In [0]:
%pip install mlflow prophet scikit-learn pandas
dbutils.library.restartPython()

In [0]:
from pyspark.sql.functions import pandas_udf, PandasUDFType, col, lit, current_date, expr
from pyspark.sql.types import *
import pandas as pd
from prophet import Prophet
import logging

# 1. Prepare Data: Aggregate to Monthly Level to reduce noise
# This makes training faster and more accurate for long-term trends
df_market = spark.table("agriculture.silver.market_prices") \
    .groupBy("crop_name", "market_date") \
    .agg({"modal_price_rs_quintal": "avg"}) \
    .withColumnRenamed("avg(modal_price_rs_quintal)", "y") \
    .withColumnRenamed("market_date", "ds") \
    .orderBy("ds")

# 2. Define Output Schema for the Forecast
result_schema = StructType([
    StructField("crop_name", StringType(), True),
    StructField("ds", DateType(), True),
    StructField("yhat", DoubleType(), True), # Predicted Price
    StructField("yhat_lower", DoubleType(), True), # Confidence Interval Lower
    StructField("yhat_upper", DoubleType(), True), # Confidence Interval Upper
    StructField("price_stability_score", DoubleType(), True) # Risk Metric
])

# 3. The "Grouped Map UDF" (The Distributed Training Logic)
def forecast_crop_prices(history_pd):
    # a. Setup Group
    crop = history_pd['crop_name'].iloc[0]
    
    # b. Train Prophet Model
    # We disable daily seasonality as we are using monthly data
    m = Prophet(daily_seasonality=False, weekly_seasonality=False, yearly_seasonality=True)
    m.fit(history_pd[['ds', 'y']])
    
    # c. Forecast Future (12 Months)
    future = m.make_future_dataframe(periods=12, freq='M')
    forecast = m.predict(future)
    
    # d. Calculate Stability Score (Risk)
    # Metric: Coefficient of Variation (StdDev / Mean) of the future predictions
    future_preds = forecast.tail(12)
    volatility = future_preds['yhat'].std()
    avg_price = future_preds['yhat'].mean()
    stability_score = volatility / avg_price if avg_price != 0 else 0
    
    # e. Format Output
    results = forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].copy()
    results['crop_name'] = crop
    results['price_stability_score'] = stability_score
    
    # Only return future dates for the Gold Table
    last_historical_date = pd.to_datetime(history_pd['ds'].max())
    return results[results['ds'] > last_historical_date]

# 4. Execute Distributed Training
# This runs the function above for every crop in parallel
print("Launching distributed training for all crops...")

forecast_df = df_market.groupBy("crop_name").applyInPandas(
    forecast_crop_prices, 
    schema=result_schema
)

# 5. Write to Gold Layer (Materialize results)
# We use 'overwrite' so every run updates the forecast
forecast_df.write.mode("overwrite").saveAsTable("agriculture.gold.market_forecasts")

print("Forecasts generated and saved to 'agriculture.gold.market_forecasts'")
display(spark.table("agriculture.gold.market_forecasts"))