# Scenario 1
A retail company receives daily updates for its product catalog, including new products, price changes, and discontinued items. Instead of overwriting the entire catalog or simply appending new records, they need to **upsert** the incoming data - updating the existing products with the latest information and inserting new products - ensuring the catalog remains accurate and up to date in real time.

## **Querying Source**

In [0]:
%sql
SELECT * FROM pyspark_cata.source.products

id,name,price,category,updatedDate
1,iPhone,1000,electronics,2025-08-19T14:59:08.969Z
2,Macbook,2000,electronics,2025-08-19T14:59:08.969Z
3,T-shirt,50,clothing,2025-08-19T14:59:08.969Z
4,Shirt,100,clothing,2025-08-19T14:59:08.969Z
5,Pants,150,clothing,2025-08-19T14:59:08.969Z
4,Long shirt,300,clothing,2025-08-19T18:17:33.283Z
5,Trouser,150,clothing,2025-08-19T16:21:51.276Z


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import *

In [0]:
df = spark.sql("select * from pyspark_cata.source.products")

# deduplicate
df = df.withColumn("dedup", row_number().over(Window.partitionBy("id").orderBy(desc("updatedDate"))))
df = df.filter(col("dedup")==1).drop("dedup")

display(df)

id,name,price,category,updatedDate
1,iPhone,1000,electronics,2025-08-19T14:59:08.969Z
2,Macbook,2000,electronics,2025-08-19T14:59:08.969Z
3,T-shirt,50,clothing,2025-08-19T14:59:08.969Z
4,Long shirt,300,clothing,2025-08-19T18:17:33.283Z
5,Trouser,150,clothing,2025-08-19T16:21:51.276Z


## **UPSERTS**

In [0]:
# Creating Delta Object

from delta.tables import DeltaTable

if len(dbutils.fs.ls("/Volumes/pyspark_cata/source/db_volume/products_sink/")) > 0:

    dlt_obj = DeltaTable.forPath(spark, "/Volumes/pyspark_cata/source/db_volume/products_sink/")

    dlt_obj.alias("trg").merge(
        df.alias("src"),
        "src.id = trg.id"
        )\
        .whenMatchedUpdateAll(condition="src.updatedDate >= trg.updatedDate")\
        .whenNotMatchedInsertAll()\
        .execute()
    print("This is upserting now.")

else:

    df.write.format("delta")\
        .mode("overwrite")\
        .save("/Volumes/pyspark_cata/source/db_volume/products_sink/")

This is upserting now.


In [0]:
%sql
SELECT * FROM delta.`/Volumes/pyspark_cata/source/db_volume/products_sink/`

id,name,price,category,updatedDate
1,iPhone,1000,electronics,2025-08-19T14:59:08.969Z
4,Long shirt,300,clothing,2025-08-19T18:17:33.283Z
3,T-shirt,50,clothing,2025-08-19T14:59:08.969Z
2,Macbook,2000,electronics,2025-08-19T14:59:08.969Z
5,Trouser,150,clothing,2025-08-19T16:21:51.276Z
