**1. Load full e-commerce dataset**


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

orders_data = [
    (1, 101, 1001, "2024-01-05", 1),
    (2, 102, 1002, "2024-01-07", 2),
    (3, 101, 1003, "2024-01-10", 1),
    (4, 103, 1001, "2024-01-12", 3),
    (5, 101, 1002, "2024-01-15", 2),
    (6, 102, 1003, "2024-01-20", 1)
]

orders = spark.createDataFrame(
    orders_data,
    ["order_id", "customer_id", "product_id", "order_date", "quantity"]
).withColumn("order_date", F.to_date("order_date"))

In [0]:
customers_data = [
    (101, "Alice", "USA"),
    (102, "Bob", "Canada"),
    (103, "Charlie", "USA")
]

customers = spark.createDataFrame(
    customers_data,
    ["customer_id", "customer_name", "country"]
)

In [0]:
products_data = [
    (1001, "Laptop", 1200),
    (1002, "Phone", 800),
    (1003, "Headphones", 200)
]

products = spark.createDataFrame(
    products_data,
    ["product_id", "product_name", "price"]
)

**Perform complex joins**

In [0]:
orders_joined = (
    orders
    .join(customers, "customer_id", "inner")
    .join(products, "product_id", "left")
)

In [0]:
orders_joined.show()


+----------+-----------+--------+----------+--------+-------------+-------+------------+-----+
|product_id|customer_id|order_id|order_date|quantity|customer_name|country|product_name|price|
+----------+-----------+--------+----------+--------+-------------+-------+------------+-----+
|      1001|        101|       1|2024-01-05|       1|        Alice|    USA|      Laptop| 1200|
|      1002|        102|       2|2024-01-07|       2|          Bob| Canada|       Phone|  800|
|      1003|        101|       3|2024-01-10|       1|        Alice|    USA|  Headphones|  200|
|      1001|        103|       4|2024-01-12|       3|      Charlie|    USA|      Laptop| 1200|
|      1002|        101|       5|2024-01-15|       2|        Alice|    USA|       Phone|  800|
|      1003|        102|       6|2024-01-20|       1|          Bob| Canada|  Headphones|  200|
+----------+-----------+--------+----------+--------+-------------+-------+------------+-----+



**Calculate Running Totals (Window Function)**

In [0]:
# Create order value
orders_enriched = orders_joined.withColumn(
    "order_value",
    F.col("quantity") * F.col("price")
)

In [0]:
#Running total per customer (by date)
window_spec = (
    Window
    .partitionBy("customer_id")
    .orderBy("order_date")
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)
)

orders_running = orders_enriched.withColumn(
    "running_total",
    F.sum("order_value").over(window_spec)
)

**Create Derived Features**

In [0]:
final_df = (
    orders_running
    .withColumn("high_value_order", F.when(F.col("order_value") > 500, 1).otherwise(0))
    .withColumn("order_year", F.year("order_date"))
    .withColumn("order_month", F.monthname("order_date"))
)

In [0]:
# View Final Output
final_df.select(
    "order_id",
    "customer_name",
    "order_year",
    "order_month",
    "product_name",
    "order_value",
    "running_total",
    "high_value_order"
).show()

+--------+-------------+----------+-----------+------------+-----------+-------------+----------------+
|order_id|customer_name|order_year|order_month|product_name|order_value|running_total|high_value_order|
+--------+-------------+----------+-----------+------------+-----------+-------------+----------------+
|       1|        Alice|      2024|        Jan|      Laptop|       1200|         1200|               1|
|       3|        Alice|      2024|        Jan|  Headphones|        200|         1400|               0|
|       5|        Alice|      2024|        Jan|       Phone|       1600|         3000|               1|
|       2|          Bob|      2024|        Jan|       Phone|       1600|         1600|               1|
|       6|          Bob|      2024|        Jan|  Headphones|        200|         1800|               0|
|       4|      Charlie|      2024|        Jan|      Laptop|       3600|         3600|               1|
+--------+-------------+----------+-----------+------------+----

## **Practice queries**

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

events=spark.read.csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv",header=True, inferSchema=True)
# Top 5 products by revenue
revenue = events.filter(F.col("event_type") == "purchase") \
    .groupBy("product_id") \
    .agg(F.sum("price").alias("revenue")) \
    .orderBy(F.desc("revenue")).limit(5)

revenue.show()

+----------+--------------------+
|product_id|             revenue|
+----------+--------------------+
|   1005115|1.2406807350000003E7|
|   1005105|1.0239248679999996E7|
|   1004249|   6730112.920000011|
|   1005135|   5567806.640000007|
|   1004767|   5430723.430000007|
+----------+--------------------+



In [0]:
# Running total per user
window = Window.partitionBy("user_id").orderBy("event_time")
events.withColumn("cumulative_events", F.count("*").over(window))


DataFrame[event_time: timestamp, event_type: string, product_id: int, category_id: bigint, category_code: string, brand: string, price: double, user_id: int, user_session: string, cumulative_events: bigint]

In [0]:
conversion = (
    events
    .groupBy("category_code")
    .pivot("event_type", ["view", "purchase"])
    .count()
    .withColumn(
        "conversion_rate",
        (F.col("purchase") / F.col("view")) * 100
    )
)

conversion.show()

+--------------------+-------+--------+-------------------+
|       category_code|   view|purchase|    conversion_rate|
+--------------------+-------+--------+-------------------+
|auto.accessories....|  12305|      46| 0.3738317757009346|
|furniture.living_...| 215471|    1084| 0.5030839416905292|
| stationery.cartrige|   7380|     134| 1.8157181571815717|
|       sport.bicycle| 128759|     838| 0.6508282916145668|
|        apparel.sock|   2621|      21| 0.8012209080503624|
|appliances.enviro...|   2172|      27| 1.2430939226519337|
|          kids.swing|  31596|     330|  1.044436004557539|
|electronics.audio...|  28394|     430| 1.5144044516447137|
|auto.accessories....|  42350|     494|  1.166469893742621|
|  electronics.clocks|1272783|   17906| 1.4068384005757462|
|electronics.audio...|  35409|     423| 1.1946115394391257|
|appliances.kitche...| 105149|     936| 0.8901653843593377|
|appliances.kitche...| 155551|    2382|  1.531330560394983|
|  electronics.tablet| 301992|    5603| 