In [1]:
!pip install delta-spark==3.2.0 -q
import pyspark
from delta import *
from pyspark.sql.functions import *

# Create a SparkSession with Delta Lake extensions
# The '.config(...)' lines are crucial for enabling Delta Lake's features
builder = pyspark.sql.SparkSession.builder.appName("DeltaTutorial") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

# Get or create the SparkSession
spark = configure_spark_with_delta_pip(builder).getOrCreate()

print("Spark and Delta Lake are ready!")

Spark and Delta Lake are ready!


In [9]:
from google.colab import drive
drive.mount('/content/drive')

# CLEANED DATA
file_path = "/content/drive/MyDrive/data engineering/Week 4/stock_movements.csv"

df = spark.read.option("header", "true").option("inferSchema", "true").csv(file_path)

df.show()

# PRODUCTS DATA
file_path_p = "/content/drive/MyDrive/data engineering/Week 4/products.csv"

df_p = spark.read.option("header", "true").option("inferSchema", "true").csv(file_path_p)

df_p.show()

# WAREHOUSE DATA
file_path_w = "/content/drive/MyDrive/data engineering/Week 4/warehouse.csv"

df_w = spark.read.option("header", "true").option("inferSchema", "true").csv(file_path_w)

df_w.show()

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
+-----------+----------+------------+-----------+-------------+--------+-------------+
|movement_id|product_id|warehouse_id|supplier_id|movement_type|quantity|movement_date|
+-----------+----------+------------+-----------+-------------+--------+-------------+
|       M001|      P001|         W01|      SUP01|           IN|      50|   2025-07-01|
|       M002|      P001|         W01|      SUP01|          OUT|     -10|   2025-07-03|
|       M003|      P002|         W01|      SUP02|           IN|     120|   2025-07-02|
|       M004|      P002|         W01|      SUP02|          OUT|     -30|   2025-07-05|
|       M005|      P003|         W01|      SUP03|           IN|       5|   2025-07-06|
|       M006|      P001|         W02|      SUP01|           IN|      40|   2025-07-02|
|       M007|      P003|         W02|      SUP03|           IN|     100|   2025-07-01|
|

In [14]:
# Aggregation
stock_summary_df = df.groupBy("product_id", "warehouse_id") \
    .agg(sum("quantity").alias("current_stock"))

df_p = df_p.withColumnRenamed("name", "product_name")
df_w = df_w.withColumnRenamed("name", "warehouse_name")

# Join stock with product info
stock_product_df = stock_summary_df.join(df_p, on="product_id", how="left")

# Join with warehouse info
master_inventory_df = stock_product_df.join(df_w, on="warehouse_id", how="left")
master_inventory_df.show()

+------------+----------+-------------+------------+-------------+-------+--------------+---------+
|warehouse_id|product_id|current_stock|product_name|reorder_level|  price|warehouse_name| location|
+------------+----------+-------------+------------+-------------+-------+--------------+---------+
|         W01|      P003|            5|     Monitor|            3| 7999.0|Main Warehouse|  Chennai|
|         W03|      P005|          150|      Laptop|           10|55000.0|    South Zone|Bangalore|
|         W01|      P007|           10|      Webcam|            8| 3500.0|Main Warehouse|  Chennai|
|         W03|      P006|            5|     Headset|           20| 1999.0|    South Zone|Bangalore|
|         W01|      P002|           90|    Keyboard|            5|  499.5|Main Warehouse|  Chennai|
|         W01|      P001|           40|       Mouse|          100| 299.99|Main Warehouse|  Chennai|
|         W02|      P003|           80|     Monitor|            3| 7999.0|    North Zone|    Delhi|


In [15]:
# Reorder flag

master_inventory_df = master_inventory_df.withColumn(
    "reorder_flag",
    when(col("current_stock") < col("reorder_level"), "UNDERSTOCK")
    .when(col("current_stock") > col("reorder_level") * 2, "OVERSTOCKED")
    .otherwise("NORMAL")
)
master_inventory_df.show()

+------------+----------+-------------+------------+-------------+-------+--------------+---------+------------+
|warehouse_id|product_id|current_stock|product_name|reorder_level|  price|warehouse_name| location|reorder_flag|
+------------+----------+-------------+------------+-------------+-------+--------------+---------+------------+
|         W01|      P003|            5|     Monitor|            3| 7999.0|Main Warehouse|  Chennai|      NORMAL|
|         W03|      P005|          150|      Laptop|           10|55000.0|    South Zone|Bangalore| OVERSTOCKED|
|         W01|      P007|           10|      Webcam|            8| 3500.0|Main Warehouse|  Chennai|      NORMAL|
|         W03|      P006|            5|     Headset|           20| 1999.0|    South Zone|Bangalore|  UNDERSTOCK|
|         W01|      P002|           90|    Keyboard|            5|  499.5|Main Warehouse|  Chennai| OVERSTOCKED|
|         W01|      P001|           40|       Mouse|          100| 299.99|Main Warehouse|  Chenn

In [16]:
# Save in CSV format
master_inventory_df.coalesce(1).write.mode("overwrite").option("header", "true").csv("/content/drive/MyDrive/data engineering/Week 4/final_stock.csv")

# Save in Delta format
master_inventory_df.write.mode("overwrite").format("delta").save("/content/drive/MyDrive/data engineering/Week 4/final_stock.delta")


In [18]:
from google.colab import files
import glob, shutil

# Path where Spark saved the CSV folder
drive_csv_path = "/content/drive/MyDrive/data engineering/Week 4/final_stock.csv"

# Find the actual part file inside
csv_files = glob.glob(drive_csv_path + "/part-*.csv")

# Copy to Colab local storage with a nice name
local_csv = "/content/final_stock.csv"
shutil.copy(csv_files[0], local_csv)

# Download to your system
files.download(local_csv)

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>