<a href="https://colab.research.google.com/github/HARSHAVARDHAN5696/DATA-SCIENCE-BEGINNER-PROJECTS/blob/main/retail_sales_dataengineering.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
# Install Java
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

# Download and extract Spark (latest stable version)
!wget -q https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
!tar xf spark-3.3.2-bin-hadoop3.tgz

# Install PySpark helper
!pip install -q findspark

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop3"


In [4]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("RetailProject") \
    .getOrCreate()

spark


In [5]:
from google.colab import files

# This will open a file picker
uploaded = files.upload()


Saving retail_sales.csv to retail_sales.csv


In [6]:
# Read the uploaded CSV file into Spark DataFrame
df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv("retail_sales.csv")
)

df.show(5)


+--------+-------------------+-----------+----------+--------+-----+--------+------+
|order_id|         order_date|customer_id|product_id|quantity|price|store_id|region|
+--------+-------------------+-----------+----------+--------+-----+--------+------+
|   O1000|2023-04-13 00:00:00|         C3|       P19|       7| 9.91|      S4| North|
|   O1001|2023-12-15 00:00:00|        C33|       P12|       5|52.96|      S6| North|
|   O1002|2023-09-28 00:00:00|         C6|       P18|       5|73.25|      S5|  West|
|   O1003|2023-04-17 00:00:00|        C50|        P6|       2|86.95|      S7| South|
|   O1004|2023-03-13 00:00:00|        C10|       P18|       4|22.03|     S10| South|
+--------+-------------------+-----------+----------+--------+-----+--------+------+
only showing top 5 rows



In [7]:
import pandas as pd

# Load original file using pandas
df_full = pd.read_csv("retail_sales.csv")

# Create 5 mini-batches and save them as separate files
for i in range(5):
    batch = df_full.sample(50, random_state=i)
    batch.to_csv(f"batch_{i+1}.csv", index=False)


In [8]:
import time
from pyspark.sql.functions import *

# Create an empty DataFrame to simulate the warehouse table
fact_sales_df = None

# Simulate streaming by processing each batch
for i in range(1, 6):
    print(f"🚀 Ingesting batch_{i}.csv ...")

    # Read the mini-batch
    batch_df = (
        spark.read
        .option("header", True)
        .option("inferSchema", True)
        .csv(f"batch_{i}.csv")
    )

    # Add timestamp column for demo
    batch_df = batch_df.withColumn("ingested_at", current_timestamp())

    # Combine with the master fact_sales_df
    if fact_sales_df is None:
        fact_sales_df = batch_df
    else:
        fact_sales_df = fact_sales_df.union(batch_df)

    # Show current fact table
    fact_sales_df.groupBy("region").agg(sum("quantity").alias("total_quantity")).show()

    time.sleep(1)  # Wait 1 sec before next batch


🚀 Ingesting batch_1.csv ...
+------+--------------+
|region|total_quantity|
+------+--------------+
| South|            53|
|  East|            38|
|  West|            77|
| North|            97|
+------+--------------+

🚀 Ingesting batch_2.csv ...
+------+--------------+
|region|total_quantity|
+------+--------------+
| South|           135|
|  East|           124|
|  West|           118|
| North|           164|
+------+--------------+

🚀 Ingesting batch_3.csv ...
+------+--------------+
|region|total_quantity|
+------+--------------+
| South|           208|
|  East|           182|
|  West|           188|
| North|           208|
+------+--------------+

🚀 Ingesting batch_4.csv ...
+------+--------------+
|region|total_quantity|
+------+--------------+
| South|           230|
|  East|           279|
|  West|           246|
| North|           279|
+------+--------------+

🚀 Ingesting batch_5.csv ...
+------+--------------+
|region|total_quantity|
+------+--------------+
| South|        

In [9]:
# Save the final fact_sales_df to Parquet format (acts like data warehouse storage)
fact_sales_df.write.mode("overwrite").parquet("fact_sales_parquet")


In [10]:
# Read from the saved parquet file
fact_sales = spark.read.parquet("fact_sales_parquet")

# Show the schema
fact_sales.printSchema()

# Show total revenue by region
fact_sales.withColumn("total_sale", fact_sales.quantity * fact_sales.price) \
    .groupBy("region") \
    .agg(round(sum("total_sale"), 2).alias("Total_Revenue")) \
    .orderBy("Total_Revenue", ascending=False) \
    .show()


root
 |-- order_id: string (nullable = true)
 |-- order_date: timestamp (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)
 |-- store_id: string (nullable = true)
 |-- region: string (nullable = true)
 |-- ingested_at: timestamp (nullable = true)

+------+-------------+
|region|Total_Revenue|
+------+-------------+
|  East|     17336.44|
| North|     16837.01|
|  West|     15798.24|
| South|      14983.7|
+------+-------------+



In [11]:
dim_customer = fact_sales.select("customer_id").distinct()
dim_customer = dim_customer.withColumn("customer_key", monotonically_increasing_id())

dim_customer.show(5)


+-----------+------------+
|customer_id|customer_key|
+-----------+------------+
|         C6|           0|
|        C22|           1|
|        C78|           2|
|        C95|           3|
|        C98|           4|
+-----------+------------+
only showing top 5 rows



In [12]:
dim_product = fact_sales.select("product_id").distinct()
dim_product = dim_product.withColumn("product_key", monotonically_increasing_id())

dim_product.show(5)


+----------+-----------+
|product_id|product_key|
+----------+-----------+
|       P13|          0|
|        P2|          1|
|       P15|          2|
|       P11|          3|
|        P7|          4|
+----------+-----------+
only showing top 5 rows



In [13]:
dim_store = fact_sales.select("store_id", "region").distinct()
dim_store = dim_store.withColumn("store_key", monotonically_increasing_id())

dim_store.show(5)


+--------+------+---------+
|store_id|region|store_key|
+--------+------+---------+
|      S7| South|        0|
|      S8| South|        1|
|      S7| North|        2|
|      S6|  West|        3|
|      S2|  East|        4|
+--------+------+---------+
only showing top 5 rows



In [14]:
from pyspark.sql.functions import to_date, year, month, dayofmonth

dim_date = fact_sales.select(to_date("order_date").alias("order_date")).distinct()
dim_date = dim_date.withColumn("day", dayofmonth("order_date")) \
                   .withColumn("month", month("order_date")) \
                   .withColumn("year", year("order_date")) \
                   .withColumn("date_key", monotonically_increasing_id())

dim_date.show(5)


+----------+---+-----+----+--------+
|order_date|day|month|year|date_key|
+----------+---+-----+----+--------+
|2023-11-08|  8|   11|2023|       0|
|2023-05-22| 22|    5|2023|       1|
|2023-02-25| 25|    2|2023|       2|
|2023-06-23| 23|    6|2023|       3|
|2023-02-08|  8|    2|2023|       4|
+----------+---+-----+----+--------+
only showing top 5 rows



In [16]:
dim_date = dim_date.withColumnRenamed("order_date", "date_value")


In [17]:
from pyspark.sql.functions import to_date

fs = fact_sales \
    .join(dim_customer, on="customer_id", how="left") \
    .join(dim_product, on="product_id", how="left") \
    .join(dim_store, on=["store_id", "region"], how="left") \
    .join(dim_date, to_date("order_date") == dim_date["date_value"], "left") \
    .select("order_id", "customer_key", "product_key", "store_key", "date_key",
            "quantity", "price", "ingested_at")

fs.show(5)



+--------+------------+-----------+---------+--------+--------+-----+--------------------+
|order_id|customer_key|product_key|store_key|date_key|quantity|price|         ingested_at|
+--------+------------+-----------+---------+--------+--------+-----+--------------------+
|   O1642|          18|          7|       17|      86|       1| 80.3|2025-07-29 17:10:...|
|   O1762|          41|          9|       34|      37|       9|43.23|2025-07-29 17:10:...|
|   O1909|           3|          1|       28|      44|       5|83.32|2025-07-29 17:10:...|
|   O1199|          41|          3|        8|     102|       4|55.88|2025-07-29 17:10:...|
|   O1586|           7|          1|       31|      82|       1|84.81|2025-07-29 17:10:...|
+--------+------------+-----------+---------+--------+--------+-----+--------------------+
only showing top 5 rows



In [18]:
# Save all tables
dim_customer.write.mode("overwrite").parquet("dim_customer")
dim_product.write.mode("overwrite").parquet("dim_product")
dim_store.write.mode("overwrite").parquet("dim_store")
dim_date.write.mode("overwrite").parquet("dim_date")
fs.write.mode("overwrite").parquet("fact_sales")


In [19]:
# Create final project folder and move everything
!mkdir -p Retail_Data_Engineering_Project
!cp -r fact_sales dim_* pipeline_notebook.ipynb Retail_Data_Engineering_Project/
!zip -r Retail_Data_Engineering_Project.zip Retail_Data_Engineering_Project


cp: cannot stat 'pipeline_notebook.ipynb': No such file or directory
  adding: Retail_Data_Engineering_Project/ (stored 0%)
  adding: Retail_Data_Engineering_Project/dim_customer/ (stored 0%)
  adding: Retail_Data_Engineering_Project/dim_customer/._SUCCESS.crc (stored 0%)
  adding: Retail_Data_Engineering_Project/dim_customer/part-00000-84b81625-5dc0-4c54-8acb-515a84b69e45-c000.snappy.parquet (deflated 38%)
  adding: Retail_Data_Engineering_Project/dim_customer/.part-00000-84b81625-5dc0-4c54-8acb-515a84b69e45-c000.snappy.parquet.crc (stored 0%)
  adding: Retail_Data_Engineering_Project/dim_customer/_SUCCESS (stored 0%)
  adding: Retail_Data_Engineering_Project/dim_date/ (stored 0%)
  adding: Retail_Data_Engineering_Project/dim_date/._SUCCESS.crc (stored 0%)
  adding: Retail_Data_Engineering_Project/dim_date/part-00000-95567ce4-6ee9-45ce-80d6-af57aaf55a0a-c000.snappy.parquet (deflated 46%)
  adding: Retail_Data_Engineering_Project/dim_date/_SUCCESS (stored 0%)
  adding: Retail_Data_Engi

In [20]:
from google.colab import files
files.download("Retail_Data_Engineering_Project.zip")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>