In [1]:
# Install required packages
!pip install -q pyspark==3.5.1 delta-spark==3.1.0

# Set environment variables
import os
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"

# Create SparkSession with Delta support
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

builder = SparkSession.builder \
    .appName("OnlineCourseAnalytics") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()


In [2]:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("RetailInventoryDeltaLake") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()


inventory_df = spark.read.option("header", True).option("inferSchema", True).csv("inventory_supply.csv")


restock_logs_df = spark.read.option("header", True).option("inferSchema", True).csv("restock_logs.csv")

inventory_df.show()
restock_logs_df.show()


+------+------------+-----------+----------+--------+------------+-------------+---------+---------+
|ItemID|    ItemName|   Category| Warehouse|StockQty|ReorderLevel|LastRestocked|UnitPrice| Supplier|
+------+------------+-----------+----------+--------+------------+-------------+---------+---------+
|  I001|      LED TV|Electronics|WarehouseA|      50|          20|   2024-03-15|    30000|   AVTech|
|  I002|      Laptop|Electronics|WarehouseB|      10|          15|   2024-04-01|    70000|TechWorld|
|  I003|Office Chair|  Furniture|WarehouseA|      40|          10|   2024-03-25|     6000|  ChairCo|
|  I004|Refrigerator| Appliances|WarehouseC|       5|          10|   2024-02-20|    25000| FreezeIt|
|  I005|     Printer|Electronics|WarehouseB|       3|           5|   2024-03-30|     8000|PrintFast|
+------+------------+-----------+----------+--------+------------+-------------+---------+---------+

+------+-----------+-------------+
|ItemID|RestockDate|QuantityAdded|
+------+-----------+

In [4]:
from pyspark.sql.functions import avg, count, when, col  # <-- col added here

# 1. Average price per supplier
avg_price_by_supplier = inventory_df.groupBy("Supplier").agg(avg("UnitPrice").alias("AvgPrice"))
avg_price_by_supplier.show()

# 2. Suppliers offering items below average price in their category
category_avg = inventory_df.groupBy("Category").agg(avg("UnitPrice").alias("CategoryAvg"))
below_avg_items = inventory_df.join(category_avg, on="Category") \
    .filter(col("UnitPrice") < col("CategoryAvg"))
below_avg_items.select("Supplier", "ItemName", "UnitPrice", "Category", "CategoryAvg").show()

# 3. Tag suppliers with Good Deal if >50% of their items are below market average
flagged = inventory_df.join(category_avg, "Category") \
    .withColumn("IsGoodDeal", (col("UnitPrice") < col("CategoryAvg")).cast("int"))

good_deal_tagged = flagged.groupBy("Supplier") \
    .agg((avg("IsGoodDeal") * 100).alias("BelowAvgPercent")) \
    .withColumn("Tag", when(col("BelowAvgPercent") > 50, "Good Deal").otherwise(""))

good_deal_tagged.show()


+---------+--------+
| Supplier|AvgPrice|
+---------+--------+
|   AVTech| 30000.0|
|TechWorld| 70000.0|
|PrintFast|  8000.0|
| FreezeIt| 25000.0|
|  ChairCo|  6000.0|
+---------+--------+

+---------+--------+---------+-----------+-----------+
| Supplier|ItemName|UnitPrice|   Category|CategoryAvg|
+---------+--------+---------+-----------+-----------+
|   AVTech|  LED TV|    30000|Electronics|    36000.0|
|PrintFast| Printer|     8000|Electronics|    36000.0|
+---------+--------+---------+-----------+-----------+

+---------+---------------+---------+
| Supplier|BelowAvgPercent|      Tag|
+---------+---------------+---------+
|   AVTech|          100.0|Good Deal|
|TechWorld|            0.0|         |
|PrintFast|          100.0|Good Deal|
| FreezeIt|            0.0|         |
|  ChairCo|            0.0|         |
+---------+---------------+---------+



In [5]:
# 3rd
from pyspark.sql.functions import col

cost_df = inventory_df.withColumn("TotalStockValue", col("StockQty") * col("UnitPrice"))
cost_df.show()

top_items = cost_df.orderBy(col("TotalStockValue").desc()).limit(3)
top_items.show()

output_path = "/mnt/data/top_inventory_items_by_warehouse"
top_items.write.mode("overwrite").partitionBy("Warehouse").parquet(output_path)

print(f"✅ Exported to Parquet at: {output_path}")


+------+------------+-----------+----------+--------+------------+-------------+---------+---------+---------------+
|ItemID|    ItemName|   Category| Warehouse|StockQty|ReorderLevel|LastRestocked|UnitPrice| Supplier|TotalStockValue|
+------+------------+-----------+----------+--------+------------+-------------+---------+---------+---------------+
|  I001|      LED TV|Electronics|WarehouseA|      50|          20|   2024-03-15|    30000|   AVTech|        1500000|
|  I002|      Laptop|Electronics|WarehouseB|      10|          15|   2024-04-01|    70000|TechWorld|         700000|
|  I003|Office Chair|  Furniture|WarehouseA|      40|          10|   2024-03-25|     6000|  ChairCo|         240000|
|  I004|Refrigerator| Appliances|WarehouseC|       5|          10|   2024-02-20|    25000| FreezeIt|         125000|
|  I005|     Printer|Electronics|WarehouseB|       3|           5|   2024-03-30|     8000|PrintFast|          24000|
+------+------------+-----------+----------+--------+-----------

In [6]:
# 4 th
from pyspark.sql.functions import avg, sum

items_per_warehouse = inventory_df.groupBy("Warehouse").count().withColumnRenamed("count", "ItemCount")
items_per_warehouse.show()

avg_stock_per_category = inventory_df.groupBy("Warehouse", "Category") \
    .agg(avg("StockQty").alias("AvgStock"))
avg_stock_per_category.show()

total_stock_per_warehouse = inventory_df.groupBy("Warehouse") \
    .agg(sum("StockQty").alias("TotalStock")) \
    .filter(col("TotalStock") < 100)
total_stock_per_warehouse.show()


+----------+---------+
| Warehouse|ItemCount|
+----------+---------+
|WarehouseA|        2|
|WarehouseC|        1|
|WarehouseB|        2|
+----------+---------+

+----------+-----------+--------+
| Warehouse|   Category|AvgStock|
+----------+-----------+--------+
|WarehouseB|Electronics|     6.5|
|WarehouseA|  Furniture|    40.0|
|WarehouseC| Appliances|     5.0|
|WarehouseA|Electronics|    50.0|
+----------+-----------+--------+

+----------+----------+
| Warehouse|TotalStock|
+----------+----------+
|WarehouseA|        90|
|WarehouseC|         5|
|WarehouseB|        13|
+----------+----------+



In [8]:
# 5th
from delta.tables import DeltaTable
from pyspark.sql.functions import expr

import os

# 1. Save as Delta table
delta_path = "/mnt/data/retail_inventory"
inventory_df.write.format("delta").mode("overwrite").save(delta_path)

# Load as DeltaTable object for update/delete
retail_inventory = DeltaTable.forPath(spark, delta_path)

# 2. Update stock of 'Laptop' to 20
retail_inventory.update(
    condition=col("ItemName") == "Laptop",
    set={"StockQty": expr("20")}
)

# 3. Delete any item with StockQty = 0
retail_inventory.delete(condition=col("StockQty") == 0)

# 4. View table history
spark.sql(f"DESCRIBE HISTORY delta.`{delta_path}`").show(truncate=False)

# Bonus: Query previous version (if needed)
# spark.read.format("delta").option("versionAsOf", 0).load(delta_path).show()


+-------+-----------------------+------+--------+---------+-------------------------------------------+----+--------+---------+-----------+--------------+-------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+-----------------------------------+
|version|timestamp              |userId|userName|operation|operationParameters                        |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                                                                                                                                                                                                                                                                       

In [10]:
# 6th
from pyspark.sql.functions import col, expr, lit

# Load Delta table
delta_path = "/mnt/data/retail_inventory"
inventory_delta = DeltaTable.forPath(spark, delta_path)

# Read current Delta table as DataFrame
current_inventory = inventory_delta.toDF()

# Join with restock logs
updated_stock = current_inventory.alias("inv").join(
    restock_logs_df.alias("log"),
    col("inv.ItemID") == col("log.ItemID"),
    "inner"
).withColumn("NewStockQty", col("inv.StockQty") + col("log.QuantityAdded")) \
 .withColumn("RestockedRecently", lit(True)) \
 .select(
     col("inv.ItemID"),
     col("inv.ItemName"),
     col("inv.Category"),
     col("inv.Warehouse"),
     col("NewStockQty").alias("StockQty"),
     col("inv.ReorderLevel"),
     col("inv.LastRestocked"),
     col("inv.UnitPrice"),
     col("inv.Supplier"),
     col("RestockedRecently")
 )

# Write updated data using MERGE INTO
inventory_delta.alias("target").merge(
    updated_stock.alias("source"),
    "target.ItemID = source.ItemID"
).whenMatchedUpdate(set={
    "StockQty": "source.StockQty",
    "ItemName": "source.ItemName",
    "Category": "source.Category",
    "Warehouse": "source.Warehouse",
    "ReorderLevel": "source.ReorderLevel",
    "LastRestocked": "source.LastRestocked",
    "UnitPrice": "source.UnitPrice",
    "Supplier": "source.Supplier"
}).execute()

# Show updated inventory
inventory_delta.toDF().show()



+------+------------+-----------+----------+--------+------------+-------------+---------+---------+
|ItemID|    ItemName|   Category| Warehouse|StockQty|ReorderLevel|LastRestocked|UnitPrice| Supplier|
+------+------------+-----------+----------+--------+------------+-------------+---------+---------+
|  I001|      LED TV|Electronics|WarehouseA|      70|          20|   2024-03-15|    30000|   AVTech|
|  I002|      Laptop|Electronics|WarehouseB|      30|          15|   2024-04-01|    70000|TechWorld|
|  I003|Office Chair|  Furniture|WarehouseA|      40|          10|   2024-03-25|     6000|  ChairCo|
|  I004|Refrigerator| Appliances|WarehouseC|       5|          10|   2024-02-20|    25000| FreezeIt|
|  I005|     Printer|Electronics|WarehouseB|       8|           5|   2024-03-30|     8000|PrintFast|
+------+------------+-----------+----------+--------+------------+-------------+---------+---------+



In [12]:
from pyspark.sql.functions import col


inventory_view_df = DeltaTable.forPath(spark, "/mnt/data/retail_inventory").toDF()

summary_df = inventory_view_df \
    .withColumn("NeedsReorder", col("StockQty") < col("ReorderLevel")) \
    .withColumn("TotalStockValue", col("StockQty") * col("UnitPrice"))

summary_df.createOrReplaceTempView("inventory_summary")

spark.sql("SELECT ItemName, Category, StockQty, NeedsReorder, TotalStockValue FROM inventory_summary").show()

spark.sql("""
    CREATE OR REPLACE TEMP VIEW supplier_leaderboard AS
    SELECT Supplier, ROUND(AVG(UnitPrice), 2) AS AvgPrice
    FROM inventory_summary
    GROUP BY Supplier
    ORDER BY AvgPrice DESC
""")

spark.sql("SELECT * FROM supplier_leaderboard").show()


+------------+-----------+--------+------------+---------------+
|    ItemName|   Category|StockQty|NeedsReorder|TotalStockValue|
+------------+-----------+--------+------------+---------------+
|      LED TV|Electronics|      70|       false|        2100000|
|      Laptop|Electronics|      30|       false|        2100000|
|Office Chair|  Furniture|      40|       false|         240000|
|Refrigerator| Appliances|       5|        true|         125000|
|     Printer|Electronics|       8|       false|          64000|
+------------+-----------+--------+------------+---------------+

+---------+--------+
| Supplier|AvgPrice|
+---------+--------+
|TechWorld| 70000.0|
|   AVTech| 30000.0|
| FreezeIt| 25000.0|
|PrintFast|  8000.0|
|  ChairCo|  6000.0|
+---------+--------+



In [13]:
# 8th
from pyspark.sql.functions import when

inv_df = DeltaTable.forPath(spark, "/mnt/data/retail_inventory").toDF()

categorized_df = inv_df.withColumn(
    "StockStatus",
    when(col("StockQty") > 2 * col("ReorderLevel"), "Overstocked").otherwise("LowStock")
)

categorized_df.select("ItemName", "StockQty", "ReorderLevel", "StockStatus").show()

overstocked_filter = categorized_df.filter(col("StockStatus") == "Overstocked")
print("Using .filter():")
overstocked_filter.show()

overstocked_where = categorized_df.where(col("StockStatus") == "Overstocked")
print("Using .where():")
overstocked_where.show()


+------------+--------+------------+-----------+
|    ItemName|StockQty|ReorderLevel|StockStatus|
+------------+--------+------------+-----------+
|      LED TV|      70|          20|Overstocked|
|      Laptop|      30|          15|   LowStock|
|Office Chair|      40|          10|Overstocked|
|Refrigerator|       5|          10|   LowStock|
|     Printer|       8|           5|   LowStock|
+------------+--------+------------+-----------+

Using .filter():
+------+------------+-----------+----------+--------+------------+-------------+---------+--------+-----------+
|ItemID|    ItemName|   Category| Warehouse|StockQty|ReorderLevel|LastRestocked|UnitPrice|Supplier|StockStatus|
+------+------------+-----------+----------+--------+------------+-------------+---------+--------+-----------+
|  I001|      LED TV|Electronics|WarehouseA|      70|          20|   2024-03-15|    30000|  AVTech|Overstocked|
|  I003|Office Chair|  Furniture|WarehouseA|      40|          10|   2024-03-25|     6000| Ch

In [14]:
from pyspark.sql.functions import month, current_date, datediff

feature_df = DeltaTable.forPath(spark, "/mnt/data/retail_inventory").toDF()

feature_df = feature_df.withColumn("RestockMonth", month(col("LastRestocked")))

feature_df = feature_df.withColumn("StockAge", datediff(current_date(), col("LastRestocked")))

feature_df = feature_df.withColumn(
    "StockAgeBucket",
    when(col("StockAge") <= 30, "New")
    .when(col("StockAge") <= 90, "Moderate")
    .otherwise("Stale")
)


feature_df.select("ItemName", "LastRestocked", "RestockMonth", "StockAge", "StockAgeBucket").show()


+------------+-------------+------------+--------+--------------+
|    ItemName|LastRestocked|RestockMonth|StockAge|StockAgeBucket|
+------------+-------------+------------+--------+--------------+
|      LED TV|   2024-03-15|           3|     461|         Stale|
|      Laptop|   2024-04-01|           4|     444|         Stale|
|Office Chair|   2024-03-25|           3|     451|         Stale|
|Refrigerator|   2024-02-20|           2|     485|         Stale|
|     Printer|   2024-03-30|           3|     446|         Stale|
+------------+-------------+------------+--------+--------------+



In [15]:
# 10 export it
stale_items_df = feature_df.filter(col("StockAgeBucket") == "Stale")

stale_items_df.write.mode("overwrite").option("header", True).csv("/mnt/data/export/inventory/stale_items/csv")
stale_items_df.write.mode("overwrite").json("/mnt/data/export/inventory/stale_items/json")
stale_items_df.write.format("delta").mode("overwrite").save("/mnt/data/export/inventory/stale_items/delta")

print("✅ Exports complete! Formats saved at:")
print("- CSV:    /mnt/data/export/inventory/stale_items/csv")
print("- JSON:   /mnt/data/export/inventory/stale_items/json")
print("- DELTA:  /mnt/data/export/inventory/stale_items/delta")


✅ Exports complete! Formats saved at:
- CSV:    /mnt/data/export/inventory/stale_items/csv
- JSON:   /mnt/data/export/inventory/stale_items/json
- DELTA:  /mnt/data/export/inventory/stale_items/delta
