In [0]:
# imports
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, DoubleType, DateType, StringType


In [0]:
# read raw CSVs
orders = spark.read.option("header",True).option("inferSchema",True).csv("/Volumes/workspace/default/ecom-learning/orders.csv")
customers = spark.read.option("header",True).csv("/Volumes/workspace/default/ecom-learning/customers.csv")
products = spark.read.option("header",True).csv("/Volumes/workspace/default/ecom-learning/products.csv")

print("orders rows:", orders.count())
print("customers rows:", customers.count())
print("products rows:", products.count())

display(orders.limit(5))


orders rows: 5397
customers rows: 500
products rows: 200


order_id,customer_id,order_date,product_id,quantity,price,channel,order_status
1,88,2024-01-01,104,1,57.75,web,completed
2,239,2024-01-01,182,2,25.28,store,completed
3,180,2024-01-01,139,2,115.65,mobile,completed
4,412,2024-01-01,181,2,110.29,web,completed
5,35,2024-01-01,79,1,108.2,store,completed


In [0]:
# basic casting and cleaning
orders = orders.withColumn("order_date", F.to_date(F.col("order_date"), "yyyy-MM-dd")) \
               .withColumn("quantity", F.col("quantity").cast(IntegerType())) \
               .withColumn("price", F.col("price").cast(DoubleType()))

# remove exact duplicate order_id rows (if any)
orders = orders.dropDuplicates(["order_id"])

# sanity checks
print("orders schema:")
orders.printSchema()
display(orders.filter(F.col("order_status").isNull()).limit(5))


orders schema:
root
 |-- order_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)
 |-- channel: string (nullable = true)
 |-- order_status: string (nullable = true)



order_id,customer_id,order_date,product_id,quantity,price,channel,order_status


In [0]:
# Cell 4 - compute revenue and join
orders = orders.withColumn("revenue", F.col("quantity") * F.col("price"))

sales = orders.join(customers, on="customer_id", how="left") \
              .join(products, on="product_id", how="left")

display(sales.select("order_id","order_date","customer_id","product_id","quantity","price","revenue","product_name","category").limit(10))


order_id,order_date,customer_id,product_id,quantity,price,revenue,product_name,category
1021,2024-02-03,252,63,1,293.65,293.65,Product_63,Toys
2737,2024-04-02,228,147,1,22.25,22.25,Product_147,Beauty
2936,2024-04-09,478,135,1,12.12,12.12,Product_135,Electronics
3139,2024-04-16,6,180,1,71.31,71.31,Product_180,Electronics
3683,2024-05-03,52,28,1,26.49,26.49,Product_28,Electronics
4366,2024-05-27,162,116,3,137.68,413.04,Product_116,Home
4885,2024-06-13,261,192,1,24.17,24.17,Product_192,Home
5156,2024-06-22,383,92,1,133.41,133.41,Product_92,Home
94,2024-01-04,240,66,3,10.74,32.22,Product_66,Beauty
1721,2024-02-26,52,168,1,119.68,119.68,Product_168,Toys


In [0]:
# save as csv (curated)
curated_path = "/Volumes/workspace/default/ecom-learning/sales_curated.csv"

sales.toPandas().to_csv(curated_path, index=False)

# read it back to verify
import pandas as pd
sales_curated = pd.read_csv(curated_path)
sales_curated.head(10)

Unnamed: 0,product_id,customer_id,order_id,order_date,quantity,price,channel,order_status,revenue,name,signup_date,city,state,age_group,product_name,category,cost_price,list_price
0,104,88,1,2024-01-01,1,57.75,web,completed,57.75,Shelia Gregory,2025-07-15,Wardborough,Kansas,26-35,Product_104,Clothing,130.12,329.51
1,182,239,2,2024-01-01,2,25.28,store,completed,50.56,Teresa Harris,2023-03-07,North Kathy,Missouri,above 55,Product_182,Electronics,190.97,107.09
2,139,180,3,2024-01-01,2,115.65,mobile,completed,231.3,Michael Simpson,2024-11-23,Kellyton,New Jersey,26-35,Product_139,Clothing,128.26,140.13
3,181,412,4,2024-01-01,2,110.29,web,completed,220.58,Raymond Robinson,2025-03-27,Lake Johnfurt,New Jersey,above 55,Product_181,Clothing,155.53,268.44
4,79,35,5,2024-01-01,1,108.2,store,completed,108.2,Curtis Foster,2025-01-04,West Wandatown,Rhode Island,46-55,Product_79,Electronics,182.02,98.98
5,109,54,6,2024-01-01,1,62.77,mobile,completed,62.77,Jason Zuniga,2024-02-24,Port Amychester,Connecticut,46-55,Product_109,Toys,91.8,305.11
6,130,42,7,2024-01-01,1,13.99,web,completed,13.99,Amanda Johnson,2024-04-13,Butlerfurt,Ohio,above 55,Product_130,Electronics,151.5,374.59
7,37,467,8,2024-01-01,1,55.96,store,completed,55.96,Raymond Mclean,2023-06-08,West William,Washington,36-45,Product_37,Electronics,131.04,355.41
8,164,137,9,2024-01-01,3,144.3,mobile,completed,432.9,Gary Simmons,2025-02-05,West Victor,North Dakota,18-25,Product_164,Electronics,111.16,131.63
9,147,409,10,2024-01-01,3,65.3,mobile,completed,195.9,Alexandra Johnson,2024-01-06,East Stephenberg,Maine,26-35,Product_147,Beauty,75.85,209.34


In [0]:
# Register the sales DataFrame as a temporary view
sales.createOrReplaceTempView("ecommerce_sales")

# Now run SQL queries on it
spark.sql("""
SELECT date_trunc('month', order_date) AS month,
       SUM(revenue) AS total_revenue,
       COUNT(DISTINCT customer_id) AS unique_customers
FROM ecommerce_sales
WHERE order_status = 'completed'
GROUP BY month
ORDER BY month
""").show()


+-------------------+------------------+----------------+
|              month|     total_revenue|unique_customers|
+-------------------+------------------+----------------+
|2024-01-01 00:00:00|125315.79999999999|             413|
|2024-02-01 00:00:00|112860.05000000006|             396|
|2024-03-01 00:00:00|117627.79999999999|             392|
|2024-04-01 00:00:00|128004.28999999989|             410|
|2024-05-01 00:00:00|122371.32999999991|             406|
|2024-06-01 00:00:00|110685.64999999995|             404|
+-------------------+------------------+----------------+



In [0]:
# simulate an incremental batch (re-run after uploading new orders for real usage)
from delta.tables import DeltaTable

target_path = curated_path
if DeltaTable.isDeltaTable(spark, target_path):
    delta_table = DeltaTable.forPath(spark, target_path)
    updatesDF = sales # in real pipeline this is new batch data
    delta_table.alias("t").merge(
        updatesDF.alias("s"),
        "t.order_id = s.order_id"
    ).whenMatchedUpdateAll() \
     .whenNotMatchedInsertAll() \
     .execute()
else:
    sales.write.format("csv").mode("overwrite").save(target_path)


In [0]:
# validation checks
print("total rows curated:", spark.read.format("csv").load(curated_path).count())
print("duplicates by order_id:")
display(spark.sql("SELECT order_id, count(*) as c FROM ecommerce_sales GROUP BY order_id HAVING c>1"))
print("null customer ids:", spark.sql("SELECT count(*) FROM ecommerce_sales WHERE customer_id IS NULL").collect())


total rows curated: 5397
duplicates by order_id:


order_id,c


null customer ids: [Row(count(*)=0)]
