
## Handling UPSERTS uisng PySpark

### Upsert

**Upsert = Update + Insert**

An **upsert** is a data operation where:
- If a record **already exists**, it is **updated**
- If a record **does not exist**, it is **inserted**

**Upserts are operations that let you maintain accurate, de-duplicated, and current tables by inserting new records and updating existing ones in a single atomic process**

This ensures that the **data is not duplicated** while keeping it **current**, making it essential in analytics, pipelines, CDC (Change Data Capture), and real-time systems

### Use Cases
- **Incremental ETL/ELT**: Only changed rows are updated, new rows are inserted
- **Change Data Capture (CDC)**: Apply inserts and updates from source systems
- **Slowly Changing Dimensions (SCD Type 1)**: Overwrite old dimension values with new ones
- **Kafka Pipelines**: Keep sync table in sync with event streams
- **API Data Ingestion**: External APIs often return both new + updated data

### MySQL Syntax

```sql
INSERT INTO customers (customer_id, email, last_login)
VALUES (101, 'plaidt@gmail.com', '2025-12-11')
ON DUPLICATE KEY UPDATE
    email = VALUES(email),
    last_login = VALUES(last_login);
```

### Scenario

A retail company recives daily updates for its product catalog, including new products, price changes, and discounted items. Instead of overwriting the entire catalog or simply appending new records, they need to upsert the incoming data, updating existing products with the latest information and inserting new products. This ensures that the catalog remains accurate and up-to-date in real-time

Firstly, we need to create a source table which has the products on which we need to perform upserting. To do that,
- Go to `Catalog` and create a catalog named `pyspark_cata`
- Configure the catalog as prompted, click `next` and `save`
- Create a new schema `source`
- Go to `SQL Editor` and select workspace as `pyspark_cata` and schema as `source`
- Turn on the Serverless Starter Warehouse compute to run SQL Query
- Run the following query before returning to the notebook

```sql
CREATE TABLE products
(
  id INT,
  name STRING,
  price INT,
  category STRING,
  updated_date TIMESTAMP
);

INSERT INTO products VALUES
(1, 'iPhone', 1000, 'electronics', current_timestamp()),
(2, 'Macbook', 2000, 'electronics', current_timestamp()),
(3, 'T-Shirt', 50, 'clothing', current_timestamp()),
(4, 'Shirt', 100, 'clothing', current_timestamp()),
(5, 'Pants', 200, 'clothing', current_timestamp()),
(6, 'Shoes', 300, 'shoes', current_timestamp()),
(7, 'Sneakers', 50, 'shoes', current_timestamp());
```

#### Querying Source

In [0]:
%sql

SELECT * FROM pyspark_cata.source.products

id,name,price,category,updated_date
1,iPhone,1000,electronics,2025-12-11T14:17:04.127Z
2,Macbook,2000,electronics,2025-12-11T14:17:04.127Z
3,T-Shirt,50,clothing,2025-12-11T14:17:04.127Z
4,Shirt,100,clothing,2025-12-11T14:17:04.127Z
5,Pants,200,clothing,2025-12-11T14:17:04.127Z
6,Shoes,300,shoes,2025-12-11T14:17:04.127Z
7,Sneakers,50,shoes,2025-12-11T14:17:04.127Z


In [0]:
df = spark.sql("select * from pyspark_cata.source.products")
display(df)

id,name,price,category,updated_date
1,iPhone,1000,electronics,2025-12-11T14:17:04.127Z
2,Macbook,2000,electronics,2025-12-11T14:17:04.127Z
3,T-Shirt,50,clothing,2025-12-11T14:17:04.127Z
4,Shirt,100,clothing,2025-12-11T14:17:04.127Z
5,Pants,200,clothing,2025-12-11T14:17:04.127Z
6,Shoes,300,shoes,2025-12-11T14:17:04.127Z
7,Sneakers,50,shoes,2025-12-11T14:17:04.127Z


Now we can see the data through the table we created. The next part is performing the upserts operation but for that, we require a **destination** where the upserted data will be stored. If you are not provided with destination in interviews, make sure to ask where is the destination or where do we sink the data.

So we create the destination in Databricks
- Goto our `source` and create volume `db_volume`
- Create a directory `products_sink` in `db_volume`

PS: Volumes are a managed storage location iside Unity Catalog used for storing files instead of tables. Think of it as a file sytem directory inside a Catalog and Schema, governed by Unity Catalog 

#### Upserting Data

In [0]:
# Creating Delta Object

from delta.tables import DeltaTable

# Setting guard-rails by using try-except so that Upsert doesn't fail if the table doesn't 
# We can also use 'if spark.catalog.tableExists():'
# or
# 'if dbutils.fs.ls("/Volumes/pyspark_cata/source/db_volume/products_sink/"):'

try:
    dlt_obj = DeltaTable.forPath(spark, "/Volumes/pyspark_cata/source/db_volume/products_sink/")

    # The condition in whenMatchedUpdateAll() is optional.
    # It is used to prevent updating the data if the source data is older than the target data.
    # However, in real-world, if we need to backfill the data, this condition can be useful to not lose latest
    # data

    dlt_obj.alias("tgt").merge(
        df.alias("src"), "src.id = tgt.id"
    ).whenMatchedUpdateAll(condition="src.updated_date >= tgt.updated_date")\
        .whenNotMatchedInsertAll().execute()
    print("The data is being Upserted")

except:
    df.write.format("delta")\
        .mode("overwrite")\
        .save("/Volumes/pyspark_cata/source/db_volume/products_sink/")
    print("The data is being Written")


The data is being Upserted


In [0]:
%sql
SELECT * FROM delta.`/Volumes/pyspark_cata/source/db_volume/products_sink/`

id,name,price,category,updated_date
1,iPhone,1000,electronics,2025-12-11T14:17:04.127Z
2,Macbook,2000,electronics,2025-12-11T14:17:04.127Z
3,T-Shirt,50,clothing,2025-12-11T14:17:04.127Z
4,Shirt,100,clothing,2025-12-11T14:17:04.127Z
5,Pants,200,clothing,2025-12-11T14:17:04.127Z
6,Shoes,300,shoes,2025-12-11T14:17:04.127Z
7,Sneakers,50,shoes,2025-12-11T14:17:04.127Z



Now this should perform Upsert efficiently. But there is a catch! Let's say we go back to our SQL Editor and insert value where for id 5, instead of pants, it now has trousers. The pipeline would fail. Let's see how.

In [0]:
df = spark.sql("select * from pyspark_cata.source.products")
display(df)

id,name,price,category,updated_date
1,iPhone,1000,electronics,2025-12-11T14:17:04.127Z
2,Macbook,2000,electronics,2025-12-11T14:17:04.127Z
3,T-Shirt,50,clothing,2025-12-11T14:17:04.127Z
4,Shirt,100,clothing,2025-12-11T14:17:04.127Z
5,Pants,200,clothing,2025-12-11T14:17:04.127Z
6,Shoes,300,shoes,2025-12-11T14:17:04.127Z
7,Sneakers,50,shoes,2025-12-11T14:17:04.127Z
5,Trouser,150,clothing,2025-12-11T15:07:35.984Z


In [0]:
dlt_obj = DeltaTable.forPath(spark, "/Volumes/pyspark_cata/source/db_volume/products_sink/")
dlt_obj.alias("tgt").merge(
    df.alias("src"), "src.id = tgt.id"
).whenMatchedUpdateAll(condition="src.updated_date >= tgt.updated_date")\
    .whenNotMatchedInsertAll().execute()
print("The data is being Upserted")

Error
```
[DELTA_MULTIPLE_SOURCE_ROW_MATCHING_TARGET_ROW_IN_MERGE] Cannot perform Merge as multiple source rows matched and attempted to modify the same
target row in the Delta table in possibly conflicting ways. By SQL semantics of Merge,
when multiple source rows match on the same target row, the result may be ambiguous
as it is unclear which source row should be used to update or delete the matching
target row. You can preprocess the source table to eliminate the possibility of
multiple matches. Please refer to
https://docs.databricks.com/delta/merge.html#merge-error
 SQLSTATE: 21506
```

To fix this, we need de-duplication

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import *

In [0]:
df = spark.sql("select * from pyspark_cata.source.products")
#Dedup
df = df.withColumn('dedup', row_number().over(Window.partitionBy('id').orderBy(desc('updated_date'))))
df = df.filter(col('dedup') == 1).drop('dedup')
display(df)

id,name,price,category,updated_date
1,iPhone,1000,electronics,2025-12-11T14:17:04.127Z
2,Macbook,2000,electronics,2025-12-11T14:17:04.127Z
3,T-Shirt,50,clothing,2025-12-11T14:17:04.127Z
4,Shirt,100,clothing,2025-12-11T14:17:04.127Z
5,Trouser,150,clothing,2025-12-11T15:07:35.984Z
6,Shoes,300,shoes,2025-12-11T14:17:04.127Z
7,Sneakers,50,shoes,2025-12-11T14:17:04.127Z


In [0]:
# Creating Delta Object

from delta.tables import DeltaTable

# Setting guard-rails by using try-except so that Upsert doesn't fail if the table doesn't exist
try:
    dlt_obj = DeltaTable.forPath(spark, "/Volumes/pyspark_cata/source/db_volume/products_sink/")
    dlt_obj.alias("tgt").merge(
        df.alias("src"), "src.id = tgt.id"
    ).whenMatchedUpdateAll(condition="src.updated_date >= tgt.updated_date")\
        .whenNotMatchedInsertAll().execute()
    print("The data is being Upserted")

except:
    df.write.format("delta")\
        .mode("overwrite")\
        .save("/Volumes/pyspark_cata/source/db_volume/products_sink/")
    print("The data is being Written")

The data is being Upserted


In [0]:
%sql
SELECT * FROM delta.`/Volumes/pyspark_cata/source/db_volume/products_sink/`

id,name,price,category,updated_date
1,iPhone,1000,electronics,2025-12-11T14:17:04.127Z
2,Macbook,2000,electronics,2025-12-11T14:17:04.127Z
3,T-Shirt,50,clothing,2025-12-11T14:17:04.127Z
4,Shirt,100,clothing,2025-12-11T14:17:04.127Z
5,Trouser,150,clothing,2025-12-11T15:07:35.984Z
6,Shoes,300,shoes,2025-12-11T14:17:04.127Z
7,Sneakers,50,shoes,2025-12-11T14:17:04.127Z


Now we have also solved the problem of duplications


https://docs.delta.io/latest/delta/delta-update.html