# Retail Catalog Upsert with PySpark and Delta Lake

A retail company receives daily updates for its product catalog, including new products, price changes, and discontinued items.  
Instead of overwriting the entire catalog or simply appending new records, the goal is to **upsert** the incoming data—updating existing products with the latest information and inserting new products—so that the product catalog remains **accurate, consistent, and up-to-date in real-time**.

# SQL: Table Setup

### Explanation:
- Drops the table learning_db.qna.products if it exists, ensuring a clean slate.
- Creates the table with columns for product details.
- Inserts three initial product records.

In [0]:
%sql
DROP TABLE IF EXISTS learning_db.qna.products;
CREATE TABLE IF NOT EXISTS learning_db.qna.products (
  product_id STRING,
  name STRING,
  category STRING,
  price DOUBLE,
  currency STRING,
  updated_at TIMESTAMP);

INSERT INTO learning_db.qna.products VALUES
('1', 'Wireless Mouse', 'Electronics', 25.99, 'USD', '2025-08-20 10:00:00'),
('2', 'Yoga Mat', 'Fitness', 19.99, 'USD', '2025-08-20 10:05:00'),
('3', 'Coffee Mug', 'Kitchen', 7.50, 'USD', '2025-08-20 10:10:00');

# Python: Load Table as DataFrame

In [0]:
df = spark.table("learning_db.qna.products")
display(df)

# Python: Delta Lake Merge
Explanation:

- Imports Delta Lake functionality.
- Checks if the Delta table path /Volumes/learning_db/qna/landing/produncts_upsert exists.
- If it exists:
  - Loads the Delta table.
  - Merges the new data (`df`) into the Delta table:
  - Updates records if the incoming `updated_at` is newer or equal.
  - Inserts records if they don’t exist.
- If it does not exist:
  - Writes the DataFrame as a new Delta table at the specified path.
- Displays the merged Delta table, ordered by `product_id`.

In [0]:
# Creating Delta Object
from delta.tables import DeltaTable

if dbutils.fs.ls("/Volumes/learning_db/qna/landing/produncts_upsert"):
    
    deltaObject = DeltaTable.forPath(spark, "/Volumes/learning_db/qna/landing/produncts_upsert")

    deltaObject.alias("tgt").merge(
        df.alias("src"),
        "src.product_id = tgt.product_id",
    ).whenMatchedUpdateAll(condition="src.updated_at >= tgt.updated_at")\
        .whenNotMatchedInsertAll()\
        .execute()

else:
    df.write.format("delta").mode("overwrite").save("/Volumes/learning_db/qna/landing/produncts_upsert")

display(spark.sql("select * from delta.`/Volumes/learning_db/qna/landing/produncts_upsert`").orderBy("product_id"))

# SQL: Insert More Products
### Explanation:
- Inserts five new records into the products table.
  - Some have duplicate product_id values (for updates).
  - Some are new products.
  - One is a duplicate row (same as previous).

In [0]:
%sql
INSERT INTO learning_db.qna.products VALUES
('2', 'Yoga Mat Pro', 'Fitness', 24.99, 'USD', '2025-08-20 12:00:00'), -- duplicate id, updated info
('3', 'Coffee Mug XL', 'Kitchen', 9.99, 'USD', '2025-08-20 12:05:00'), -- duplicate id, updated info
('4', 'Bluetooth Speaker', 'Electronics', 49.99, 'USD', '2025-08-20 12:10:00'), -- new
('5', 'Running Shoes', 'Fitness', 59.99, 'USD', '2025-08-20 12:15:00'), -- new
('5', 'Running Shoes', 'Fitness', 59.99, 'USD', '2025-08-20 12:15:00'); -- same as above, duplicate row

# Python: Deduplicate and Keep Latest
### Explanation:

- Imports necessary PySpark functions and windowing.
- Reloads the `products` table into a DataFrame.
- Defines a window partitioned by `product_id`, ordered by `updated_at` descending.
- Assigns a `row number` within each partition (latest record gets `rnk=1`).
- Filters to keep only the latest record for each `product_id` (deduplication).
- Displays the deduplicated DataFrame.

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window

df = spark.table("learning_db.qna.products")

WindowSpec = Window.partitionBy("product_id").orderBy(desc("updated_at"))

df_rnk = df.withColumn("rnk", row_number().over(WindowSpec))
df = df_rnk.filter(col("rnk") == 1).drop("rnk")

display(df)

In [0]:
# Creating Delta ObjectS
from delta.tables import DeltaTable

if dbutils.fs.ls("/Volumes/learning_db/qna/landing/produncts_upsert"):
    
    deltaObject = DeltaTable.forPath(spark, "/Volumes/learning_db/qna/landing/produncts_upsert")

    deltaObject.alias("tgt").merge(
        df.alias("src"),
        "src.product_id = tgt.product_id",
    ).whenMatchedUpdateAll(condition="src.updated_at >= tgt.updated_at")\
        .whenNotMatchedInsertAll()\
        .execute()

else:
    df.write.format("delta").mode("overwrite").save("/Volumes/learning_db/qna/landing/produncts_upsert")

display(spark.sql("select * from delta.`/Volumes/learning_db/qna/landing/produncts_upsert`").orderBy("product_id"))

# Final Combined Code: Deduplicate and Upsert Products Table to Delta Lake

### Summary of Steps:

- **Read** the products table into a Spark DataFrame.
- **Deduplicate** by keeping only the latest updated_at for each product_id.
- **Upsert** the cleaned DataFrame into the Delta table, merging updates and inserting new records.
- **Display** the final upserted Delta table, ordered by product_id.

In [0]:
# Step 1: Read the products table into a Spark DataFrame
df = spark.table("learning_db.qna.products")

# Step 2: Remove duplicate product_id rows, keeping only the latest updated_at for each product
from pyspark.sql.functions import col, row_number, desc
from pyspark.sql.window import Window

WindowSpec = Window.partitionBy("product_id").orderBy(desc("updated_at"))
df_rnk = df.withColumn("rnk", row_number().over(WindowSpec))
df_uqi = df_rnk.filter(col("rnk") == 1).drop("rnk")

# Step 3: Upsert the cleaned DataFrame into the Delta table
from delta.tables import DeltaTable

if dbutils.fs.ls("/Volumes/learning_db/qna/landing/produncts_upsert"):
    deltaObject = DeltaTable.forPath(spark, "/Volumes/learning_db/qna/landing/produncts_upsert")
    deltaObject.alias("tgt").merge(
        df_uqi.alias("src"),
        "src.product_id = tgt.product_id",
    ).whenMatchedUpdateAll(condition="src.updated_at >= tgt.updated_at")\
     .whenNotMatchedInsertAll()\
     .execute()
else:
    df_uqi.write.format("delta").mode("overwrite").save("/Volumes/learning_db/qna/landing/produncts_upsert")

# Step 4: Display the upserted Delta table
display(spark.sql("select * from delta.`/Volumes/learning_db/qna/landing/produncts_upsert`").orderBy("product_id"))