In [0]:
# Basic setup â€“ importing required Delta + Spark functions.
# Also creates the storage path inside a Databricks Volume 
from delta.tables import DeltaTable
from pyspark.sql.functions import col, to_date
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType
import datetime, traceback

path = "dbfs:/Volumes/rithika/default/data/shopez_new/orders"
dbutils.fs.mkdirs(path)
print("Using path:", path)


Using path: dbfs:/Volumes/rithika/default/data/shopez_new/orders


In [0]:
# Task 1: Ingest sample order data into a Spark DataFrame.
# We simulate incoming JSON/CSV by creating Python dictionaries and applying a schema.
# This represents daily data ingestion step.

sample_data_v1 = [
    {"order_id": "o1001", "order_timestamp": "2025-11-28 10:05:00", "customer_id": "c100", "country": "US", "amount": 120.50, "currency": "USD", "status": "CREATED"},
    {"order_id": "o1002", "order_timestamp": "2025-11-28 11:10:00", "customer_id": "c101", "country": "IN", "amount": 2500.00, "currency": "INR", "status": "PAID"},
    {"order_id": "o1003", "order_timestamp": "2025-11-29 09:00:00", "customer_id": "c102", "country": "US", "amount": 15.00, "currency": "USD", "status": "PAID"},
    {"order_id": "o1004", "order_timestamp": "2025-11-29 12:30:00", "customer_id": "c103", "country": "GB", "amount": 75.00, "currency": "GBP", "status": "CREATED"},
]

schema_v1 = StructType([
    StructField("order_id", StringType(), False),
    StructField("order_timestamp", StringType(), False),
    StructField("customer_id", StringType(), False),
    StructField("country", StringType(), False),
    StructField("amount", DoubleType(), False),
    StructField("currency", StringType(), False),
    StructField("status", StringType(), False),
])

df_v1 = spark.createDataFrame(sample_data_v1, schema=schema_v1)

# Task 2: Add derived column 'order_date' for partitioning.
# Extracting only the DATE from a TIMESTAMP reduces the number of partitions and speeds queries.
df_v1 = df_v1.withColumn("order_timestamp", col("order_timestamp").cast(TimestampType()))
df_v1 = df_v1.withColumn("order_date", to_date(col("order_timestamp")))

display(df_v1)


order_id,order_timestamp,customer_id,country,amount,currency,status,order_date
o1001,2025-11-28T10:05:00Z,c100,US,120.5,USD,CREATED,2025-11-28
o1002,2025-11-28T11:10:00Z,c101,IN,2500.0,INR,PAID,2025-11-28
o1003,2025-11-29T09:00:00Z,c102,US,15.0,USD,PAID,2025-11-29
o1004,2025-11-29T12:30:00Z,c103,GB,75.0,GBP,CREATED,2025-11-29


In [0]:
# Task 3: Write the DataFrame as a Delta table partitioned by country + order_date.
# Partitioning helps reduce file scanning and improves performance for country/date filters.

df_v1.write.format("delta") \
    .mode("overwrite") \
    .partitionBy("country", "order_date") \
    .save(path)

print("Delta table written successfully.")


Delta table written successfully.


In [0]:

registered_in_hive = False

try:
    spark.sql("USE CATALOG hive_metastore")
    spark.sql("CREATE DATABASE IF NOT EXISTS default")
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS default.shopez_orders
        USING DELTA
        LOCATION '{path}'
    """)
    registered_in_hive = True
    print("Table registered in hive_metastore.")
except:
    print("Registration failed, falling back to TEMP VIEW.")

if not registered_in_hive:
    spark.read.format("delta").load(path).createOrReplaceTempView("shopez_orders_v")
    print("Created TEMP VIEW: shopez_orders_v")


Table registered in hive_metastore.


In [0]:
# Task 4: Verify the partition structure inside the storage path.
# This confirms that country and order_date folders were created properly.

print("Parent folder:")
display(dbutils.fs.ls("dbfs:/Volumes/rithika/default/data/shopez_new"))

print("Delta table contents:")
display(dbutils.fs.ls(path))

print("Partition directories:")
for p in dbutils.fs.ls(path):
    if p.isDir():
        print("-", p.path)


Parent folder:


path,name,size,modificationTime
dbfs:/Volumes/rithika/default/data/shopez_new/orders/,orders/,0,1764599065000
dbfs:/Volumes/rithika/default/data/shopez_new/orders_smallfiles_demo/,orders_smallfiles_demo/,0,1764599837000


Delta table contents:


path,name,size,modificationTime
dbfs:/Volumes/rithika/default/data/shopez_new/orders/_delta_log/,_delta_log/,0,1764599288000
dbfs:/Volumes/rithika/default/data/shopez_new/orders/country=GB/,country=GB/,0,1764599290000
dbfs:/Volumes/rithika/default/data/shopez_new/orders/country=IN/,country=IN/,0,1764599290000
dbfs:/Volumes/rithika/default/data/shopez_new/orders/country=US/,country=US/,0,1764599289000
dbfs:/Volumes/rithika/default/data/shopez_new/orders/deletion_vector_23f98b53-b8f9-486d-9f5b-7b49b5be48d7.bin,deletion_vector_23f98b53-b8f9-486d-9f5b-7b49b5be48d7.bin,43,1764599305000
dbfs:/Volumes/rithika/default/data/shopez_new/orders/deletion_vector_3fcc21d0-1811-4c21-9c6e-ce37fa0a6508.bin,deletion_vector_3fcc21d0-1811-4c21-9c6e-ce37fa0a6508.bin,43,1764601015000
dbfs:/Volumes/rithika/default/data/shopez_new/orders/deletion_vector_5eff52c8-2f2c-4754-bdb4-17f1ae6b1687.bin,deletion_vector_5eff52c8-2f2c-4754-bdb4-17f1ae6b1687.bin,43,1764599873000
dbfs:/Volumes/rithika/default/data/shopez_new/orders/deletion_vector_7d25368c-e50e-443f-98f7-7c565cebddca.bin,deletion_vector_7d25368c-e50e-443f-98f7-7c565cebddca.bin,43,1764599299000
dbfs:/Volumes/rithika/default/data/shopez_new/orders/deletion_vector_8674d8bb-b3a6-4fb1-8985-bebf4739881d.bin,deletion_vector_8674d8bb-b3a6-4fb1-8985-bebf4739881d.bin,43,1764599758000
dbfs:/Volumes/rithika/default/data/shopez_new/orders/deletion_vector_9d3d622f-0d51-4043-8cb8-ddeae2fd650a.bin,deletion_vector_9d3d622f-0d51-4043-8cb8-ddeae2fd650a.bin,43,1764599752000


Partition directories:
- dbfs:/Volumes/rithika/default/data/shopez_new/orders/_delta_log/
- dbfs:/Volumes/rithika/default/data/shopez_new/orders/country=GB/
- dbfs:/Volumes/rithika/default/data/shopez_new/orders/country=IN/
- dbfs:/Volumes/rithika/default/data/shopez_new/orders/country=US/


In [0]:
# Task 5: Partition pruning demo.
# When filtering by partition columns, Spark will skip scanning unrelated folders.

q = spark.read.format("delta").load(path) \
    .filter((col("country") == "US") & (col("order_date") == "2025-11-28"))

display(q)

print("Explain Plan (to verify partition pruning):")
q.explain(True)


order_id,order_timestamp,customer_id,country,amount,currency,status,order_date,payment_method,coupon_code
o1001,2025-11-28T10:05:00Z,c100,US,120.5,USD,CREATED,2025-11-28,,


Explain Plan (to verify partition pruning):
== Parsed Logical Plan ==
'Filter 'and('`=`('country, US), '`=`('order_date, 2025-11-28))
+- Relation [order_id#12379,order_timestamp#12380,customer_id#12381,country#12382,amount#12383,currency#12384,status#12385,order_date#12386,payment_method#12387,coupon_code#12388] parquet

== Analyzed Logical Plan ==
order_id: string, order_timestamp: timestamp, customer_id: string, country: string, amount: double, currency: string, status: string, order_date: date, payment_method: string, coupon_code: string
Filter ((country#12382 = US) AND (order_date#12386 = cast(2025-11-28 as date)))
+- Relation [order_id#12379,order_timestamp#12380,customer_id#12381,country#12382,amount#12383,currency#12384,status#12385,order_date#12386,payment_method#12387,coupon_code#12388] parquet

== Optimized Logical Plan ==
Filter (((isnotnull(order_date#12386) AND (order_date#12386 = 2025-11-28)) AND isnotnull(country#12382)) AND (country#12382 = US))
+- Relation [order_id#12

In [0]:
# Task 6: Time Travel demo.
# Delta Lake keeps versions of data, allowing rollback or audit via version numbers.

dt = DeltaTable.forPath(spark, path)

print("History before update:")
dt.history().show(truncate=False)

dt.update("order_id = 'o1001'", {"status": "'PAID'"})  # creates new version

print("History after update:")
dt.history().show(truncate=False)

print("Reading version 0 (before update):")
display(spark.read.format("delta").option("versionAsOf", 0).load(path))


History before update:
+-------+-------------------+---------------+-------------------------+---------+----------------------------------------------------------------------------------+----+------------------+--------------------+-----------+-----------------+-------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+------------------------------------------+
|version|timestamp          |userId         |userName                 |operation|operationParameters                                                               |job |notebook          |clusterId           |readVersion|isolationLevel   |isBlindAppend|operationMetrics                                                                               

order_id,order_timestamp,customer_id,country,amount,currency,status,order_date
o1001,2025-11-28T10:05:00Z,c100,US,120.5,USD,CREATED,2025-11-28
o1004,2025-11-29T12:30:00Z,c103,GB,75.0,GBP,CREATED,2025-11-29
o1002,2025-11-28T11:10:00Z,c101,IN,2500.0,INR,PAID,2025-11-28
o1003,2025-11-29T09:00:00Z,c102,US,15.0,USD,PAID,2025-11-29


In [0]:
# Task 7: Schema evolution.
# New columns (payment_method, coupon_code) are added in new incoming data.
# mergeSchema=true allows the Delta table to automatically update column definitions.

sample_data_v2 = [
    {"order_id": "o1005", "order_timestamp": "2025-11-30 14:00:00", "customer_id": "c104", "country": "IN",
     "amount": 499.0, "currency": "INR", "status": "PAID", "payment_method": "UPI", "coupon_code": None},
    {"order_id": "o1006", "order_timestamp": "2025-11-30 15:30:00", "customer_id": "c105", "country": "US",
     "amount": 199.99, "currency": "USD", "status": "CREATED", "payment_method": "CARD", "coupon_code": "WELCOME10"},
]

schema_v2 = StructType(schema_v1.fields + [
    StructField("payment_method", StringType(), True),
    StructField("coupon_code", StringType(), True),
])

df_v2 = spark.createDataFrame(sample_data_v2, schema=schema_v2)
df_v2 = df_v2.withColumn("order_timestamp", col("order_timestamp").cast(TimestampType()))
df_v2 = df_v2.withColumn("order_date", to_date(col("order_timestamp")))

df_v2.write.format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .partitionBy("country", "order_date") \
    .save(path)

display(spark.read.format("delta").load(path))


order_id,order_timestamp,customer_id,country,amount,currency,status,order_date,payment_method,coupon_code
o1006,2025-11-30T15:30:00Z,c105,US,199.99,USD,CREATED,2025-11-30,CARD,WELCOME10
o1001,2025-11-28T10:05:00Z,c100,US,120.5,USD,PAID,2025-11-28,,
o1005,2025-11-30T14:00:00Z,c104,IN,499.0,INR,PAID,2025-11-30,UPI,
o1004,2025-11-29T12:30:00Z,c103,GB,75.0,GBP,CREATED,2025-11-29,,
o1002,2025-11-28T11:10:00Z,c101,IN,2500.0,INR,PAID,2025-11-28,,
o1003,2025-11-29T09:00:00Z,c102,US,15.0,USD,PAID,2025-11-29,,


In [0]:
# Task 8: Updates & Deletes.
# Demonstrates how Delta supports ACID transactions for modifying existing data safely.

dt = DeltaTable.forPath(spark, path)

dt.update("amount < 20", {"status": "'CANCELLED'"})  # update rule
dt.delete("amount < 10")  # delete rule

display(spark.read.format("delta").load(path))


order_id,order_timestamp,customer_id,country,amount,currency,status,order_date,payment_method,coupon_code
o1006,2025-11-30T15:30:00Z,c105,US,199.99,USD,CREATED,2025-11-30,CARD,WELCOME10
o1003,2025-11-29T09:00:00Z,c102,US,15.0,USD,CANCELLED,2025-11-29,,
o1001,2025-11-28T10:05:00Z,c100,US,120.5,USD,PAID,2025-11-28,,
o1005,2025-11-30T14:00:00Z,c104,IN,499.0,INR,PAID,2025-11-30,UPI,
o1004,2025-11-29T12:30:00Z,c103,GB,75.0,GBP,CREATED,2025-11-29,,
o1002,2025-11-28T11:10:00Z,c101,IN,2500.0,INR,PAID,2025-11-28,,


In [0]:
# Task 9: OPTIMIZE and optional ZORDER.
# These improve performance by compacting small files and clustering data.

try:
    spark.sql(f"OPTIMIZE delta.`{path}`")
    spark.sql(f"OPTIMIZE delta.`{path}` ZORDER BY (customer_id)")
    print("OPTIMIZE + ZORDER finished.")
except Exception as e:
    print("OPTIMIZE/ZORDER not supported:", e)


OPTIMIZE + ZORDER finished.


In [0]:
# Task 10: Small file problem demo.
# Creating many tiny partitions shows why OPTIMIZE is needed to compact Delta files.

small_path = path + "_smallfiles_demo"
dbutils.fs.rm(small_path, recurse=True)

rows = []
base = datetime.date(2025, 11, 1)
for i in range(50):
    rows.append({
        "order_id": f"tiny_{i}",
        "order_timestamp": f"2025-11-{(i % 30) + 1:02d} 10:00:00",
        "customer_id": f"ct_{i}",
        "country": "US",
        "amount": float(i + 1),
        "currency": "USD",
        "status": "PAID",
        "order_date": base + datetime.timedelta(days=i)
    })

tiny_df = spark.createDataFrame(rows)
tiny_df = tiny_df.withColumn("order_timestamp", col("order_timestamp").cast(TimestampType()))
tiny_df = tiny_df.withColumn("order_date", to_date(col("order_date")))

tiny_df.write.format("delta").mode("overwrite").partitionBy("order_date").save(small_path)

display(dbutils.fs.ls(small_path))

# Try compacting small files
try:
    spark.sql(f"OPTIMIZE delta.`{small_path}`")
except:
    print("OPTIMIZE not supported on this runtime.")


path,name,size,modificationTime
dbfs:/Volumes/rithika/default/data/shopez_new/orders_smallfiles_demo/_delta_log/,_delta_log/,0,1764602779000
dbfs:/Volumes/rithika/default/data/shopez_new/orders_smallfiles_demo/order_date=2025-11-01/,order_date=2025-11-01/,0,1764602779000
dbfs:/Volumes/rithika/default/data/shopez_new/orders_smallfiles_demo/order_date=2025-11-02/,order_date=2025-11-02/,0,1764602780000
dbfs:/Volumes/rithika/default/data/shopez_new/orders_smallfiles_demo/order_date=2025-11-03/,order_date=2025-11-03/,0,1764602780000
dbfs:/Volumes/rithika/default/data/shopez_new/orders_smallfiles_demo/order_date=2025-11-04/,order_date=2025-11-04/,0,1764602780000
dbfs:/Volumes/rithika/default/data/shopez_new/orders_smallfiles_demo/order_date=2025-11-05/,order_date=2025-11-05/,0,1764602780000
dbfs:/Volumes/rithika/default/data/shopez_new/orders_smallfiles_demo/order_date=2025-11-06/,order_date=2025-11-06/,0,1764602780000
dbfs:/Volumes/rithika/default/data/shopez_new/orders_smallfiles_demo/order_date=2025-11-07/,order_date=2025-11-07/,0,1764602780000
dbfs:/Volumes/rithika/default/data/shopez_new/orders_smallfiles_demo/order_date=2025-11-08/,order_date=2025-11-08/,0,1764602780000
dbfs:/Volumes/rithika/default/data/shopez_new/orders_smallfiles_demo/order_date=2025-11-09/,order_date=2025-11-09/,0,1764602780000
