In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp,col

"""1.Ingest sample order data into a Spark DataFrame."""

spark=SparkSession.builder.getOrCreate()

df=spark.read.csv("/Volumes/charishma_s/default/chubb/orders_dataset.csv",header=True,inferSchema=True)

df.printSchema()
display(df.limit(10))

root
 |-- order_id: string (nullable = true)
 |-- order_timestamp: timestamp (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- country: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- status: string (nullable = true)



order_id,order_timestamp,customer_id,country,amount,currency,status
ORD100000,2025-01-30T09:29:00Z,CUST3871,CA,420.48,INR,CANCELLED
ORD100001,2025-01-12T23:39:00Z,CUST5772,UK,23.95,USD,PAID
ORD100002,2025-01-18T07:58:00Z,CUST5426,DE,332.6,EUR,CREATED
ORD100003,2025-01-16T13:52:00Z,CUST8047,AU,7.54,GBP,CANCELLED
ORD100004,2025-01-08T00:19:00Z,CUST4794,DE,304.45,CAD,CANCELLED
ORD100005,2025-01-16T18:30:00Z,CUST5063,CA,191.96,USD,CREATED
ORD100006,2025-01-18T09:38:00Z,CUST7434,UK,499.38,INR,PAID
ORD100007,2025-01-08T12:43:00Z,CUST2039,UK,453.93,INR,PAID
ORD100008,2025-01-05T17:04:00Z,CUST8651,US,342.09,CAD,PAID
ORD100009,2025-01-27T18:47:00Z,CUST5500,DE,360.16,INR,CANCELLED


In [0]:
"""2.Add a derived column order_date (date only from order_timestamp)."""

from pyspark.sql.functions import to_date

new_df=df.withColumn('order_timestamp',to_timestamp(col('order_timestamp'))) \
        .withColumn('order_date',to_date(col('order_timestamp')))

display(new_df.limit(10))

order_id,order_timestamp,customer_id,country,amount,currency,status,order_date
ORD100000,2025-01-30T09:29:00Z,CUST3871,CA,420.48,INR,CANCELLED,2025-01-30
ORD100001,2025-01-12T23:39:00Z,CUST5772,UK,23.95,USD,PAID,2025-01-12
ORD100002,2025-01-18T07:58:00Z,CUST5426,DE,332.6,EUR,CREATED,2025-01-18
ORD100003,2025-01-16T13:52:00Z,CUST8047,AU,7.54,GBP,CANCELLED,2025-01-16
ORD100004,2025-01-08T00:19:00Z,CUST4794,DE,304.45,CAD,CANCELLED,2025-01-08
ORD100005,2025-01-16T18:30:00Z,CUST5063,CA,191.96,USD,CREATED,2025-01-16
ORD100006,2025-01-18T09:38:00Z,CUST7434,UK,499.38,INR,PAID,2025-01-18
ORD100007,2025-01-08T12:43:00Z,CUST2039,UK,453.93,INR,PAID,2025-01-08
ORD100008,2025-01-05T17:04:00Z,CUST8651,US,342.09,CAD,PAID,2025-01-05
ORD100009,2025-01-27T18:47:00Z,CUST5500,DE,360.16,INR,CANCELLED,2025-01-27


In [0]:
"""3.Write the DataFrame as a Delta table partitioned by country and order_date."""

delta_path = "/mnt/delta/orders_dataset_delta"

(new_df.write
     .format("delta")
     .mode("overwrite")
     .partitionBy("country", "order_date")
     .option("overwriteSchema", "true")
     .save(delta_path))


In [0]:
"""4.Verify the partition structure in the storage path."""

display(dbutils.fs.ls(delta_path))

display(dbutils.fs.ls(delta_path + "/country=UK"))

path,name,size,modificationTime
dbfs:/mnt/delta/orders_dataset_delta/_delta_log/,_delta_log/,0,1764608480000
dbfs:/mnt/delta/orders_dataset_delta/country=AU/,country=AU/,0,1764608481000
dbfs:/mnt/delta/orders_dataset_delta/country=CA/,country=CA/,0,1764608482000
dbfs:/mnt/delta/orders_dataset_delta/country=DE/,country=DE/,0,1764608485000
dbfs:/mnt/delta/orders_dataset_delta/country=IN/,country=IN/,0,1764608481000
dbfs:/mnt/delta/orders_dataset_delta/country=SG/,country=SG/,0,1764608483000
dbfs:/mnt/delta/orders_dataset_delta/country=UK/,country=UK/,0,1764608482000
dbfs:/mnt/delta/orders_dataset_delta/country=US/,country=US/,0,1764608482000


path,name,size,modificationTime
dbfs:/mnt/delta/orders_dataset_delta/country=UK/order_date=2025-01-01/,order_date=2025-01-01/,0,1764608486000
dbfs:/mnt/delta/orders_dataset_delta/country=UK/order_date=2025-01-02/,order_date=2025-01-02/,0,1764608500000
dbfs:/mnt/delta/orders_dataset_delta/country=UK/order_date=2025-01-03/,order_date=2025-01-03/,0,1764608504000
dbfs:/mnt/delta/orders_dataset_delta/country=UK/order_date=2025-01-04/,order_date=2025-01-04/,0,1764608504000
dbfs:/mnt/delta/orders_dataset_delta/country=UK/order_date=2025-01-05/,order_date=2025-01-05/,0,1764608486000
dbfs:/mnt/delta/orders_dataset_delta/country=UK/order_date=2025-01-06/,order_date=2025-01-06/,0,1764608502000
dbfs:/mnt/delta/orders_dataset_delta/country=UK/order_date=2025-01-07/,order_date=2025-01-07/,0,1764608501000
dbfs:/mnt/delta/orders_dataset_delta/country=UK/order_date=2025-01-08/,order_date=2025-01-08/,0,1764608506000
dbfs:/mnt/delta/orders_dataset_delta/country=UK/order_date=2025-01-09/,order_date=2025-01-09/,0,1764608507000
dbfs:/mnt/delta/orders_dataset_delta/country=UK/order_date=2025-01-10/,order_date=2025-01-10/,0,1764608492000


In [0]:
"""5.Run queries that demonstrate partition pruning (e.g., filter on a single country and/or date)."""

delta_table=spark.read.format("delta").load(delta_path)

#filter on single country
display(delta_table.filter("country='UK'"))

#filter on single date
display(delta_table.filter("order_date='2025-01-10'"))

#filter on both country and date
display(delta_table.filter("country='UK' AND order_date='2025-01-10'"))

#filter on country or date
display(delta_table.filter("country='UK' OR order_date='2025-01-10'"))

order_id,order_timestamp,customer_id,country,amount,currency,status,order_date
ORD100994,2025-01-10T05:11:00Z,CUST6274,UK,253.14,GBP,CANCELLED,2025-01-10
ORD100918,2025-01-10T15:07:00Z,CUST3040,UK,201.57,USD,CREATED,2025-01-10
ORD100805,2025-01-10T17:24:00Z,CUST1402,UK,301.9,EUR,CREATED,2025-01-10
ORD100742,2025-01-10T23:01:00Z,CUST2695,UK,454.06,GBP,CANCELLED,2025-01-10
ORD100693,2025-01-10T04:32:00Z,CUST6468,UK,30.09,CAD,CREATED,2025-01-10
ORD100374,2025-01-10T11:17:00Z,CUST3809,UK,387.67,USD,CANCELLED,2025-01-10
ORD100373,2025-01-10T05:47:00Z,CUST1743,UK,124.98,EUR,CREATED,2025-01-10
ORD100330,2025-01-10T09:46:00Z,CUST2447,UK,158.37,GBP,CREATED,2025-01-10
ORD100135,2025-01-10T23:56:00Z,CUST2598,UK,322.07,INR,CREATED,2025-01-10
ORD100122,2025-01-10T00:30:00Z,CUST6970,UK,343.35,CAD,CANCELLED,2025-01-10


order_id,order_timestamp,customer_id,country,amount,currency,status,order_date
ORD100994,2025-01-10T05:11:00Z,CUST6274,UK,253.14,GBP,CANCELLED,2025-01-10
ORD100918,2025-01-10T15:07:00Z,CUST3040,UK,201.57,USD,CREATED,2025-01-10
ORD100805,2025-01-10T17:24:00Z,CUST1402,UK,301.9,EUR,CREATED,2025-01-10
ORD100742,2025-01-10T23:01:00Z,CUST2695,UK,454.06,GBP,CANCELLED,2025-01-10
ORD100693,2025-01-10T04:32:00Z,CUST6468,UK,30.09,CAD,CREATED,2025-01-10
ORD100374,2025-01-10T11:17:00Z,CUST3809,UK,387.67,USD,CANCELLED,2025-01-10
ORD100373,2025-01-10T05:47:00Z,CUST1743,UK,124.98,EUR,CREATED,2025-01-10
ORD100330,2025-01-10T09:46:00Z,CUST2447,UK,158.37,GBP,CREATED,2025-01-10
ORD100135,2025-01-10T23:56:00Z,CUST2598,UK,322.07,INR,CREATED,2025-01-10
ORD100122,2025-01-10T00:30:00Z,CUST6970,UK,343.35,CAD,CANCELLED,2025-01-10


order_id,order_timestamp,customer_id,country,amount,currency,status,order_date
ORD100994,2025-01-10T05:11:00Z,CUST6274,UK,253.14,GBP,CANCELLED,2025-01-10
ORD100918,2025-01-10T15:07:00Z,CUST3040,UK,201.57,USD,CREATED,2025-01-10
ORD100805,2025-01-10T17:24:00Z,CUST1402,UK,301.9,EUR,CREATED,2025-01-10
ORD100742,2025-01-10T23:01:00Z,CUST2695,UK,454.06,GBP,CANCELLED,2025-01-10
ORD100693,2025-01-10T04:32:00Z,CUST6468,UK,30.09,CAD,CREATED,2025-01-10
ORD100374,2025-01-10T11:17:00Z,CUST3809,UK,387.67,USD,CANCELLED,2025-01-10
ORD100373,2025-01-10T05:47:00Z,CUST1743,UK,124.98,EUR,CREATED,2025-01-10
ORD100330,2025-01-10T09:46:00Z,CUST2447,UK,158.37,GBP,CREATED,2025-01-10
ORD100135,2025-01-10T23:56:00Z,CUST2598,UK,322.07,INR,CREATED,2025-01-10
ORD100122,2025-01-10T00:30:00Z,CUST6970,UK,343.35,CAD,CANCELLED,2025-01-10


order_id,order_timestamp,customer_id,country,amount,currency,status,order_date
ORD100994,2025-01-10T05:11:00Z,CUST6274,UK,253.14,GBP,CANCELLED,2025-01-10
ORD100918,2025-01-10T15:07:00Z,CUST3040,UK,201.57,USD,CREATED,2025-01-10
ORD100805,2025-01-10T17:24:00Z,CUST1402,UK,301.9,EUR,CREATED,2025-01-10
ORD100742,2025-01-10T23:01:00Z,CUST2695,UK,454.06,GBP,CANCELLED,2025-01-10
ORD100693,2025-01-10T04:32:00Z,CUST6468,UK,30.09,CAD,CREATED,2025-01-10
ORD100374,2025-01-10T11:17:00Z,CUST3809,UK,387.67,USD,CANCELLED,2025-01-10
ORD100373,2025-01-10T05:47:00Z,CUST1743,UK,124.98,EUR,CREATED,2025-01-10
ORD100330,2025-01-10T09:46:00Z,CUST2447,UK,158.37,GBP,CREATED,2025-01-10
ORD100135,2025-01-10T23:56:00Z,CUST2598,UK,322.07,INR,CREATED,2025-01-10
ORD100122,2025-01-10T00:30:00Z,CUST6970,UK,343.35,CAD,CANCELLED,2025-01-10


In [0]:
"""6.Demonstrate Delta Lake Time Travel:Write data, update some rows, then query older versions."""

from delta.tables import DeltaTable
d_tb=DeltaTable.forPath(spark,delta_path)

d_tb.history().show()

#update some rows
d_tb.update("country='UK' AND order_date='2025-01-10'",{'status': lit('shipped')})
d_tb.history().show()

#query older versions   
display(d_tb.history().filter("version=0"))

old_version_df = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)
display(old_version_df)



+-------+-------------------+---------------+--------------------+---------+--------------------+----+------------------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|version|          timestamp|         userId|            userName|operation| operationParameters| job|          notebook|           clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+-------------------+---------------+--------------------+---------+--------------------+----+------------------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|     23|2025-12-01 18:46:09|148593850855805|s.vijaya.charishm...|   DELETE|{predicate -> ["(...|NULL|{2065271677097478}|1121-101913-kir4gy3f|         22|WriteSerializable|        false|{numRemovedFiles ...|        NULL|Databricks-Runtim...|
|     22|2025-12-01 18:46:07|148

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
0,2025-12-01T17:01:55Z,148593850855805,s.vijaya.charishma@gmail.com,WRITE,"Map(mode -> Overwrite, statsOnLoad -> false, partitionBy -> [""country"",""order_date""])",,List(2065271677097478),1121-101913-kir4gy3f,,WriteSerializable,False,"Map(numFiles -> 207, numRemovedFiles -> 0, numRemovedBytes -> 0, numDeletionVectorsRemoved -> 0, numOutputRows -> 1000, numOutputBytes -> 398797)",,Databricks-Runtime/17.3.x-photon-scala2.13


order_id,order_timestamp,customer_id,country,amount,currency,status,order_date
ORD100999,2025-01-25T11:17:00Z,CUST5823,SG,226.45,CAD,PAID,2025-01-25
ORD100961,2025-01-25T15:43:00Z,CUST5571,SG,203.37,USD,CANCELLED,2025-01-25
ORD100732,2025-01-25T14:58:00Z,CUST8094,SG,333.13,EUR,CREATED,2025-01-25
ORD100706,2025-01-25T08:01:00Z,CUST5968,SG,105.43,INR,PAID,2025-01-25
ORD100564,2025-01-25T08:29:00Z,CUST4212,SG,89.91,GBP,CREATED,2025-01-25
ORD100531,2025-01-25T21:50:00Z,CUST8527,SG,287.12,INR,PAID,2025-01-25
ORD100499,2025-01-25T19:52:00Z,CUST1139,SG,6.48,USD,PAID,2025-01-25
ORD100260,2025-01-25T20:10:00Z,CUST9709,SG,61.66,EUR,CREATED,2025-01-25
ORD100246,2025-01-25T23:35:00Z,CUST6653,SG,133.72,INR,CANCELLED,2025-01-25
ORD100178,2025-01-25T02:10:00Z,CUST9861,SG,478.11,USD,CANCELLED,2025-01-25


In [0]:
"""7.Demonstrate Schema Evolution"""
#Add payment_method & coupon_code to new data.
from pyspark.sql.functions import lit, rand, when, col

coupon_list = ["NEW50", "SAVE10", "WELCOME20", "FREESHIP", "MEGA30"]

# create DataFrame that has two new columns
new_rows = (
    new_df
        .withColumn("payment_method", lit("CARD"))
        .withColumn(
        "coupon_code",
            when(rand() < 0.2, lit("NEW50"))
            .when(rand() < 0.4, lit("SAVE10"))
            .when(rand() < 0.6, lit("WELCOME20"))
            .when(rand() < 0.8, lit("FREESHIP"))
            .otherwise(lit("MEGA30"))
        )
)


#Write to the same Delta table, allowing schema evolution.
(new_rows
  .write
  .format("delta")
  .mode("append")
  .option("mergeSchema", "true")
  .partitionBy("country","order_date")
  .save(delta_path))

#display
new_df=spark.read.format("delta").load(delta_path)
display(new_df)



order_id,order_timestamp,customer_id,country,amount,currency,status,order_date,payment_method,coupon_code
ORD100066,2025-01-10T20:40:00Z,CUST6943,AU,207.58,INR,PAID,2025-01-10,CARD,SAVE10
ORD100200,2025-01-10T19:30:00Z,CUST8693,AU,321.1,USD,PAID,2025-01-10,CARD,SAVE10
ORD100337,2025-01-10T03:06:00Z,CUST7941,AU,455.75,CAD,PAID,2025-01-10,CARD,NEW50
ORD100392,2025-01-10T11:32:00Z,CUST6154,AU,10.68,EUR,PAID,2025-01-10,CARD,WELCOME20
ORD100930,2025-01-10T18:07:00Z,CUST4255,AU,280.95,INR,PAID,2025-01-10,CARD,WELCOME20
ORD100936,2025-01-10T17:49:00Z,CUST7970,AU,246.02,CAD,PAID,2025-01-10,CARD,NEW50
ORD100066,2025-01-10T20:40:00Z,CUST6943,AU,207.58,INR,PAID,2025-01-10,CARD,NEW50
ORD100200,2025-01-10T19:30:00Z,CUST8693,AU,321.1,USD,PAID,2025-01-10,CARD,FREESHIP
ORD100337,2025-01-10T03:06:00Z,CUST7941,AU,455.75,CAD,PAID,2025-01-10,CARD,NEW50
ORD100392,2025-01-10T11:32:00Z,CUST6154,AU,10.68,EUR,PAID,2025-01-10,CARD,WELCOME20


In [0]:
#adding payment_method and coupon_code to complete data 
from pyspark.sql.functions import lit

d_tb.update("payment_method IS NULL", {"payment_method": lit("CARD")})
d_tb.update(
    "coupon_code IS NULL",
    {
        "coupon_code": when(col("amount") >= 500, lit("MEGA30"))
                        .when(col("amount") >= 200, lit("SAVE10"))
                        .when(col("amount") >= 100, lit("WELCOME20"))
                        .otherwise(lit("FREESHIP"))
    }
)


In [0]:
"""8.Demonstrate Updates & Deletes using Delta:"""

#update
#Mark some orders as CANCELLED.
d_tb.update("country='UK' AND order_date='2025-01-20'",{"status":lit("CANCELLED")})
#display
display(d_tb.toDF().filter("country='UK' AND order_date='2025-01-20'"))


#delete
#Delete orders below a certain amount (e.g., test data cleanup)
d_tb.delete("country='US' AND order_date='2025-01-20' AND amount<100")
#display
display(d_tb.toDF().filter("country='US' AND order_date='2025-01-20' AND amount<100"))




order_id,order_timestamp,customer_id,country,amount,currency,status,order_date,payment_method,coupon_code
ORD100540,2025-01-20T04:22:00Z,CUST4720,UK,206.75,INR,CANCELLED,2025-01-20,CARD,SAVE10
ORD100425,2025-01-20T08:24:00Z,CUST6366,UK,286.75,GBP,CANCELLED,2025-01-20,CARD,SAVE10
ORD100391,2025-01-20T04:34:00Z,CUST7242,UK,25.69,INR,CANCELLED,2025-01-20,CARD,FREESHIP
ORD100391,2025-01-20T04:34:00Z,CUST7242,UK,25.69,INR,CANCELLED,2025-01-20,CARD,FREESHIP
ORD100425,2025-01-20T08:24:00Z,CUST6366,UK,286.75,GBP,CANCELLED,2025-01-20,CARD,SAVE10
ORD100540,2025-01-20T04:22:00Z,CUST4720,UK,206.75,INR,CANCELLED,2025-01-20,CARD,SAVE10
ORD100540,2025-01-20T04:22:00Z,CUST4720,UK,206.75,INR,CANCELLED,2025-01-20,CARD,SAVE10
ORD100425,2025-01-20T08:24:00Z,CUST6366,UK,286.75,GBP,CANCELLED,2025-01-20,CARD,SAVE10
ORD100391,2025-01-20T04:34:00Z,CUST7242,UK,25.69,INR,CANCELLED,2025-01-20,CARD,FREESHIP
ORD100391,2025-01-20T04:34:00Z,CUST7242,UK,25.69,INR,CANCELLED,2025-01-20,CARD,FREESHIP


order_id,order_timestamp,customer_id,country,amount,currency,status,order_date,payment_method,coupon_code


In [0]:
"""9.(Bonus) Optimize the table:
Use OPTIMIZE and optionally ZORDER on customer_id or order
"""

spark.sql(
    f"OPTIMIZE delta.`{delta_path}` ZORDER BY (customer_id)"
)



DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,clusteringStats:struct<inputZCubeFiles:struct<numFiles:bigint,size:bigint>,inputOtherFiles:struct<numFiles:bigint,size:bigint>,inputNumZCubes:bigint,mergedFiles:struct<numFiles:bigint,size:bigint>,numOutputZCubes:bigint>,numBins:bigint,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,