In [1]:
# Setting up the spark session

from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
appName("Agam_aggregations"). \
enableHiveSupport(). \
master("yarn"). \
getOrCreate()

In [2]:
spark

<div class="alert alert-block alert-success">
<b>Let's define the schema so we can enforce it later while reading the file rather than infer.
    I am using the hotel data for carrying out the analysis in this notebook.
</div>

In [3]:
hotel_schema = "customer_id long, customer_name string, check_in_date date, check_out_date date, room_type string, price float"

# Read the data from HDFS location
hotel_data = spark.read.format("csv").option("header", "true").schema(hotel_schema).load("/public/trendytech/datasets/hotel_data.csv")

# Print the first 20 rows to see the data format
hotel_data.show(truncate=False)

+-----------+-----------------+-------------+--------------+---------+-----+
|customer_id|customer_name    |check_in_date|check_out_date|room_type|price|
+-----------+-----------------+-------------+--------------+---------+-----+
|2          |Jane Smith       |2023-05-02   |2023-05-06    |Deluxe   |600.0|
|3          |Mark Johnson     |2023-05-03   |2023-05-08    |Standard |450.0|
|4          |Sarah Wilson     |2023-05-04   |2023-05-07    |Executive|750.0|
|5          |Emily Brown      |2023-05-06   |2023-05-09    |Deluxe   |550.0|
|6          |Michael Davis    |2023-05-07   |2023-05-10    |Standard |400.0|
|7          |Samantha Thompson|2023-05-08   |2023-05-12    |Deluxe   |600.0|
|8          |William Lee      |2023-05-10   |2023-05-13    |Standard |450.0|
|9          |Amanda Harris    |2023-05-11   |2023-05-16    |Executive|750.0|
|10         |David Rodriguez  |2023-05-12   |2023-05-15    |Deluxe   |550.0|
|11         |Linda Wilson     |2023-05-14   |2023-05-18    |Standard |400.0|

In [4]:
# Printing the dataframe schema to ensure all columns are of correct data types as intended

hotel_data.printSchema()

root
 |-- customer_id: long (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- check_in_date: date (nullable = true)
 |-- check_out_date: date (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: float (nullable = true)



**Now let's deep dive to perform some simple and Grouping aggregations first:**

`Note: I will follow the Spark SQL style in this notebook to perform different aggregations. We will cover the below scenarios as part of simple aggregations:`

* Get total records from the dataframe
* Get all distinct room types.
* Calculate total price for each room type
* Calculate avg. price for each room type
* Find out customers who have stayed more than or equal to 10 times in different types of rooms

In [5]:
# Importing all the sql funcations from Pyspark3 to run the code

from pyspark.sql import *
from pyspark.sql.functions import *

##### `Creating a spark table from the dataframe which can be then used to perform different actions.`

In [6]:
hotel_data.createOrReplaceTempView("hoteldata")

_`Let's use the basic count() aggregate first to see how many records we have in our dataframe:`_

In [7]:
spark.sql("select count(*) as total_rows from hoteldata")

total_rows
106


_`Let's now see how many number of customers are there for distinct room types in this dataset:`_

In [8]:
spark.sql("select room_type, count(customer_id) as number_of_customers from hoteldata group by room_type")

room_type,number_of_customers
Executive,20
Deluxe,43
Standard,43


_`Let's find out the total price for different categories of rooms listed:`_

In [9]:
spark.sql("select sum(price) as total_price, room_type from hoteldata group by room_type order by total_price DESC")

total_price,room_type
24750.0,Deluxe
18300.0,Standard
15000.0,Executive


<div class="alert alert-block alert-success">
<b>We get to know that most people prefer Deluxe rooms so the total price is highest for these rooms over all the entries
</div>

_`Let's find out the average price for different categories of rooms listed:`_

In [10]:
spark.sql("select round(avg(price),2) as average_price, room_type from hoteldata group by room_type")

average_price,room_type
750.0,Executive
575.58,Deluxe
425.58,Standard


<div class="alert alert-block alert-success">
<b>Even when most people prefer Deluxe rooms for their stay, we see Average price for Executive rooms to be highest and that is because all the price entries for Executive rooms have 750 conistently, however Deluxe rooms are offered on different rates across different dates. 
</div>

_`Let's find out top customers who have stayed more than or equal to 10 times in different types of rooms listed:`_

In [11]:
spark.sql("""select customer_name, room_type, 
count(*) as total_entries from hoteldata 
group by customer_name, room_type
having count(customer_name) >= 10""")

customer_name,room_type,total_entries
James Smith,Executive,10
Robert Johnson,Deluxe,10
William Lee,Standard,11
Michael Davis,Standard,11


<div class="alert alert-block alert-success">
<b>We can conclude that there are only 4 customers out of 22 distinct who have stayed 10 or more times in different rooms making them frequent customers.
</div>

### The above data only had 106 records in total so I will be using a different dataset (with more records) to perform Advanced Grouping and Windowing aggregations.

In [12]:
# Setting up a new spark session

from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
master("yarn"). \
getOrCreate()

In [13]:
# Using the Window data modified for this analysis.
windows_schema = "country string, weeknum long, numinvoices long, totalquantity long, invoicevalue float"

window_data = spark.read.format("csv").option("header", "true").schema(windows_schema).load("/public/trendytech/datasets/windowdatamodified.csv")

# Print the first 20 rows to see the data format
window_data.show(truncate=False)

+--------------+-------+-----------+-------------+------------+
|country       |weeknum|numinvoices|totalquantity|invoicevalue|
+--------------+-------+-----------+-------------+------------+
|Spain         |49     |1          |67           |174.72      |
|Germany       |48     |11         |1795         |1600.0      |
|Lithuania     |48     |3          |622          |1598.06     |
|Germany       |49     |12         |1852         |1800.0      |
|Bahrain       |51     |1          |54           |205.74      |
|Iceland       |49     |1          |319          |711.79      |
|India         |51     |5          |95           |300.0       |
|Australia     |50     |2          |133          |387.95      |
|Italy         |49     |1          |-2           |-17.0       |
|India         |49     |5          |1280         |3284.1      |
|Spain         |50     |2          |400          |1049.01     |
|United Kingdom|51     |200        |28782        |75103.46    |
|Norway        |49     |1          |1730

In [14]:
# Printing the dataframe schema to ensure all columns are of correct data types as intended

window_data.printSchema()

root
 |-- country: string (nullable = true)
 |-- weeknum: long (nullable = true)
 |-- numinvoices: long (nullable = true)
 |-- totalquantity: long (nullable = true)
 |-- invoicevalue: float (nullable = true)



**Now let's deep dive to perform some windowing aggregations:**

`Note: I will follow the Spark SQL style in this notebook to perform different aggregations. We will cover the below scenarios as part of windowing aggregations:`

* Use Rank() function
* Use denserank() function
* Use row_number() function
* Use lag and lead functions.
* Run a pivot table.

`As we know that, in order to perform all above windowing functions, we need to define a window which should contain a:`

* Partitioning Column
* Sorting Column
* Size of Window

In [15]:
# Let's define the window based on the data we have read

mywindow_1 = Window.partitionBy("country").orderBy("weeknum").rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Calculating the running cost after partitioning by Country and sorting by Week number
windows_df_1 = window_data.withColumn("running_total", sum("invoicevalue").over(mywindow_1))
windows_df_1.show()

+-------+-------+-----------+-------------+------------+------------------+
|country|weeknum|numinvoices|totalquantity|invoicevalue|     running_total|
+-------+-------+-----------+-------------+------------+------------------+
| Sweden|     50|          3|         3714|      2646.3| 2646.300048828125|
|Germany|     48|         11|         1795|      1600.0|            1600.0|
|Germany|     49|         12|         1852|      1800.0|            3400.0|
|Germany|     50|         15|         1973|      1800.0|            5200.0|
|Germany|     51|          5|         1103|      1600.0|            6800.0|
| France|     48|          4|         1299|       500.0|             500.0|
| France|     49|          9|         2303|       500.0|            1000.0|
| France|     50|          6|          529|      537.32|1537.3200073242188|
| France|     51|          5|          847|       500.0|2037.3200073242188|
|Belgium|     48|          1|          528|       800.0|             800.0|
|Belgium|   

In [16]:
windows_df_rank_1 = windows_df_1.withColumn("rank", rank().over(mywindow_1))
windows_df_rank_1.show()

+-------+-------+-----------+-------------+------------+------------------+----+
|country|weeknum|numinvoices|totalquantity|invoicevalue|     running_total|rank|
+-------+-------+-----------+-------------+------------+------------------+----+
| Sweden|     50|          3|         3714|      2646.3| 2646.300048828125|   1|
|Germany|     48|         11|         1795|      1600.0|            1600.0|   1|
|Germany|     49|         12|         1852|      1800.0|            3400.0|   2|
|Germany|     50|         15|         1973|      1800.0|            5200.0|   3|
|Germany|     51|          5|         1103|      1600.0|            6800.0|   4|
| France|     48|          4|         1299|       500.0|             500.0|   1|
| France|     49|          9|         2303|       500.0|            1000.0|   2|
| France|     50|          6|          529|      537.32|1537.3200073242188|   3|
| France|     51|          5|          847|       500.0|2037.3200073242188|   4|
|Belgium|     48|          1

In [17]:
windows_df_denserank_1 = windows_df_1.withColumn("rank", dense_rank().over(mywindow_1))
windows_df_denserank_1.show()

+-------+-------+-----------+-------------+------------+------------------+----+
|country|weeknum|numinvoices|totalquantity|invoicevalue|     running_total|rank|
+-------+-------+-----------+-------------+------------+------------------+----+
| Sweden|     50|          3|         3714|      2646.3| 2646.300048828125|   1|
|Germany|     48|         11|         1795|      1600.0|            1600.0|   1|
|Germany|     49|         12|         1852|      1800.0|            3400.0|   2|
|Germany|     50|         15|         1973|      1800.0|            5200.0|   3|
|Germany|     51|          5|         1103|      1600.0|            6800.0|   4|
| France|     48|          4|         1299|       500.0|             500.0|   1|
| France|     49|          9|         2303|       500.0|            1000.0|   2|
| France|     50|          6|          529|      537.32|1537.3200073242188|   3|
| France|     51|          5|          847|       500.0|2037.3200073242188|   4|
|Belgium|     48|          1

In [18]:
windows_df_row_num_1 = windows_df_1.withColumn("rank", row_number().over(mywindow_1))
windows_df_row_num_1.show()

+-------+-------+-----------+-------------+------------+------------------+----+
|country|weeknum|numinvoices|totalquantity|invoicevalue|     running_total|rank|
+-------+-------+-----------+-------------+------------+------------------+----+
| Sweden|     50|          3|         3714|      2646.3| 2646.300048828125|   1|
|Germany|     48|         11|         1795|      1600.0|            1600.0|   1|
|Germany|     49|         12|         1852|      1800.0|            3400.0|   2|
|Germany|     50|         15|         1973|      1800.0|            5200.0|   3|
|Germany|     51|          5|         1103|      1600.0|            6800.0|   4|
| France|     48|          4|         1299|       500.0|             500.0|   1|
| France|     49|          9|         2303|       500.0|            1000.0|   2|
| France|     50|          6|          529|      537.32|1537.3200073242188|   3|
| France|     51|          5|          847|       500.0|2037.3200073242188|   4|
|Belgium|     48|          1

<div class="alert alert-block alert-success">
<b>When we used partitionBy column as 'country' and orderBy column as 'weeknum', we can conclude see that there is no difference at all in the outputs of rank, denserank and row_num functions because there are no repetiting weeks for any country.
</div>

`Now, let's try to use the orderBy column as 'invoicevalue and see what difference does it make in the output of all the 3 functions'`

In [19]:
# Let's define the window based on the data we have read

mywindow_2 = Window.partitionBy("country").orderBy("invoicevalue").rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Calculating the running cost after partitioning by Country and sorting by Week number
windows_df_2 = window_data.withColumn("running_total", sum("invoicevalue").over(mywindow_2))
windows_df_2.show()

+-------+-------+-----------+-------------+------------+------------------+
|country|weeknum|numinvoices|totalquantity|invoicevalue|     running_total|
+-------+-------+-----------+-------------+------------+------------------+
| Sweden|     50|          3|         3714|      2646.3| 2646.300048828125|
|Germany|     48|         11|         1795|      1600.0|            1600.0|
|Germany|     51|          5|         1103|      1600.0|            3200.0|
|Germany|     49|         12|         1852|      1800.0|            5000.0|
|Germany|     50|         15|         1973|      1800.0|            6800.0|
| France|     51|          5|          847|       500.0|             500.0|
| France|     49|          9|         2303|       500.0|            1000.0|
| France|     48|          4|         1299|       500.0|            1500.0|
| France|     50|          6|          529|      537.32|2037.3200073242188|
|Belgium|     50|          2|          285|      625.16| 625.1599731445312|
|Belgium|   

In [20]:
windows_df_rank_2 = windows_df_2.withColumn("rank", rank().over(mywindow_2))
windows_df_rank_2.show()

+-------+-------+-----------+-------------+------------+------------------+----+
|country|weeknum|numinvoices|totalquantity|invoicevalue|     running_total|rank|
+-------+-------+-----------+-------------+------------+------------------+----+
| Sweden|     50|          3|         3714|      2646.3| 2646.300048828125|   1|
|Germany|     48|         11|         1795|      1600.0|            1600.0|   1|
|Germany|     51|          5|         1103|      1600.0|            3200.0|   1|
|Germany|     49|         12|         1852|      1800.0|            5000.0|   3|
|Germany|     50|         15|         1973|      1800.0|            6800.0|   3|
| France|     51|          5|          847|       500.0|             500.0|   1|
| France|     49|          9|         2303|       500.0|            1000.0|   1|
| France|     48|          4|         1299|       500.0|            1500.0|   1|
| France|     50|          6|          529|      537.32|2037.3200073242188|   4|
|Belgium|     50|          2

In [21]:
windows_df_denserank_2 = windows_df_2.withColumn("rank", dense_rank().over(mywindow_2))
windows_df_denserank_2.show()

+-------+-------+-----------+-------------+------------+------------------+----+
|country|weeknum|numinvoices|totalquantity|invoicevalue|     running_total|rank|
+-------+-------+-----------+-------------+------------+------------------+----+
| Sweden|     50|          3|         3714|      2646.3| 2646.300048828125|   1|
|Germany|     48|         11|         1795|      1600.0|            1600.0|   1|
|Germany|     51|          5|         1103|      1600.0|            3200.0|   1|
|Germany|     49|         12|         1852|      1800.0|            5000.0|   2|
|Germany|     50|         15|         1973|      1800.0|            6800.0|   2|
| France|     51|          5|          847|       500.0|             500.0|   1|
| France|     49|          9|         2303|       500.0|            1000.0|   1|
| France|     48|          4|         1299|       500.0|            1500.0|   1|
| France|     50|          6|          529|      537.32|2037.3200073242188|   2|
|Belgium|     50|          2

In [22]:
windows_df_row_num_3 = windows_df_2.withColumn("rank", row_number().over(mywindow_2))
windows_df_row_num_3.show()

+-------+-------+-----------+-------------+------------+------------------+----+
|country|weeknum|numinvoices|totalquantity|invoicevalue|     running_total|rank|
+-------+-------+-----------+-------------+------------+------------------+----+
| Sweden|     50|          3|         3714|      2646.3| 2646.300048828125|   1|
|Germany|     48|         11|         1795|      1600.0|            1600.0|   1|
|Germany|     51|          5|         1103|      1600.0|            3200.0|   2|
|Germany|     49|         12|         1852|      1800.0|            5000.0|   3|
|Germany|     50|         15|         1973|      1800.0|            6800.0|   4|
| France|     51|          5|          847|       500.0|             500.0|   1|
| France|     49|          9|         2303|       500.0|            1000.0|   2|
| France|     48|          4|         1299|       500.0|            1500.0|   3|
| France|     50|          6|          529|      537.32|2037.3200073242188|   4|
|Belgium|     50|          2

<div class="alert alert-block alert-success">
<b>As we can see that when we used partitionBy column as 'country' and orderBy column as 'invoicevalue', we can notice a clear difference in the outputs of rank() and denserank() functions becaause we have duplicate values in invoice value column for differently grouped countries.
</div>

`Now, let's try to use lag and lead functions'`

In [23]:
mywindow_lag = Window.partitionBy("country").orderBy("weeknum")

In [24]:
windows_df_lag_1 = windows_df_1.withColumn("previous_week", lag("invoicevalue").over(mywindow_lag))

windows_df_lag_1.show()

+-------+-------+-----------+-------------+------------+------------------+-------------+
|country|weeknum|numinvoices|totalquantity|invoicevalue|     running_total|previous_week|
+-------+-------+-----------+-------------+------------+------------------+-------------+
| Sweden|     50|          3|         3714|      2646.3| 2646.300048828125|         null|
|Germany|     48|         11|         1795|      1600.0|            1600.0|         null|
|Germany|     49|         12|         1852|      1800.0|            3400.0|       1600.0|
|Germany|     50|         15|         1973|      1800.0|            5200.0|       1800.0|
|Germany|     51|          5|         1103|      1600.0|            6800.0|       1800.0|
| France|     48|          4|         1299|       500.0|             500.0|         null|
| France|     49|          9|         2303|       500.0|            1000.0|        500.0|
| France|     50|          6|          529|      537.32|1537.3200073242188|        500.0|
| France| 

In [25]:
mywindow_lead = Window.partitionBy("country").orderBy("weeknum")

In [26]:
windows_df_lead_1 = windows_df_1.withColumn("next_week", lead("invoicevalue").over(mywindow_lead))

windows_df_lead_1.show()

+-------+-------+-----------+-------------+------------+------------------+---------+
|country|weeknum|numinvoices|totalquantity|invoicevalue|     running_total|next_week|
+-------+-------+-----------+-------------+------------+------------------+---------+
| Sweden|     50|          3|         3714|      2646.3| 2646.300048828125|     null|
|Germany|     48|         11|         1795|      1600.0|            1600.0|   1800.0|
|Germany|     49|         12|         1852|      1800.0|            3400.0|   1800.0|
|Germany|     50|         15|         1973|      1800.0|            5200.0|   1600.0|
|Germany|     51|          5|         1103|      1600.0|            6800.0|     null|
| France|     48|          4|         1299|       500.0|             500.0|    500.0|
| France|     49|          9|         2303|       500.0|            1000.0|   537.32|
| France|     50|          6|          529|      537.32|1537.3200073242188|    500.0|
| France|     51|          5|          847|       500.

### Dealing with NULLS in Apache Spark

In [39]:
data = [
    ('Alice', 25, 'New York'),
     ('Bob', 25, 'Los Angeles'),
    ('Charlie', None, 'Chicago'),
    (None, 33, 'Houston'),
    ('Frank', 27, None),
    ('Henry', 30, 'Seattle'),
    (None, None, None),
    ('Ivy', None, 'Denver')
]

In [40]:
sampledata_schema = "student_name string, student_age int, city string"

In [41]:
sampledata_df = spark.createDataFrame(data=data, schema=sampledata_schema)
sampledata_df.show()

+------------+-----------+-----------+
|student_name|student_age|       city|
+------------+-----------+-----------+
|       Alice|         25|   New York|
|         Bob|         25|Los Angeles|
|     Charlie|       null|    Chicago|
|        null|         33|    Houston|
|       Frank|         27|       null|
|       Henry|         30|    Seattle|
|        null|       null|       null|
|         Ivy|       null|     Denver|
+------------+-----------+-----------+



`We can drop the nulls by dropping the rows having NA's`

In [42]:
refined_df = sampledata_df.dropna()
refined_df.show()

+------------+-----------+-----------+
|student_name|student_age|       city|
+------------+-----------+-----------+
|       Alice|         25|   New York|
|         Bob|         25|Los Angeles|
|       Henry|         30|    Seattle|
+------------+-----------+-----------+



`We can also drop Nulls based on specific columns`

In [46]:
refined_df_columns = sampledata_df.dropna(subset=['student_name', 'student_age'])
refined_df_columns.show()

+------------+-----------+-----------+
|student_name|student_age|       city|
+------------+-----------+-----------+
|       Alice|         25|   New York|
|         Bob|         25|Los Angeles|
|       Frank|         27|       null|
|       Henry|         30|    Seattle|
+------------+-----------+-----------+

