In [0]:
from pyspark.sql.functions import col, count

mpg_df = spark.read.csv("/FileStore/mpg___Copy.csv", header=True, inferSchema=True)

# Filter to only include ford and dodge 
filtered_mpg_df = mpg_df.filter(col("manufacturer").isin(["ford", "dodge"]))

# Perform Rollup on manufacturer and cylinder
rollup_df = (
    filtered_mpg_df.rollup("manufacturer", "cyl").agg(count("*").alias("total_vehicles")).orderBy(col("manufacturer").asc_nulls_last(), col("cyl").asc_nulls_last())
)

# Perform Cube on manufacturer and cylinder
cube_df = (
    filtered_mpg_df.cube("manufacturer", "cyl").agg(count("*").alias("total_vehicles")).orderBy(col("manufacturer").asc_nulls_last(), col("cyl").asc_nulls_last())
)

rollup_df.show()
cube_df.show()

+------------+----+--------------+
|manufacturer| cyl|total_vehicles|
+------------+----+--------------+
|       dodge|   4|             1|
|       dodge|   6|            15|
|       dodge|   8|            21|
|       dodge|null|            37|
|        ford|   6|            10|
|        ford|   8|            15|
|        ford|null|            25|
|        null|null|            62|
+------------+----+--------------+

+------------+----+--------------+
|manufacturer| cyl|total_vehicles|
+------------+----+--------------+
|       dodge|   4|             1|
|       dodge|   6|            15|
|       dodge|   8|            21|
|       dodge|null|            37|
|        ford|   6|            10|
|        ford|   8|            15|
|        ford|null|            25|
|        null|   4|             1|
|        null|   6|            25|
|        null|   8|            36|
|        null|null|            62|
+------------+----+--------------+



In [0]:
from pyspark.sql.functions import avg
from pyspark.sql.functions import col, avg, min, max
from pyspark.sql.window import Window

aapl_df = spark.read.csv("/FileStore/aapl_2017___Copy.csv", header=True, inferSchema=True)

aapl_df = aapl_df.withColumn("date", col("date").cast("date"))

# Define a sliding window function
window_spec = Window.orderBy("date").rowsBetween(-6, 0)

# Average closing price within the window
aapl_avg_df = aapl_df.withColumn("monthly_avg", avg("close").over(window_spec))

# Start and end dates for each window
aapl_avg_df = aapl_avg_df.withColumn("window_start", min("date").over(window_spec))
aapl_avg_df = aapl_avg_df.withColumn("window_end", max("date").over(window_spec))

# Select relevant columns
result_df = aapl_avg_df.select("window_start", "window_end", "monthly_avg").distinct().orderBy("window_start")

print("Monthly Average Closing Prices with Start and End Dates:")
result_df.show()

Monthly Average Closing Prices with Start and End Dates:
+------------+----------+------------------+
|window_start|window_end|       monthly_avg|
+------------+----------+------------------+
|  2017-01-03|2017-01-03|        116.150002|
|  2017-01-03|2017-01-04|116.08499950000001|
|  2017-01-03|2017-01-05|            116.26|
|  2017-01-03|2017-01-06|116.67250100000001|
|  2017-01-03|2017-01-09|117.13600040000001|
|  2017-01-03|2017-01-10|117.46500050000002|
|  2017-01-03|2017-01-11|117.79142900000001|
|  2017-01-04|2017-01-12|118.23428585714285|
|  2017-01-05|2017-01-13|118.66571499999999|
|  2017-01-06|2017-01-17|119.15000057142856|
|  2017-01-09|2017-01-18|119.44714257142857|
|  2017-01-10|2017-01-19|119.55999985714287|
|  2017-01-11|2017-01-20|119.68714257142857|
|  2017-01-12|2017-01-23|119.73428571428573|
|  2017-01-13|2017-01-24|119.83714300000001|
|  2017-01-17|2017-01-25|120.24285671428572|
|  2017-01-18|2017-01-26|120.51999985714285|
|  2017-01-19|2017-01-27|120.79999971428573

In [0]:
from pyspark.sql.functions import rank
from pyspark.sql.window import Window

data = [
    ("2025-01-01", "Eric's Bikes", "Norco Storm", 4500.75),
    ("2025-01-01", "Eric's Bikes", "Cannondale Optimo", 5200.50),
    ("2025-01-01", "CNA Bikes", "Specialized S-Works", 4800.25),
    ("2025-01-01", "CNA Bikes", "Trek Madone", 4600.10),
    ("2025-01-01", "Canary Cycles", "Norco Storm", 5100.95),
    ("2025-01-01", "Canary Cycles", "Cannondale Optimo", 4750.60),
]

from pyspark.sql.types import StructType, StructField, StringType, DoubleType

schema = StructType([
    StructField("date", StringType(), True),
    StructField("store", StringType(), True),
    StructField("product", StringType(), True),
    StructField("sales_amount", DoubleType(), True)
])

df = spark.createDataFrame(data, schema)

window_spec = Window.partitionBy("store").orderBy(col("sales_amount").desc())

# Rank sales within each store
df_ranked = df.withColumn("rank", rank().over(window_spec))

print("Sales Ranking within each store:")
df_ranked.show()

Sales Ranking within each store:
+----------+-------------+-------------------+------------+----+
|      date|        store|            product|sales_amount|rank|
+----------+-------------+-------------------+------------+----+
|2025-01-01|    CNA Bikes|Specialized S-Works|     4800.25|   1|
|2025-01-01|    CNA Bikes|        Trek Madone|      4600.1|   2|
|2025-01-01|Canary Cycles|        Norco Storm|     5100.95|   1|
|2025-01-01|Canary Cycles|  Cannondale Optimo|      4750.6|   2|
|2025-01-01| Eric's Bikes|  Cannondale Optimo|      5200.5|   1|
|2025-01-01| Eric's Bikes|        Norco Storm|     4500.75|   2|
+----------+-------------+-------------------+------------+----+



In [0]:
from pyspark.sql.functions import sum

window_spec_running_total = Window.partitionBy("store").orderBy("date").rowsBetween(Window.unboundedPreceding, 0)

df_running_total = df.withColumn("running_total", sum("sales_amount").over(window_spec_running_total))

print("Running Total of Sales Per Store:")
df_running_total.show()

Running Total of Sales Per Store:
+----------+-------------+-------------------+------------+-------------+
|      date|        store|            product|sales_amount|running_total|
+----------+-------------+-------------------+------------+-------------+
|2025-01-01|    CNA Bikes|Specialized S-Works|     4800.25|      4800.25|
|2025-01-01|    CNA Bikes|        Trek Madone|      4600.1|      9400.35|
|2025-01-01|Canary Cycles|        Norco Storm|     5100.95|      5100.95|
|2025-01-01|Canary Cycles|  Cannondale Optimo|      4750.6|      9851.55|
|2025-01-01| Eric's Bikes|        Norco Storm|     4500.75|      4500.75|
|2025-01-01| Eric's Bikes|  Cannondale Optimo|      5200.5|      9701.25|
+----------+-------------+-------------------+------------+-------------+



In [0]:
window_spec_3_day_avg = Window.partitionBy("store").orderBy("date").rowsBetween(-2, 0)

df_avg_sales = df.withColumn("3_day_avg_sales", avg("sales_amount").over(window_spec_3_day_avg))

print("3 Day Average Sales Per Store:")
df_avg_sales.show()




3 Day Average Sales Per Store:
+----------+-------------+-------------------+------------+---------------+
|      date|        store|            product|sales_amount|3_day_avg_sales|
+----------+-------------+-------------------+------------+---------------+
|2025-01-01|    CNA Bikes|Specialized S-Works|     4800.25|        4800.25|
|2025-01-01|    CNA Bikes|        Trek Madone|      4600.1|       4700.175|
|2025-01-01|Canary Cycles|        Norco Storm|     5100.95|        5100.95|
|2025-01-01|Canary Cycles|  Cannondale Optimo|      4750.6|       4925.775|
|2025-01-01| Eric's Bikes|        Norco Storm|     4500.75|        4500.75|
|2025-01-01| Eric's Bikes|  Cannondale Optimo|      5200.5|       4850.625|
+----------+-------------+-------------------+------------+---------------+

