In [0]:
df = spark.table("workspace.default.online_retail")
df.show()

+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity| InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/10 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/10 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/10 8:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/10 8:26|     4.25|     17850|United

In [0]:
bronze_df = df

In [0]:
bronze_df.show(5)
bronze_df.printSchema()

+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity| InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/10 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/10 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+
only showing top 5 rows
root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: strin

In [0]:
bronze_df.write.format("delta").mode("overwrite").saveAsTable("bronze_retail")


In [0]:
bronze_df.select("InvoiceDate").show(20, truncate=False)


+------------+
|InvoiceDate |
+------------+
|12/1/10 8:26|
|12/1/10 8:26|
|12/1/10 8:26|
|12/1/10 8:26|
|12/1/10 8:26|
|12/1/10 8:26|
|12/1/10 8:26|
|12/1/10 8:28|
|12/1/10 8:28|
|12/1/10 8:34|
|12/1/10 8:34|
|12/1/10 8:34|
|12/1/10 8:34|
|12/1/10 8:34|
|12/1/10 8:34|
|12/1/10 8:34|
|12/1/10 8:34|
|12/1/10 8:34|
|12/1/10 8:34|
|12/1/10 8:34|
+------------+
only showing top 20 rows


In [0]:
bronze_df.columns

['InvoiceNo',
 'StockCode',
 'Description',
 'Quantity',
 'InvoiceDate',
 'UnitPrice',
 'CustomerID',
 'Country']

In [0]:
from pyspark.sql.functions import col, regexp_extract, to_timestamp

# 1) Extract only valid-looking timestamps (keeps rows like 1/4/11 10:00)
pattern = r"^\d{1,2}/\d{1,2}/\d{2} \d{1,2}:\d{2}$"

bronze_clean = (
    bronze_df
    .withColumn("InvoiceDateNorm", regexp_extract(col("InvoiceDate"), pattern, 0))
    .filter(col("InvoiceDateNorm") != "")
)

# 2) Parse using flexible month/day (M,d) â€” this handles 1/4/11 10:00 and 12/10/11 08:26
silver_final = (
    bronze_clean
    .withColumn("InvoiceDate", to_timestamp(col("InvoiceDateNorm"), "M/d/yy H:mm"))
    .filter(col("CustomerID").isNotNull())
    .filter(col("Quantity") > 0)
    .filter(col("UnitPrice") > 0)
    .select(                       # keep only clean columns for Delta
        "InvoiceNo", "StockCode", "Description",
        "Quantity", "UnitPrice", "CustomerID", "Country",
        "InvoiceDate"
    )
)

# Optional sanity check
silver_final.select("InvoiceDate").show(20, truncate=False)
silver_final.printSchema()


+-------------------+
|InvoiceDate        |
+-------------------+
|2010-12-01 08:26:00|
|2010-12-01 08:26:00|
|2010-12-01 08:26:00|
|2010-12-01 08:26:00|
|2010-12-01 08:26:00|
|2010-12-01 08:26:00|
|2010-12-01 08:26:00|
|2010-12-01 08:28:00|
|2010-12-01 08:28:00|
|2010-12-01 08:34:00|
|2010-12-01 08:34:00|
|2010-12-01 08:34:00|
|2010-12-01 08:34:00|
|2010-12-01 08:34:00|
|2010-12-01 08:34:00|
|2010-12-01 08:34:00|
|2010-12-01 08:34:00|
|2010-12-01 08:34:00|
|2010-12-01 08:34:00|
|2010-12-01 08:34:00|
+-------------------+
only showing top 20 rows
root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: long (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: long (nullable = true)
 |-- Country: string (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)



In [0]:
# If you want to replace the table:
spark.sql("DROP TABLE IF EXISTS silver_retail")

silver_final.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("silver_retail")


In [0]:
silver_final.show(30
                  )

+---------+---------+--------------------+--------+---------+----------+--------------+-------------------+
|InvoiceNo|StockCode|         Description|Quantity|UnitPrice|CustomerID|       Country|        InvoiceDate|
+---------+---------+--------------------+--------+---------+----------+--------------+-------------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|     2.55|     17850|United Kingdom|2010-12-01 08:26:00|
|   536365|    71053| WHITE METAL LANTERN|       6|     3.39|     17850|United Kingdom|2010-12-01 08:26:00|
|   536365|   84406B|CREAM CUPID HEART...|       8|     2.75|     17850|United Kingdom|2010-12-01 08:26:00|
|   536365|   84029G|KNITTED UNION FLA...|       6|     3.39|     17850|United Kingdom|2010-12-01 08:26:00|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|     3.39|     17850|United Kingdom|2010-12-01 08:26:00|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|     7.65|     17850|United Kingdom|2010-12-01 08:26:00|
|   536365|    21730|GLASS S

In [0]:
from pyspark.sql.functions import sum as _sum

gold_country = (
    silver_final
    .withColumn("Revenue", col("Quantity") * col("UnitPrice"))
    .groupBy("Country")
    .agg(_sum("Revenue").alias("TotalRevenue"))
    .orderBy(col("TotalRevenue").desc())
)

display(gold_country)


Country,TotalRevenue
United Kingdom,7308391.554003064
Netherlands,285446.34
EIRE,265545.8999999985
Germany,228867.13999999844
France,209024.0499999997
Australia,138521.30999999956
Spain,61577.10999999997
Switzerland,56443.95000000004
Belgium,41196.33999999997
Sweden,38378.33000000003


In [0]:
gold_customers = (
    silver_final
    .withColumn("Revenue", col("Quantity") * col("UnitPrice"))
    .groupBy("CustomerID")
    .agg(_sum("Revenue").alias("TotalSpent"))
    .orderBy(col("TotalSpent").desc())
    .limit(10)
)

display(gold_customers)


CustomerID,TotalSpent
14646,280206.01999999984
18102,259657.30000000005
17450,194550.79000000007
16446,168472.5
14911,143825.05999999944
12415,124914.52999999977
14156,117379.6299999998
17511,91062.37999999996
16029,81024.84000000001
12346,77183.6


In [0]:
from pyspark.sql.functions import month, year, date_format, col, sum as _sum

gold_monthly = (
    silver_final
    .withColumn("Revenue", col("Quantity") * col("UnitPrice"))
    .withColumn("Year", year("InvoiceDate"))
    .withColumn("Month", month("InvoiceDate"))
    .withColumn("YearMonth", date_format(col("InvoiceDate"), "yyyy-MM"))
    .groupBy("YearMonth")
    .agg(_sum("Revenue").alias("MonthlyRevenue"))
    .orderBy("YearMonth")
)

display(gold_monthly)


YearMonth,MonthlyRevenue
2010-12,572713.8900000163
2011-01,569445.0400000077
2011-02,447137.3500000165
2011-03,595500.760000013
2011-04,469200.3610000132
2011-05,678594.5600000018
2011-06,661213.6900000116
2011-07,600091.0110000141
2011-08,645343.900000009
2011-09,952838.3819999964


In [0]:
spark.sql("DROP TABLE IF EXISTS gold_monthly_revenue")


DataFrame[]

In [0]:
gold_country.write.format("delta").mode("overwrite").saveAsTable("gold_country_revenue")
gold_customers.write.format("delta").mode("overwrite").saveAsTable("gold_top_customers")
gold_monthly.write.format("delta").mode("overwrite").saveAsTable("gold_monthly_revenue")


In [0]:
display(gold_country)


Country,TotalRevenue
United Kingdom,7308391.554003064
Netherlands,285446.34
EIRE,265545.8999999985
Germany,228867.13999999844
France,209024.0499999997
Australia,138521.30999999956
Spain,61577.10999999997
Switzerland,56443.95000000004
Belgium,41196.33999999997
Sweden,38378.33000000003


Databricks visualization. Run in Databricks to view.

In [0]:
display(gold_monthly)


YearMonth,MonthlyRevenue
2010-12,572713.8900000163
2011-01,569445.0400000077
2011-02,447137.3500000165
2011-03,595500.760000013
2011-04,469200.3610000132
2011-05,678594.5600000018
2011-06,661213.6900000116
2011-07,600091.0110000141
2011-08,645343.900000009
2011-09,952838.3819999964


Databricks visualization. Run in Databricks to view.

In [0]:
display(gold_customers)

CustomerID,TotalSpent
14646,280206.01999999984
18102,259657.30000000005
17450,194550.79000000007
16446,168472.5
14911,143825.05999999944
12415,124914.52999999977
14156,117379.6299999998
17511,91062.37999999996
16029,81024.84000000001
12346,77183.6


Databricks visualization. Run in Databricks to view.

In [0]:
total_revenue = silver_final.selectExpr("sum(Quantity * UnitPrice) as TotalRevenue")
display(total_revenue)

TotalRevenue
8911407.904003216


Databricks visualization. Run in Databricks to view.

In [0]:
total_orders = silver_final.selectExpr("count(distinct InvoiceNo) as TotalOrders")
display(total_orders)


TotalOrders
18532


Databricks visualization. Run in Databricks to view.

In [0]:
unique_customers = silver_final.selectExpr("count(distinct CustomerID) as UniqueCustomers")
display(unique_customers)


UniqueCustomers
4338


Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql.functions import sum as _sum, countDistinct

aov = (
    silver_final
    .agg(
        (_sum(col("Quantity") * col("UnitPrice")) / countDistinct("InvoiceNo")).alias("AOV")
    )
)
display(aov)


AOV
480.86595639973945


Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql.functions import count

customer_purchase_counts = (
    silver_final
    .groupBy("CustomerID")
    .agg(count("InvoiceNo").alias("NumPurchases"))
)

returning = customer_purchase_counts.filter("NumPurchases >= 2").count()
total = customer_purchase_counts.count()

returning_pct = (returning / total) * 100
from pyspark.sql import Row

kpi_df = spark.createDataFrame([Row(ReturningCustomerPct=returning_pct)])

display(kpi_df)


ReturningCustomerPct
98.3633010603965


Databricks visualization. Run in Databricks to view.