In [0]:
from pyspark.sql import functions as F
from delta.tables import *


In [0]:
# Base path where our source data lives (from Day 3/4)
source_path = "/Volumes/workspace/ecommerce/ecommerce_data/processed_data/oct_2019"

**TASK 1: Design 3-Layer Architecture**

- We define physical paths for Bronze, Silver, and Gold layers.
- This structure separates raw ingestion from refined business data.

In [0]:
# Define new paths for our Medallion Layers
base_medallion = "/Volumes/workspace/ecommerce/ecommerce_data/medallion"
path_bronze = f"{base_medallion}/bronze"
path_silver = f"{base_medallion}/silver"
path_gold =   f"{base_medallion}/gold"

print(f"Pipeline Paths configured:\n  Bronze: {path_bronze}\n  Silver: {path_silver}\n  Gold:   {path_gold}")

Pipeline Paths configured:
  Bronze: /Volumes/workspace/ecommerce/ecommerce_data/medallion/bronze
  Silver: /Volumes/workspace/ecommerce/ecommerce_data/medallion/silver
  Gold:   /Volumes/workspace/ecommerce/ecommerce_data/medallion/gold


**TASK 2: Build BRONZE Layer (Raw Ingestion)**

- **GOAL**: Ingest raw data "as-is". Do not clean it yet. Just capture history.
- We add metadata columns like 'ingestion_time' to track lineage.

In [0]:
print("\n Building BRONZE Layer...")

# Read Raw Data (Simulating reading from an external landing zone)
raw_df = spark.read.parquet(source_path)

# Add Metadata and Write to Bronze
# We interpret schemas strictly but keep data raw.
bronze_df = raw_df.withColumn("ingestion_ts", F.current_timestamp()) \
                  .withColumn("source_file", F.col("_metadata.file_path"))

bronze_df.write \
    .format("delta") \
    .mode("overwrite") \
    .save(path_bronze)

print(f"Bronze Layer Created: {bronze_df.count():,} rows ingested.")


 Building BRONZE Layer...
Bronze Layer Created: 42,448,764 rows ingested.


**TASK 3: Build SILVER Layer (Cleaning & Validation)**

- **GOAL**: Filter bad data, deduplicate, and add useful derived columns.
- This becomes the "Enterprise Truth" table.

In [0]:
print("\n Building SILVER Layer...")

# Read from Bronze
bronze_read = spark.read.format("delta").load(path_bronze)

# Apply Transformations
silver_df = bronze_read \
    .filter(F.col("price") > 0) \
    .filter(F.col("price") < 100000) \
    .dropDuplicates(["user_session", "event_time", "product_id"]) \
    .withColumn("event_date", F.to_date("event_time")) \
    .withColumn("price_tier",
        F.when(F.col("price") < 50, "budget")
         .when(F.col("price") < 200, "mid-range")
         .otherwise("premium")
    )

# Write to Silver
silver_df.write \
    .format("delta") \
    .mode("overwrite") \
    .save(path_silver)

print(f"Silver Layer Created: {silver_df.count():,} rows cleaned.")


 Building SILVER Layer...
Silver Layer Created: 42,344,170 rows cleaned.


**TASK 4: Build GOLD Layer (Business Aggregates) 🥇**

- **GOAL**: Create specific tables for Reporting/Dashboards.
- Optimized for read performance, not write performance.

In [0]:
print("\n Building GOLD Layer...")

# Read from Silver
silver_read = spark.read.format("delta").load(path_silver)

# Create "Product Performance" Aggregate
# We calculate Views vs Purchases to get Conversion Rate
# Create "Product Performance" Aggregate
product_perf = silver_read.groupBy("product_id", "category_code", "brand") \
    .agg(
        F.countDistinct(F.when(F.col("event_type")=="view", F.col("user_session"))).alias("total_views"),
        F.countDistinct(F.when(F.col("event_type")=="cart", F.col("user_session"))).alias("total_carts"),
        F.countDistinct(F.when(F.col("event_type")=="purchase", F.col("user_session"))).alias("total_purchases"),
        F.sum(F.when(F.col("event_type")=="purchase", F.col("price"))).alias("total_revenue")
      ) 
    
product_perf = product_perf.withColumn("conversion_rate", 
        F.when(F.col("total_views") == 0, 0.0) \
         .otherwise(F.round((F.col("total_purchases") / F.col("total_views")) * 100, 2))
    ).fillna(0) # Replace nulls with 0

# Write to Gold
product_perf.write \
    .format("delta") \
    .mode("overwrite") \
    .save(path_gold)

print("Gold Layer Created: Product Performance Table.")

# Verify the Gold Table
print("\n Top 5 Performing Products:")
display(product_perf.orderBy(F.col("total_revenue").desc()).limit(5))


 Building GOLD Layer...
Gold Layer Created: Product Performance Table.

 Top 5 Performing Products:


product_id,category_code,brand,total_views,total_carts,total_purchases,total_revenue,conversion_rate
1005115,electronics.smartphone,apple,234359,11063,11626,12404835.949999962,4.96
1005105,electronics.smartphone,apple,148054,7414,6716,10239248.68,4.54
1004249,electronics.smartphone,apple,144639,10345,8323,6728639.859999994,5.75
1005135,electronics.smartphone,apple,76294,3720,2929,5567806.6400000015,3.84
1004767,electronics.smartphone,samsung,238486,24673,19842,5430222.719999989,8.32
