In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, hour, date_format, concat_ws, to_timestamp, count, row_number
)
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("MidtermQ2").getOrCreate()

In [3]:
customers_df = spark.read.json("customers.json")
orders_df = spark.read.json("orders.json")
order_items_df = spark.read.json("order_items.json")


                                                                                

In [4]:
customers_df.printSchema()
customers_df.show(truncate=False)

orders_df.printSchema()
orders_df.show(truncate=False)

order_items_df.printSchema()
order_items_df.show(truncate=False)

root
 |-- city: string (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- email: string (nullable = true)
 |-- name: string (nullable = true)
 |-- registration_date: string (nullable = true)
 |-- tier: string (nullable = true)

+------------+-----------+---------------------+----------------+-----------------+------+
|city        |customer_id|email                |name            |registration_date|tier  |
+------------+-----------+---------------------+----------------+-----------------+------+
|New York    |1          |john@example.com     |John Smith      |2022-01-15       |Gold  |
|Los Angeles |2          |mary@example.com     |Mary Johnson    |2022-03-20       |Silver|
|Chicago     |3          |robert@example.com   |Robert Brown    |2023-02-10       |Bronze|
|Houston     |4          |linda@example.com    |Linda Davis     |2021-11-05       |Gold  |
|Phoenix     |5          |michael@example.com  |Michael Wilson  |2022-07-22       |Silver|
|Philadelphia|6          |eliza

In [12]:
# 2.1 Customer Spending Analysis
# cancel cancelled
valid_orders_df = orders_df.filter(orders_df.status != "Cancelled")


In [13]:
from pyspark.sql.functions import col
valid_orders_items_df = valid_orders_df.join(order_items_df, "order_id", "inner")

full_df = valid_orders_items_df.join(customers_df, "customer_id", "inner")

full_df.show(truncate=False)


+-----------+--------+--------------------------+----------+--------------+---------------+----------+-----------------+--------+----------+-----------+--------------------+---------------+-----------------+------+
|customer_id|order_id|categories                |order_date|payment_method|shipping_method|status    |product_name     |quantity|unit_price|city       |email               |name           |registration_date|tier  |
+-----------+--------+--------------------------+----------+--------------+---------------+----------+-----------------+--------+----------+-----------+--------------------+---------------+-----------------+------+
|1          |101     |[electronics, home]       |2023-09-01|Credit Card   |Standard       |Completed |Desk Lamp        |2       |49.99     |New York   |john@example.com    |John Smith     |2022-01-15       |Gold  |
|1          |101     |[electronics, home]       |2023-09-01|Credit Card   |Standard       |Completed |iPhone 13        |1       |999.99    |

In [14]:
from pyspark.sql.functions import sum as _sum
full_df = full_df.withColumn("item_total", col("quantity") * col("unit_price"))

customer_spending_df = full_df.groupBy("customer_id") \
    .agg(_sum("item_total").alias("total_spent"))

In [15]:
from pyspark.sql.functions import desc
top_spenders_df = customer_spending_df.orderBy(desc("total_spent"))
top_spenders_df.show(truncate=False)
# result: customer_id 1 spend the most money overall

+-----------+-----------------+
|customer_id|total_spent      |
+-----------+-----------------+
|1          |1488.92          |
|3          |679.95           |
|2          |577.9100000000001|
|7          |369.98           |
|10         |345.98           |
|8          |209.97           |
|4          |149.97           |
|5          |62.95            |
+-----------+-----------------+



In [16]:
#2.2 Category Preference Analysis
#Compute the most popular product categories for each customer tier (Gold, Silver, Bronze)
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, ArrayType

exploded_df = full_df.withColumn("category", F.explode(col("categories")))

#count
tier_category_count_df = exploded_df.groupBy("tier", "category") \
    .agg(F.count("*").alias("purchase_count"))

tier_category_count_df.orderBy("tier", F.desc("purchase_count")).show(truncate=False)


+------+-----------+--------------+
|tier  |category   |purchase_count|
+------+-----------+--------------+
|Bronze|books      |2             |
|Bronze|electronics|2             |
|Bronze|home       |2             |
|Bronze|stationery |2             |
|Gold  |electronics|5             |
|Gold  |home       |4             |
|Gold  |sports     |4             |
|Gold  |garden     |2             |
|Gold  |clothing   |2             |
|Gold  |outdoors   |2             |
|Gold  |books      |2             |
|Silver|accessories|4             |
|Silver|clothing   |2             |
|Silver|health     |2             |
|Silver|beauty     |2             |
|Silver|electronics|2             |
|Silver|books      |2             |
|Silver|stationery |2             |
+------+-----------+--------------+



In [20]:
from pyspark.sql.window import Window

windowSpec = Window.partitionBy("tier").orderBy(F.desc("purchase_count"))
ranked_df = tier_category_count_df.withColumn("rank", F.rank().over(windowSpec))
top_cats_each_tier_df = ranked_df.filter(ranked_df.rank == 1)

top_cats_each_tier_df.show(truncate=False)
#the most popular product categories：
#bronze: books, electronics, home, stationery
#gold: electronics
#silver: accessories

+------+-----------+--------------+----+
|tier  |category   |purchase_count|rank|
+------+-----------+--------------+----+
|Bronze|books      |2             |1   |
|Bronze|electronics|2             |1   |
|Bronze|home       |2             |1   |
|Bronze|stationery |2             |1   |
|Gold  |electronics|5             |1   |
|Silver|accessories|4             |1   |
+------+-----------+--------------+----+



In [21]:
#2.3 Price Range Preferences
#Show how different customer tiers distribute their spending across budget, 
#mid-range, and premium products, i.e. whether Gold tier customers buy more premium products than Silver or Bronze customers

price_range_df = full_df.withColumn(
    "price_range",
    F.when(col("unit_price") < 50, "Budget")
     .when(col("unit_price") < 200, "Mid-range")
     .otherwise("Premium")
)

In [22]:
range_pref_df = price_range_df.groupBy("tier", "price_range") \
    .agg(F.sum("item_total").alias("spend_amount"),
         F.count("*").alias("item_count"))  

range_pref_df.show()

+------+-----------+------------------+----------+
|  tier|price_range|      spend_amount|item_count|
+------+-----------+------------------+----------+
|Bronze|    Premium|            399.99|         1|
|Bronze|  Mid-range|            189.99|         1|
|  Gold|     Budget|            214.93|         4|
|  Gold|    Premium|           1549.97|         3|
|  Gold|  Mid-range|            589.95|         4|
|Silver|     Budget|220.87999999999997|         5|
|Bronze|     Budget|             89.97|         2|
|Silver|  Mid-range|            629.95|         3|
+------+-----------+------------------+----------+



In [24]:
#Result (2.3 Price Range Preferences)
#Bronze: Small purchases in all ranges; one Premium purchase (399.99).
#Gold: Highest Premium spending (1549.97), also some Mid-range and Budget.
#Silver: No Premium, moderate Mid-range (629.95) and Budget (220.88).
#This indicates Gold leans toward high-end items, Silver focuses on mid- and lower-priced goods,
#and Bronze spreads minimal spending across all ranges.

#whether Gold tier customers buy more premium products than Silver or Bronze customers？
#yes. Based on the results, Gold tier customers clearly spend more on Premium products. 
#They recorded a premium spend of 1549.97, whereas Bronze customers only spent 399.99 and Silver customers did not record any premium spending.
#This strongly suggests that Gold tier customers buy more premium products than Silver or Bronze tiers.

In [30]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
customer_spending_df = full_df.groupBy("customer_id").agg(F.sum("item_total").alias("total_spent"))
spending_with_tier_df = customer_spending_df.join(customers_df.select("customer_id", "tier"), on="customer_id", how="left")
windowSpec = Window.partitionBy("tier").orderBy(F.desc("total_spent"))
ranked_df = spending_with_tier_df.withColumn("rank_in_tier", F.rank().over(windowSpec))
top2_each_tier_df = ranked_df.filter(F.col("rank_in_tier") <= 2)
top2_each_tier_df.show(truncate=False)


+-----------+-----------------+------+------------+
|customer_id|total_spent      |tier  |rank_in_tier|
+-----------+-----------------+------+------------+
|3          |679.95           |Bronze|1           |
|1          |1488.92          |Gold  |1           |
|7          |369.98           |Gold  |2           |
|2          |577.9100000000001|Silver|1           |
|8          |209.97           |Silver|2           |
+-----------+-----------------+------+------------+



In [31]:
#Bronze: Only one customer (customer_id 3 with 679.95) appears, which is acceptable if there’s only one Bronze customer in the dataset.
#Gold: Customer_id 1 (1488.92) ranks first and customer_id 7 (369.98) ranks second.
#Silver: Customer_id 2 (577.91) is ranked first and customer_id 8 (209.97) is ranked second.