# Lab 8: Cube, Rollup, Sliding Window grouping and Window functions

In [0]:
from pyspark.sql.functions import col, avg, to_date, round, window, desc, rank, max, sum
from pyspark.sql.types import StructField, StructType, StringType, DoubleType
from pyspark.sql.window import Window

In [0]:
mpgdf = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("dbfs:/FileStore/mpg.csv")
aapl_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("dbfs:/FileStore/aapl_2017.csv").withColumn("date", to_date(col("date"), "yyyy-MM-dd"))

(5 marks) The file mpg.csv contains data about various automobiles. Load this file into a dataframe and perform the following:
- Filter the dataframe to only include 'ford' and 'dodge' vehicles
- Perform a Rollup on the manufacturer and number of cylinders columns, which displays the total vehicles from that manufacturer with the given number of cylinders. Be sure to order the rows so the null values are contained last.
- Perform the same steps using the Cube function instead of Rollup

In [0]:
mpgdf = mpgdf.filter((mpgdf.manufacturer == 'ford') | (mpgdf.manufacturer == 'dodge')) # Filter by Ford and Dodge
mpgdf.show()

+---+------------+-----------------+-----+----+---+----------+---+---+---+---+-------+
|_c0|manufacturer|            model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+---+------------+-----------------+-----+----+---+----------+---+---+---+---+-------+
| 38|       dodge|      caravan 2wd|  2.4|1999|  4|  auto(l3)|  f| 18| 24|  r|minivan|
| 39|       dodge|      caravan 2wd|  3.0|1999|  6|  auto(l4)|  f| 17| 24|  r|minivan|
| 40|       dodge|      caravan 2wd|  3.3|1999|  6|  auto(l4)|  f| 16| 22|  r|minivan|
| 41|       dodge|      caravan 2wd|  3.3|1999|  6|  auto(l4)|  f| 16| 22|  r|minivan|
| 42|       dodge|      caravan 2wd|  3.3|2008|  6|  auto(l4)|  f| 17| 24|  r|minivan|
| 43|       dodge|      caravan 2wd|  3.3|2008|  6|  auto(l4)|  f| 17| 24|  r|minivan|
| 44|       dodge|      caravan 2wd|  3.3|2008|  6|  auto(l4)|  f| 11| 17|  e|minivan|
| 45|       dodge|      caravan 2wd|  3.8|1999|  6|  auto(l4)|  f| 15| 22|  r|minivan|
| 46|       dodge|      caravan 2wd|  3.8|1

In [0]:
mpgdfRolledUp = mpgdf.rollup(mpgdf.manufacturer, mpgdf.cyl).count().orderBy('cyl', ascending=False).show()

+------------+----+-----+
|manufacturer| cyl|count|
+------------+----+-----+
|       dodge|   8|   21|
|        ford|   8|   15|
|        ford|   6|   10|
|       dodge|   6|   15|
|       dodge|   4|    1|
|       dodge|null|   37|
|        null|null|   62|
|        ford|null|   25|
+------------+----+-----+



In [0]:
mpgdfCubed = mpgdf.cube(mpgdf.manufacturer, mpgdf.cyl).count().orderBy('cyl', 'manufacturer', ascending=False).show()

+------------+----+-----+
|manufacturer| cyl|count|
+------------+----+-----+
|        ford|   8|   15|
|       dodge|   8|   21|
|        null|   8|   36|
|        ford|   6|   10|
|       dodge|   6|   15|
|        null|   6|   25|
|       dodge|   4|    1|
|        null|   4|    1|
|        ford|null|   25|
|       dodge|null|   37|
|        null|null|   62|
+------------+----+-----+



(5 marks) #The file aapl-2017.csv contains information about daily apple stock prices. - Load this file into a dataframe and perform the following:
- Calculate the monthy average closing price using a window function inside the groupBy transformation. 
- For the window function, use a sliding window which slides by 1 week at a time. 
- Be sure to order the results in ascending order by the start date of the window
- Output the start time, end time for the window as well as the monthly average

In [0]:
aapl_df.groupBy(window(col("date"), "4 week", "1 week")) \
    .agg(round(avg(col("close")), 2).alias("monthly avg")) \
    .orderBy(col("window.start")) \
    .show(truncate=False)

+------------------------------------------+-----------+
|window                                    |monthly avg|
+------------------------------------------+-----------+
|{2016-12-08 00:00:00, 2017-01-05 00:00:00}|116.08     |
|{2016-12-15 00:00:00, 2017-01-12 00:00:00}|117.79     |
|{2016-12-22 00:00:00, 2017-01-19 00:00:00}|118.44     |
|{2016-12-29 00:00:00, 2017-01-26 00:00:00}|119.03     |
|{2017-01-05 00:00:00, 2017-02-02 00:00:00}|120.42     |
|{2017-01-12 00:00:00, 2017-02-09 00:00:00}|123.53     |
|{2017-01-19 00:00:00, 2017-02-16 00:00:00}|126.86     |
|{2017-01-26 00:00:00, 2017-02-23 00:00:00}|130.54     |
|{2017-02-02 00:00:00, 2017-03-02 00:00:00}|134.3      |
|{2017-02-09 00:00:00, 2017-03-09 00:00:00}|136.67     |
|{2017-02-16 00:00:00, 2017-03-16 00:00:00}|138.15     |
|{2017-02-23 00:00:00, 2017-03-23 00:00:00}|139.17     |
|{2017-03-02 00:00:00, 2017-03-30 00:00:00}|140.34     |
|{2017-03-09 00:00:00, 2017-04-06 00:00:00}|141.52     |
|{2017-03-16 00:00:00, 2017-04-

In [0]:
data = [
("2025-01-01", "Eric's Bikes",  "Norco Storm",          4500.75),
("2025-01-01", "Eric's Bikes",  "Cannondale Optimo",    5200.50),
("2025-01-01", "CNA Bikes",     "Specialized S-Works",  4800.25),
("2025-01-01", "CNA Bikes",     "Trek Madone",          4600.10),
("2025-01-01", "Canary Cycles", "Norco Storm",          5100.95),
("2025-01-01", "Canary Cycles", "Cannondale Optimo",    4750.60),
("2025-01-02", "Eric's Bikes",  "Norco Storm",          4450.00),
("2025-01-02", "Eric's Bikes",  "Specialized S-Works",  5350.30),
("2025-01-02", "CNA Bikes",     "Trek Madone",          4300.75),
("2025-01-02", "CNA Bikes",     "Cannondale Optimo",    4900.20),
("2025-01-02", "Canary Cycles", "Norco Storm",          5500.00),
("2025-01-02", "Canary Cycles", "Specialized S-Works",  5600.40),
("2025-01-03", "Eric's Bikes",  "Cannondale Optimo",    5100.30),
("2025-01-03", "Eric's Bikes",  "Trek Madone",          4500.90),
("2025-01-03", "CNA Bikes",     "Norco Storm",          5200.75),
("2025-01-03", "CNA Bikes",     "Specialized S-Works",  5450.80),
("2025-01-03", "Canary Cycles", "Norco Storm",          5700.40),
("2025-01-03", "Canary Cycles", "Trek Madone",          4600.50),
("2025-01-04", "Eric's Bikes",  "Norco Storm",          4800.60),
("2025-01-04", "Eric's Bikes",  "Specialized S-Works",  5000.80),
("2025-01-04", "CNA Bikes",     "Trek Madone",          4800.25),
("2025-01-04", "CNA Bikes",     "Cannondale Optimo",    4700.10),
("2025-01-04", "Canary Cycles", "Norco Storm",          5400.85),
("2025-01-04", "Canary Cycles", "Specialized S-Works",  5100.60),
("2025-01-05", "Eric's Bikes",  "Cannondale Optimo",    5300.40),
("2025-01-05", "Eric's Bikes",  "Trek Madone",          4500.35),
("2025-01-05", "CNA Bikes",     "Norco Storm",          4700.50),
("2025-01-05", "CNA Bikes",     "Specialized S-Works",  5200.00),
("2025-01-05", "Canary Cycles", "Norco Storm",          5500.90),
("2025-01-05", "Canary Cycles", "Cannondale Optimo",    4950.10)
]
# Define schema
schema = StructType([
StructField("date", StringType(), True),
StructField("store", StringType(), True),
StructField("product", StringType(), True),
StructField("sales_amount", DoubleType(), True)
])
# Create DataFrame
df = spark.createDataFrame(data, schema)

In [0]:
df.show()

+----------+-------------+-------------------+------------+
|      date|        store|            product|sales_amount|
+----------+-------------+-------------------+------------+
|2025-01-01| Eric's Bikes|        Norco Storm|     4500.75|
|2025-01-01| Eric's Bikes|  Cannondale Optimo|      5200.5|
|2025-01-01|    CNA Bikes|Specialized S-Works|     4800.25|
|2025-01-01|    CNA Bikes|        Trek Madone|      4600.1|
|2025-01-01|Canary Cycles|        Norco Storm|     5100.95|
|2025-01-01|Canary Cycles|  Cannondale Optimo|      4750.6|
|2025-01-02| Eric's Bikes|        Norco Storm|      4450.0|
|2025-01-02| Eric's Bikes|Specialized S-Works|      5350.3|
|2025-01-02|    CNA Bikes|        Trek Madone|     4300.75|
|2025-01-02|    CNA Bikes|  Cannondale Optimo|      4900.2|
|2025-01-02|Canary Cycles|        Norco Storm|      5500.0|
|2025-01-02|Canary Cycles|Specialized S-Works|      5600.4|
|2025-01-03| Eric's Bikes|  Cannondale Optimo|      5100.3|
|2025-01-03| Eric's Bikes|        Trek M

In [0]:
ranking_window_by_sales = Window.partitionBy("store").orderBy(desc("sales_amount")).rowsBetween(Window.unboundedPreceding, Window.currentRow)
df.withColumn("rank", rank().over(ranking_window_by_sales)).show()

+----------+-------------+-------------------+------------+----+
|      date|        store|            product|sales_amount|rank|
+----------+-------------+-------------------+------------+----+
|2025-01-03|    CNA Bikes|Specialized S-Works|      5450.8|   1|
|2025-01-03|    CNA Bikes|        Norco Storm|     5200.75|   2|
|2025-01-05|    CNA Bikes|Specialized S-Works|      5200.0|   3|
|2025-01-02|    CNA Bikes|  Cannondale Optimo|      4900.2|   4|
|2025-01-01|    CNA Bikes|Specialized S-Works|     4800.25|   5|
|2025-01-04|    CNA Bikes|        Trek Madone|     4800.25|   5|
|2025-01-05|    CNA Bikes|        Norco Storm|      4700.5|   7|
|2025-01-04|    CNA Bikes|  Cannondale Optimo|      4700.1|   8|
|2025-01-01|    CNA Bikes|        Trek Madone|      4600.1|   9|
|2025-01-02|    CNA Bikes|        Trek Madone|     4300.75|  10|
|2025-01-03|Canary Cycles|        Norco Storm|      5700.4|   1|
|2025-01-02|Canary Cycles|Specialized S-Works|      5600.4|   2|
|2025-01-05|Canary Cycles

In [0]:
running_sales_window = Window.partitionBy("store").orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df.withColumn("culm_sum", sum(col("sales_amount")).over(running_sales_window)).show()

+----------+-------------+-------------------+------------+------------------+
|      date|        store|            product|sales_amount|          culm_sum|
+----------+-------------+-------------------+------------+------------------+
|2025-01-01|    CNA Bikes|Specialized S-Works|     4800.25|           4800.25|
|2025-01-01|    CNA Bikes|        Trek Madone|      4600.1|           9400.35|
|2025-01-02|    CNA Bikes|        Trek Madone|     4300.75|           13701.1|
|2025-01-02|    CNA Bikes|  Cannondale Optimo|      4900.2|           18601.3|
|2025-01-03|    CNA Bikes|        Norco Storm|     5200.75|          23802.05|
|2025-01-03|    CNA Bikes|Specialized S-Works|      5450.8|          29252.85|
|2025-01-04|    CNA Bikes|        Trek Madone|     4800.25|           34053.1|
|2025-01-04|    CNA Bikes|  Cannondale Optimo|      4700.1|           38753.2|
|2025-01-05|    CNA Bikes|        Norco Storm|      4700.5|           43453.7|
|2025-01-05|    CNA Bikes|Specialized S-Works|      

In [0]:
sales_last_3_days_window = Window.partitionBy("store").orderBy("date").rowsBetween(-2, 0)
df.withColumn("avg", avg("sales_amount").over(sales_last_3_days_window)).show()

+----------+-------------+-------------------+------------+-----------------+
|      date|        store|            product|sales_amount|              avg|
+----------+-------------+-------------------+------------+-----------------+
|2025-01-01|    CNA Bikes|Specialized S-Works|     4800.25|          4800.25|
|2025-01-01|    CNA Bikes|        Trek Madone|      4600.1|         4700.175|
|2025-01-02|    CNA Bikes|        Trek Madone|     4300.75|4567.033333333334|
|2025-01-02|    CNA Bikes|  Cannondale Optimo|      4900.2|4600.349999999999|
|2025-01-03|    CNA Bikes|        Norco Storm|     5200.75|4800.566666666667|
|2025-01-03|    CNA Bikes|Specialized S-Works|      5450.8|5183.916666666667|
|2025-01-04|    CNA Bikes|        Trek Madone|     4800.25|5150.599999999999|
|2025-01-04|    CNA Bikes|  Cannondale Optimo|      4700.1|4983.716666666666|
|2025-01-05|    CNA Bikes|        Norco Storm|      4700.5|4733.616666666667|
|2025-01-05|    CNA Bikes|Specialized S-Works|      5200.0|4866.