In [1]:
# DATA CLEANING, TRANSFORMATIONS, LOADING TO DELTA TABLES
from pyspark.sql.functions import *

spark.conf.set('spark.sql.parquet.vorder.default', 'true')
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")
#Reading data
df_customers = spark.read.format("csv").option("header","true").load("Files/Shopping_Mart_Data/ShoppingMart_customers.csv")
df_orders = spark.read.format("csv").option("header","true").load("Files/Shopping_Mart_Data/ShoppingMart_orders.csv")
df_products = spark.read.format("csv").option("header","true").load("Files/Shopping_Mart_Data/ShoppingMart_products.csv")
df_reviews = spark.read.json("Files/Shopping_Mart_Data/ShoppingMart_review.json")

#Transforming data	
df_orders = df_orders.dropna(subset = ["OrderID", "CustomerID", "ProductID", "OrderDate", "TotalAmount"], how="all")
df_orders = df_orders.withColumnRenamed("OrderDate","OrderTimestamp")
df_orders = df_orders.withColumn("OrderTimestamp", to_timestamp(col("OrderTimestamp"), "yyyy-MM-dd HH:mm:ss.SSSSSSSSS"))
df_orders = df_orders.withColumn("OrderDate", to_date(col("OrderTimestamp")))
#display(df_reviews)

# Writing transactional data to delta table	
df_orders.write.format("delta").mode("append").saveAsTable("Shopping_Mart_Silver_LH.Orders")
df_reviews.write.format("delta").mode("append").saveAsTable("Shopping_Mart_Silver_LH.Order_Reviews")

#Registering temp views
df_products.createOrReplaceTempView("products_vw")
df_customers.createOrReplaceTempView("customers_vw")

StatementMeta(, 79534f6c-4b62-4599-8bde-94b672f3754e, 3, Finished, Available, Finished, False)

In [2]:
%%sql
CREATE TABLE IF NOT EXISTS Shopping_Mart_Silver_LH.Customer_Details
USING DELTA
AS
SELECT
    CustomerID,
    CustomerName,
    Email,
    Location,
    SignupDate
FROM customers_vw
WHERE 1 = 0;

CREATE TABLE IF NOT EXISTS Shopping_Mart_Silver_LH.Product_Details
USING DELTA
AS
SELECT
    ProductID,
    ProductName,
    Category,
    Stock,
    UnitPrice
FROM products_vw
WHERE 1 = 0;

MERGE INTO Shopping_Mart_Silver_LH.Customer_Details AS tgt
USING customers_vw  AS src
ON tgt.CustomerID = src.CustomerID
WHEN MATCHED AND (
    tgt.CustomerName <> src.CustomerName OR 
    tgt.Email        <> src.Email        OR 
    tgt.Location     <> src.Location     OR 
    tgt.SignupDate   <> src.SignupDate
) THEN
  UPDATE SET
    tgt.CustomerName = src.CustomerName,
    tgt.Email    = src.Email,
    tgt.Location = src.Location,
    tgt.SignupDate = src.SignupDate
WHEN NOT MATCHED THEN
  INSERT (
    CustomerID,
    CustomerName,
    Email,
    Location,
    SignupDate
  )
  VALUES (
    src.CustomerID,
    src.CustomerName,
    src.Email,
    src.Location,
    src.SignupDate
  );

MERGE INTO Shopping_Mart_Silver_LH.Product_Details AS tgt
USING products_vw  AS src
ON tgt.ProductID = src.ProductID
WHEN MATCHED AND (
    tgt.ProductName <> src.ProductName OR 
    tgt.Category    <> src.Category    OR 
    tgt.Stock       <> src.Stock       OR 
    tgt.UnitPrice   <> src.UnitPrice
) THEN
  UPDATE SET
    tgt.ProductName = src.ProductName,
    tgt.Category    = src.Category,
    tgt.Stock       = src.Stock,
    tgt.UnitPrice   = src.UnitPrice
WHEN NOT MATCHED THEN
  INSERT (
    ProductID,
    ProductName,
    Category,
    Stock,
    UnitPrice
  )
  VALUES (
    src.ProductID,
    src.ProductName,
    src.Category,
    src.Stock,
    src.UnitPrice
  );


StatementMeta(, 79534f6c-4b62-4599-8bde-94b672f3754e, 7, Finished, Available, Finished, True)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 1 rows and 4 fields>

<Spark SQL result set with 1 rows and 4 fields>

In [4]:
spark.catalog.dropTempView("products_vw")
spark.catalog.dropTempView("customers_vw")
# 1. Define your Lakehouse / Database
lakehouse_name = "Shopping_Mart_Silver_LH"

# 2. Get all tables and optimize them in a loop
tables = spark.catalog.listTables(lakehouse_name)

for t in tables:
    table_path = f"{lakehouse_name}.{t.name}"
    print(f"Optimizing: {table_path}")
    
    # This performs standard compaction and applies V-Order
    spark.sql(f"OPTIMIZE {table_path}")

StatementMeta(, 79534f6c-4b62-4599-8bde-94b672f3754e, 9, Finished, Available, Finished, False)

Optimizing: Shopping_Mart_Silver_LH.orders
Optimizing: Shopping_Mart_Silver_LH.order_reviews
Optimizing: Shopping_Mart_Silver_LH.customer_details
Optimizing: Shopping_Mart_Silver_LH.product_details
