In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType
from datetime import datetime

spark = SparkSession.builder.appName("CustomerData").getOrCreate()

# Stub for creating the schema in Unity Catalog (assume catalog 'retail' exists)
spark.sql("CREATE SCHEMA IF NOT EXISTS retail.customer_data")

# Define schema for customer data
customer_schema = StructType([
    StructField("customer_id", IntegerType(), False),
    StructField("name", StringType(), True),
    StructField("region", StringType(), True),
    StructField("status", StringType(), True),
    StructField("signup_date", DateType(), True)
])

# Sample input data as a list of rows
customer_data = [
    (1, "Alice Johnson", "West", "Active",datetime(2023,1,15)),
    (2, "Bob Smith", "East", "Inactive",datetime(2023,2,20)),
    (3, "Charlie Lee", "West", "Active", datetime(2023,3,10))
]

# Create DataFrame from sample data
customers_df = spark.createDataFrame(customer_data, schema=customer_schema)
display(customers_df)
customers_df.write.format("delta").mode("overwrite").saveAsTable("retail.customer_data.customers")
spark.sql("SELECT customer_id, name, region FROM retail.customer_data.customers WHERE status = 'Active' AND region = 'West'")

customer_id,name,region,status,signup_date
1,Alice Johnson,West,Active,2023-01-15
2,Bob Smith,East,Inactive,2023-02-20
3,Charlie Lee,West,Active,2023-03-10


DataFrame[customer_id: int, name: string, region: string]

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

spark = SparkSession.builder.appName("InventoryUpdates").getOrCreate()

# Stub for creating the schema in Unity Catalog (assume catalog 'ecommerce' exists)
spark.sql("CREATE SCHEMA IF NOT EXISTS ecommerce.inventory")

# Define schema for inventory
inventory_schema = StructType([
    StructField("product_id", IntegerType(), False),
    StructField("name", StringType(), True),
    StructField("stock", IntegerType(), True),
    StructField("warehouse", StringType(), True)
])

# Initial inventory data
initial_data = [
    (101, "Laptop", 50, "A"),
    (102, "Smartphone", 100, "B")
]
initial_df = spark.createDataFrame(initial_data, schema=inventory_schema)
#display(initial_df)
#Write initial data as Delta table
initial_df.write.format("delta").mode("overwrite").saveAsTable("ecommerce.inventory.products")

# Update batch data
update_data = [
    (101, "Laptop", 45, "A"),
    (103, "Tablet", 30, "A")
]
update_df = spark.createDataFrame(update_data, schema=inventory_schema)



In [0]:
from delta.tables import DeltaTable

# Get reference to the Delta table
delta_inventory = DeltaTable.forName(spark, "ecommerce.inventory.products")

# Merge (upsert) update_df into the target table
delta_inventory.alias("target").merge(
    update_df.alias("source"),
    "target.product_id = source.product_id"
).whenMatchedUpdate(
    set={"stock": "source.stock"}
).whenNotMatchedInsert(
    values={
        "product_id": "source.product_id",
        "name": "source.name",
        "stock": "source.stock",
        "warehouse": "source.warehouse"
    }
).execute()

# Query the final inventory
final_df = spark.sql("""
    SELECT *
    FROM ecommerce.inventory.products
    ORDER BY product_id
""")
final_df.show()


+----------+----------+-----+---------+
|product_id|      name|stock|warehouse|
+----------+----------+-----+---------+
|       101|    Laptop|   45|        A|
|       102|Smartphone|  100|        B|
|       103|    Tablet|   30|        A|
+----------+----------+-----+---------+



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from delta.tables import DeltaTable

spark = SparkSession.builder.appName("SalesAnalysis").getOrCreate()

# Stub for creating the schema in Unity Catalog (assume catalog 'finance' exists)
spark.sql("CREATE SCHEMA IF NOT EXISTS finance.sales")

# Define schema for sales
sales_schema = StructType([
    StructField("sale_id", IntegerType(), False),
    StructField("product", StringType(), True),
    StructField("amount", IntegerType(), True),
    StructField("quarter", StringType(), True)
])

# Initial sales data
initial_data = [
    (1, "Stocks", 10000, "Q1-2023"),
    (2, "Bonds", 15000, "Q1-2023")
]
initial_df = spark.createDataFrame(initial_data, schema=sales_schema)

# Write initial data as Delta table with history enabled
initial_df.write.format("delta").mode("overwrite").saveAsTable("finance.sales.quarterly_sales")

# Update data (correction)
update_data = [
    (1, "Stocks", 12000, "Q1-2023")
]
update_df = spark.createDataFrame(update_data, schema=sales_schema)

# Perform update
delta_table = DeltaTable.forName(spark, "finance.sales.quarterly_sales")
delta_table.alias("target").merge(
    update_df.alias("source"),
    "target.sale_id = source.sale_id"
).whenMatchedUpdate(set={"amount": "source.amount"}).execute()

# Create schema if not exists
spark.sql("CREATE SCHEMA IF NOT EXISTS finance.sales")

# Initial sales data
initial_df.write.format("delta").mode("overwrite") \
    .saveAsTable("finance.sales.quarterly_sales")

# Update data
update_df = spark.createDataFrame(
    [(1, "Stocks", 12000, "Q1-2023")],
    schema=sales_schema
)

delta_sales = DeltaTable.forName(spark, "finance.sales.quarterly_sales")
delta_sales.alias("target").merge(
    update_df.alias("source"),
    "target.sale_id = source.sale_id"
).whenMatchedUpdate(
    set={"amount": "source.amount"}
).execute()

# History
history_df = delta_sales.history()
history_df.show()

# Previous version
previous_version = history_df.select("version").collect()[1][0]
prev_total_df = spark.sql(f"""
    SELECT SUM(amount) AS total_amount
    FROM finance.sales.quarterly_sales VERSION AS OF {previous_version}
""")
prev_total_df.show()

# Current version
spark.sql("""
    SELECT SUM(amount) AS total_amount
    FROM finance.sales.quarterly_sales
""").show()


+-------+--------------------+--------------+--------------------+--------------------+--------------------+----+--------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|        userId|            userName|           operation| operationParameters| job|notebook|           clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+--------------+--------------------+--------------------+--------------------+----+--------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|     11| 2025-08-12 10:00:48|70887064104782|vijaykumarsbsp@gm...|               MERGE|{predicate -> ["(...|NULL|    NULL|0812-091116-xn91f...|         10|WriteSerializable|        false|{numTargetRowsCop...|        NULL|Databricks-Runtim...|
|     10| 2025-08-12 10:00:4