In [1]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
    builder. \
    config('spark.ui.port','0'). \
    config("spark.sql.warehouse.dir",f"/user/itv010130/warehouse"). \
    enableHiveSupport(). \
    master('yarn'). \
    getOrCreate()

In [22]:
orders_df = spark.read \
.format("csv") \
.option("header","true") \
.option("inferSchema","true") \
.load("/public/trendytech/datasets/order_data.csv")

In [4]:
orders_df.show(5)

+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|   536378|     null|PACK OF 60 DINOSA...|      24|01-12-2010 9.37|     0.55|     14688|United Kingdom|
|   536378|     null|PACK OF 60 PINK P...|      24|01-12-2010 9.37|     0.55|     14688|United Kingdom|
|   536378|    84991|60 TEATIME FAIRY ...|      24|01-12-2010 9.37|     0.55|     14688|United Kingdom|
|   536378|   84519A|TOMATO CHARLIE+LO...|       6|01-12-2010 9.37|     2.95|     14688|United Kingdom|
|   536378|   85183B|CHARLIE & LOLA WA...|      48|01-12-2010 9.37|     1.25|     14688|United Kingdom|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
only showing top 5 rows



# Aggregate functions

## 1. Simple Aggregation

### Count the total number of records in the dataset, distinct invoiceno, sum of quantities, avg unit price

### a. Programatic style

In [6]:
from pyspark.sql.functions import *

In [17]:
orders_df.select(count("*").alias("total_rows"),countDistinct("invoiceno").alias("unique_invoice"),sum("quantity").alias("total_quantity"),avg("unitprice").alias("avg_unitprice")).show()

+----------+--------------+--------------+-----------------+
|total_rows|unique_invoice|total_quantity|    avg_unitprice|
+----------+--------------+--------------+-----------------+
|    541782|         25858|       5175855|4.611565323321931|
+----------+--------------+--------------+-----------------+



### b. Select expression

In [16]:
orders_df.selectExpr("count(*) as total_rows","count(distinct(invoiceno)) as unique_invoice","sum(quantity) as total_quantity","avg(unitprice) as avg_unitprice").show()

+----------+--------------+--------------+-----------------+
|total_rows|unique_invoice|total_quantity|    avg_unitprice|
+----------+--------------+--------------+-----------------+
|    541782|         25858|       5175855|4.611565323321931|
+----------+--------------+--------------+-----------------+



### c. Spark sql

In [23]:
orders_df.createOrReplaceTempView("orders")

In [25]:
spark.sql("select count(*) as total_rows,count(distinct(invoiceno)) as unique_invoice,sum(quantity) as total_quantity,avg(unitprice) as avg_unitprice from orders").show()

+----------+--------------+--------------+-----------------+
|total_rows|unique_invoice|total_quantity|    avg_unitprice|
+----------+--------------+--------------+-----------------+
|    541782|         25858|       5175855|4.611565323321929|
+----------+--------------+--------------+-----------------+



## 2. Grouping Aggregation

### Total quantity, total invoice price of orders grouped by invoice number and country

### a. Programatic style

In [29]:
result_df = orders_df.groupBy("country","invoiceno").agg(sum("quantity").alias("total_quantity"),sum(expr("quantity * unitprice")).alias("invoice_cost")).sort("invoiceno")

In [32]:
result_df.show()

+--------------+---------+--------------+------------------+
|       country|invoiceno|total_quantity|      invoice_cost|
+--------------+---------+--------------+------------------+
|United Kingdom|   536378|           242|192.78000000000003|
|United Kingdom|   536380|            24|              34.8|
|United Kingdom|   536381|           198|449.97999999999996|
|United Kingdom|   536382|           134|430.59999999999997|
|United Kingdom|   536384|           190|             489.6|
|United Kingdom|   536385|            53|            130.85|
|United Kingdom|   536386|           236|508.20000000000005|
|United Kingdom|   536387|          1440|           3193.92|
|United Kingdom|   536388|           108|            226.14|
|     Australia|   536389|           107|            358.25|
|United Kingdom|   536390|          1568|           1825.74|
|United Kingdom|   536392|           103|318.14000000000004|
|United Kingdom|   536393|             8|              79.6|
|United Kingdom|   53639

### b. Select expression

In [44]:
result_df = orders_df.groupBy("country","invoiceno").agg(expr("sum(quantity) as total_quantity"),expr("sum(quantity * unitprice) as invoice_cost")).sort("invoiceno")

In [45]:
result_df.show()

+--------------+---------+--------------+------------------+
|       country|invoiceno|total_quantity|      invoice_cost|
+--------------+---------+--------------+------------------+
|United Kingdom|   536378|           242|192.78000000000003|
|United Kingdom|   536380|            24|              34.8|
|United Kingdom|   536381|           198|449.97999999999996|
|United Kingdom|   536382|           134|430.59999999999997|
|United Kingdom|   536384|           190|             489.6|
|United Kingdom|   536385|            53|            130.85|
|United Kingdom|   536386|           236|508.20000000000005|
|United Kingdom|   536387|          1440|           3193.92|
|United Kingdom|   536388|           108|            226.14|
|     Australia|   536389|           107|            358.25|
|United Kingdom|   536390|          1568|           1825.74|
|United Kingdom|   536392|           103|318.14000000000004|
|United Kingdom|   536393|             8|              79.6|
|United Kingdom|   53639

### c. Spark sql

In [46]:
orders_df.createOrReplaceTempView("orders")

In [50]:
spark.sql("select country,invoiceno,sum(distinct(quantity)) as total_quantity,sum(quantity * unitprice) as invoice_cost from orders group by country,invoiceno order by invoiceno").show()

+--------------+---------+--------------+------------------+
|       country|invoiceno|total_quantity|      invoice_cost|
+--------------+---------+--------------+------------------+
|United Kingdom|   536378|           184|192.78000000000003|
|United Kingdom|   536380|            24|              34.8|
|United Kingdom|   536381|           114|            449.98|
|United Kingdom|   536382|            86|             430.6|
|United Kingdom|   536384|           123|             489.6|
|United Kingdom|   536385|            31|            130.85|
|United Kingdom|   536386|           136|508.20000000000005|
|United Kingdom|   536387|           624|           3193.92|
|United Kingdom|   536388|            27|            226.14|
|     Australia|   536389|            59|            358.25|
|United Kingdom|   536390|           876|           1825.74|
|United Kingdom|   536392|            63|            318.14|
|United Kingdom|   536393|             8|              79.6|
|United Kingdom|   53639

In [72]:
spark.stop()

## 3. Windowing Aggregation

In [2]:
orders_df2 = spark.read \
.format("csv") \
.option("header","true") \
.option("inferSchema","true") \
.load("/public/trendytech/datasets/windowdatamodified.csv")

In [34]:
orders_df2.sort("country").show()

+---------------+-------+-----------+-------------+------------+
|        country|weeknum|numinvoices|totalquantity|invoicevalue|
+---------------+-------+-----------+-------------+------------+
|      Australia|     49|          1|          214|       258.9|
|      Australia|     48|          1|          107|      358.25|
|      Australia|     50|          2|          133|      387.95|
|        Austria|     50|          2|            3|      257.04|
|        Bahrain|     51|          1|           54|      205.74|
|        Belgium|     48|          1|          528|       800.0|
|        Belgium|     50|          2|          285|      625.16|
|        Belgium|     51|          2|          942|       800.0|
|Channel Islands|     49|          1|           80|      363.53|
|         Cyprus|     50|          1|          917|     1590.82|
|        Denmark|     49|          1|          454|      1281.5|
|        Finland|     50|          1|         1254|       892.8|
|         France|     49|

In [3]:
from pyspark.sql import *

In [36]:
my_orders = Window.partitionBy("country").orderBy("weeknum").rowsBetween(Window.unboundedPreceding,Window.currentRow)

In [37]:
result = orders_df2.withColumn("running_total",sum("invoicevalue").over(my_orders))

In [38]:
result.show()

+-------+-------+-----------+-------------+------------+------------------+
|country|weeknum|numinvoices|totalquantity|invoicevalue|     running_total|
+-------+-------+-----------+-------------+------------+------------------+
| Sweden|     50|          3|         3714|      2646.3|            2646.3|
|Germany|     48|         11|         1795|      1600.0|            1600.0|
|Germany|     49|         12|         1852|      1800.0|            3400.0|
|Germany|     50|         15|         1973|      1800.0|            5200.0|
|Germany|     51|          5|         1103|      1600.0|            6800.0|
| France|     48|          4|         1299|       500.0|             500.0|
| France|     49|          9|         2303|       500.0|            1000.0|
| France|     50|          6|          529|      537.32|1537.3200000000002|
| France|     51|          5|          847|       500.0|2037.3200000000002|
|Belgium|     48|          1|          528|       800.0|             800.0|
|Belgium|   

In [39]:
my_orders2 = Window.partitionBy("country").orderBy("weeknum").rowsBetween(-2,Window.currentRow)

In [40]:
result2 = orders_df2.withColumn("running_total",sum("invoicevalue").over(my_orders2))

In [41]:
result2.show()

+-------+-------+-----------+-------------+------------+------------------+
|country|weeknum|numinvoices|totalquantity|invoicevalue|     running_total|
+-------+-------+-----------+-------------+------------+------------------+
| Sweden|     50|          3|         3714|      2646.3|            2646.3|
|Germany|     48|         11|         1795|      1600.0|            1600.0|
|Germany|     49|         12|         1852|      1800.0|            3400.0|
|Germany|     50|         15|         1973|      1800.0|            5200.0|
|Germany|     51|          5|         1103|      1600.0|            5200.0|
| France|     48|          4|         1299|       500.0|             500.0|
| France|     49|          9|         2303|       500.0|            1000.0|
| France|     50|          6|          529|      537.32|1537.3200000000002|
| France|     51|          5|          847|       500.0|1537.3200000000002|
|Belgium|     48|          1|          528|       800.0|             800.0|
|Belgium|   

## Windowing Functions

### 1. Rank Function

In [42]:
my_orders3 = Window.partitionBy("country").orderBy(desc("invoicevalue"))

In [43]:
result3 = orders_df2.withColumn("rank",rank().over(my_orders3))

In [44]:
result3.show()

+-------+-------+-----------+-------------+------------+----+
|country|weeknum|numinvoices|totalquantity|invoicevalue|rank|
+-------+-------+-----------+-------------+------------+----+
| Sweden|     50|          3|         3714|      2646.3|   1|
|Germany|     49|         12|         1852|      1800.0|   1|
|Germany|     50|         15|         1973|      1800.0|   1|
|Germany|     48|         11|         1795|      1600.0|   3|
|Germany|     51|          5|         1103|      1600.0|   3|
| France|     50|          6|          529|      537.32|   1|
| France|     51|          5|          847|       500.0|   2|
| France|     49|          9|         2303|       500.0|   2|
| France|     48|          4|         1299|       500.0|   2|
|Belgium|     48|          1|          528|       800.0|   1|
|Belgium|     51|          2|          942|       800.0|   1|
|Belgium|     50|          2|          285|      625.16|   3|
|Finland|     50|          1|         1254|       892.8|   1|
|  India

### 2. Dense Rank

In [45]:
my_orders4 = Window.partitionBy("country").orderBy(desc("invoicevalue"))

In [46]:
result4 = orders_df2.withColumn("rank",dense_rank().over(my_orders4))

In [47]:
result4.show()

+-------+-------+-----------+-------------+------------+----+
|country|weeknum|numinvoices|totalquantity|invoicevalue|rank|
+-------+-------+-----------+-------------+------------+----+
| Sweden|     50|          3|         3714|      2646.3|   1|
|Germany|     49|         12|         1852|      1800.0|   1|
|Germany|     50|         15|         1973|      1800.0|   1|
|Germany|     48|         11|         1795|      1600.0|   2|
|Germany|     51|          5|         1103|      1600.0|   2|
| France|     50|          6|          529|      537.32|   1|
| France|     51|          5|          847|       500.0|   2|
| France|     49|          9|         2303|       500.0|   2|
| France|     48|          4|         1299|       500.0|   2|
|Belgium|     48|          1|          528|       800.0|   1|
|Belgium|     51|          2|          942|       800.0|   1|
|Belgium|     50|          2|          285|      625.16|   2|
|Finland|     50|          1|         1254|       892.8|   1|
|  India

### 3. Row Number

In [4]:
my_orders5 = Window.partitionBy("country").orderBy("invoicevalue")

In [7]:
result5 = orders_df2.withColumn("rank",row_number().over(my_orders5))

In [8]:
result5.show()

+-------+-------+-----------+-------------+------------+----+
|country|weeknum|numinvoices|totalquantity|invoicevalue|rank|
+-------+-------+-----------+-------------+------------+----+
| Sweden|     50|          3|         3714|      2646.3|   1|
|Germany|     48|         11|         1795|      1600.0|   1|
|Germany|     51|          5|         1103|      1600.0|   2|
|Germany|     49|         12|         1852|      1800.0|   3|
|Germany|     50|         15|         1973|      1800.0|   4|
| France|     51|          5|          847|       500.0|   1|
| France|     49|          9|         2303|       500.0|   2|
| France|     48|          4|         1299|       500.0|   3|
| France|     50|          6|          529|      537.32|   4|
|Belgium|     50|          2|          285|      625.16|   1|
|Belgium|     48|          1|          528|       800.0|   2|
|Belgium|     51|          2|          942|       800.0|   3|
|Finland|     50|          1|         1254|       892.8|   1|
|  India

In [9]:
result5.select("*").where("rank==1").show()

+---------------+-------+-----------+-------------+------------+----+
|        country|weeknum|numinvoices|totalquantity|invoicevalue|rank|
+---------------+-------+-----------+-------------+------------+----+
|         Sweden|     50|          3|         3714|      2646.3|   1|
|        Germany|     48|         11|         1795|      1600.0|   1|
|         France|     51|          5|          847|       500.0|   1|
|        Belgium|     50|          2|          285|      625.16|   1|
|        Finland|     50|          1|         1254|       892.8|   1|
|          India|     51|          5|           95|       300.0|   1|
|          Italy|     49|          1|           -2|       -17.0|   1|
|      Lithuania|     49|          1|           30|        63.0|   1|
|         Norway|     49|          1|         1730|     1867.98|   1|
|          Spain|     49|          1|           67|      174.72|   1|
|        Denmark|     49|          1|          454|      1281.5|   1|
|        Iceland|   

In [10]:
result5.select("*").where("rank==1").drop("rank").show()

+---------------+-------+-----------+-------------+------------+
|        country|weeknum|numinvoices|totalquantity|invoicevalue|
+---------------+-------+-----------+-------------+------------+
|         Sweden|     50|          3|         3714|      2646.3|
|        Germany|     48|         11|         1795|      1600.0|
|         France|     51|          5|          847|       500.0|
|        Belgium|     50|          2|          285|      625.16|
|        Finland|     50|          1|         1254|       892.8|
|          India|     51|          5|           95|       300.0|
|          Italy|     49|          1|           -2|       -17.0|
|      Lithuania|     49|          1|           30|        63.0|
|         Norway|     49|          1|         1730|     1867.98|
|          Spain|     49|          1|           67|      174.72|
|        Denmark|     49|          1|          454|      1281.5|
|        Iceland|     49|          1|          319|      711.79|
|         Israel|     50|

In [11]:
result5.select("*").where("rank<3").show()

+---------+-------+-----------+-------------+------------+----+
|  country|weeknum|numinvoices|totalquantity|invoicevalue|rank|
+---------+-------+-----------+-------------+------------+----+
|   Sweden|     50|          3|         3714|      2646.3|   1|
|  Germany|     48|         11|         1795|      1600.0|   1|
|  Germany|     51|          5|         1103|      1600.0|   2|
|   France|     51|          5|          847|       500.0|   1|
|   France|     49|          9|         2303|       500.0|   2|
|  Belgium|     50|          2|          285|      625.16|   1|
|  Belgium|     48|          1|          528|       800.0|   2|
|  Finland|     50|          1|         1254|       892.8|   1|
|    India|     51|          5|           95|       300.0|   1|
|    India|     48|          7|         2822|       300.0|   2|
|    Italy|     49|          1|           -2|       -17.0|   1|
|    Italy|     51|          1|          131|       383.7|   2|
|Lithuania|     49|          1|         

### Lag Function

In [12]:
my_orders6 = Window.partitionBy("country").orderBy("weeknum")

In [16]:
result6 = orders_df2.withColumn("previous_week",lag("invoicevalue").over(my_orders6))

In [17]:
result6.show()

+-------+-------+-----------+-------------+------------+-------------+
|country|weeknum|numinvoices|totalquantity|invoicevalue|previous_week|
+-------+-------+-----------+-------------+------------+-------------+
| Sweden|     50|          3|         3714|      2646.3|         null|
|Germany|     48|         11|         1795|      1600.0|         null|
|Germany|     49|         12|         1852|      1800.0|       1600.0|
|Germany|     50|         15|         1973|      1800.0|       1800.0|
|Germany|     51|          5|         1103|      1600.0|       1800.0|
| France|     48|          4|         1299|       500.0|         null|
| France|     49|          9|         2303|       500.0|        500.0|
| France|     50|          6|          529|      537.32|        500.0|
| France|     51|          5|          847|       500.0|       537.32|
|Belgium|     48|          1|          528|       800.0|         null|
|Belgium|     50|          2|          285|      625.16|        800.0|
|Belgi

In [26]:
lag_result = result6.withColumn("invoice_diff",expr("invoicevalue - previous_week"))

In [27]:
lag_result.show()

+-------+-------+-----------+-------------+------------+-------------+-------------------+
|country|weeknum|numinvoices|totalquantity|invoicevalue|previous_week|       invoice_diff|
+-------+-------+-----------+-------------+------------+-------------+-------------------+
| Sweden|     50|          3|         3714|      2646.3|         null|               null|
|Germany|     48|         11|         1795|      1600.0|         null|               null|
|Germany|     49|         12|         1852|      1800.0|       1600.0|              200.0|
|Germany|     50|         15|         1973|      1800.0|       1800.0|                0.0|
|Germany|     51|          5|         1103|      1600.0|       1800.0|             -200.0|
| France|     48|          4|         1299|       500.0|         null|               null|
| France|     49|          9|         2303|       500.0|        500.0|                0.0|
| France|     50|          6|          529|      537.32|        500.0|  37.32000000000005|

### Lead Function

In [30]:
result7 = orders_df2.withColumn("upper_week",lead("invoicevalue").over(my_orders6))

In [31]:
result7.show()

+-------+-------+-----------+-------------+------------+----------+
|country|weeknum|numinvoices|totalquantity|invoicevalue|upper_week|
+-------+-------+-----------+-------------+------------+----------+
| Sweden|     50|          3|         3714|      2646.3|      null|
|Germany|     48|         11|         1795|      1600.0|    1800.0|
|Germany|     49|         12|         1852|      1800.0|    1800.0|
|Germany|     50|         15|         1973|      1800.0|    1600.0|
|Germany|     51|          5|         1103|      1600.0|      null|
| France|     48|          4|         1299|       500.0|     500.0|
| France|     49|          9|         2303|       500.0|    537.32|
| France|     50|          6|          529|      537.32|     500.0|
| France|     51|          5|          847|       500.0|      null|
|Belgium|     48|          1|          528|       800.0|    625.16|
|Belgium|     50|          2|          285|      625.16|     800.0|
|Belgium|     51|          2|          942|     

In [37]:
lead_result = result7.withColumn("invoice_diff",expr("upper_week - invoicevalue"))

In [38]:
lead_result.show()

+-------+-------+-----------+-------------+------------+----------+-------------------+
|country|weeknum|numinvoices|totalquantity|invoicevalue|upper_week|       invoice_diff|
+-------+-------+-----------+-------------+------------+----------+-------------------+
| Sweden|     50|          3|         3714|      2646.3|      null|               null|
|Germany|     48|         11|         1795|      1600.0|    1800.0|              200.0|
|Germany|     49|         12|         1852|      1800.0|    1800.0|                0.0|
|Germany|     50|         15|         1973|      1800.0|    1600.0|             -200.0|
|Germany|     51|          5|         1103|      1600.0|      null|               null|
| France|     48|          4|         1299|       500.0|     500.0|                0.0|
| France|     49|          9|         2303|       500.0|    537.32|  37.32000000000005|
| France|     50|          6|          529|      537.32|     500.0| -37.32000000000005|
| France|     51|          5|   