In [0]:
# Import schema types
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# Import functions
from pyspark.sql.functions import col, to_date, to_timestamp, lit, expr

# Delta Lake API
from delta.tables import DeltaTable


1.Data Ingestion

In [0]:
# Creating 10 input order records with timestamp as strings
data = [
    ("o-1001","2025-11-30 10:12:00","c-501","US",120.50,"USD","CREATED"),
    ("o-1002","2025-11-30 10:15:00","c-502","IN",50.00,"INR","PAID"),
    ("o-1003","2025-11-30 11:00:00","c-503","US",249.99,"USD","PAID"),
    ("o-1004","2025-11-30 12:05:00","c-504","GB",30.00,"GBP","CANCELLED"),
    ("o-1005","2025-11-30 13:20:00","c-505","IN",10.00,"INR","CREATED"),
    ("o-1006","2025-12-01 09:00:00","c-506","US",75.00,"USD","PAID"),
    ("o-1007","2025-12-01 09:10:00","c-507","IN",20.00,"INR","PAID"),
    ("o-1008","2025-12-01 10:00:00","c-508","GB",99.00,"GBP","CREATED"),
    ("o-1009","2025-12-01 10:30:00","c-509","US",199.99,"USD","PAID"),
    ("o-1010","2025-12-01 11:00:00","c-510","IN",60.00,"INR","CREATED")
]

# Create schema BUT make timestamp temporarily StringType
schema = StructType([
    StructField("order_id", StringType(), False),
    StructField("order_timestamp", StringType(), False),   # FIX: StringType first
    StructField("customer_id", StringType(), False),
    StructField("country", StringType(), False),
    StructField("amount", DoubleType(), False),
    StructField("currency", StringType(), False),
    StructField("status", StringType(), False)
])

# Create DataFrame
df = spark.createDataFrame(data, schema)

# Convert order_timestamp string â†’ actual timestamp using to_timestamp()
from pyspark.sql.functions import to_timestamp
df = df.withColumn("order_timestamp", to_timestamp("order_timestamp"))

display(df)


order_id,order_timestamp,customer_id,country,amount,currency,status
o-1001,2025-11-30T10:12:00.000Z,c-501,US,120.5,USD,CREATED
o-1002,2025-11-30T10:15:00.000Z,c-502,IN,50.0,INR,PAID
o-1003,2025-11-30T11:00:00.000Z,c-503,US,249.99,USD,PAID
o-1004,2025-11-30T12:05:00.000Z,c-504,GB,30.0,GBP,CANCELLED
o-1005,2025-11-30T13:20:00.000Z,c-505,IN,10.0,INR,CREATED
o-1006,2025-12-01T09:00:00.000Z,c-506,US,75.0,USD,PAID
o-1007,2025-12-01T09:10:00.000Z,c-507,IN,20.0,INR,PAID
o-1008,2025-12-01T10:00:00.000Z,c-508,GB,99.0,GBP,CREATED
o-1009,2025-12-01T10:30:00.000Z,c-509,US,199.99,USD,PAID
o-1010,2025-12-01T11:00:00.000Z,c-510,IN,60.0,INR,CREATED


2.Add derived column (order_date)


In [0]:
# Extracting only the date portion for efficient partitioning and daily reporting
df = df.withColumn("order_date", to_date(col("order_timestamp")))

display(df)


order_id,order_timestamp,customer_id,country,amount,currency,status,order_date
o-1001,2025-11-30T10:12:00.000Z,c-501,US,120.5,USD,CREATED,2025-11-30
o-1002,2025-11-30T10:15:00.000Z,c-502,IN,50.0,INR,PAID,2025-11-30
o-1003,2025-11-30T11:00:00.000Z,c-503,US,249.99,USD,PAID,2025-11-30
o-1004,2025-11-30T12:05:00.000Z,c-504,GB,30.0,GBP,CANCELLED,2025-11-30
o-1005,2025-11-30T13:20:00.000Z,c-505,IN,10.0,INR,CREATED,2025-11-30
o-1006,2025-12-01T09:00:00.000Z,c-506,US,75.0,USD,PAID,2025-12-01
o-1007,2025-12-01T09:10:00.000Z,c-507,IN,20.0,INR,PAID,2025-12-01
o-1008,2025-12-01T10:00:00.000Z,c-508,GB,99.0,GBP,CREATED,2025-12-01
o-1009,2025-12-01T10:30:00.000Z,c-509,US,199.99,USD,PAID,2025-12-01
o-1010,2025-12-01T11:00:00.000Z,c-510,IN,60.0,INR,CREATED,2025-12-01


3. Write Delta table with partitioning

Purpose: Store data efficiently using Delta Lake with partitionBy.

In [0]:
# Defining the cloud storage path (DBFS location - works on serverless)
delta_path = "/mnt/delta/orders_delta"

# Writing Delta table partitioned by country + order_date for faster queries
(df.write.format("delta")
    .mode("overwrite")   # initial load overwrites old content
    .partitionBy("country", "order_date")
    .save(delta_path)
)




4. Verify the partition structure
Confirm that partition folders are created in storage.

In [0]:
# List all files and folders inside the delta_path
dbutils.fs.ls(delta_path)

# Let's specifically check one country folder to ensure proper partitioning
dbutils.fs.ls(f"{delta_path}/country=US")


[FileInfo(path='dbfs:/mnt/delta/orders_delta/country=US/order_date=2025-11-30/', name='order_date=2025-11-30/', size=0, modificationTime=1764606117000),
 FileInfo(path='dbfs:/mnt/delta/orders_delta/country=US/order_date=2025-12-01/', name='order_date=2025-12-01/', size=0, modificationTime=1764606117000)]

5. Query with Partition Pruning
Show how filtering on partition columns reduces scan cost.

In [0]:
# Reading data from the Delta path and applying partition filters
pruned_df = (
    spark.read.format("delta")
    .load(delta_path)
    .filter("country = 'US' AND order_date = '2025-11-30'")
)

# Check the physical plan to verify partition pruning
pruned_df.explain(True)

display(pruned_df)


== Parsed Logical Plan ==
'Filter (('country = US) AND ('order_date = 2025-11-30))
+- Relation [order_id#12210,order_timestamp#12211,customer_id#12212,country#12213,amount#12214,currency#12215,status#12216,order_date#12217] parquet

== Analyzed Logical Plan ==
order_id: string, order_timestamp: timestamp, customer_id: string, country: string, amount: double, currency: string, status: string, order_date: date
Filter ((country#12213 = US) AND (order_date#12217 = cast(2025-11-30 as date)))
+- Relation [order_id#12210,order_timestamp#12211,customer_id#12212,country#12213,amount#12214,currency#12215,status#12216,order_date#12217] parquet

== Optimized Logical Plan ==
Filter (((isnotnull(order_date#12217) AND (order_date#12217 = 2025-11-30)) AND isnotnull(country#12213)) AND (country#12213 = US))
+- Relation [order_id#12210,order_timestamp#12211,customer_id#12212,country#12213,amount#12214,currency#12215,status#12216,order_date#12217] parquet

== Physical Plan ==
*(1) ColumnarToRow
+- Photon

order_id,order_timestamp,customer_id,country,amount,currency,status,order_date
o-1001,2025-11-30T10:12:00.000Z,c-501,US,120.5,USD,CREATED,2025-11-30
o-1003,2025-11-30T11:00:00.000Z,c-503,US,249.99,USD,PAID,2025-11-30


6. Time Travel
: Show Delta versioning with updates and reading old data.

View table history

In [0]:
from delta.tables import DeltaTable

# Load DeltaTable from the path
dt = DeltaTable.forPath(spark, delta_path)

# Display commit history of the Delta table
display(dt.history())


version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
5,2025-12-01T16:27:42.000Z,144089211768666,22071a12j2@vnrvjiet.in,WRITE,"Map(mode -> Overwrite, statsOnLoad -> false, partitionBy -> [""country"",""order_date""])",,List(1407241198071361),1201-160138-mj89elkw-v2n,4.0,WriteSerializable,False,"Map(numFiles -> 6, numRemovedFiles -> 6, numRemovedBytes -> 10282, numDeletionVectorsRemoved -> 0, numOutputRows -> 10, numOutputBytes -> 10282)",,Databricks-Runtime/17.2.x-photon-scala2.13
4,2025-12-01T16:27:23.000Z,144089211768666,22071a12j2@vnrvjiet.in,WRITE,"Map(mode -> Overwrite, statsOnLoad -> false, partitionBy -> [""country"",""order_date""])",,List(1407241198071361),1201-160138-mj89elkw-v2n,3.0,WriteSerializable,False,"Map(numFiles -> 6, numRemovedFiles -> 6, numRemovedBytes -> 10282, numDeletionVectorsRemoved -> 0, numOutputRows -> 10, numOutputBytes -> 10282)",,Databricks-Runtime/17.2.x-photon-scala2.13
3,2025-12-01T16:25:49.000Z,144089211768666,22071a12j2@vnrvjiet.in,WRITE,"Map(mode -> Overwrite, statsOnLoad -> false, partitionBy -> [""country"",""order_date""])",,List(1407241198071361),1201-160138-mj89elkw-v2n,2.0,WriteSerializable,False,"Map(numFiles -> 6, numRemovedFiles -> 6, numRemovedBytes -> 10282, numDeletionVectorsRemoved -> 0, numOutputRows -> 10, numOutputBytes -> 10282)",,Databricks-Runtime/17.2.x-photon-scala2.13
2,2025-12-01T16:25:35.000Z,144089211768666,22071a12j2@vnrvjiet.in,WRITE,"Map(mode -> Overwrite, statsOnLoad -> false, partitionBy -> [""country"",""order_date""])",,List(1407241198071361),1201-160138-mj89elkw-v2n,1.0,WriteSerializable,False,"Map(numFiles -> 6, numRemovedFiles -> 6, numRemovedBytes -> 10282, numDeletionVectorsRemoved -> 0, numOutputRows -> 10, numOutputBytes -> 10282)",,Databricks-Runtime/17.2.x-photon-scala2.13
1,2025-12-01T16:22:30.000Z,144089211768666,22071a12j2@vnrvjiet.in,WRITE,"Map(mode -> Overwrite, statsOnLoad -> false, partitionBy -> [""country"",""order_date""])",,List(1407241198071361),1201-160138-mj89elkw-v2n,0.0,WriteSerializable,False,"Map(numFiles -> 6, numRemovedFiles -> 6, numRemovedBytes -> 10282, numDeletionVectorsRemoved -> 0, numOutputRows -> 10, numOutputBytes -> 10282)",,Databricks-Runtime/17.2.x-photon-scala2.13
0,2025-12-01T16:21:58.000Z,144089211768666,22071a12j2@vnrvjiet.in,WRITE,"Map(mode -> Overwrite, statsOnLoad -> false, partitionBy -> [""country"",""order_date""])",,List(1407241198071361),1201-160138-mj89elkw-v2n,,WriteSerializable,False,"Map(numFiles -> 6, numRemovedFiles -> 0, numRemovedBytes -> 0, numDeletionVectorsRemoved -> 0, numOutputRows -> 10, numOutputBytes -> 10282)",,Databricks-Runtime/17.2.x-photon-scala2.13


Update a record to generate a new version

In [0]:
# Updating a record to create a new version
dt.update(
    condition="order_id = 'o-1002'",
    set={"status": "'CANCELLED'"}
)

# Check history again after update
display(dt.history())


version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
6,2025-12-01T16:32:37.000Z,144089211768666,22071a12j2@vnrvjiet.in,UPDATE,"Map(predicate -> [""(order_id#12418 = o-1002)""])",,List(1407241198071361),1201-160138-mj89elkw-v2n,5.0,WriteSerializable,False,"Map(numRemovedFiles -> 0, numRemovedBytes -> 0, numCopiedRows -> 0, numDeletionVectorsAdded -> 1, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 2755, numDeletionVectorsUpdated -> 0, scanTimeMs -> 1576, numAddedFiles -> 1, numUpdatedRows -> 1, numAddedBytes -> 1703, rewriteTimeMs -> 1163)",,Databricks-Runtime/17.2.x-photon-scala2.13
5,2025-12-01T16:27:42.000Z,144089211768666,22071a12j2@vnrvjiet.in,WRITE,"Map(mode -> Overwrite, statsOnLoad -> false, partitionBy -> [""country"",""order_date""])",,List(1407241198071361),1201-160138-mj89elkw-v2n,4.0,WriteSerializable,False,"Map(numFiles -> 6, numRemovedFiles -> 6, numRemovedBytes -> 10282, numDeletionVectorsRemoved -> 0, numOutputRows -> 10, numOutputBytes -> 10282)",,Databricks-Runtime/17.2.x-photon-scala2.13
4,2025-12-01T16:27:23.000Z,144089211768666,22071a12j2@vnrvjiet.in,WRITE,"Map(mode -> Overwrite, statsOnLoad -> false, partitionBy -> [""country"",""order_date""])",,List(1407241198071361),1201-160138-mj89elkw-v2n,3.0,WriteSerializable,False,"Map(numFiles -> 6, numRemovedFiles -> 6, numRemovedBytes -> 10282, numDeletionVectorsRemoved -> 0, numOutputRows -> 10, numOutputBytes -> 10282)",,Databricks-Runtime/17.2.x-photon-scala2.13
3,2025-12-01T16:25:49.000Z,144089211768666,22071a12j2@vnrvjiet.in,WRITE,"Map(mode -> Overwrite, statsOnLoad -> false, partitionBy -> [""country"",""order_date""])",,List(1407241198071361),1201-160138-mj89elkw-v2n,2.0,WriteSerializable,False,"Map(numFiles -> 6, numRemovedFiles -> 6, numRemovedBytes -> 10282, numDeletionVectorsRemoved -> 0, numOutputRows -> 10, numOutputBytes -> 10282)",,Databricks-Runtime/17.2.x-photon-scala2.13
2,2025-12-01T16:25:35.000Z,144089211768666,22071a12j2@vnrvjiet.in,WRITE,"Map(mode -> Overwrite, statsOnLoad -> false, partitionBy -> [""country"",""order_date""])",,List(1407241198071361),1201-160138-mj89elkw-v2n,1.0,WriteSerializable,False,"Map(numFiles -> 6, numRemovedFiles -> 6, numRemovedBytes -> 10282, numDeletionVectorsRemoved -> 0, numOutputRows -> 10, numOutputBytes -> 10282)",,Databricks-Runtime/17.2.x-photon-scala2.13
1,2025-12-01T16:22:30.000Z,144089211768666,22071a12j2@vnrvjiet.in,WRITE,"Map(mode -> Overwrite, statsOnLoad -> false, partitionBy -> [""country"",""order_date""])",,List(1407241198071361),1201-160138-mj89elkw-v2n,0.0,WriteSerializable,False,"Map(numFiles -> 6, numRemovedFiles -> 6, numRemovedBytes -> 10282, numDeletionVectorsRemoved -> 0, numOutputRows -> 10, numOutputBytes -> 10282)",,Databricks-Runtime/17.2.x-photon-scala2.13
0,2025-12-01T16:21:58.000Z,144089211768666,22071a12j2@vnrvjiet.in,WRITE,"Map(mode -> Overwrite, statsOnLoad -> false, partitionBy -> [""country"",""order_date""])",,List(1407241198071361),1201-160138-mj89elkw-v2n,,WriteSerializable,False,"Map(numFiles -> 6, numRemovedFiles -> 0, numRemovedBytes -> 0, numDeletionVectorsRemoved -> 0, numOutputRows -> 10, numOutputBytes -> 10282)",,Databricks-Runtime/17.2.x-photon-scala2.13


Read version 0

In [0]:
# Reading the initial version of the Delta table (before update)
version0_df = (
    spark.read.format("delta")
    .option("versionAsOf", 0)
    .load(delta_path)
)

# Show the original value of updated row
display(version0_df.filter("order_id = 'o-1002'"))

order_id,order_timestamp,customer_id,country,amount,currency,status,order_date
o-1002,2025-11-30T10:15:00.000Z,c-502,IN,50.0,INR,PAID,2025-11-30


7.Schema Evolution
>  Append new columns (payment_method & coupon_code).

In [0]:
# Creating new data with additional columns introduced by business
new_data = [
    ("o-1011", "2025-12-02 09:00:00", "c-511", "US", 180.00, "USD", "PAID", "CARD", None),
    ("o-1012", "2025-12-02 09:15:00", "c-512", "IN", 40.00,  "INR", "PAID", "UPI", "NEW50")
]

# New schema including payment_method and coupon_code
schema2 = StructType([
    StructField("order_id", StringType(), False),
    StructField("order_timestamp", StringType(), False),
    StructField("customer_id", StringType(), False),
    StructField("country", StringType(), False),
    StructField("amount", DoubleType(), False),
    StructField("currency", StringType(), False),
    StructField("status", StringType(), False),
    StructField("payment_method", StringType(), True),
    StructField("coupon_code", StringType(), True)
])

df2 = spark.createDataFrame(new_data, schema2)

# Convert timestamp fields to TimestampType
df2 = df2.withColumn("order_timestamp", to_timestamp("order_timestamp"))
df2 = df2.withColumn("order_date", to_date("order_timestamp"))

display(df2)


order_id,order_timestamp,customer_id,country,amount,currency,status,payment_method,coupon_code,order_date
o-1011,2025-12-02T09:00:00.000Z,c-511,US,180.0,USD,PAID,CARD,,2025-12-02
o-1012,2025-12-02T09:15:00.000Z,c-512,IN,40.0,INR,PAID,UPI,NEW50,2025-12-02


In [0]:
# Appending new records with schema evolution enabled
(df2.write
    .format("delta")
    .mode("append")
    .option("mergeSchema", "true")  # this is the key
    .partitionBy("country", "order_date")
    .save(delta_path)
)

# Verify new schema
spark.read.format("delta").load(delta_path).printSchema()


root
 |-- order_id: string (nullable = true)
 |-- order_timestamp: timestamp (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- country: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- status: string (nullable = true)
 |-- order_date: date (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- coupon_code: string (nullable = true)



8.Updates & Deletes (Delta Lake DML): Demonstrate ACID updates & deletes.

In [0]:
# Reload DeltaTable reference
dt = DeltaTable.forPath(spark, delta_path)

# Update: Set status = CANCELLED for selected orders
dt.update(
    condition="order_id IN ('o-1001', 'o-1005')",
    set={"status": "'CANCELLED'"}
)

# Delete: Remove test orders with very low amounts
dt.delete("amount < 15")

# Show final data
display(spark.read.format("delta").load(delta_path))


order_id,order_timestamp,customer_id,country,amount,currency,status,order_date,payment_method,coupon_code
o-1002,2025-11-30T10:15:00.000Z,c-502,IN,50.0,INR,CANCELLED,2025-11-30,,
o-1012,2025-12-02T09:15:00.000Z,c-512,IN,40.0,INR,PAID,2025-12-02,UPI,NEW50
o-1001,2025-11-30T10:12:00.000Z,c-501,US,120.5,USD,CANCELLED,2025-11-30,,
o-1011,2025-12-02T09:00:00.000Z,c-511,US,180.0,USD,PAID,2025-12-02,CARD,
o-1006,2025-12-01T09:00:00.000Z,c-506,US,75.0,USD,PAID,2025-12-01,,
o-1009,2025-12-01T10:30:00.000Z,c-509,US,199.99,USD,PAID,2025-12-01,,
o-1007,2025-12-01T09:10:00.000Z,c-507,IN,20.0,INR,PAID,2025-12-01,,
o-1010,2025-12-01T11:00:00.000Z,c-510,IN,60.0,INR,CREATED,2025-12-01,,
o-1003,2025-11-30T11:00:00.000Z,c-503,US,249.99,USD,PAID,2025-11-30,,
o-1004,2025-11-30T12:05:00.000Z,c-504,GB,30.0,GBP,CANCELLED,2025-11-30,,


9.OPTIMIZE & ZORDER
Improve read performance & remove small files.

In [0]:
# Create RAW (unoptimized) Delta table in DBFS

raw_path = "/mnt/delta/orders_raw"

# Write the same DataFrame df to raw_path WITHOUT OPTIMIZE
df.write.format("delta") \
    .mode("overwrite") \
    .save(raw_path)


import time
start_time = time.time()
display(dbutils.fs.ls(raw_path))
end_time = time.time()
print("Time taken for the operation before optimising: ", end_time - start_time)

path,name,size,modificationTime
dbfs:/mnt/delta/orders_raw/_delta_log/,_delta_log/,0,1764607085000
dbfs:/mnt/delta/orders_raw/part-00000-ac70978e-2f88-4ad9-bd25-53bcb3992385.c000.snappy.parquet,part-00000-ac70978e-2f88-4ad9-bd25-53bcb3992385.c000.snappy.parquet,2431,1764607086000


Time taken for the operation before optimising:  0.3886380195617676


In [0]:
%sql
-- BONUS QUESTION:
-- Optimize Delta table to reduce small files and improve performance.
OPTIMIZE delta.`/mnt/delta/orders_raw`
ZORDER BY (customer_id)

path,metrics
dbfs:/mnt/delta/orders_raw,"List(0, 0, List(null, null, 0.0, 0, 0), List(null, null, 0.0, 0, 0), 0, List(minCubeSize(107374182400), List(0, 0), List(1, 2431), 0, List(0, 0), 0, null), null, 0, 0, 1, 1, false, 0, 0, 1764607106361, 1764607106696, 8, 0, null, List(0, 0), null, 8, 8, 0, 0, null)"


In [0]:
import time
start_time = time.time()
display(dbutils.fs.ls(raw_path))
end_time = time.time()
print("Time taken for the operation after optimising: ", end_time - start_time)


path,name,size,modificationTime
dbfs:/mnt/delta/orders_raw/_delta_log/,_delta_log/,0,1764607085000
dbfs:/mnt/delta/orders_raw/part-00000-ac70978e-2f88-4ad9-bd25-53bcb3992385.c000.snappy.parquet,part-00000-ac70978e-2f88-4ad9-bd25-53bcb3992385.c000.snappy.parquet,2431,1764607086000


Time taken for the operation after optimising:  0.42958879470825195


10.Create small file problem (simulation)

In [0]:
# Generating many tiny files artificially
for i in range(20):
    tiny_df = df.limit(1).withColumn("order_id", expr(f"'demo_{i}'"))
    tiny_df.write.format("delta") \
        .mode("append") \
        .partitionBy("country", "order_date") \
        .save(delta_path)

# Count how many files exist in a specific partition
len(dbutils.fs.ls(f"{delta_path}/country=US/order_date=2025-11-30"))


27

In [0]:
%sql
OPTIMIZE delta.`/mnt/delta/orders_delta`;


path,metrics
dbfs:/mnt/delta/orders_delta,"List(1, 22, List(2350, 2350, 2350.0, 1, 2350), List(1659, 2050, 1683.2727272727273, 22, 37032), 8, null, null, 0, 1, 29, 7, true, 0, 0, 1764607217304, 1764607218458, 8, 1, null, List(1, 1), null, 10, 10, 299, 0, null)"
