<a href="https://colab.research.google.com/github/abhinandhan21/retail-etl-pipeline/blob/main/retail_etl_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

 **Step 1: Install & Import Libraries**

In [2]:
# Install PySpark
!pip install pyspark

# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, avg, to_date, month, year, when, count, row_number
from pyspark.sql.window import Window
from google.colab import files
import shutil



**Step 2: Upload File & Start Spark Session**

In [3]:
# Uploading retail sales dataset
uploaded = files.upload()

# Start Spark Session
spark = SparkSession.builder.appName("EnhancedRetailETL").getOrCreate()

# Read CSV
df = spark.read.csv("retail_sales_dataset.csv", header=True, inferSchema=True)
df.show(5)

Saving retail_sales_dataset.csv to retail_sales_dataset (1).csv
+--------------+----------+-----------+------+---+----------------+--------+--------------+------------+
|Transaction ID|      Date|Customer ID|Gender|Age|Product Category|Quantity|Price per Unit|Total Amount|
+--------------+----------+-----------+------+---+----------------+--------+--------------+------------+
|             1|2023-11-24|    CUST001|  Male| 34|          Beauty|       3|            50|         150|
|             2|2023-02-27|    CUST002|Female| 26|        Clothing|       2|           500|        1000|
|             3|2023-01-13|    CUST003|  Male| 50|     Electronics|       1|            30|          30|
|             4|2023-05-21|    CUST004|  Male| 37|        Clothing|       1|           500|         500|
|             5|2023-05-06|    CUST005|  Male| 30|          Beauty|       2|            50|         100|
+--------------+----------+-----------+------+---+----------------+--------+--------------+-----

**Step 3: Data Quality Checks**

In [4]:
# Check for nulls in each column
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

# Filter out invalid rows (e.g., Quantity or Price <= 0)
df = df.filter((col("Quantity") > 0) & (col("Price per Unit") > 0))

+--------------+----+-----------+------+---+----------------+--------+--------------+------------+
|Transaction ID|Date|Customer ID|Gender|Age|Product Category|Quantity|Price per Unit|Total Amount|
+--------------+----+-----------+------+---+----------------+--------+--------------+------------+
|             0|   0|          0|     0|  0|               0|       0|             0|           0|
+--------------+----+-----------+------+---+----------------+--------+--------------+------------+



**Step 4: Enrich Data (Date & Revenue)**

In [5]:
# Convert date string to proper DateType
df = df.withColumn("ParsedDate", to_date(col("Date"), "yyyy-MM-dd"))

# Extract month and year
df = df.withColumn("Month", month("ParsedDate")).withColumn("Year", year("ParsedDate"))

# Add Revenue Per Unit
df = df.withColumn("RevenuePerUnit", col("Total Amount") / col("Quantity"))

df.select("ParsedDate", "Month", "Year", "RevenuePerUnit").show(5)

+----------+-----+----+--------------+
|ParsedDate|Month|Year|RevenuePerUnit|
+----------+-----+----+--------------+
|2023-11-24|   11|2023|          50.0|
|2023-02-27|    2|2023|         500.0|
|2023-01-13|    1|2023|          30.0|
|2023-05-21|    5|2023|         500.0|
|2023-05-06|    5|2023|          50.0|
+----------+-----+----+--------------+
only showing top 5 rows



**Monthly Aggregation by Product Category**

In [6]:
monthly_sales = df.groupBy("Year", "Month", "Product Category").agg(
    _sum("Total Amount").alias("Total Sales"),
    avg("RevenuePerUnit").alias("Avg Price")
).orderBy("Year", "Month", "Product Category")

monthly_sales.show(5)

+----+-----+----------------+-----------+------------------+
|Year|Month|Product Category|Total Sales|         Avg Price|
+----+-----+----------------+-----------+------------------+
|2023|    1|          Beauty|      12430|             196.0|
|2023|    1|        Clothing|      13125|194.42307692307693|
|2023|    1|     Electronics|       9895|             166.4|
|2023|    2|          Beauty|      14035|180.57692307692307|
|2023|    2|        Clothing|      14560| 180.9090909090909|
+----+-----+----------------+-----------+------------------+
only showing top 5 rows



**Customer Segmentation by Age Group & Gender**

In [7]:
# Create Age Groups
df = df.withColumn("AgeGroup",
    when(col("Age") < 25, "18–24")
    .when((col("Age") >= 25) & (col("Age") <= 34), "25–34")
    .when((col("Age") >= 35) & (col("Age") <= 44), "35–44")
    .when((col("Age") >= 45) & (col("Age") <= 54), "45–54")
    .otherwise("55+")
)

age_gender_sales = df.groupBy("AgeGroup", "Gender").agg(
    _sum("Total Amount").alias("Total Spent"),
    count("Transaction ID").alias("Total Purchases")
).orderBy("AgeGroup")

age_gender_sales.show()

+--------+------+-----------+---------------+
|AgeGroup|Gender|Total Spent|Total Purchases|
+--------+------+-----------+---------------+
|   18–24|  Male|      38730|             77|
|   18–24|Female|      35920|             72|
|   25–34|Female|      51850|            102|
|   25–34|  Male|      45240|            101|
|   35–44|  Male|      43870|             95|
|   35–44|Female|      52965|            112|
|   45–54|Female|      46825|            115|
|   45–54|  Male|      50410|            110|
|     55+|Female|      45280|            109|
|     55+|  Male|      44910|            107|
+--------+------+-----------+---------------+



**Top Product Categories per Month**

In [8]:
windowSpec = Window.partitionBy("Year", "Month").orderBy(col("Total Amount").desc())

df_ranked = df.groupBy("Year", "Month", "Product Category") \
    .agg(_sum("Total Amount").alias("Total Amount")) \
    .withColumn("rank", row_number().over(windowSpec)) \
    .filter(col("rank") <= 3)

df_ranked.orderBy("Year", "Month", "rank").show()

+----+-----+----------------+------------+----+
|Year|Month|Product Category|Total Amount|rank|
+----+-----+----------------+------------+----+
|2023|    1|        Clothing|       13125|   1|
|2023|    1|          Beauty|       12430|   2|
|2023|    1|     Electronics|        9895|   3|
|2023|    2|     Electronics|       15465|   1|
|2023|    2|        Clothing|       14560|   2|
|2023|    2|          Beauty|       14035|   3|
|2023|    3|        Clothing|       15065|   1|
|2023|    3|          Beauty|       10545|   2|
|2023|    3|     Electronics|        3380|   3|
|2023|    4|        Clothing|       13940|   1|
|2023|    4|          Beauty|       11905|   2|
|2023|    4|     Electronics|        8025|   3|
|2023|    5|     Electronics|       23245|   1|
|2023|    5|        Clothing|       17455|   2|
|2023|    5|          Beauty|       12450|   3|
|2023|    6|     Electronics|       15550|   1|
|2023|    6|          Beauty|       10995|   2|
|2023|    6|        Clothing|       1017

**Save Multiple Outputs and Download**

In [9]:
# Save outputs to CSV
monthly_sales.coalesce(1).write.csv("monthly_sales", header=True, mode="overwrite")
age_gender_sales.coalesce(1).write.csv("age_gender_sales", header=True, mode="overwrite")
df_ranked.coalesce(1).write.csv("top_products", header=True, mode="overwrite")

# Zip folders
shutil.make_archive("monthly_sales", 'zip', "monthly_sales")
shutil.make_archive("age_gender_sales", 'zip', "age_gender_sales")
shutil.make_archive("top_products", 'zip', "top_products")

# Download files
files.download("monthly_sales.zip")
files.download("age_gender_sales.zip")
files.download("top_products.zip")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>