In [0]:
### upload ecommerce csv file 

BASE_PATH = "/Volumes/workspace/ecommerce/ecommerce_data"

df = spark.read.option("header", True).option("inferSchema", True).csv(
    f"{BASE_PATH}/*.csv"
)

In [0]:
Task 1 - Implement incremental merge 

In [0]:
#creating a new file updates_oct

data_updates = [
    ("2019-10-01 10:00:00", "view", 1001, 2053013555631883655, "electronics.smqartphone", "samsung", 899.99, 101, "u1"),
    ("2019-10-01 10:00:00", "view", 1001, 2053013555631883655, "electronics.smqartphone", "apple", 1299.00, 105, "u5"),
    ("2019-10-01 10:00:00", "view", 1001, 2053013555631883655, "electronics.smqartphone", "xiaomi", 499.99, 106, "u6")
]

columns = [
    "event_time", "event_type", "product_id", "category_id",
    "category_code", "brand", "price", "user_id", "user_session"
]

updates_df = spark.createDataFrame(data_updates, columns)

updates_df.coalesce(1).write\
    .mode("overwrite")\
    .option("header", True)\
    .csv("/Volumes/workspace/ecommerce/ecommerce_data/events_oct.csv")

In [0]:
dbutils.fs.ls("/Volumes/workspace/ecommerce/ecommerce_data/")

[FileInfo(path='dbfs:/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv', name='2019-Nov.csv', size=9006762395, modificationTime=1768205969000),
 FileInfo(path='dbfs:/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv', name='2019-Oct.csv', size=5668612855, modificationTime=1768206113000),
 FileInfo(path='dbfs:/Volumes/workspace/ecommerce/ecommerce_data/events_oct.csv/', name='events_oct.csv/', size=0, modificationTime=1768325465923),
 FileInfo(path='dbfs:/Volumes/workspace/ecommerce/ecommerce_data/output/', name='output/', size=0, modificationTime=1768325465923)]

In [0]:
base_df = spark.read.csv(
    "/Volumes/workspace/ecommerce/ecommerce_data/events_oct.csv",
    header=True,
    inferSchema=True
)

In [0]:
## these code will convert the data in to delta format 

base_df.write \
    .format("delta") \
    .mode("overwrite") \
    .save("/Volumes/workspace/ecommerce/ecommerce_data/delta_events/")

In [0]:
## verfying is the file converted in to delta format 

dbutils.fs.ls("/Volumes/workspace/ecommerce/ecommerce_data/delta_events/")

[FileInfo(path='dbfs:/Volumes/workspace/ecommerce/ecommerce_data/delta_events/_delta_log/', name='_delta_log/', size=0, modificationTime=1768325685937),
 FileInfo(path='dbfs:/Volumes/workspace/ecommerce/ecommerce_data/delta_events/part-00000-8bccf7dc-d05a-4bad-b8d3-a9ee674ee1c1.c000.snappy.parquet', name='part-00000-8bccf7dc-d05a-4bad-b8d3-a9ee674ee1c1.c000.snappy.parquet', size=2571, modificationTime=1768325607000),
 FileInfo(path='dbfs:/Volumes/workspace/ecommerce/ecommerce_data/delta_events/part-00000-c9579313-51ee-4baf-8668-3a6fb5899ca4.c000.snappy.parquet', name='part-00000-c9579313-51ee-4baf-8668-3a6fb5899ca4.c000.snappy.parquet', size=2571, modificationTime=1768325567000)]

In [0]:
from delta.tables import DeltaTable

deltatable = DeltaTable.forPath(
    spark,
    "/Volumes/workspace/ecommerce/ecommerce_data/delta_events/"
)

updates = spark.read.csv(
    "/Volumes/workspace/ecommerce/ecommerce_data/events_oct.csv",
    header=True,
    inferSchema=True
)

deltatable.alias("t") \
    .merge(
        updates.alias("s"),
        "t.user_session = s.user_session AND t.event_time = s.event_time"
    ) \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

+-----------------+----------------+----------------+-----------------+
|num_affected_rows|num_updated_rows|num_deleted_rows|num_inserted_rows|
+-----------------+----------------+----------------+-----------------+
|                3|               3|               0|                0|
+-----------------+----------------+----------------+-----------------+



In [0]:
### Task 2 - Query Historical versions 


In [0]:
v0 = spark.read.format("delta").option("versionAsOf", 0).load("/Volumes/workspace/ecommerce/ecommerce_data/delta_events/")
yesterday = spark.read.format("delta").option("versionAsOf", "2024-01-12").load("/Volumes/workspace/ecommerce/ecommerce_data/delta_events/")
v0.show(2)

+-------------------+----------+----------+-------------------+--------------------+-------+------+-------+------------+
|         event_time|event_type|product_id|        category_id|       category_code|  brand| price|user_id|user_session|
+-------------------+----------+----------+-------------------+--------------------+-------+------+-------+------------+
|2019-10-01 10:00:00|      view|      1001|2053013555631883655|electronics.smqar...|samsung|899.99|    101|          u1|
|2019-10-01 10:00:00|      view|      1001|2053013555631883655|electronics.smqar...|  apple|1299.0|    105|          u5|
+-------------------+----------+----------+-------------------+--------------------+-------+------+-------+------------+
only showing top 2 rows


In [0]:
### Task 3 - optimize tables 

In [0]:
spark.sql("""
          OPTIMIZE delta.`/Volumes/workspace/ecommerce/ecommerce_data/delta_events/`
          ZORDER BY (event_type, user_id)
          """)

DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,clusteringStats:struct<inputZCubeFiles:struct<numFiles:bigint,size:bigint>,inputOtherFiles:struct<numFiles:bigint,size:bigint>,inputNumZCubes:bigint,mergedFiles:struct<numFiles:bigint,size:bigint>,numOutputZCubes:bigint>,numBins:bigint,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,

In [0]:
### Task 4 - clean old files 

In [0]:
spark.sql("""
            VACUUM delta.`/Volumes/workspace/ecommerce/ecommerce_data/delta_events/` 
            RETAIN 168 HOURS""")

DataFrame[path: string]