In [0]:
from pyspark.sql import functions as F
from delta.tables import *

In [0]:
file_path = f"/databricks-datasets/online_retail/data-001/*"

In [0]:
try:
    df = spark.read.csv(file_path, header=True, inferSchema=True)
    display(df.limit(10))
except Exception as e:
    print(f"Error reading CSV file: {e}")

In [0]:
df.write.format("delta").mode("overwrite").save("/mnt/delta/retail_sales")

In [0]:
delta_table = DeltaTable.forPath(spark, "/mnt/delta/retail_sales")

In [0]:
spark.sql("CREATE TABLE IF NOT EXISTS retail_sales_table USING DELTA LOCATION '/mnt/delta/retail_sales'")

In [0]:
bad_data = [("536365", "85123A", "WHITE HEART", 6, "12/1/10 8:26", 2.55, 17850, "UK", "SAVE10")]
bad_df = spark.createDataFrame(bad_data, ["InvoiceNo", "StockCode", "Description", "Quantity", 
                                          "InvoiceDate", "UnitPrice", "CustomerID", "Country", "DiscountCode"]

In [0]:
update_data = [("536365", "85123A", "WHITE HANGING HEART T-LIGHT HOLDER", 10, "12/1/10 8:26", 2.55, 17850, "United Kingdom")]
update_df = spark.createDataFrame(update_data, df.schema)

In [0]:
delta_table.alias("old").merge(
    update_df.alias("new"),
    "old.InvoiceNo = new.InvoiceNo AND old.StockCode = new.StockCode"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

print("Upsert completed successfully.")