In [0]:
silver = "abfss://silver@onlineretail96.dfs.core.windows.net/"
gold = "abfss://gold@onlineretail96.dfs.core.windows.net/"

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
df_gold = spark.read.format("delta").option("Header","true").option("InferSchema","true").load(f"{silver}/silver_final")

In [0]:
df_gold.display()

Invoice,StockCode,Description,Quantity,InvoiceDate,Price,CustomerID,Country,Year,Month
512807,22086,PAPER CHAIN KIT 50'S CHRISTMAS,2,2010-06-18,2.95,14646.0,Others,2010,6
510727,21432,SET OF 3 CASES WOODLAND DESIGN,12,2010-06-03,5.45,14646.0,Others,2010,6
510727,22503,CABIN BAG VINTAGE PAISLEY,2,2010-06-03,29.95,14646.0,Others,2010,6
510727,22712,CARD DOLLY GIRL,12,2010-06-03,0.42,14646.0,Others,2010,6
510727,21243,PINK SPOTTY PLATE,2,2010-06-03,1.69,14646.0,Others,2010,6
510727,22367,CHILDS APRON SPACEBOY DESIGN,50,2010-06-03,1.65,14646.0,Others,2010,6
513647,22352,LUNCH BOX WITH CUTLERY RETROSPOT,60,2010-06-28,2.1,14646.0,Others,2010,6
512807,21670,BLUE SPOT CERAMIC DRAWER KNOB,192,2010-06-18,1.06,14646.0,Others,2010,6
510727,22326,ROUND SNACK BOXES SET OF4 WOODLAND,2,2010-06-03,2.95,14646.0,Others,2010,6
513647,22423,REGENCY CAKESTAND 3 TIER,32,2010-06-28,10.95,14646.0,Others,2010,6


In [0]:
df_gold.dtypes

[('Invoice', 'string'),
 ('StockCode', 'string'),
 ('Description', 'string'),
 ('Quantity', 'int'),
 ('InvoiceDate', 'date'),
 ('Price', 'double'),
 ('CustomerID', 'string'),
 ('Country', 'string'),
 ('Year', 'int'),
 ('Month', 'int')]

In [0]:
df_gold = df_gold.withColumn("Revenue", round(col("Quantity")*col("Price"), 2))

# Aggregations

In [0]:
df_gold.createOrReplaceTempView("df_gold_temp")

In [0]:
df_gold_revenue_by_product = spark.sql("""
                                       SELECT Description, Count(DISTINCT(Invoice)) AS TotalOrders, SUM(Quantity) AS TotalQuantity, ROUND(SUM(Revenue), 2) as TotalRevenue
                                       FROM df_gold_temp
                                       GROUP BY Description
                                       ORDER BY TotalRevenue DESC
                                       LIMIT 10
                                       """)

In [0]:
df_gold_revenue_by_product.display()

Description,TotalOrders,TotalQuantity,TotalRevenue
Manual,787,6835,338914.86
REGENCY CAKESTAND 3 TIER,3918,26478,330590.32
DOTCOM POSTAGE,1415,1415,309854.11
WHITE HANGING HEART T-LIGHT HOLDER,5452,89788,248164.32
PARTY BUNTING,2674,28200,148318.28
JUMBO BAG RED RETROSPOT,3266,74499,142513.47
POSTAGE,1851,5363,125682.42
ASSORTED COLOUR BIRD ORNAMENT,2803,71882,117434.49
PAPER CHAIN KIT 50'S CHRISTMAS,2017,34064,115159.29
CHILLI LIGHTS,1135,15841,80540.88


Databricks visualization. Run in Databricks to view.

In [0]:
df_gold_revenue_by_month = spark.sql("""
                                     SELECT to_date(concat(Year,'-',Month,'-01')) AS MonthYear, ROUND(Sum(Revenue),2) AS TotalRevenue
                                     FROM df_gold_temp
                                     GROUP BY to_date(concat(Year,'-',Month,'-01'))
                                     ORDER BY MonthYear
                                     """) 

In [0]:
df_gold_revenue_by_month.display()

MonthYear,TotalRevenue
2009-12-01,815213.75
2010-01-01,643594.87
2010-02-01,532559.93
2010-03-01,810544.36
2010-04-01,678178.29
2010-05-01,640179.36
2010-06-01,737411.35
2010-07-01,648810.27
2010-08-01,681938.39
2010-09-01,898207.11


Databricks visualization. Run in Databricks to view.

In [0]:
df_top10_products_by_month = spark.sql("""
    WITH monthly_revenue AS (
        SELECT 
            TO_DATE(CONCAT(Year, '-', Month, '-01')) AS MonthYear,
            Description,
            ROUND(SUM(Revenue), 2) AS TotalRevenue
        FROM df_gold_temp
        GROUP BY Year, Month, Description
    ),
    ranked_products AS (
        SELECT 
            MonthYear,
            Description,
            TotalRevenue,
            ROW_NUMBER() OVER (PARTITION BY MonthYear ORDER BY TotalRevenue DESC) AS rank
        FROM monthly_revenue
    )
    SELECT 
        MonthYear,
        Description,
        TotalRevenue,
        rank
    FROM ranked_products
    WHERE rank <= 10
    ORDER BY MonthYear, rank
""")


In [0]:
df_top10_products_by_month.display()

MonthYear,Description,TotalRevenue,rank
2009-12-01,DOTCOM POSTAGE,18574.58,1
2009-12-01,WHITE HANGING HEART T-LIGHT HOLDER,17255.35,2
2009-12-01,PAPER CHAIN KIT 50'S CHRISTMAS,10169.36,3
2009-12-01,EDWARDIAN PARASOL BLACK,8697.75,4
2009-12-01,SCOTTIE DOG HOT WATER BOTTLE,8027.76,5
2009-12-01,ASSORTED COLOUR BIRD ORNAMENT,7623.93,6
2009-12-01,JUMBO BAG RED WHITE SPOTTY,6701.5,7
2009-12-01,WHITE CHERRY LIGHTS,6408.42,8
2009-12-01,HOT WATER BOTTLE TEA AND SYMPATHY,6322.75,9
2009-12-01,EDWARDIAN PARASOL RED,6205.2,10


Databricks visualization. Run in Databricks to view.

In [0]:
df_gold.write.mode("append")\
             .format("delta")\
             .option("header","true")\
             .save(f"{gold}/Data_warehouse")

In [0]:
df_gold_revenue_by_product.write.mode("append")\
                                .format("delta")\
                                .option("header","true")\
                                .save(f"{gold}/Revenue_by_product")

In [0]:
df_top10_products_by_month.write.mode("append")\
                                .format("delta")\
                                .option("header","true")\
                                .save(f"{gold}/Top10_products_by_month")