# Retail Analytics (PySpark)

This notebook analyzes the retail dataset using PySpark to answer business questions.
Data is pre-processed by `prepare_data.py`.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, window, avg, countDistinct, lit, when, date_format, count, desc, row_number
from pyspark.sql.window import Window
import matplotlib.pyplot as plt
import seaborn as sns
import os

# Initialize Spark Session
spark = SparkSession.builder.appName("RetailAnalytics").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

# Set plot style
sns.set_theme(style="whitegrid")

## 1. Load Processed Data

In [None]:
PROCESSED_DIR = 'data/processed_transactions'
PROMO_PROCESSED_DIR = 'data/processed_promotions'

# Load Parquet data
df = spark.read.parquet(PROCESSED_DIR)
df_promotions = spark.read.parquet(PROMO_PROCESSED_DIR)

print("Total records:", df.count())
df.printSchema()
df.show(5)

## 2. Analysis: Weekly Purchase Data for Customers
Question: What are the purchase data for customers displayed weekly?

In [None]:
# Group by customer and week
# We use 'window' function on the date column for weekly grouping.
# '1 week' duration.

weekly_sales = df.groupBy(
    "customer_name",
    window(col("date"), "1 week")
).agg(
    _sum("price_paid").alias("total_spend")
)

# Extract start date from window for cleaner sorting/display
weekly_sales = weekly_sales.withColumn("week_start", col("window.start")) \
                           .drop("window")

# Sort
weekly_sales = weekly_sales.orderBy("customer_name", "week_start")

weekly_sales.show(10)

In [None]:
# Convert to Pandas for visualization
pdf_weekly_sales = weekly_sales.toPandas()

# Visualize for a few top customers
top_customers = pdf_weekly_sales.groupby('customer_name')['total_spend'].sum().nlargest(5).index

plt.figure(figsize=(12, 6))
sns.lineplot(data=pdf_weekly_sales[pdf_weekly_sales['customer_name'].isin(top_customers)], 
             x='week_start', y='total_spend', hue='customer_name', marker='o')
plt.title('Weekly Spend for Top 5 Customers')
plt.xlabel('Week')
plt.ylabel('Total Spend ($)')
plt.legend(title='Customer')
plt.show()

## 3. Classification: Fast, Medium, Slow Items & Stores
Group items and stores based on average weekly sales.
- **Fast**: Top 33%
- **Medium**: Middle 33%
- **Slow**: Bottom 33%

In [None]:
def classify_entity(df, entity_col, entity_name_col, metric_col='price_paid'):
    # 1. Calculate weekly sales per entity
    weekly_entity_sales = df.groupBy(
        entity_col,
        entity_name_col,
        window(col("date"), "1 week")
    ).agg(
        _sum(metric_col).alias("weekly_sales")
    )
    
    # 2. Calculate average weekly sales per entity
    avg_weekly_sales = weekly_entity_sales.groupBy(entity_col, entity_name_col) \
        .agg(avg("weekly_sales").alias("avg_weekly_sales"))
    
    # 3. Determine thresholds (33rd and 66th percentiles)
    # We use approxQuantile for efficiency
    quantiles = avg_weekly_sales.approxQuantile("avg_weekly_sales", [0.33, 0.66], 0.01)
    low_threshold = quantiles[0]
    high_threshold = quantiles[1]
    
    print(f"Classification Thresholds for {entity_name_col}:")
    print(f"  Slow < {low_threshold:.2f}")
    print(f"  {low_threshold:.2f} <= Medium <= {high_threshold:.2f}")
    print(f"  Fast > {high_threshold:.2f}")
    
    # 4. Classify
    classified_df = avg_weekly_sales.withColumn(
        "classification",
        when(col("avg_weekly_sales") > high_threshold, "Fast")
        .when(col("avg_weekly_sales") < low_threshold, "Slow")
        .otherwise("Medium")
    )
    
    return classified_df.orderBy(col("avg_weekly_sales").desc())

# Classify Products
print("--- Product Classification ---")
classified_products = classify_entity(df, "product_id", "product_name")
classified_products.show(10)

# Classify Shops
print("\n--- Shop Classification ---")
classified_shops = classify_entity(df, "shop_id", "shop_name")
classified_shops.show(10)

## 4. Promotion Impact Analysis
Analyze the impact of promotions on sales.

**Questions:**
1.  Which items experienced the biggest sale increase during promotions?
2.  Are there stores that have higher promotion reaction?
3.  Is there any significant difference between promotion impacts of the Fast versus Slow items?
4.  Is there any significant difference between promotion impacts of the Fast versus Slow stores?

In [None]:
# 1. Flag Transactions as Promo or Regular
# Join transactions with promotions based on product_id and date range

df_with_promo = df.alias("t").join(
    df_promotions.alias("p"),
    (col("t.product_id") == col("p.product_id")) &
    (col("t.date") >= col("p.start_date")) &
    (col("t.date") <= col("p.end_date")),
    "left"
).select(
    col("t.*"),
    when(col("p.promotion_id").isNotNull(), 1).otherwise(0).alias("is_promo")
)

df_with_promo.show(5)

In [None]:
# Helper function to calculate lift
def calculate_lift(df_input, group_cols):
    # Calculate average daily sales for Promo vs Regular periods
    # Note: This is a simplified view. Ideally we'd normalize by number of days.
    # Here we'll take average sales per transaction as a proxy for "impact" or total sales if we assume equal time.
    # Better metric: Avg Daily Sales. 
    
    # Let's calculate Total Sales and Count of Days (or Transactions) for Promo vs Regular
    stats = df_input.groupBy(group_cols + ["is_promo"]).agg(
        _sum("price_paid").alias("total_sales"),
        countDistinct("date").alias("days_active") # Approximate days with sales
    )
    
    # Pivot to get columns: sales_0 (Regular), sales_1 (Promo)
    # We need to handle cases where an item might not have promo sales.
    
    # Separate Promo and Regular
    regular = stats.filter(col("is_promo") == 0).withColumnRenamed("total_sales", "reg_sales").withColumnRenamed("days_active", "reg_days")
    promo = stats.filter(col("is_promo") == 1).withColumnRenamed("total_sales", "promo_sales").withColumnRenamed("days_active", "promo_days")
    
    joined = regular.join(promo, group_cols, "left")
    
    # Calculate Avg Daily Sales
    joined = joined.withColumn("avg_daily_reg", col("reg_sales") / col("reg_days")) \
                   .withColumn("avg_daily_promo", col("promo_sales") / col("promo_days"))
    
    # Calculate Lift: (Promo - Regular) / Regular
    joined = joined.withColumn("lift", (col("avg_daily_promo") - col("avg_daily_reg")) / col("avg_daily_reg"))
    
    return joined.orderBy(col("lift").desc())

# Q1: Which items experienced the biggest sale increase during promotions?
print("--- Q1: Item Sales Lift ---")
item_lift = calculate_lift(df_with_promo, ["product_id", "product_name"])
item_lift.select("product_name", "avg_daily_reg", "avg_daily_promo", "lift").show(10)

# Q2: Are there stores that have higher promotion reaction?
print("\n--- Q2: Store Sales Lift ---")
store_lift = calculate_lift(df_with_promo, ["shop_id", "shop_name"])
store_lift.select("shop_name", "avg_daily_reg", "avg_daily_promo", "lift").show(10)

In [None]:
# Q3: Is there any significant difference between promotion impacts of the Fast versus Slow items?
print("\n--- Q3: Fast vs Slow Items Impact ---")
# Join item lift with classification
item_impact = item_lift.join(classified_products.select("product_id", "classification"), "product_id")

item_impact_summary = item_impact.groupBy("classification").agg(
    avg("lift").alias("avg_lift")
).orderBy("avg_lift")

item_impact_summary.show()

# Q4: Is there any significant difference between promotion impacts of the Fast versus Slow stores?
print("\n--- Q4: Fast vs Slow Stores Impact ---")
# Join store lift with classification
store_impact = store_lift.join(classified_shops.select("shop_id", "classification"), "shop_id")

store_impact_summary = store_impact.groupBy("classification").agg(
    avg("lift").alias("avg_lift")
).orderBy("avg_lift")

store_impact_summary.show()

## 5. Advanced Insights & Trends
Additional insights into returns, customers, geography, and timing.

**Questions:**
5.  Which product categories have the highest return rate?
6.  Who are the top customers by frequency and monetary value?
7.  What is the top-selling product category for each store?
8.  What is the busiest day of the week?

In [None]:
# Q5: Return Rate Analysis by Category
print("--- Q5: Return Rate by Category ---")

# Return is defined as price_paid < 0
category_stats = df.groupBy("category").agg(
    count("transaction_id").alias("total_txns"),
    _sum(when(col("price_paid") < 0, 1).otherwise(0)).alias("return_count")
)

category_returns = category_stats.withColumn(
    "return_rate", col("return_count") / col("total_txns")
).orderBy(col("return_rate").desc())

category_returns.show()

# Visualize
pdf_returns = category_returns.toPandas()
plt.figure(figsize=(10, 5))
sns.barplot(data=pdf_returns, x='category', y='return_rate')
plt.title('Return Rate by Product Category')
plt.ylabel('Return Rate')
plt.show()

In [None]:
# Q6: Customer Segmentation (Frequency & Monetary)
print("--- Q6: Top Customers (Frequency & Monetary) ---")

customer_metrics = df.groupBy("customer_name").agg(
    countDistinct("transaction_id").alias("frequency"),
    _sum("price_paid").alias("monetary")
).orderBy(col("monetary").desc())

customer_metrics.show(10)

# Scatter plot
pdf_customers = customer_metrics.toPandas()
plt.figure(figsize=(10, 6))
sns.scatterplot(data=pdf_customers, x='frequency', y='monetary', alpha=0.6)
plt.title('Customer Segments: Frequency vs Monetary')
plt.xlabel('Frequency (Number of Visits)')
plt.ylabel('Monetary (Total Spend)')
plt.show()

In [None]:
# Q7: Geographic Preferences (Top Category per Store)
print("--- Q7: Top Category per Store ---")

store_category_sales = df.groupBy("shop_name", "category").agg(
    _sum("price_paid").alias("total_sales")
)

window_spec = Window.partitionBy("shop_name").orderBy(col("total_sales").desc())

top_categories = store_category_sales.withColumn(
    "rank", row_number().over(window_spec)
).filter(col("rank") == 1).drop("rank")

top_categories.orderBy("shop_name").show()

In [None]:
# Q8: Peak Trading Times (Busiest Day of Week)
print("--- Q8: Busiest Day of Week ---")

# Extract day of week (E format gives Mon, Tue, etc.)
df_with_day = df.withColumn("day_of_week", date_format(col("date"), "E"))

daily_volume = df_with_day.groupBy("day_of_week").agg(
    count("transaction_id").alias("txn_count")
).orderBy(col("txn_count").desc())

daily_volume.show()

# Visualize
pdf_daily = daily_volume.toPandas()
# Sort by day order for plotting if needed, but simple bar chart works
plt.figure(figsize=(10, 5))
sns.barplot(data=pdf_daily, x='day_of_week', y='txn_count', order=['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'])
plt.title('Transaction Volume by Day of Week')
plt.ylabel('Number of Transactions')
plt.show()