In [0]:
from pyspark.sql import SparkSession
import pandas as pd 
import matplotlib.pyplot as plt

In [0]:
spark = SparkSession.builder.appName("Advanced_retail_ETL").getOrCreate()

In [0]:
df = spark.read.format("csv")\
    .option("header", True)\
        .option("inferSchema", True)\
            .load("/Volumes/retail_catalog/default/volume_retail/OnlineRetail.csv")

In [0]:
df.display()

# Bronze Layer

In [0]:
df.write.format("delta").mode("overwrite").saveAsTable("retail_catalog.bronze_retail.bronze_layer_retail")

In [0]:
from pyspark.sql.functions import *

In [0]:
df_cleaned = df.dropna(subset = ['InvoiceNo', 'StockCode', 'Description', 'Quantity', 'InvoiceDate','UnitPrice', 'CustomerID', 'Country'])
df_cleaned.display()

# Silver Layer

In [0]:
df_cleaned = df_cleaned.withColumn("Description", ltrim(col("Description")))
df_cleaned.display()

In [0]:
df_cleaned = df_cleaned.withColumn("TotalPrice", round(col("Quantity")* col("UnitPrice"),4))
df_cleaned.display()

In [0]:
df_cleaned = df_cleaned.withColumn("InvoiceDate", to_date(to_timestamp(col("InvoiceDate"), "M/d/yyyy H:mm")))

In [0]:
df_cleaned.display()

In [0]:
df_cleaned.write.format("delta").mode("overwrite").saveAsTable("retail_catalog.silver_retail.silver_layer_retail")

# Gold Layer

In [0]:
delta_df_cleaned = spark.table("retail_catalog.silver_retail.silver_layer_retail")

In [0]:
delta_df_cleaned.display()

In [0]:
description_quantity = delta_df_cleaned.groupBy("Description").agg(sum(col("Quantity")).alias("total_quantity")).orderBy(col("total_quantity").desc())

In [0]:
description_quantity.display()

In [0]:
description_quantity.write.format("delta").mode("overwrite").saveAsTable("retail_catalog.gold_retail.gold_description_quantity")

In [0]:
description_quantity_pd = description_quantity.limit(10).toPandas()

plt.figure(figsize=(12,6))
plt.barh(description_quantity_pd['Description'], description_quantity_pd['total_quantity'], color = 'skyblue')
plt.xlabel('Total Quantity')
plt.ylabel('Description')
plt.title('Top 10 Quantity')
plt.gca().invert_yaxis()
plt.show()

In [0]:
sales_trend = delta_df_cleaned.groupBy("InvoiceDate").agg(sum("Quantity").alias("total_quantity")).orderBy("total_quantity", ascending = False)

In [0]:
sales_trend.display()

In [0]:
sales_trend = sales_trend.withColumn("Year", year(col("InvoiceDate")))
sales_trend = sales_trend.withColumn("Month", month(col("InvoiceDate")))

In [0]:
sales_trend.display()

In [0]:
monthly_sales_trend = sales_trend.groupBy("Year","Month").agg(sum("total_quantity").alias("monthly_quantity")).orderBy("Year", "Month")

In [0]:
monthly_sales_trend.display()

In [0]:
monthly_sales_trend.write.format("delta").mode("overwrite").saveAsTable("retail_catalog.gold_retail.gold_monthly_sales")

In [0]:
monthly_sales_pd = monthly_sales_trend.limit(10).toPandas()

plt.figure(figsize=(14,6))
plt.plot(monthly_sales_pd['Month'],monthly_sales_pd['monthly_quantity'], marker = 'o', linestyle = '-', color = 'blue')
plt.xlabel('Month')
plt.ylabel('Total Sales')
plt.title("Monthly Sales Trend")
plt.grid(True)
plt.show()

In [0]:
weekly_sales_cleaned = delta_df_cleaned.withColumn('day_of_week', dayofweek(col("InvoiceDate")))
weekly_sales = weekly_sales_cleaned.groupBy('day_of_week').agg(sum('Quantity').alias('weekly_quantity')).orderBy('day_of_week')

In [0]:
weekly_sales.write.format('delta').mode('overwrite').saveAsTable('retail_catalog.gold_retail.gold_weekly_sales')

In [0]:
weekly_sales_pd = weekly_sales.toPandas()

plt.figure(figsize=(10,6))
plt.bar(weekly_sales_pd['day_of_week'],weekly_sales_pd['weekly_quantity'], color='orange')
plt.xlabel("Day Of Week")
plt.ylabel("Total Sales")
plt.title("Sales distribution by day of the week")
plt.show()