In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("CustomerData").getOrCreate()
spark.sql("CREATE SCHEMA IF NOT EXISTS retail.customer_data")
raw_schema = """
    customer_id STRING,
    name STRING,
    region STRING,
    status STRING,
    signup_date STRING
"""
customer_data = [
    ("1", "Alice Johnson", "West", "Active", "2023-01-15"),
    ("2", "Bob Smith", "East", "Inactive", "2023-02-20"),
    ("3", "Charlie Lee", "West", "Active", "2023-03-10")
]
customers_df = spark.createDataFrame(customer_data, schema=raw_schema) \
    .withColumn("customer_id", col("customer_id").cast("int")) \
    .withColumn("signup_date", col("signup_date").cast("date"))
customers_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("retail.customer_data.customers")
result_df = spark.sql("""
    SELECT customer_id, name, region
    FROM retail.customer_data.customers
    WHERE status = 'Active' AND region = 'West'
""")
result_df.show()


+-----------+-------------+------+
|customer_id|         name|region|
+-----------+-------------+------+
|          1|Alice Johnson|  West|
|          3|  Charlie Lee|  West|
+-----------+-------------+------+



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from delta.tables import DeltaTable

spark = SparkSession.builder.appName("InventoryUpdates").getOrCreate()

# Stub for creating the schema in Unity Catalog (assume catalog 'ecommerce' exists)
spark.sql("CREATE CATALOG IF NOT EXISTS ecommerce")
spark.sql("CREATE SCHEMA IF NOT EXISTS ecommerce.inventory")

# Define schema for inventory
inventory_schema = StructType([
    StructField("product_id", IntegerType(), False),
    StructField("name", StringType(), True),
    StructField("stock", IntegerType(), True),
    StructField("warehouse", StringType(), True)
])

# Initial inventory data
initial_data = [
    (101, "Laptop", 50, "A"),
    (102, "Smartphone", 100, "B")
]
initial_df = spark.createDataFrame(initial_data, schema=inventory_schema)

# Write initial data as Delta table
initial_df.write.format("delta").mode("overwrite").saveAsTable("ecommerce.inventory.products")

# Update batch data
update_data = [
    (101, "Laptop", 45, "A"),
    (103, "Tablet", 30, "A")
]
update_df = spark.createDataFrame(update_data, schema=inventory_schema)


# TODO: Perform a merge (upsert) into the Delta table
# Hint: Use DeltaTable.forName(spark, "ecommerce.inventory.products").alias("target")
# Then .merge(update_df.alias("source"), "target.product_id = source.product_id")
# .whenMatchedUpdate(set={"stock": "source.stock"})
# .whenNotMatchedInsert(values={"product_id": "source.product_id", "name": "source.name", "stock": "source.stock", "warehouse": "source.warehouse"})
# .execute()

# TODO: Query the final inventory
# Hint: spark.sql("SELECT * FROM ecommerce.inventory.products ORDER BY product_id")
delta_table = DeltaTable.forName(spark, "ecommerce.inventory.products")

(
    delta_table.alias("target")
    .merge(
        update_df.alias("source"),
        "target.product_id = source.product_id"
    )
    .whenMatchedUpdate(set={
        "stock": "source.stock"
    })
    .whenNotMatchedInsert(values={
        "product_id": "source.product_id",
        "name": "source.name",
        "stock": "source.stock",
        "warehouse": "source.warehouse"
    })
    .execute()
)

final_df = spark.sql("""
    SELECT * 
    FROM ecommerce.inventory.products 
    ORDER BY product_id
""")
final_df.show()


+----------+----------+-----+---------+
|product_id|      name|stock|warehouse|
+----------+----------+-----+---------+
|       101|    Laptop|   45|        A|
|       102|Smartphone|  100|        B|
|       103|    Tablet|   30|        A|
+----------+----------+-----+---------+



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from delta.tables import DeltaTable

spark = SparkSession.builder.appName("SalesAnalysis").getOrCreate()

# Stub for creating the schema in Unity Catalog (assume catalog 'finance' exists)
spark.sql("CREATE CATALOG IF NOT EXISTS finance")
spark.sql("CREATE SCHEMA IF NOT EXISTS finance.sales")

# Define schema for sales
sales_schema = StructType([
    StructField("sale_id", IntegerType(), False),
    StructField("product", StringType(), True),
    StructField("amount", IntegerType(), True),
    StructField("quarter", StringType(), True)
])

# Initial sales data
initial_data = [
    (1, "Stocks", 10000, "Q1-2023"),
    (2, "Bonds", 15000, "Q1-2023")
]
initial_df = spark.createDataFrame(initial_data, schema=sales_schema)

# Write initial data as Delta table with history enabled
initial_df.write.format("delta").mode("overwrite").saveAsTable("finance.sales.quarterly_sales")

# Update data (correction)
update_data = [
    (1, "Stocks", 12000, "Q1-2023")
]
update_df = spark.createDataFrame(update_data, schema=sales_schema)

# Perform update
delta_table = DeltaTable.forName(spark, "finance.sales.quarterly_sales")
delta_table.alias("target").merge(
    update_df.alias("source"),
    "target.sale_id = source.sale_id"
).whenMatchedUpdate(set={"amount": "source.amount"}).execute()

# TODO: Query total amount from previous version using time travel
# Hint: First, get the previous version number with delta_table.history().select("version").collect()[1][0] (assuming version 0 is initial)
# Then, spark.sql(f"SELECT SUM(amount) AS total_amount FROM finance.sales.quarterly_sales VERSION AS OF <previous_version>")

# TODO: Query total amount from current version
# Hint: spark.sql("SELECT SUM(amount) AS total_amount FROM finance.sales.qua
history_df = delta_table.history()
previous_version = history_df.select("version").collect()[1][0]
spark.sql(f"SELECT SUM(amount) AS total_amount FROM finance.sales.quarterly_sales VERSION AS OF {previous_version}").show()

# Current version total
spark.sql(f"SELECT SUM(amount) AS total_amount FROM finance.sales.quarterly_sales").show()


+------------+
|total_amount|
+------------+
|       25000|
+------------+

+------------+
|total_amount|
+------------+
|       27000|
+------------+

