In [0]:
import datetime
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

spark = SparkSession.builder.appName("CustomerData").getOrCreate()
spark.sql("CREATE DATABASE IF NOT EXISTS default")

customer_schema = StructType([
    StructField("customer_id", IntegerType(), False),
    StructField("name", StringType(), True),
    StructField("region", StringType(), True),
    StructField("status", StringType(), True),
    StructField("signup_date", DateType(), True)
])

customer_data = [
    (1, "Alice Johnson", "West", "Active", datetime.date(2023, 1, 15)),
    (2, "Bob Smith", "East", "Inactive", datetime.date(2023, 2, 20)),
    (3, "Charlie Lee", "West", "Active", datetime.date(2023, 3, 10))
]

customers_df = spark.createDataFrame(customer_data, schema=customer_schema)
customers_df.write.format("delta").mode("overwrite").saveAsTable("default.customers")

result_df = spark.sql("""
    SELECT customer_id, name, region
    FROM default.customers
    WHERE status = 'Active' AND region = 'West'
    """)

result_df.show()


+-----------+-------------+------+
|customer_id|         name|region|
+-----------+-------------+------+
|          1|Alice Johnson|  West|
|          3|  Charlie Lee|  West|
+-----------+-------------+------+



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from delta.tables import DeltaTable

spark = SparkSession.builder.appName("InventoryUpdates").getOrCreate()

spark.sql("CREATE DATABASE IF NOT EXISTS default")

inventory_schema = StructType([
    StructField("product_id", IntegerType(), False),
    StructField("name", StringType(), True),
    StructField("stock", IntegerType(), True),
    StructField("warehouse", StringType(), True)
])

initial_data = [
    (101, "Laptop", 50, "A"),
    (102, "Smartphone", 100, "B")
]
initial_df = spark.createDataFrame(initial_data, schema=inventory_schema)
initial_df.write.format("delta").mode("overwrite").saveAsTable("default.products")

update_data = [
    (101, "Laptop", 45, "A"),  # update
    (103, "Tablet", 30, "A")   # insert
]
update_df = spark.createDataFrame(update_data, schema=inventory_schema)

target_table = DeltaTable.forName(spark, "default.products")
target_table.alias("target").merge(
    update_df.alias("source"),
    "target.product_id = source.product_id"
).whenMatchedUpdate(
    set={"stock": "source.stock"}
).whenNotMatchedInsert(
    values={
        "product_id": "source.product_id",
        "name": "source.name",
        "stock": "source.stock",
        "warehouse": "source.warehouse"
    }
).execute()

spark.sql("SELECT * FROM default.products ORDER BY product_id").show()


+----------+----------+-----+---------+
|product_id|      name|stock|warehouse|
+----------+----------+-----+---------+
|       101|    Laptop|   45|        A|
|       102|Smartphone|  100|        B|
|       103|    Tablet|   30|        A|
+----------+----------+-----+---------+



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from delta.tables import DeltaTable

spark = SparkSession.builder.appName("SalesAnalysis").getOrCreate()

spark.sql("CREATE DATABASE IF NOT EXISTS sales_db")

sales_schema = StructType([
    StructField("sale_id", IntegerType(), False),
    StructField("product", StringType(), True),
    StructField("amount", IntegerType(), True),
    StructField("quarter", StringType(), True)
])

initial_data = [
    (1, "Stocks", 10000, "Q1-2023"),
    (2, "Bonds", 15000, "Q1-2023")
]
initial_df = spark.createDataFrame(initial_data, schema=sales_schema)

initial_df.write.format("delta").mode("overwrite").saveAsTable("sales_db.quarterly_sales")

update_data = [
    (1, "Stocks", 12000, "Q1-2023")
]
update_df = spark.createDataFrame(update_data, schema=sales_schema)

delta_table = DeltaTable.forName(spark, "sales_db.quarterly_sales")
delta_table.alias("target").merge(
    update_df.alias("source"),
    "target.sale_id = source.sale_id"
).whenMatchedUpdate(set={"amount": "source.amount"}).execute()

history_df = delta_table.history()
previous_version = history_df.select("version").collect()[1][0]

prev_total = spark.sql(f"""
SELECT SUM(amount) AS total_amount
FROM sales_db.quarterly_sales VERSION AS OF {previous_version}
""")
prev_total.show()

curr_total = spark.sql("""
SELECT SUM(amount) AS total_amount
FROM sales_db.quarterly_sales
""")
curr_total.show()


+------------+
|total_amount|
+------------+
|       25000|
+------------+

+------------+
|total_amount|
+------------+
|       27000|
+------------+

