In [0]:
# Data to be appended to the existing table
new_data = [("2019-11-01 00:00:00 UTC", "view", "1003461", "2053013555631882655", "electronics.smartphone", "xiaomi", "489.07", "520088904", "4d3b30da-a5e4-49df-b1a8-ba5943f1dd33"), 
            ("2019-11-01 00:01:15 UTC", "view", "1003462", "2053013555631882655", "electronics.smartphone", "samsung", "699.99", "520088905", "7a1c42de-9b22-4c6d-9e18-2e9e1d1f4c21")]
# Define the column names
columns = ["event_time", "event_type", "product_id", "category_id", "category_code", "brand", "price", "user_id", "user_session"]
# Create a DataFrame with the new data
incremental_df = spark.createDataFrame(new_data, columns)
# Insert the new data into the Delta table

In [0]:
# Import libary DeltaTable
from delta.tables import DeltaTable
# Read the Delta table
delta_table = DeltaTable.forName(spark, "workspace.ecommerce.ecommerce_data_2019_nov")
# Perform the merge
delta_table.alias("target").merge(
    incremental_df.alias("source"),
    """
    target.event_time   = source.event_time AND
    target.event_type   = source.event_type AND
    target.product_id   = source.product_id AND
    target.category_id  = source.category_id AND
    target.category_code= source.category_code AND
    target.brand        = source.brand AND
    target.price        = source.price AND
    target.user_id      = source.user_id AND
    target.user_session = source.user_session
    """
).whenNotMatchedInsertAll().execute()
# Get the latest operation from Delta history
history_df = delta_table.history(1)
history = history_df.collect()[0]
# Extract number of inserted rows
row_inserted = int(history.operationMetrics.get("numInsertedRows", 0))
# Check if the row was inserted or not
if row_inserted > 0:
    print(f"Row successfully inserted. Inserted rows: {row_inserted}")
else:
    print("Duplicate row detected. No insertion performed.")

Duplicate row detected. No insertion performed.


In [0]:
# Query historical versions of the Delta table
for v in range(delta_table.history().count()):
    df_version = spark.read.format("delta").option("versionAsOf", v).table("workspace.ecommerce.ecommerce_data_2019_nov")
    print(f"Version: {v} | Shape: ({df_version.count()}, {len(df_version.columns)})")

Version: 0 | Shape: (67501979, 9)
Version: 1 | Shape: (67501979, 9)
Version: 2 | Shape: (67501979, 9)
Version: 3 | Shape: (67501980, 9)
Version: 4 | Shape: (67501980, 9)
Version: 5 | Shape: (67501980, 9)
Version: 6 | Shape: (67501980, 9)
Version: 7 | Shape: (67501980, 9)
Version: 8 | Shape: (67501980, 9)
Version: 9 | Shape: (67501981, 9)
Version: 10 | Shape: (67501981, 9)
Version: 11 | Shape: (67501981, 9)


In [0]:
# Optimize the Delta table to improve performance
optimized_result = spark.sql("OPTIMIZE workspace.ecommerce.ecommerce_data_2019_nov ZORDER BY (event_time, event_type, product_id, category_id, category_code, brand, price, user_id, user_session)")
# optimized_result.show(truncate=False).display()
from pyspark.sql.functions import col
optimized_result.select(
    col("metrics.numFilesAdded").alias("files_added"),
    col("metrics.numFilesRemoved").alias("files_removed"),
    col("metrics.filesAdded.totalSize").alias("size_added"),
    col("metrics.filesRemoved.totalSize").alias("size_removed")
).display()

files_added,files_removed,size_added,size_removed
0,0,0,0


In [0]:
# Clean up old files from the Delta table to reclaim storage
spark.sql("VACUUM workspace.ecommerce.ecommerce_data_2019_nov RETAIN 168 HOURS")

DataFrame[path: string]