In [0]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import current_date, lit, to_date, col, lag
from pyspark.sql.window import Window
import random
from datetime import datetime, timedelta

# -----------------------------------------
# Step 0: Create database
# -----------------------------------------
spark.sql("CREATE DATABASE IF NOT EXISTS inflation")

# -----------------------------------------
# Step 1: Item Master Table
# -----------------------------------------
items = [
    (1, "Rice", "Food", "kg"),
    (2, "Wheat Flour", "Food", "kg"),
    (3, "Cooking Oil", "Food", "liter"),
    (4, "Milk", "Food", "liter"),
    (5, "Potato", "Vegetable", "kg"),
    (6, "Onion", "Vegetable", "kg"),
    (7, "Sugar", "Food", "kg")
]

items_df = spark.createDataFrame(
    items,
    ["item_id", "item_name", "category", "unit"]
)

# Save as a table (optional, just to reference)
items_df.write.format("delta").mode("overwrite").saveAsTable("inflation.item_master")

# -----------------------------------------
# Step 2: Generate Mock Daily Prices
# -----------------------------------------
# Base prices
base_prices = {
    1: 65, 2: 55, 3: 180, 4: 90, 5: 40, 6: 60, 7: 85
}

# Function to generate random daily price fluctuation
def generate_prices_for_date(run_date):
    price_data = []
    for item_id, base_price in base_prices.items():
        # Random daily fluctuation +/- 2
        daily_change = random.uniform(-2, 2)
        price = round(base_price + daily_change, 2)
        price_data.append((item_id, price))
    
    price_df = spark.createDataFrame(price_data, ["item_id", "price"])
    
    # Join with item master
    daily_prices = (
        items_df
        .join(price_df, "item_id")
        .withColumn("date", to_date(lit(run_date)))
        .withColumn("market", lit("Kathmandu"))
        .withColumn("source", lit("mock"))
    )
    
    return daily_prices

# -----------------------------------------
# Step 3: Backfill multiple days (optional)
# -----------------------------------------
# Number of past days to generate
days_to_backfill = 30  # change to however many days you want
all_days_data = []

for i in range(days_to_backfill):
    run_date = (datetime.today() - timedelta(days=i)).strftime("%Y-%m-%d")
    daily_df = generate_prices_for_date(run_date)
    all_days_data.append(daily_df)

# Combine all days into one DataFrame
prices_df = all_days_data[0]
for df in all_days_data[1:]:
    prices_df = prices_df.union(df)

# -----------------------------------------
# Step 4: Save as Delta Table
# -----------------------------------------
prices_df.write.format("delta").mode("overwrite").saveAsTable("inflation.daily_prices")

print("✅ Daily prices table generated and saved!")

# -----------------------------------------
# Step 5: Calculate Daily Inflation %
# -----------------------------------------
window = Window.partitionBy("item_id").orderBy("date")

inflation_df = (
    spark.table("inflation.daily_prices")
    .withColumn("prev_price", lag("price").over(window))
    .withColumn(
        "daily_inflation_pct",
        ((col("price") - col("prev_price")) / col("prev_price")) * 100
    )
)

inflation_df.write.format("delta").mode("overwrite").saveAsTable("inflation.daily_inflation")

print("✅ Daily inflation % table generated and saved!")

# -----------------------------------------
# Step 6: Preview
# -----------------------------------------
display(spark.table("inflation.daily_prices").orderBy("date", "item_id"))
display(spark.table("inflation.daily_inflation").orderBy("date", "item_id"))
