### Kafka Producer

In [0]:
from kafka import KafkaProducer
import json
import ssl
import random
from datetime import datetime, timedelta

# Retrieve Event Hub connection string from Azure Key Vault-backed Databricks secret
eh_connection_string = dbutils.secrets.get(scope="kafka-scope", key="kafka-secrets")

# Create Kafka Producer
producer = KafkaProducer(
    bootstrap_servers="ecommerce-project.servicebus.windows.net:9093",
    security_protocol="SASL_SSL",
    sasl_mechanism="PLAIN",
    sasl_plain_username="$ConnectionString",
    sasl_plain_password=eh_connection_string,
    value_serializer=lambda v: json.dumps(v).encode("utf-8"),
    api_version=(0, 10),
    ssl_context=ssl.create_default_context()
)

# Sample values
payment_modes = ["CREDIT_CARD", "DEBIT_CARD", "UPI", "NET_BANKING", "CASH_ON_DELIVERY"]
order_statuses = ["PLACED", "SHIPPED", "DELIVERED", "CANCELLED"]
product_ids = [1001, 1002, 1003, 1004, 1005]

base_date = datetime(2024, 12, 20)

# Produce 10 records
for i in range(1, 11):
    quantity = random.randint(1, 5)
    unit_price = random.randint(200, 1500)
    
    order = {
        "order_id": 50000 + i,
        "order_date": (base_date + timedelta(days=random.randint(0, 8))).strftime("%Y-%m-%d"),
        "customer_id": random.randint(100, 999),
        "product_id": random.choice(product_ids),
        "quantity": quantity,
        "unit_price": unit_price,
        "total_amount": quantity * unit_price,
        "payment_mode": random.choice(payment_modes),
        "order_status": random.choice(order_statuses)
    }
    
    producer.send("orders_topic", order)

producer.flush()
print("10 order records sent successfully to Event Hubs!")




10 order records sent successfully to Event Hubs!


### Read data from Kafka and write it as delta table in bronze

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql.functions import col, from_json

# -----------------------------
# Paths
# -----------------------------
orders_path = "abfss://datalake@datalakehouse1ecommerce.dfs.core.windows.net/ecommerce-project/orders_topic/*/*/*/*/*/*"
bronze_table_path = "abfss://datalake@datalakehouse1ecommerce.dfs.core.windows.net/bronze/delta_table/orders_kafka"

# -----------------------------
# Define actual order schema
# -----------------------------
orders_schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("order_date", StringType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("unit_price", DoubleType(), True),
    StructField("total_amount", DoubleType(), True),
    StructField("payment_mode", StringType(), True),
    StructField("order_status", StringType(), True)
])

# -----------------------------
# Read Event Hub Avro files (batch)
# -----------------------------
raw_df = spark.read.format("avro").load(orders_path)

# -----------------------------
# Extract actual JSON data from 'Body' column
# -----------------------------
orders_df = (
    raw_df
    .withColumn("body_str", col("Body").cast("string"))  # 'Body' is case-sensitive
    .withColumn("data", from_json(col("body_str"), orders_schema))
    .select("data.*")  # keep only valid columns
)

display(orders_df)

# -----------------------------
# Write to Bronze Delta table
# -----------------------------
orders_df.write.format("delta").mode("append").save(bronze_table_path)


order_id,order_date,customer_id,product_id,quantity,unit_price,total_amount,payment_mode,order_status
50001,2024-12-27,511,1003,4,998.0,3992.0,CREDIT_CARD,PLACED
50002,2024-12-20,756,1004,1,309.0,309.0,UPI,DELIVERED
50004,2024-12-23,865,1003,5,573.0,2865.0,NET_BANKING,SHIPPED
50005,2024-12-21,504,1001,4,309.0,1236.0,UPI,PLACED
50006,2024-12-28,742,1005,3,303.0,909.0,CREDIT_CARD,DELIVERED
50010,2024-12-24,968,1003,2,1337.0,2674.0,UPI,DELIVERED
50003,2024-12-25,646,1005,4,632.0,2528.0,CASH_ON_DELIVERY,DELIVERED
50007,2024-12-20,237,1005,2,1279.0,2558.0,UPI,CANCELLED
50008,2024-12-25,343,1004,3,1313.0,3939.0,CASH_ON_DELIVERY,CANCELLED
50009,2024-12-24,302,1001,5,622.0,3110.0,DEBIT_CARD,SHIPPED


### Bronze Layer

In [0]:
## Reading data

customers_df = spark.read.csv('abfss://datalake@datalakehouse1ecommerce.dfs.core.windows.net/bronze/customers.csv', header=True, inferSchema=True)

products_df = spark.read.csv('abfss://datalake@datalakehouse1ecommerce.dfs.core.windows.net/bronze/products.csv', header=True, inferSchema=True)

orders_df = spark.read.csv('abfss://datalake@datalakehouse1ecommerce.dfs.core.windows.net/bronze/orders.csv', header=True, inferSchema=True)

orders_kafka = spark.read.format('delta').load('abfss://datalake@datalakehouse1ecommerce.dfs.core.windows.net/bronze/delta_table/orders_kafka')

display(orders_kafka)



order_id,order_date,customer_id,product_id,quantity,unit_price,total_amount,payment_mode,order_status
50001,2024-12-27,511,1003,4,998.0,3992.0,CREDIT_CARD,PLACED
50002,2024-12-20,756,1004,1,309.0,309.0,UPI,DELIVERED
50004,2024-12-23,865,1003,5,573.0,2865.0,NET_BANKING,SHIPPED
50005,2024-12-21,504,1001,4,309.0,1236.0,UPI,PLACED
50006,2024-12-28,742,1005,3,303.0,909.0,CREDIT_CARD,DELIVERED
50010,2024-12-24,968,1003,2,1337.0,2674.0,UPI,DELIVERED
50003,2024-12-25,646,1005,4,632.0,2528.0,CASH_ON_DELIVERY,DELIVERED
50007,2024-12-20,237,1005,2,1279.0,2558.0,UPI,CANCELLED
50008,2024-12-25,343,1004,3,1313.0,3939.0,CASH_ON_DELIVERY,CANCELLED
50009,2024-12-24,302,1001,5,622.0,3110.0,DEBIT_CARD,SHIPPED


In [0]:
orders_json_df = spark.read \
    .option("multiline", "true") \
    .json("abfss://datalake@datalakehouse1ecommerce.dfs.core.windows.net/bronze/orders.json")

orders_json_df.printSchema()

from pyspark.sql.functions import col, to_date
orders_json_flat_df = orders_json_df.select(
    col("order_details.order_id").cast("int").alias("order_id"),
    to_date(col("order_details.order_date"), "dd-MM-yyyy").alias("order_date"),
    col("customer.customer_id").cast("int").alias("customer_id"),
    col("item.product_id").cast("int").alias("product_id"),
    col("item.quantity").cast("int").alias("quantity"),
    col("item.unit_price").cast("double").alias("unit_price"),
    col("transaction.total_amount").cast("double").alias("total_amount"),
    col("transaction.payment_method").alias("payment_mode"),
    col("transaction.status").alias("order_status")
)


orders_json_flat_df.show(5)
orders_json_flat_df.count()


root
 |-- customer: struct (nullable = true)
 |    |-- customer_id: long (nullable = true)
 |-- item: struct (nullable = true)
 |    |-- product_id: long (nullable = true)
 |    |-- quantity: long (nullable = true)
 |    |-- unit_price: double (nullable = true)
 |-- order_details: struct (nullable = true)
 |    |-- order_date: string (nullable = true)
 |    |-- order_id: long (nullable = true)
 |-- transaction: struct (nullable = true)
 |    |-- payment_method: string (nullable = true)
 |    |-- status: string (nullable = true)
 |    |-- total_amount: double (nullable = true)

+--------+----------+-----------+----------+--------+----------+------------+------------+------------+
|order_id|order_date|customer_id|product_id|quantity|unit_price|total_amount|payment_mode|order_status|
+--------+----------+-----------+----------+--------+----------+------------+------------+------------+
|   20001|2025-06-02|       2567|       907|       1|  73992.79|    73992.79|  Debit Card|     Shipped|


2000

In [0]:
orders_json_flat_df = orders_json_flat_df.select(orders_df.columns)
orders_final_df = orders_df.unionByName(orders_json_flat_df).unionByName(orders_kafka)

orders_final_df.count()


12010

In [0]:
# final_orders_df = orders_df.unionByName(orders_json_flat_df)
# final_orders_df.show(5)
# final_orders_df.count()


+--------+----------+-----------+----------+--------+----------+------------+------------+------------+
|order_id|order_date|customer_id|product_id|quantity|unit_price|total_amount|payment_mode|order_status|
+--------+----------+-----------+----------+--------+----------+------------+------------+------------+
|       1|2024-09-01|       1610|       362|       1|  15288.59|    15288.59| Credit Card|   Delivered|
|       2|2025-05-30|       1201|       307|       1|  43925.35|    43925.35| Credit Card|    Returned|
|       3|2024-06-26|        897|       116|       2|   1001.97|     2003.94| Credit Card|   Delivered|
|       4|2024-12-23|       1082|       213|       4|   1462.82|     5851.28| Net Banking|   Delivered|
|       5|2025-03-28|        989|       470|       3|   1638.69|     4916.07| Credit Card|   Delivered|
+--------+----------+-----------+----------+--------+----------+------------+------------+------------+
only showing top 5 rows


12000

In [0]:
### Writing as delta table

customers_df.write \
  .format("delta") \
  .mode("overwrite") \
  .save('abfss://datalake@datalakehouse1ecommerce.dfs.core.windows.net/bronze/delta_table/customers')

products_df.write \
.format("delta") \
.mode("overwrite") \
.save('abfss://datalake@datalakehouse1ecommerce.dfs.core.windows.net/bronze/delta_table/products')

orders_final_df.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.save('abfss://datalake@datalakehouse1ecommerce.dfs.core.windows.net/bronze/delta_table/orders')



### Silver Layer Transformation

In [0]:
bronze_base_path = "abfss://datalake@datalakehouse1ecommerce.dfs.core.windows.net"

customers_df = spark.read.format("delta").load(f"{bronze_base_path}/bronze/delta_table/customers")
products_df  = spark.read.format("delta").load(f"{bronze_base_path}/bronze/delta_table/products")
orders_df    = spark.read.format("delta").load(f"{bronze_base_path}/bronze/delta_table/orders")


In [0]:
orders_df.show(5)
orders_df.count()

+--------+----------+-----------+----------+--------+----------+------------+------------+------------+
|order_id|order_date|customer_id|product_id|quantity|unit_price|total_amount|payment_mode|order_status|
+--------+----------+-----------+----------+--------+----------+------------+------------+------------+
|       1|2024-09-01|       1610|       362|       1|  15288.59|    15288.59| Credit Card|   Delivered|
|       2|2025-05-30|       1201|       307|       1|  43925.35|    43925.35| Credit Card|    Returned|
|       3|2024-06-26|        897|       116|       2|   1001.97|     2003.94| Credit Card|   Delivered|
|       4|2024-12-23|       1082|       213|       4|   1462.82|     5851.28| Net Banking|   Delivered|
|       5|2025-03-28|        989|       470|       3|   1638.69|     4916.07| Credit Card|   Delivered|
+--------+----------+-----------+----------+--------+----------+------------+------------+------------+
only showing top 5 rows


12010

In [0]:
### STANDARDIZE DATE FORMATS - Orders: unify date format to yyyy-MM-dd

from pyspark.sql.functions import to_date, col
orders_df = orders_df.withColumn(
    "order_date",
    to_date(col("order_date"))  # Spark auto-converts if already date
)


In [0]:
### CASE NORMALIZATION (Strings) - Example: emails, country, status

from pyspark.sql.functions import lower
customers_df = customers_df.withColumn("email", lower(col("email")))
orders_df = orders_df.withColumn("order_status", lower(col("order_status")))


In [0]:
### HANDLE NULL VALUES

customers_df = customers_df.fillna({
    "email": "unknown",
    "country": "unknown"
})

orders_df = orders_df.fillna({
    "quantity": 0,
    "total_amount": 0
})


In [0]:
### RENAME COLUMNS (BUSINESS FRIENDLY)
## Example:
## customer_id → CustomerID
## order_id → OrderID

customers_df = customers_df \
    .withColumnRenamed("customer_id", "CustomerID") \
    .withColumnRenamed("email", "Email")

orders_df = orders_df \
    .withColumnRenamed("order_id", "OrderID") \
    .withColumnRenamed("customer_id", "CustomerID")

orders_df.show(5)
customers_df.show(5)
orders_df.count()

+-------+----------+----------+----------+--------+----------+------------+------------+------------+
|OrderID|order_date|CustomerID|product_id|quantity|unit_price|total_amount|payment_mode|order_status|
+-------+----------+----------+----------+--------+----------+------------+------------+------------+
|      1|2024-09-01|      1610|       362|       1|  15288.59|    15288.59| Credit Card|   delivered|
|      2|2025-05-30|      1201|       307|       1|  43925.35|    43925.35| Credit Card|    returned|
|      3|2024-06-26|       897|       116|       2|   1001.97|     2003.94| Credit Card|   delivered|
|      4|2024-12-23|      1082|       213|       4|   1462.82|     5851.28| Net Banking|   delivered|
|      5|2025-03-28|       989|       470|       3|   1638.69|     4916.07| Credit Card|   delivered|
+-------+----------+----------+----------+--------+----------+------------+------------+------------+
only showing top 5 rows
+----------+----------------+--------------------+--------

12010

In [0]:
## DEDUPLICATION

orders_df = orders_df.dropDuplicates(["OrderID", "order_date"])


In [0]:
### QUALITY FLAGGING (DO NOT DELETE BAD DATA)
# Rule
# If:
# total_amount < 0
# OR quantity <= 0
# Mark record as Inconsistent


from pyspark.sql.functions import when, lit
orders_df = orders_df.withColumn(
    "quality_flag",
    when(
        (orders_df.total_amount < 0) | (orders_df.quantity <= 0),
        lit("Inconsistent")
    ).otherwise(lit("Valid"))
)


In [0]:
# REGEX DATA VALIDATION (EMAIL)
# Rule-
# Email must contain @

from pyspark.sql.functions import col
customers_df = customers_df.withColumn(
    "email_valid",
    when(col("Email").rlike(".+@.+"), lit("Valid"))
    .otherwise(lit("Invalid"))
)


In [0]:
# DERIVED COLUMNS (BUSINESS METRICS)

orders_df = orders_df.withColumn(
    "calculated_total_amount",
    orders_df.quantity * orders_df.unit_price
)


In [0]:
# %sql
# CREATE SCD TYPE 2 TABLE (FIRST RUN ONLY)
# Run this only once

from pyspark.sql.functions import lower, current_date, lit

customers_scd_init_df = customers_df \
    .withColumn("customer_sk", customers_df.CustomerID) \
    .withColumn("start_date", current_date()) \
    .withColumn("end_date", lit(None).cast("date")) \
    .withColumn("is_current", lit(True))

customers_scd_init_df.write \
    .format("delta") \
    .mode("overwrite") \
    .save('abfss://datalake@datalakehouse1ecommerce.dfs.core.windows.net/silver/customers_scd')



In [0]:
# SCD TYPE 2 MERGE LOGIC (CORE STEP)
# This handles:
# New customers
# Changed customers
# History preservation

from delta.tables import DeltaTable
import time

# Configuration
max_retries = 5        # Number of retry attempts
retry_interval = 5     # Seconds to wait between retries

# Delta table path
scd_table_path = "abfss://datalake@datalakehouse1ecommerce.dfs.core.windows.net/silver/customers_scd"

# Load Delta table
customers_scd_table = DeltaTable.forPath(spark, scd_table_path)

# Retry loop
retry_count = 0
while retry_count < max_retries:
    try:
        # SCD Type 2 MERGE logic
        customers_scd_table.alias("target").merge(
            customers_df.alias("source"),
            "target.CustomerID = source.CustomerID AND target.is_current = true"
        ).whenMatchedUpdate(
            condition="""target.Email <> source.Email OR target.Country <> source.Country""",
            set={
                "end_date": "current_date()",
                "is_current": "false"
            }
        ).whenNotMatchedInsert(
            values={
                "customer_sk": "source.CustomerID",
                "CustomerID": "source.CustomerID",
                "Email": "source.Email",
                "Country": "source.Country",
                "start_date": "current_date()",
                "end_date": "null",
                "is_current": "true"
            }
        ).execute()
        
        print("MERGE completed successfully!")
        break  # Exit retry loop if successful

    except Exception as e:
        # Handle concurrent append exception
        if "DELTA_CONCURRENT_APPEND" in str(e):
            retry_count += 1
            print(f"[Retry {retry_count}/{max_retries}] Conflict detected. Waiting {retry_interval}s before retrying...")
            time.sleep(retry_interval)
        else:
            # Raise other exceptions
            raise
else:
    raise Exception(f"MERGE failed after {max_retries} retries due to concurrent writes.")


DEBUG:ThreadMonitor:Logging python thread stack frames for MainThread and py4j threads:
DEBUG:ThreadMonitor:Logging Thread-48 (run) stack frames:
  File "/usr/lib/python3.12/threading.py", line 1030, in _bootstrap
    self._bootstrap_inner()
  File "/usr/lib/python3.12/threading.py", line 1073, in _bootstrap_inner
    self.run()
  File "/databricks/python/lib/python3.12/site-packages/ipykernel/ipkernel.py", line 766, in run_closure
    _threading_Thread_run(self)
  File "/usr/lib/python3.12/threading.py", line 1010, in run
    self._target(*self._args, **self._kwargs)
  File "/databricks/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/clientserver.py", line 521, in run
    self.wait_for_commands()
  File "/databricks/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/clientserver.py", line 593, in wait_for_commands
    command = smart_decode(self.stream.readline())[:-1]
  File "/usr/lib/python3.12/socket.py", line 707, in readinto
    return self._sock.recv_into(b)

DEBUG:ThreadMonitor:Logging Th

MERGE completed successfully!


In [0]:
### Writing the data to silver layer

products_df.write \
.format("delta") \
.mode("overwrite") \
.save('abfss://datalake@datalakehouse1ecommerce.dfs.core.windows.net/silver/products')

orders_df.write \
.format("delta") \
.mode("overwrite") \
.option("mergeSchema", "true") \
.option("overwriteSchema", "true") \
.save('abfss://datalake@datalakehouse1ecommerce.dfs.core.windows.net/silver/orders')

### Gold Layer


In [0]:

base_path = "abfss://datalake@datalakehouse1ecommerce.dfs.core.windows.net"

customers_df = spark.read.format("delta").load(f"{base_path}/silver/customers_scd")
products_df  = spark.read.format("delta").load(f"{base_path}/silver/products")
orders_df    = spark.read.format("delta").load(f"{base_path}/silver/orders")

customers_current_df = customers_df.filter("is_current = true")

orders_df.show(5)
orders_df.count()


+-------+----------+----------+----------+--------+----------+------------+------------+------------+------------+-----------------------+
|OrderID|order_date|CustomerID|product_id|quantity|unit_price|total_amount|payment_mode|order_status|quality_flag|calculated_total_amount|
+-------+----------+----------+----------+--------+----------+------------+------------+------------+------------+-----------------------+
|      1|2024-09-01|      1610|       362|       1|  15288.59|    15288.59| Credit Card|   delivered|       Valid|               15288.59|
|      2|2025-05-30|      1201|       307|       1|  43925.35|    43925.35| Credit Card|    returned|       Valid|               43925.35|
|      3|2024-06-26|       897|       116|       2|   1001.97|     2003.94| Credit Card|   delivered|       Valid|                2003.94|
|      4|2024-12-23|      1082|       213|       4|   1462.82|     5851.28| Net Banking|   delivered|       Valid|                5851.28|
|      5|2025-03-28|       

12010

In [0]:
from pyspark.sql.functions import sum, count
orders_daily_agg_df = orders_df.groupBy("order_date").agg(
    count("OrderID").alias("total_orders"),
    sum("quantity").alias("total_quantity"),
    sum("total_amount").alias("total_revenue")
)


In [0]:
# Customers → Gold (Partitioned)
# Partitioning on a low-cardinality column is best.

base_path = "abfss://datalake@datalakehouse1ecommerce.dfs.core.windows.net"

customers_current_df.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("Country") \
    .save(f"{base_path}/gold/customers")


products_df.write \
    .format("delta") \
    .mode("overwrite") \
    .save(f"{base_path}/gold/products")

# Orders → Gold (Partition by Date)

orders_df.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("order_date") \
    .save(f"{base_path}/gold/orders")

# Orders Daily Aggregate → Gold

orders_daily_agg_df.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("order_date") \
    .save(f"{base_path}/gold/orders_daily_agg")


DEBUG:ThreadMonitor:Logging python thread stack frames for MainThread and py4j threads:
DEBUG:ThreadMonitor:Logging Thread-48 (run) stack frames:
  File "/usr/lib/python3.12/threading.py", line 1030, in _bootstrap
    self._bootstrap_inner()
  File "/usr/lib/python3.12/threading.py", line 1073, in _bootstrap_inner
    self.run()
  File "/databricks/python/lib/python3.12/site-packages/ipykernel/ipkernel.py", line 766, in run_closure
    _threading_Thread_run(self)
  File "/usr/lib/python3.12/threading.py", line 1010, in run
    self._target(*self._args, **self._kwargs)
  File "/databricks/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/clientserver.py", line 521, in run
    self.wait_for_commands()
  File "/databricks/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/clientserver.py", line 593, in wait_for_commands
    command = smart_decode(self.stream.readline())[:-1]
  File "/usr/lib/python3.12/socket.py", line 707, in readinto
    return self._sock.recv_into(b)

DEBUG:ThreadMonitor:Logging Th

In [0]:
## optimization

spark.sql(f"""
OPTIMIZE delta.`{base_path}/gold/orders`
ZORDER BY (OrderID, CustomerID)
""")


DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,clusteringStats:struct<inputZCubeFiles:struct<numFiles:bigint,size:bigint>,inputOtherFiles:struct<numFiles:bigint,size:bigint>,inputNumZCubes:bigint,mergedFiles:struct<numFiles:bigint,size:bigint>,numOutputZCubes:bigint>,numBins:bigint,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,

In [0]:
spark.sql(f"""
OPTIMIZE delta.`{base_path}/gold/customers`
ZORDER BY (CustomerID)
""")


DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,clusteringStats:struct<inputZCubeFiles:struct<numFiles:bigint,size:bigint>,inputOtherFiles:struct<numFiles:bigint,size:bigint>,inputNumZCubes:bigint,mergedFiles:struct<numFiles:bigint,size:bigint>,numOutputZCubes:bigint>,numBins:bigint,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,

In [0]:
### WINDOW FUNCTION
# Most Recent Purchase per Customer (row_number)
# For each customer, find their latest purchase using row_number().

from pyspark.sql import functions as F
from pyspark.sql.window import Window

base_path = "abfss://datalake@datalakehouse1ecommerce.dfs.core.windows.net"

orders_gold_df = spark.read.format("delta").load(f"{base_path}/gold/orders")
customers_gold_df = spark.read.format("delta").load(f"{base_path}/gold/customers")

window_spec = Window.partitionBy("CustomerID") \
                    .orderBy(F.col("order_date").desc())

latest_purchase_df = orders_gold_df \
    .withColumn("rn", F.row_number().over(window_spec)) \
    .filter(F.col("rn") == 1) \
    .drop("rn")

display(latest_purchase_df)




DEBUG:ThreadMonitor:Logging python thread stack frames for MainThread and py4j threads:
DEBUG:ThreadMonitor:Logging Thread-49 (run) stack frames:
  File "/usr/lib/python3.12/threading.py", line 1030, in _bootstrap
    self._bootstrap_inner()
  File "/usr/lib/python3.12/threading.py", line 1073, in _bootstrap_inner
    self.run()
  File "/databricks/python/lib/python3.12/site-packages/ipykernel/ipkernel.py", line 766, in run_closure
    _threading_Thread_run(self)
  File "/usr/lib/python3.12/threading.py", line 1010, in run
    self._target(*self._args, **self._kwargs)
  File "/databricks/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/clientserver.py", line 521, in run
    self.wait_for_commands()
  File "/databricks/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/clientserver.py", line 593, in wait_for_commands
    command = smart_decode(self.stream.readline())[:-1]
  File "/usr/lib/python3.12/socket.py", line 707, in readinto
    return self._sock.recv_into(b)

DEBUG:ThreadMonitor:Logging Th

OrderID,order_date,CustomerID,product_id,quantity,unit_price,total_amount,payment_mode,order_status,quality_flag,calculated_total_amount
1969,2025-07-12,1,175,2,27338.54,54677.08,UPI,delivered,Valid,54677.08
4282,2025-12-23,2,364,3,34942.84,104828.52,Cash,cancelled,Valid,104828.52
3632,2025-08-18,3,412,1,48684.24,48684.24,UPI,delivered,Valid,48684.24
4625,2025-08-02,4,309,5,11977.56,59887.8,Net Banking,delivered,Valid,59887.8
3440,2025-11-16,5,271,2,7116.41,14232.82,Debit Card,delivered,Valid,14232.82
8151,2025-12-15,6,203,3,34210.07,102630.21,Cash,delivered,Valid,102630.21
6439,2025-10-03,7,455,5,43882.67,219413.35,Net Banking,returned,Valid,219413.35
1663,2025-12-13,8,305,3,13313.67,39941.01,Cash,returned,Valid,39941.01
9213,2025-06-04,9,116,1,1001.97,1001.97,UPI,cancelled,Valid,1001.97
3186,2025-08-16,10,65,2,8981.33,17962.66,Cash,returned,Valid,17962.66


In [0]:
## BROADCAST JOIN OPTIMIZATION

from pyspark.sql.functions import broadcast

orders = orders_gold_df.join(
    broadcast(customers_gold_df),
    on="CustomerID",
    how="inner"
)

orders.show()



+----------+-------+----------+----------+--------+----------+------------+------------+------------+------------+-----------------------+--------------------+--------------------+--------------------+------------------+------------+-------+-----------+-----------+-----------+----------+--------+----------+
|CustomerID|OrderID|order_date|product_id|quantity|unit_price|total_amount|payment_mode|order_status|quality_flag|calculated_total_amount|       customer_name|               Email|               phone|              city|       state|country|signup_date|email_valid|customer_sk|start_date|end_date|is_current|
+----------+-------+----------+----------+--------+----------+------------+------------+------------+------------+-----------------------+--------------------+--------------------+--------------------+------------------+------------+-------+-----------+-----------+-----------+----------+--------+----------+
|       317|    140|2024-12-24|       142|       1|   4945.33|     4945.3