In [0]:
# STEP 0 — Setup
# These variables store the paths for each stage of the pipeline

# Landing folders: raw files exactly as they arrive
LANDING_ORDERS    = "dbfs:/FileStore/tables/dlt/landing/orders"
LANDING_CUSTOMERS = "dbfs:/FileStore/tables/dlt/landing/customers"

# Silver folder: cleaned data in Delta format
DELTA_SILVER_PATH = "dbfs:/tmp/delta/sil_orders"

# SQL table name pointing to the silver Delta folder
DELTA_TABLE_NAME  = "sil_orders_tbl"

print("We will store data in:")
print(f"Landing Orders folder:    {LANDING_ORDERS}")
print(f"Landing Customers folder: {LANDING_CUSTOMERS}")
print(f"Silver Delta folder:      {DELTA_SILVER_PATH}")
print(f"SQL Table name:           {DELTA_TABLE_NAME}")

We will store data in:
Landing Orders folder:    dbfs:/FileStore/tables/dlt/landing/orders
Landing Customers folder: dbfs:/FileStore/tables/dlt/landing/customers
Silver Delta folder:      dbfs:/tmp/delta/sil_orders
SQL Table name:           sil_orders_tbl


In [0]:
import pyspark.sql.types as T
import pyspark.sql.functions as F
print("STEP 1: Seeding inline data to landing (JSON) ...")

orders_rows = [
    (1, "C001", "2025-08-08 09:00:00", 12000, "placed"),
    (2, "C002", "2025-08-08 09:05:00",  4500, "placed"),
    (3, "C001", "2025-08-08 09:10:00", 22000, "cancelled"),
    (4, "C003", "2025-08-08 09:15:00",   800, "placed")
]
customers_rows = [
    ("C001", "Ananya", "Bengaluru"),
    ("C002", "Rahul",  "Hyderabad"),
    ("C003", "Meera",  "Pune")
]

orders_schema = T.StructType([
    T.StructField("order_id",    T.IntegerType()),
    T.StructField("customer_id", T.StringType()),
    T.StructField("order_ts",    T.StringType()),
    T.StructField("amount",      T.IntegerType()),
    T.StructField("status",      T.StringType())
])
cust_schema = T.StructType([
    T.StructField("customer_id", T.StringType()),
    T.StructField("name",        T.StringType()),
    T.StructField("city",        T.StringType())
])

orders_df = (spark.createDataFrame(orders_rows, orders_schema)
             .withColumn("order_ts", F.to_timestamp("order_ts")))
customers_df = spark.createDataFrame(customers_rows, cust_schema)

orders_df.write.mode("overwrite").json(LANDING_ORDERS)
customers_df.write.mode("overwrite").json(LANDING_CUSTOMERS)

print("✅ Seeded landing JSON:")
print(f"  {LANDING_ORDERS}")
print(f"  {LANDING_CUSTOMERS}")


STEP 1: Seeding inline data to landing (JSON) ...
✅ Seeded landing JSON:
  dbfs:/FileStore/tables/dlt/landing/orders
  dbfs:/FileStore/tables/dlt/landing/customers


In [0]:
print("step 2 print raw data-bronze data")
bronze_orders = spark.read.json(LANDING_ORDERS)
bronze_customers = spark.read.json(LANDING_CUSTOMERS)

print("raw orders schema and sample")
bronze_orders.printSchema()
bronze_orders.show()

print("raw customers and sampe")
bronze_customers.printSchema()
bronze_customers.show()

step 2 print raw data-bronze data
raw orders schema and sample
root
 |-- amount: long (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_id: long (nullable = true)
 |-- order_ts: string (nullable = true)
 |-- status: string (nullable = true)

+------+-----------+--------+--------------------+---------+
|amount|customer_id|order_id|            order_ts|   status|
+------+-----------+--------+--------------------+---------+
| 22000|       C001|       3|2025-08-08T09:10:...|cancelled|
| 12000|       C001|       1|2025-08-08T09:00:...|   placed|
|  4500|       C002|       2|2025-08-08T09:05:...|   placed|
|   800|       C003|       4|2025-08-08T09:15:...|   placed|
+------+-----------+--------+--------------------+---------+

raw customers and sampe
root
 |-- city: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- name: string (nullable = true)

+---------+-----------+------+
|     city|customer_id|  name|
+---------+-----------+------+
|Bengalur

In [0]:
sil_orders = bronze_orders.select("order_id", "customer_id", "order_ts", "amount", "status").filter("order_id is not null and amount > 0")
sil_orders.write.format("delta").mode("overwrite").save(DELTA_SILVER_PATH)
print("wrote silver orders to delta")
print(f"{DELTA_SILVER_PATH}")
print("reading back from delta silver path")
spark.read.format("delta").load(DELTA_SILVER_PATH).show()


wrote silver orders to delta
dbfs:/tmp/delta/sil_orders
reading back from delta silver path
+--------+-----------+--------------------+------+---------+
|order_id|customer_id|            order_ts|amount|   status|
+--------+-----------+--------------------+------+---------+
|       3|       C001|2025-08-08T09:10:...| 22000|cancelled|
|       1|       C001|2025-08-08T09:00:...| 12000|   placed|
|       4|       C003|2025-08-08T09:15:...|   800|   placed|
|       2|       C002|2025-08-08T09:05:...|  4500|   placed|
+--------+-----------+--------------------+------+---------+



In [0]:
print("gold enriched orders by joining with customers")
sil_orders_df = spark.read.format("delta").load(DELTA_SILVER_PATH)
gold_enriched = (sil_orders_df.alias("o").join(bronze_customers.alias("c"), on = "customer_id", how = "left"))
print("gold enriched sample")
gold_enriched.show()

gold enriched orders by joining with customers
gold enriched sample
+-----------+--------+--------------------+------+---------+---------+------+
|customer_id|order_id|            order_ts|amount|   status|     city|  name|
+-----------+--------+--------------------+------+---------+---------+------+
|       C001|       3|2025-08-08T09:10:...| 22000|cancelled|Bengaluru|Ananya|
|       C001|       1|2025-08-08T09:00:...| 12000|   placed|Bengaluru|Ananya|
|       C003|       4|2025-08-08T09:15:...|   800|   placed|     Pune| Meera|
|       C002|       2|2025-08-08T09:05:...|  4500|   placed|Hyderabad| Rahul|
+-----------+--------+--------------------+------+---------+---------+------+



In [0]:
# update, delete and upsert operations
from pyspark.sql.functions import col
from delta.tables import DeltaTable
delta_Table = DeltaTable.forPath(spark, DELTA_SILVER_PATH)
delta_Table.update(
   condition= col('order_id') == 1,
   set = {"status": "'cancelled'"}
)
delta_Table.toDF().show()
delta_Table.delete(condition = "order_id = 4")
delta_Table.toDF().show()
delta_Table.alias("t").merge(
    gold_enriched.alias("s"),
    "t.order_id = s.order_id"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
delta_Table.toDF().show()

+--------+-----------+--------------------+------+---------+
|order_id|customer_id|            order_ts|amount|   status|
+--------+-----------+--------------------+------+---------+
|       1|       C001|2025-08-08T09:00:...| 12000|cancelled|
|       3|       C001|2025-08-08T09:10:...| 22000|cancelled|
|       4|       C003|2025-08-08T09:15:...|   800|   placed|
|       2|       C002|2025-08-08T09:05:...|  4500|   placed|
+--------+-----------+--------------------+------+---------+

+--------+-----------+--------------------+------+---------+
|order_id|customer_id|            order_ts|amount|   status|
+--------+-----------+--------------------+------+---------+
|       1|       C001|2025-08-08T09:00:...| 12000|cancelled|
|       3|       C001|2025-08-08T09:10:...| 22000|cancelled|
|       2|       C002|2025-08-08T09:05:...|  4500|   placed|
+--------+-----------+--------------------+------+---------+

+--------+-----------+--------------------+------+---------+
|order_id|customer_id|