####ETL Pipeline – Product & Customer Data

####1. Setup

In [0]:
storage_account = "hexdatastoragegen2"
storage_key = "3yXeqbl+vxjfPvR/0TDxvMsa7Q56vlBDcdyXjput7irnJ4TK6rHd/ETsfIBlCqO68tdiqKvAvf1n+AStOdiylw=="

# Configure for ADLS Gen2 (ABFS)
spark.conf.set(f"fs.azure.account.key.{storage_account}.dfs.core.windows.net", storage_key)

# Define paths
raw_path = "abfss://raw@hexdatastoragegen2.dfs.core.windows.net/retail_sales_dataset.csv"
curated_path = "abfss://synapse@hexdatastoragegen2.dfs.core.windows.net/curated"

####2. Extract – Load Raw Data

In [0]:
df = spark.read.option("header", "true").csv(raw_path)

df.show(5)

+--------------+----------+-----------+------+---+----------------+--------+--------------+------------+
|Transaction ID|      Date|Customer ID|Gender|Age|Product Category|Quantity|Price per Unit|Total Amount|
+--------------+----------+-----------+------+---+----------------+--------+--------------+------------+
|             1|2023-11-24|    CUST001|  Male| 34|          Beauty|       3|            50|         150|
|             2|2023-02-27|    CUST002|Female| 26|        Clothing|       2|           500|        1000|
|             3|2023-01-13|    CUST003|  Male| 50|     Electronics|       1|            30|          30|
|             4|2023-05-21|    CUST004|  Male| 37|        Clothing|       1|           500|         500|
|             5|2023-05-06|    CUST005|  Male| 30|          Beauty|       2|            50|         100|
+--------------+----------+-----------+------+---+----------------+--------+--------------+------------+
only showing top 5 rows


In [0]:
df.printSchema()

root
 |-- Transaction ID: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Product Category: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Price per Unit: string (nullable = true)
 |-- Total Amount: string (nullable = true)



Rename columns first

In [0]:
# Clean column names (replace spaces with underscores)
df = df.toDF(*[c.replace(" ", "_") for c in df.columns])

df.printSchema()

root
 |-- Transaction_ID: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Customer_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Product_Category: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Price_per_Unit: string (nullable = true)
 |-- Total_Amount: string (nullable = true)



In [0]:
from pyspark.sql.functions import col

df = df.select(
    col("Transaction_ID").cast("int"),
    col("Date").cast("date"),
    col("Customer_ID").cast("string"),
    col("Gender").cast("string"),
    col("Age").cast("int"),
    col("Product_Category").cast("string"),
    col("Quantity").cast("int"),
    col("Price_per_Unit").cast("int"),
    (col("Quantity") * col("Price_per_Unit")).alias("Total_Amount")
)

In [0]:
print("Total rows:", df.count())

Total rows: 1000


####3. Transform – Create Dimension Tables

3.1 Customer Dimension

In [0]:
# Dim Customer
dim_customer = df.select(
    "Customer_ID", "Gender", "Age"
).dropDuplicates(["Customer_ID"])

3.2 Product Dimension

In [0]:
# Dim Product
from pyspark.sql.functions import monotonically_increasing_id

# Proper Dim_Product with surrogate key
dim_product = df.select("Product_Category").dropDuplicates() \
                .withColumn("Product_ID", monotonically_increasing_id())

3.3 Other Dimensions (Date, Store, etc.)

In [0]:
from pyspark.sql.functions import col, to_date, year, month, dayofmonth, 
weekofyear, quarter, date_format, monotonically_increasing_id

dim_date = df.withColumn("Date", to_date("Date")) \
             .select(
                 col("Date"),
                 year("Date").alias("Year"),
                 month("Date").alias("Month"),
                 dayofmonth("Date").alias("Day"),
                 weekofyear("Date").alias("Week"),
                 quarter("Date").alias("Quarter"),
                 date_format("Date", "EEEE").alias("DayName"),
                 date_format("Date", "MMMM").alias("MonthName")
             ) \
             .dropDuplicates() \
             .withColumn("Date_ID", monotonically_increasing_id())

####4. Transform – Create Fact Table

In [0]:
from pyspark.sql.functions import col, to_date

# Ensure Date is proper type
df = df.withColumn("Date", to_date("Date"))

# Join with dim_product (to get Product_ID)
# Join with dim_date (to get Date_ID)
fact_sales = (
    df.join(dim_product, on="Product_Category", how="left")
      .join(dim_date, on="Date", how="left")
      .select(
          col("Transaction_ID"),
          col("Date_ID"),       # now exists after join with dim_date
          col("Customer_ID"),
          col("Product_ID"),    # surrogate key from dim_product
          col("Quantity"),
          col("Price_per_Unit"),
          (col("Quantity") * col("Price_per_Unit")).alias("Total_Amount")
      )
)

In [0]:
display(fact_sales)

Transaction_ID,Date_ID,Customer_ID,Product_ID,Quantity,Price_per_Unit,Total_Amount
1,197,CUST001,2,3,50,150
2,219,CUST002,1,2,500,1000
3,337,CUST003,0,1,30,30
4,230,CUST004,1,1,500,500
5,150,CUST005,2,2,50,100
6,114,CUST006,2,1,30,30
7,54,CUST007,1,2,25,50
8,257,CUST008,0,4,25,100
9,265,CUST009,0,2,300,600
10,239,CUST010,1,4,50,200


####5. Load – Write to Parquet (Clean Zone)

In [0]:
dim_customer.write.mode("overwrite").parquet(f"{curated_path}/dim_customer")
dim_product.write.mode("overwrite").parquet(f"{curated_path}/dim_product")
dim_date.write.mode("overwrite").parquet(f"{curated_path}/dim_date")
fact_sales.write.mode("overwrite").parquet(f"{curated_path}/fact_sales")

####6. Verify

In [0]:
print("Customers:", dim_customer.count())
print("Products:", dim_product.count())
print("Dates:", dim_date.count())
print("Sales:", fact_sales.count())

Customers: 1000
Products: 3
Dates: 345
Sales: 1000
