<div align="center">
  <img src="asset/Day3.png" alt="Databricks 14 Days AI Challenge - Day 03" width="800"/>
</div>

## DAY 3 (11/01/26) â€“ PySpark Transformations Deep Dive

### ðŸ“š Learning Objectives
Today we bridge the gap between "Data Analysis" and "Data Engineering." We will move beyond simple filtering and explore how distributed systems handle complex relationships:
* **Joins:** Merging datasets without causing data skews.
* **Window Functions:** Calculating trends (like running totals) while maintaining row-level granularity.
* **UDFs:** When to use them (and when to avoid them) .

### ðŸš€ Strategy: The "Category Performance" Pipeline
Instead of running isolated commands, we will build a unified pipeline that answers business questions:
1.  **Ingest:** Combine October and November data into a full Q4 dataset.
2.  **Enrich (Join):** Calculate "Conversion Rates" per category and join these metrics back to the main event log to spot high-performing traffic.
3.  **Analyze (Window):** Track cumulative user spending to identify our "Whales" (high-value customers).
4.  **Refine (UDF):** Clean up the `category_code` column for better reporting.

In [0]:
# Load the dataframes again so they exist in this notebook's memory
df_oct = spark.read.csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv", header=True, inferSchema=True)
df_nov = spark.read.csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv", header=True, inferSchema=True)

In [0]:
from pyspark.sql.functions import col, lit, sum, count, when, desc, udf, avg
from pyspark.sql.window import Window
from pyspark.sql.types import StringType

# ---------------------------------------------------------
# STEP 1: LOAD & UNIFY
# Strategy: Union is preferred over simple concatenation because 
# it handles schema alignment safely using unionByName.
# ---------------------------------------------------------

# Add a 'source' column so we can track lineage after merging
df_full = df_oct.withColumn("month", lit("Oct")) \
                .unionByName(df_nov.withColumn("month", lit("Nov")))

print(f"âœ… Combined Q4 Dataset Loaded. Total Rows: {df_full.count():,}")

In [0]:
# ---------------------------------------------------------
# STEP 2: COMPLEX JOINS & AGGREGATION
# Goal: Calculate Conversion Rate (Purchase / View) per Category
# and join it back to the main dataset.
# ---------------------------------------------------------

# 1. Create a Pivot Table (Practice Task Adapted)
# We pivot 'event_type' to columns to get counts of views vs purchases in one row per category
category_stats = (
    df_full
    .filter(col("category_code").isNotNull())
    .groupBy("category_code")
    .pivot("event_type", ["view", "purchase"]) # Optimizing pivot by specifying values
    .count()
    .na.fill(0) # Replace nulls with 0 for accurate math
)

# 2. Calculate Derived Metric: Conversion Rate
category_stats = category_stats.withColumn(
    "conversion_rate_pct", 
    (col("purchase") / (col("view") + 1)) * 100  # +1 prevents division by zero
)

# 3. Join back to main data (Left Join)
# Why DataFrame API? It allows us to chain logic clearly and prevents SQL injection risks.
df_enriched = df_full.join(category_stats, on="category_code", how="left")

print("--- Category Performance Stats (Top 5 by Conversion) ---")
category_stats.orderBy(desc("conversion_rate_pct")).show(5)

In [0]:
# ---------------------------------------------------------
# STEP 3: WINDOW FUNCTIONS
# Goal: Calculate "Running Total Spend" per user.
# This allows us to see *when* a user became a high-value customer.
# ---------------------------------------------------------
# Define the Window: Partition by User, Order by Time
# "rowsBetween" ensures we sum from the start of history up to the current row
w_user_history = Window.partitionBy("user_id") \
                       .orderBy("event_time") \
                       .rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Apply Window Function
# We only sum price where event_type is 'purchase' (using 'when' inside the sum)
df_with_history = df_enriched.withColumn(
    "cumulative_spend", 
    sum(when(col("event_type") == "purchase", col("price")).otherwise(0)).over(w_user_history)
)

# ---------------------------------------------------------
# VISUALIZATION
# Let's verify the running total for a specific high-value user
# ---------------------------------------------------------
# Extract a sample user ID who has made purchases
sample_user = df_with_history.filter("cumulative_spend > 500").first()["user_id"]

display(
    df_with_history
    .filter(col("user_id") == sample_user)
    .filter(col("event_type") == "purchase")
    .select("event_time", "category_code", "price", "cumulative_spend")
    .orderBy("event_time")
)

In [0]:
# ---------------------------------------------------------
# STEP 4: UDFS (User Defined Functions)
# Requirement: Create derived features.
# Mentor Note: Prefer Native Spark functions (col, split, etc.) over UDFs 
# for performance. UDFs block the catalyst optimizer.
# ---------------------------------------------------------

# METHOD A: The Pythonic UDF Way (Good for complex, non-standard logic)
# Let's extract the "Sub-Category" (e.g., "smartphone" from "electronics.smartphone")
from pyspark.sql.functions import col, split, udf
from pyspark.sql.types import StringType
def extract_subcategory(code):
    if code and "." in code:
        return code.split(".")[-1]
    return "unknown"

# Register UDF
extract_subcat_udf = udf(extract_subcategory, StringType())

# METHOD B: The Optimized Spark Way (Better Performance)
# Use split() and getItem() natively
df_final = df_with_history.withColumn("subcategory_udf", extract_subcat_udf("category_code")) \
                          .withColumn("subcategory_native", split(col("category_code"), "\.").getItem(1))

print("--- Derived Features Verification ---")
df_final.select("category_code", "subcategory_udf", "cumulative_spend").show(5, truncate=False)

### ðŸ§  Key Learnings & Takeaways
* **DataFrame API vs SQL:** Used the API for the Pivot and Join operations because it allows for cleaner variable reuse (`category_stats` dataframe) compared to complex nested SQL subqueries.
* **Window Logic:** The `rowsBetween(Window.unboundedPreceding, Window.currentRow)` frame is crucial for running totals. Without it, Spark might default to the entire partition or a different range depending on the version.
* **UDF Performance:** We demonstrated that while UDFs provide flexibility (Python logic), native Spark functions (like `split`) are preferred for speed because they run directly on the JVM without serialization overhead.
* **Pivot Power:** Converting rows (`event_type`) to columns allowed us to calculate conversion rates in a single efficient pass.