In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from delta.tables import DeltaTable

spark = SparkSession.builder.appName("InventoryUpdates").getOrCreate()



In [0]:
# Create schema
spark.sql("CREATE SCHEMA IF NOT EXISTS ecommerce.inventory")


DataFrame[]

In [0]:
# Define schema
inventory_schema = StructType([
    StructField("product_id", IntegerType(), False),
    StructField("name", StringType(), True),
    StructField("stock", IntegerType(), True),
    StructField("warehouse", StringType(), True)
])

# Initial data
initial_data = [
    (101, "Laptop", 50, "A"),
    (102, "Smartphone", 100, "B")
]

In [0]:

initial_df = spark.createDataFrame(initial_data, schema=inventory_schema)
initial_df.write.format("delta").mode("overwrite").saveAsTable("ecommerce.inventory.products")

# Update batch
update_data = [
    (101, "Laptop", 45, "A"),
    (103, "Tablet", 30, "A")
]
update_df = spark.createDataFrame(update_data, schema=inventory_schema)

In [0]:


# Merge (upsert)
delta_table = DeltaTable.forName(spark, "ecommerce.inventory.products")
delta_table.alias("target").merge(
    update_df.alias("source"),
    "target.product_id = source.product_id"
).whenMatchedUpdate(set={
    "stock": "source.stock"
}).whenNotMatchedInsert(values={
    "product_id": "source.product_id",
    "name": "source.name",
    "stock": "source.stock",
    "warehouse": "source.warehouse"
}).execute()


DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [0]:
# Final query
final_inventory = spark.sql("""
    SELECT * FROM ecommerce.inventory.products
    ORDER BY product_id
""")
final_inventory.show()

+----------+----------+-----+---------+
|product_id|      name|stock|warehouse|
+----------+----------+-----+---------+
|       101|    Laptop|   45|        A|
|       102|Smartphone|  100|        B|
|       103|    Tablet|   30|        A|
+----------+----------+-----+---------+

