In [0]:
initial_data = [
    ("P001", "Laptop", "Electronics", 999.99),
    ("P002", "Smartphone", "Electronics", 499.99),
    ("P003", "T-Shirt", "Clothing", 29.99)
]

initial_columns = ["product_id", "name", "category", "price"]

df_initial = spark.createDataFrame(initial_data, initial_columns)
df_initial.write.format("delta").mode("overwrite").saveAsTable("default.products")


In [0]:
spark.sql("SELECT * FROM default.products").show()

+----------+----------+-----------+------+
|product_id|      name|   category| price|
+----------+----------+-----------+------+
|      P001|    Laptop|Electronics|999.99|
|      P002|Smartphone|Electronics|499.99|
|      P003|   T-Shirt|   Clothing| 29.99|
+----------+----------+-----------+------+



In [0]:
from pyspark.sql.functions import lit
from delta.tables import DeltaTable

df_existing = spark.read.format("delta").table("default.products")

if 'discount' not in df_existing.columns:
    df_existing = df_existing.withColumn("discount", lit(None).cast("double"))

    df_existing.write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable("default.products")

update_data = [
    ("P001", "Laptop", "Electronics", 1099.99, 50.0),   
    ("P004", "Headphones", "Electronics", 149.99, 10.0), 
    ("P005", "Jeans", "Clothing", 59.99, 5.0)           
]
update_columns = ["product_id", "name", "category", "price", "discount"]
df_update = spark.createDataFrame(update_data, update_columns)

delta_table = DeltaTable.forName(spark, "default.products")

delta_table.alias("target") \
    .merge(df_update.alias("source"), "target.product_id = source.product_id") \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()


DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [0]:
spark.sql("DESCRIBE TABLE default.products").show()

+----------+---------+-------+
|  col_name|data_type|comment|
+----------+---------+-------+
|product_id|   string|   NULL|
|      name|   string|   NULL|
|  category|   string|   NULL|
|     price|   double|   NULL|
|  discount|   double|   NULL|
+----------+---------+-------+



In [0]:
spark.sql("SELECT * FROM default.products").show()

+----------+----------+-----------+-------+--------+
|product_id|      name|   category|  price|discount|
+----------+----------+-----------+-------+--------+
|      P002|Smartphone|Electronics| 499.99|    NULL|
|      P003|   T-Shirt|   Clothing|  29.99|    NULL|
|      P001|    Laptop|Electronics|1099.99|    50.0|
|      P004|Headphones|Electronics| 149.99|    10.0|
|      P005|     Jeans|   Clothing|  59.99|     5.0|
+----------+----------+-----------+-------+--------+



In [0]:
spark.sql("DESCRIBE HISTORY default.products").show(truncate=False)

+-------+-------------------+---------------+-----------------------------+---------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----+--------+------------------------+-----------+-----------------+-------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
spark.read.format("delta").option("versionAsOf", 0).table("default.products").show()

+----------+----------+-----------+------+
|product_id|      name|   category| price|
+----------+----------+-----------+------+
|      P001|    Laptop|Electronics|999.99|
|      P002|Smartphone|Electronics|499.99|
|      P003|   T-Shirt|   Clothing| 29.99|
+----------+----------+-----------+------+



In [0]:
spark.read.format("delta").option("versionAsOf", 1).table("default.products").show()

+----------+----------+-----------+-------+
|product_id|      name|   category|  price|
+----------+----------+-----------+-------+
|      P002|Smartphone|Electronics| 499.99|
|      P003|   T-Shirt|   Clothing|  29.99|
|      P004|Headphones|Electronics| 149.99|
|      P001|    Laptop|Electronics|1099.99|
|      P005|     Jeans|   Clothing|  59.99|
+----------+----------+-----------+-------+



In [0]:
spark.read.format("delta").option("versionAsOf", 10).table("default.products").show()

+----------+----------+-----------+-------+--------+
|product_id|      name|   category|  price|discount|
+----------+----------+-----------+-------+--------+
|      P002|Smartphone|Electronics| 499.99|    NULL|
|      P003|   T-Shirt|   Clothing|  29.99|    NULL|
|      P004|Headphones|Electronics| 149.99|    10.0|
|      P001|    Laptop|Electronics|1099.99|    50.0|
|      P005|     Jeans|   Clothing|  59.99|     5.0|
+----------+----------+-----------+-------+--------+



In [0]:
from pyspark.sql.functions import col

df_v0 = spark.read.format("delta").option("versionAsOf", 0).table("default.products")
df_v1 = spark.read.format("delta").option("versionAsOf", 1).table("default.products")

df_price_change = df_v0.alias("v0").join(
    df_v1.alias("v1"),
    on="product_id",
    how="inner"
).select(
    "product_id",
    col("v0.price").alias("price_before"),
    col("v1.price").alias("price_after")
).where("price_before != price_after")

df_price_change.show()

+----------+------------+-----------+
|product_id|price_before|price_after|
+----------+------------+-----------+
|      P001|      999.99|    1099.99|
+----------+------------+-----------+



In [0]:
spark.sql("DESCRIBE HISTORY default.products").createOrReplaceTempView("product_history")

spark.sql("""
    SELECT version, timestamp, operation, userName
    FROM product_history
    WHERE operation = 'MERGE'
""").show(truncate=False)

+-------+-------------------+---------+-----------------------------+
|version|timestamp          |operation|userName                     |
+-------+-------------------+---------+-----------------------------+
|10     |2025-07-10 07:23:53|MERGE    |vallurusumanthkumar@gmail.com|
|6      |2025-07-10 07:14:24|MERGE    |vallurusumanthkumar@gmail.com|
|1      |2025-07-08 05:40:08|MERGE    |vallurusumanthkumar@gmail.com|
+-------+-------------------+---------+-----------------------------+



In [0]:

for version in range(0, 11):
    print(f"--- Version: {version} ---")
    df = spark.read.format("delta").option("versionAsOf", version).table("default.products")
    df.filter("product_id = 'P001'").select("product_id", "price").show()


--- Version: 0 ---
+----------+------+
|product_id| price|
+----------+------+
|      P001|999.99|
+----------+------+

--- Version: 1 ---
+----------+-------+
|product_id|  price|
+----------+-------+
|      P001|1099.99|
+----------+-------+

--- Version: 2 ---
+----------+-------+
|product_id|  price|
+----------+-------+
|      P001|1099.99|
+----------+-------+

--- Version: 3 ---
+----------+------+
|product_id| price|
+----------+------+
|      P001|999.99|
+----------+------+

--- Version: 4 ---
+----------+------+
|product_id| price|
+----------+------+
|      P001|999.99|
+----------+------+

--- Version: 5 ---
+----------+------+
|product_id| price|
+----------+------+
|      P001|999.99|
+----------+------+

--- Version: 6 ---
+----------+-------+
|product_id|  price|
+----------+-------+
|      P001|1099.99|
+----------+-------+

--- Version: 7 ---
+----------+-------+
|product_id|  price|
+----------+-------+
|      P001|1099.99|
+----------+-------+

--- Version: 8 ---
+

In [0]:
spark.sql("SELECT * FROM default.products").show()

+----------+----------+-----------+-------+--------+
|product_id|      name|   category|  price|discount|
+----------+----------+-----------+-------+--------+
|      P002|Smartphone|Electronics| 499.99|    NULL|
|      P003|   T-Shirt|   Clothing|  29.99|    NULL|
|      P001|    Laptop|Electronics|1099.99|    50.0|
|      P004|Headphones|Electronics| 149.99|    10.0|
|      P005|     Jeans|   Clothing|  59.99|     5.0|
+----------+----------+-----------+-------+--------+



In [0]:
spark.sql("""
SELECT *
FROM default.products
ORDER BY product_id
""").show()


+----------+----------+-----------+-------+--------+
|product_id|      name|   category|  price|discount|
+----------+----------+-----------+-------+--------+
|      P001|    Laptop|Electronics|1099.99|    50.0|
|      P002|Smartphone|Electronics| 499.99|    NULL|
|      P003|   T-Shirt|   Clothing|  29.99|    NULL|
|      P004|Headphones|Electronics| 149.99|    10.0|
|      P005|     Jeans|   Clothing|  59.99|     5.0|
+----------+----------+-----------+-------+--------+

