In [0]:
from delta.tables import DeltaTable
from pyspark.sql import functions as F

# Reference existing Delta table by path
delta_path = "dbfs:/Volumes/workspace/default/kaggle_volume/delta/events"
deltaTable = DeltaTable.forPath(spark, delta_path)


In [0]:
# Create incremental data
updates = (
    spark.read.format("delta").load(delta_path)
    .sample(0.02, seed=42)          # small batch
    .withColumn("price", F.col("price") + 5)  # simulate change
)


In [0]:
# Deduplicate source
updates_clean = updates.dropDuplicates(
    ["user_id", "event_time", "product_id"]
)

In [0]:
# Merge
deltaTable.alias("t").merge(
    updates_clean.alias("s"),
    """
    t.user_id = s.user_id
    AND t.event_time = s.event_time
    AND t.product_id = s.product_id
    """
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

print("Incremental MERGE completed")


Incremental MERGE completed


In [0]:

%sql
-- Query Historical Versions (Time Travel)
-- step1- View table history
DESCRIBE HISTORY workspace.default.events_table;



version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
0,2026-01-12T05:37:49.000Z,72099111605464,sonali.gupta@vensysco.in,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)",,List(1556992834470568),0112-050619-8lqz2v4h-v2n,,WriteSerializable,False,"Map(numFiles -> 43, numRemovedFiles -> 0, numRemovedBytes -> 0, numDeletionVectorsRemoved -> 0, numOutputRows -> 42448764, numOutputBytes -> 1405244778)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13


In [0]:
# step2- Read an old version
v0 = spark.read.format("delta") \
    .option("versionAsOf", 0) \
    .load(delta_path)

print("Version 0 rows:", v0.count())

Version 0 rows: 42448764


In [0]:
# step3- Read by timestamp
old_data = spark.read.format("delta") \
    .option("timestampAsOf", "2024-01-01") \
    .load(delta_path)

old_data.show(5)


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-7382243017409246>, line 6[0m
[1;32m      1[0m [38;5;66;03m# step3- Read by timestamp[39;00m
[1;32m      2[0m old_data [38;5;241m=[39m spark[38;5;241m.[39mread[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;124m"[39m) \
[1;32m      3[0m     [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mtimestampAsOf[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124m2024-01-01[39m[38;5;124m"[39m) \
[1;32m      4[0m     [38;5;241m.[39mload(delta_path)
[0;32m----> 6[0m old_data[38;5;241m.[39mshow([38;5;241m5[39m)

File [0;32m/databricks/python/lib/python3.12/site-packages/pyspark/sql/connect/dataframe.py:1123[0m, in [0;36mDataFrame.show[0;34m(self, n, truncate, vertical)[0m
[1;32m   1122[0m [38;5;28;01mdef[39;00m [38;5;21mshow[39m([38;5;28mself[39m

In [0]:

%sql
-- OPTIMIZE & ZORDER
OPTIMIZE workspace.default.events_table
ZORDER BY (event_type, user_id);


path,metrics
,"List(26, 43, List(30174253, 56176753, 4.4598923115384616E7, 26, 1159572001), List(5827713, 35958801, 3.268011111627907E7, 43, 1405244778), 0, List(minCubeSize(107374182400), List(0, 0), List(43, 1405244778), 0, List(43, 1405244778), 1, null), null, 0, 1, 43, 0, false, 0, 0, 1768284401303, 1768284420459, 8, 1, null, List(0, 0), null, 9, 9, 64142, 0, null)"


In [0]:

%sql
-- Clean Old Files (VACUUM)
VACUUM workspace.default.events_table RETAIN 168 HOURS;


path
