# **Querying Source**

In [0]:
%sql
SELECT * FROM pyspark_cata.source.products

id,name,price,category,updated_date
1,iPhone 15,999,Electronics,2025-01-05T10:30:00.000Z
2,Samsung Galaxy S24,899,Electronics,2025-02-10T15:45:00.000Z
3,"MacBook Pro 16""",2499,Electronics,2025-03-01T12:00:00.000Z
4,Nike Air Max,150,Footwear,2025-03-15T09:20:00.000Z
5,Adidas Ultraboost,180,Footwear,2025-03-18T14:10:00.000Z
6,Wooden Dining Table,750,Furniture,2025-04-01T11:00:00.000Z
7,Office Chair,120,Furniture,2025-04-03T16:25:00.000Z
8,Levi’s Jeans,70,Clothing,2025-04-10T13:50:00.000Z
9,Polo T-Shirt,40,Clothing,2025-04-15T17:30:00.000Z
10,Sony WH-1000XM5,400,Electronics,2025-05-01T08:45:00.000Z


### Creating a dataframe

In [0]:
df = spark.sql("SELECT * FROM pyspark_cata.source.products")
display(df)

id,name,price,category,updated_date
1,iPhone 15,999,Electronics,2025-01-05T10:30:00.000Z
2,Samsung Galaxy S24,899,Electronics,2025-02-10T15:45:00.000Z
3,"MacBook Pro 16""",2499,Electronics,2025-03-01T12:00:00.000Z
4,Nike Air Max,150,Footwear,2025-03-15T09:20:00.000Z
5,Adidas Ultraboost,180,Footwear,2025-03-18T14:10:00.000Z
6,Wooden Dining Table,750,Furniture,2025-04-01T11:00:00.000Z
7,Office Chair,120,Furniture,2025-04-03T16:25:00.000Z
8,Levi’s Jeans,70,Clothing,2025-04-10T13:50:00.000Z
9,Polo T-Shirt,40,Clothing,2025-04-15T17:30:00.000Z
10,Sony WH-1000XM5,400,Electronics,2025-05-01T08:45:00.000Z


### Applying row_number() to further filter out the duplicate data

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import *

df = df.withColumn("dedup", row_number().over(Window.partitionBy("id").orderBy(desc("updated_date"))))

display(df)

id,name,price,category,updated_date,dedup
1,iPhone 15,999,Electronics,2025-01-05T10:30:00.000Z,1
2,Samsung Galaxy S24,899,Electronics,2025-02-10T15:45:00.000Z,1
3,"MacBook Pro 16""",2499,Electronics,2025-03-01T12:00:00.000Z,1
4,Nike Air Max,150,Footwear,2025-03-15T09:20:00.000Z,1
5,Adidas Ultraboost,180,Footwear,2025-03-18T14:10:00.000Z,1
6,Wooden Dining Table,750,Furniture,2025-04-01T11:00:00.000Z,1
7,Office Chair,120,Furniture,2025-04-03T16:25:00.000Z,1
8,Levi’s Jeans,70,Clothing,2025-04-10T13:50:00.000Z,1
9,Polo T-Shirt,50,Clothing,2025-08-26T21:39:09.426Z,1
9,Polo T-Shirt,40,Clothing,2025-04-15T17:30:00.000Z,2


In [0]:
df = df.filter(col('dedup')==1).drop('dedup')
display(df)

id,name,price,category,updated_date
1,iPhone 15,999,Electronics,2025-01-05T10:30:00.000Z
2,Samsung Galaxy S24,899,Electronics,2025-02-10T15:45:00.000Z
3,"MacBook Pro 16""",2499,Electronics,2025-03-01T12:00:00.000Z
4,Nike Air Max,150,Footwear,2025-03-15T09:20:00.000Z
5,Adidas Ultraboost,180,Footwear,2025-03-18T14:10:00.000Z
6,Wooden Dining Table,750,Furniture,2025-04-01T11:00:00.000Z
7,Office Chair,120,Furniture,2025-04-03T16:25:00.000Z
8,Levi’s Jeans,70,Clothing,2025-04-10T13:50:00.000Z
9,Polo T-Shirt,50,Clothing,2025-08-26T21:39:09.426Z
10,Sony WH-1000XM5,200,Electronics,2025-08-26T21:39:09.426Z


## **Upserts**

In [0]:
from delta.tables import DeltaTable

path = "/Volumes/pyspark_cata/source/destination/products_dir/"

if len(dbutils.fs.ls(path))>0:
    # If table exists, perform merge
    dlt_obj = DeltaTable.forPath(spark, path)

    (
        dlt_obj.alias("tgt")
        .merge(
            df.alias("src"),
            "src.id = tgt.id"  # merge condition
        )
        .whenMatchedUpdateAll(condition="src.updated_date >= tgt.updated_date")\
        .whenNotMatchedInsertAll()\
        .execute()
    )
    print("Upserting the data...")

else:
    # If table does not exist, create it
    (
        df.write.format("delta")
        .mode("overwrite")   # overwrite on first create
        .save(path)
    )
    print("Delta table created...")

Upserting the data...


In [0]:
%sql
SELECT * FROM delta.`/Volumes/pyspark_cata/source/destination/products_dir/`

id,name,price,category,updated_date
1,iPhone 15,999,Electronics,2025-01-05T10:30:00.000Z
4,Nike Air Max,150,Footwear,2025-03-15T09:20:00.000Z
8,Levi’s Jeans,70,Clothing,2025-04-10T13:50:00.000Z
6,Wooden Dining Table,750,Furniture,2025-04-01T11:00:00.000Z
3,"MacBook Pro 16""",2499,Electronics,2025-03-01T12:00:00.000Z
2,Samsung Galaxy S24,899,Electronics,2025-02-10T15:45:00.000Z
7,Office Chair,120,Furniture,2025-04-03T16:25:00.000Z
10,Sony WH-1000XM5,200,Electronics,2025-08-26T21:39:09.426Z
9,Polo T-Shirt,50,Clothing,2025-08-26T21:39:09.426Z
5,Adidas Ultraboost,180,Footwear,2025-03-18T14:10:00.000Z


In [0]:
# # Removes the directory - (had some glitch that returned 2 rows with the same record.)
# dbutils.fs.rm(
#     "/Volumes/pyspark_cata/source/destination/products_dir/",
#     True  # recursive delete
# )

True