In [None]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
!tar xf spark-3.5.3-bin-hadoop3.tgz


In [None]:
!pip install -q pyspark pymongo python-dotenv pulp

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.3-bin-hadoop3"
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, when, row_number, date_format
from pyspark.sql.window import Window
from pymongo import MongoClient
from urllib.parse import quote_plus
spark = SparkSession.builder.master("local[*]").appName("ShelfTransform").config("spark.sql.legacy.timeParserPolicy", "LEGACY").getOrCreate()

# Add your .env keys manually here (don't save in notebook)
MONGO_URI = "mongodb+srv://priyanshu:PiyuMax123@cluster01.mmq51.mongodb.net/shelfsensestorage?retryWrites=true&w=majority" # Paste from .env

In [None]:
from google.colab import drive
drive.mount('/content/drive')
DRIVE_PATH = '/content/drive/My Drive/ShelfData'
# Load data
sales_df = spark.read.csv(f'{DRIVE_PATH}/sales_perishables.csv', header=True, inferSchema=True)
inventory_df = spark.read.csv(f'{DRIVE_PATH}/inventory_chunk_*.csv', header=True, inferSchema=True) # Wildcard for chunks
weather_df = spark.read.csv(f'{DRIVE_PATH}/weather_daily_clean.csv', header=True, inferSchema=True)
promo_df = spark.read.csv(f'{DRIVE_PATH}/promotions_weekly.csv', header=True, inferSchema=True)  # FIXED: header=True

In [None]:
# Clean dates (adjust formats if errors)
sales_df = sales_df.withColumn('Date_Received', to_date(col('Date_Received'), 'MM/dd/yyyy'))
inventory_df = inventory_df.withColumn('Date', to_date(col('Date'), 'yyyy-MM-dd'))
weather_df = weather_df.withColumn('date', to_date(col('date'), 'yyyy-MM-dd'))

In [None]:
# 1. Fix column names
inventory_df = inventory_df.withColumnRenamed("Product ID", "Product_ID") \
                           .withColumnRenamed("Date", "Date_Received")

# 2. Use a REAL common key (use row number as dummy key for demo)
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql.window import Window

# Create row_id on each DataFrame before joining
sales_df_with_id = sales_df.withColumn("row_id", row_number().over(Window.orderBy("Product_ID")))
inventory_df_with_id = inventory_df.withColumn("row_id", row_number().over(Window.orderBy("Product_ID")))
weather_df_with_id = weather_df.withColumn("row_id", row_number().over(Window.orderBy("date")))


# 3. Re-run your join
joined_df = (sales_df_with_id.join(inventory_df_with_id, "row_id", 'inner')
                     .join(weather_df_with_id, "row_id", 'left')
                     .select(sales_df_with_id['Product_ID'], sales_df_with_id['Category'], col('Stock_Quantity'), col('Units Sold').alias('Units_Sold'),
                             col('meantemp'), col('humidity'), sales_df_with_id['Date_Received']))

fact_inventory = joined_df.select('Product_ID', 'Date_Received', 'Stock_Quantity',
                                  'Units_Sold', col('meantemp').alias('Weather_Temp'),
                                  col('humidity').alias('Weather_Humidity'))

fact_inventory.show(5)

In [None]:
from pyspark.sql.functions import when

# Mock prediction: Add 'predicted_waste_risk' column (high if temp > 25 and humidity > 80)
fact_inventory = fact_inventory.withColumn(
    "predicted_waste_risk",
    when((col("Weather_Temp") > 25) & (col("Weather_Humidity") > 80), "High")
    .when((col("Weather_Temp") > 20) & (col("Weather_Humidity") > 70), "Medium")
    .otherwise("Low")
)

fact_inventory.show(5)  # Preview with new column

In [None]:
from pyspark.sql.functions import date_format
# Convert date to string for JSON (before toPandas)
fact_inventory_json = fact_inventory.withColumn("Date_Received", date_format("Date_Received", "yyyy-MM-dd")).toPandas()

# Payload with string dates
payload = {
    "fact": fact_inventory_json.to_dict('records'),
    "dim_products": sales_df.select('Product_ID', 'Product_Name', 'Category', 'Unit_Price').distinct().toPandas().to_dict('records'),
    "dim_dates": fact_inventory.select('Date_Received').distinct().withColumnRenamed('Date_Received', 'Date').withColumn("Date", date_format("Date", "yyyy-MM-dd")).toPandas().to_dict('records'),
    "optimized": grouped.to_dict('records')
}
import json
with open('result.json', 'w') as f:
    json.dump(payload, f)
print("result.json created for Airflow (dates as strings)")

# Optional MongoDB Load for testing
MONGO_URI = "mongodb+srv://priyanshu:PiyuMax123@cluster01.mmq51.mongodb.net/shelfsense?retryWrites=true&w=majority"
client = MongoClient(MONGO_URI)
db = client['shelfsense']  # FIXED: dict access
db['fact_inventory'].insert_many(fact_inventory_json.to_dict('records'))
print("Loaded to MongoDB for testing")