In [0]:
customers = spark.table("customer_transactions")
customers.printSchema()
customers.show(5)



root
 |-- customer_id: long (nullable = true)
 |-- age: long (nullable = true)
 |-- gender: string (nullable = true)
 |-- country: string (nullable = true)
 |-- annual_income: long (nullable = true)
 |-- spending_score: long (nullable = true)
 |-- num_purchases: long (nullable = true)
 |-- avg_purchase_value: double (nullable = true)
 |-- membership_years: long (nullable = true)
 |-- website_visits_per_month: long (nullable = true)
 |-- cart_abandon_rate: double (nullable = true)
 |-- churned: long (nullable = true)
 |-- feedback_text: string (nullable = true)
 |-- last_purchase_date: date (nullable = true)

+-----------+---+------+---------+-------------+--------------+-------------+------------------+----------------+------------------------+-----------------+-------+--------------------+------------------+
|customer_id|age|gender|  country|annual_income|spending_score|num_purchases|avg_purchase_value|membership_years|website_visits_per_month|cart_abandon_rate|churned|       feedback

In [0]:
updates = spark.createDataFrame(
    [
        (1, 9999.0),
        (2, 8888.0),
        (50, 7777.0)   # new customer
    ],
    ["customer_id", "annual_income"]
)

updates.show()


+-----------+-------------+
|customer_id|annual_income|
+-----------+-------------+
|          1|       9999.0|
|          2|       8888.0|
|         50|       7777.0|
+-----------+-------------+



In [0]:
customers.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("customer_transactions_delta")

In [0]:
spark.sql("DESCRIBE DETAIL customer_transactions_delta").show()

+------+--------------------+--------------------+-----------+--------+--------------------+-------------------+----------------+-----------------+--------+-----------+--------------------+----------------+----------------+--------------------+--------------------+-------------+
|format|                  id|                name|description|location|           createdAt|       lastModified|partitionColumns|clusteringColumns|numFiles|sizeInBytes|          properties|minReaderVersion|minWriterVersion|       tableFeatures|          statistics|clusterByAuto|
+------+--------------------+--------------------+-----------+--------+--------------------+-------------------+----------------+-----------------+--------+-----------+--------------------+----------------+----------------+--------------------+--------------------+-------------+
| delta|9205a59f-8d71-4e0...|workspace.default...|       NULL|        |2026-01-12 15:03:...|2026-01-13 11:00:49|              []|               []|       1|    

In [0]:
from delta.tables import DeltaTable

delta_table = DeltaTable.forName(spark, "customer_transactions_delta")

delta_table.alias("t").merge(
    updates.alias("s"),
    "t.customer_id = s.customer_id"
).whenMatchedUpdate(set={
    "annual_income": "s.annual_income"
}).whenNotMatchedInsert(values={
    "customer_id": "s.customer_id",
    "annual_income": "s.annual_income"
}).execute()


DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [0]:
spark.table("customer_transactions_delta") \
    .filter("customer_id IN (1,2,50)") \
    .show()

+-----------+---+------+-------+-------------+--------------+-------------+------------------+----------------+------------------------+-----------------+-------+--------------------+------------------+
|customer_id|age|gender|country|annual_income|spending_score|num_purchases|avg_purchase_value|membership_years|website_visits_per_month|cart_abandon_rate|churned|       feedback_text|last_purchase_date|
+-----------+---+------+-------+-------------+--------------+-------------+------------------+----------------+------------------------+-----------------+-------+--------------------+------------------+
|         50| 24|Female|  India|         7777|            53|           13|             31.19|               3|                       9|             0.36|      0|Love the products...|        2025-08-12|
|          1| 37|  Male|Germany|         9999|            14|           18|              41.2|               6|                      20|             0.95|      0|Very satisfied wi...|     

In [0]:
spark.read.format("delta") \
  .option("versionAsOf", 0) \
  .table("customer_transactions_delta") \
  .show(5)


+-----------+---+------+---------+-------------+--------------+-------------+------------------+----------------+------------------------+-----------------+-------+--------------------+------------------+
|customer_id|age|gender|  country|annual_income|spending_score|num_purchases|avg_purchase_value|membership_years|website_visits_per_month|cart_abandon_rate|churned|       feedback_text|last_purchase_date|
+-----------+---+------+---------+-------------+--------------+-------------+------------------+----------------+------------------------+-----------------+-------+--------------------+------------------+
|          1| 37|  Male|  Germany|        85886|            14|           18|              41.2|               6|                      20|             0.95|      0|Very satisfied wi...|        2025-06-22|
|          2| 40|  Male|    India|        41041|             4|           10|             31.73|               4|                      29|             0.21|      0|Good quality and

In [0]:
spark.sql("OPTIMIZE customer_transactions_delta")

DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,clusteringStats:struct<inputZCubeFiles:struct<numFiles:bigint,size:bigint>,inputOtherFiles:struct<numFiles:bigint,size:bigint>,inputNumZCubes:bigint,mergedFiles:struct<numFiles:bigint,size:bigint>,numOutputZCubes:bigint>,numBins:bigint,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,

In [0]:
spark.sql("VACUUM customer_transactions_delta RETAIN 168 HOURS")

DataFrame[path: string]