In [1]:
!pip install prophet optuna statsmodels

Collecting optuna
  Downloading optuna-4.6.0-py3-none-any.whl.metadata (17 kB)
Collecting colorlog (from optuna)
  Downloading colorlog-6.10.1-py3-none-any.whl.metadata (11 kB)
Downloading optuna-4.6.0-py3-none-any.whl (404 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m404.7/404.7 kB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading colorlog-6.10.1-py3-none-any.whl (11 kB)
Installing collected packages: colorlog, optuna
Successfully installed colorlog-6.10.1 optuna-4.6.0


In [5]:
# Install core dependencies (fast)
!pip install -q pandas numpy scikit-learn statsmodels

# Install prophet (may require cmdstan). If this stalls, try using a smaller Prophet version or skip if you plan to not forecast Retail.
!pip install -q prophet cmdstanpy

# Optional (improves wholesale modeling if available)
!pip install -q xgboost optuna

# Optional for LSTM (if you want tf-based LSTM candidate)
!pip install -q tensorflow

# After installation, it's recommended to restart the runtime:
print("Installed. PLEASE RESTART RUNTIME (Runtime -> Restart runtime) before running the pipeline.")

Installed. PLEASE RESTART RUNTIME (Runtime -> Restart runtime) before running the pipeline.


In [6]:
# Run in a Python cell after restarting runtime
from cmdstanpy import install_cmdstan
install_cmdstan()  # takes several minutes and ~1GB

CmdStan install directory: /root/.cmdstan
Installing CmdStan version: 2.37.0
Downloading CmdStan version 2.37.0
Download successful, file: /tmp/tmpu873tieq
Extracting distribution
Unpacked download as cmdstan-2.37.0
Building version cmdstan-2.37.0, may take several minutes, depending on your system.
Installed cmdstan-2.37.0
Test model compilation


True

In [2]:
from google.colab import files

uploaded = files.upload()

Saving wholesale_price_data.csv to wholesale_price_data.csv
Saving retail_price_data.csv to retail_price_data.csv


In [8]:
"""
Agri-market intelligence — Bucket 3 (Onion, Potato, Salt Pack)

This is the updated Colab-friendly version with a robustness fix:
- forecast_wholesale_prices and _hw_forecast_from_model_dict are hardened so they
  never return None. They always return a DataFrame with the expected columns,
  avoiding the TypeError: 'NoneType' object is not subscriptable seen previously.

Other behavior (Retail using Prophet, Wholesale multi-model selection) unchanged.
"""

from typing import List, Optional, Dict, Any
import pandas as pd
import numpy as np
import os
import logging
import warnings

# Prophet retained for Retail path only
from prophet import Prophet
from sklearn.metrics import mean_absolute_error

# Statsmodels (ETS/ARIMA)
from statsmodels.tsa.holtwinters import ExponentialSmoothing
from statsmodels.tsa.arima.model import ARIMA

# sklearn models
from sklearn.ensemble import RandomForestRegressor
from sklearn.preprocessing import MinMaxScaler

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

SUMMARY_ROW_KEYWORDS = ["Average Price", "Maximum", "Minimum", "Modal Price"]


# -----------------------------
# Environment helpers
# -----------------------------
def _is_running_in_colab() -> bool:
    try:
        import google.colab  # type: ignore
        return True
    except Exception:
        pass
    if os.environ.get("COLAB_GPU", None) is not None:
        return True
    return False


# -----------------------------
# Utilities (unchanged)
# -----------------------------
def _detect_state_column(df: pd.DataFrame) -> str:
    for name in ['State', 'state', 'Region', 'region', 'STATE', 'STATE/UT', 'State/UT']:
        if name in df.columns:
            return name
    return df.columns[0]


def _detect_date_column(df: pd.DataFrame) -> str:
    for col in df.columns[::-1]:
        parsed = pd.to_datetime(df[col], errors='coerce')
        if parsed.notna().sum() > 0:
            return col
    raise ValueError("No date column detected")


def _load_table(filepath: str, chunksize: Optional[int] = None) -> pd.DataFrame:
    if filepath.lower().endswith(".csv"):
        if chunksize:
            return pd.concat(
                pd.read_csv(filepath, chunksize=chunksize, low_memory=False),
                ignore_index=True
            )
        return pd.read_csv(filepath, low_memory=False)
    return pd.read_excel(filepath)


# -----------------------------
# Preprocessing (unchanged)
# -----------------------------
def preprocess_prices(
    filepath: str,
    target_crops: List[str],
    market_type: str,
    chunksize: Optional[int] = None
) -> pd.DataFrame:

    df_raw = _load_table(filepath, chunksize=chunksize)

    state_col = _detect_state_column(df_raw)
    date_col = _detect_date_column(df_raw)

    df_raw[date_col] = pd.to_datetime(
        df_raw[date_col], errors="coerce", dayfirst=True
    ).ffill()

    df_raw = df_raw[~df_raw[state_col].astype(str).isin(SUMMARY_ROW_KEYWORDS)]

    crop_cols = [c for c in df_raw.columns if c not in [state_col, date_col]]
    for c in crop_cols:
        df_raw[c] = pd.to_numeric(df_raw[c], errors="coerce")

    df_long = df_raw.melt(
        id_vars=[state_col, date_col],
        value_vars=crop_cols,
        var_name="Crop",
        value_name="Price"
    )

    df_long.rename(columns={state_col: "State", date_col: "Date"}, inplace=True)
    df_long["MarketType"] = market_type

    tc = [c.lower() for c in target_crops]
    df_long = df_long[
        df_long["Crop"].str.lower().apply(lambda x: any(t in x for t in tc))
    ]

    df_long.sort_values(["MarketType", "Crop", "State", "Date"], inplace=True)

    df_long["Price"] = df_long.groupby(
        ["MarketType", "Crop", "State"]
    )["Price"].transform(lambda x: x.interpolate().ffill().bfill())

    return df_long[["State", "Date", "Crop", "Price", "MarketType"]]


# -----------------------------
# Prophet (Retail) - unchanged
# -----------------------------
def train_prophet_model(history_df: pd.DataFrame) -> Optional[Prophet]:
    df = history_df.rename(columns={"Date": "ds", "Price": "y"}).dropna()
    if len(df) < 45:
        return None
    model = Prophet(
        weekly_seasonality=True,
        yearly_seasonality=False,
        interval_width=0.95
    )
    model.fit(df)
    return model


def forecast_prices(model: Prophet, periods: int = 14) -> pd.DataFrame:
    future = model.make_future_dataframe(periods=periods)
    fc = model.predict(future)
    return fc[["ds", "yhat", "yhat_lower", "yhat_upper"]]


# -----------------------------
# Wholesale helpers & multi-model training (unchanged except robustness)
# -----------------------------
def _prepare_series_for_hw(history_df: pd.DataFrame, enforce_daily: bool = True) -> pd.Series:
    """
    Return a daily-indexed pd.Series of prices for modeling.
    """
    s = history_df.sort_values("Date")[["Date", "Price"]].dropna().copy()
    s["Price"] = pd.to_numeric(s["Price"], errors="coerce")
    s = s.dropna(subset=["Price"])
    if s.empty:
        return pd.Series(dtype=float)

    # Ensure strictly positive for multiplicative components
    if (s["Price"] <= 0).any():
        min_positive = s.loc[s["Price"] > 0, "Price"].min() if (s["Price"] > 0).any() else 1.0
        eps = min_positive * 1e-3
        s["Price"] = s["Price"].clip(lower=eps)

    if enforce_daily:
        idx = pd.date_range(s["Date"].min(), s["Date"].max(), freq='D')
        ser = s.set_index("Date")["Price"].reindex(idx).interpolate().ffill().bfill()
        ser.index.name = "Date"
        return ser
    else:
        ser = s.set_index("Date")["Price"]
        ser.index.name = "Date"
        return ser


def _create_lag_features(series: pd.Series, nlags: int = 14) -> pd.DataFrame:
    """Create lag features dataframe for supervised learning. Return DataFrame with columns lag_1..lag_n and target."""
    df = pd.DataFrame({"y": series})
    for l in range(1, nlags + 1):
        df[f"lag_{l}"] = df["y"].shift(l)
    df = df.dropna()
    return df


def _iterative_forecast_supervised(model, last_window: np.ndarray, periods: int, model_type: str, scaler: Optional[MinMaxScaler] = None) -> np.ndarray:
    """
    Produce multi-step forecasts by iteratively feeding predictions back as lags.
    last_window: array of shape (nlags,) representing most recent lags (lag_1..lag_n)
    model_type: 'rf' | 'xgb' | 'lstm' (for LSTM scaler is required)
    """
    preds = []
    window = last_window.copy().astype(float)
    nlags = len(window)
    for _ in range(periods):
        X = window.reshape(1, -1)
        if model_type == "lstm":
            # Expecting scaler and model that accepts scaled input and returns scaled output
            if scaler is None:
                raise ValueError("Scaler required for LSTM forecasting")
            Xs = scaler.transform(X)
            Xs = Xs.reshape((1, nlags, 1))
            p_scaled = model.predict(Xs, verbose=0)[0, 0]
            p = scaler.inverse_transform(np.array([[p_scaled]]))[0, 0]
        else:
            p = model.predict(X)[0]
        preds.append(float(p))
        # shift window: drop last, insert prediction at front as lag_1
        window = np.roll(window, 1)
        window[0] = p
    return np.array(preds)


def _fit_and_forecast_ets(train: pd.Series, test: pd.Series, periods: int) -> Dict[str, Any]:
    try:
        model = ExponentialSmoothing(train, trend="add", damped_trend=False, seasonal=None, initialization_method="estimated")
        fit_res = model.fit(optimized=True)
        preds = fit_res.forecast(steps=periods)
        mae = float(mean_absolute_error(test.values, preds))
        return {"mae": mae, "fit": fit_res, "model_type": "ets", "residuals": train.values - fit_res.fittedvalues}
    except Exception as e:
        logger.debug(f"ETS fit error: {e}")
        return {"mae": float("inf")}


def _fit_and_forecast_arima(train: pd.Series, test: pd.Series, periods: int) -> Dict[str, Any]:
    best_mae = float("inf")
    best_res = None
    # small grid to be safe
    orders = [(0, 1, 0), (1, 1, 0), (0, 1, 1), (1, 1, 1)]
    for order in orders:
        try:
            with warnings.catch_warnings():
                warnings.simplefilter("ignore")
                model = ARIMA(train, order=order)
                res = model.fit()
                preds = res.forecast(steps=periods)
            mae = float(mean_absolute_error(test.values, preds))
            if mae < best_mae:
                best_mae = mae
                best_res = res
        except Exception as e:
            logger.debug(f"ARIMA order {order} failed: {e}")
            continue
    if best_res is None:
        return {"mae": float("inf")}
    return {"mae": best_mae, "fit": best_res, "model_type": "arima", "residuals": train.values - best_res.fittedvalues}


def _fit_and_forecast_rf(train: pd.Series, test: pd.Series, periods: int, nlags: int = 14) -> Dict[str, Any]:
    df_train = _create_lag_features(train, nlags)
    if df_train.empty:
        return {"mae": float("inf")}
    X_train = df_train[[f"lag_{l}" for l in range(1, nlags + 1)]].values
    y_train = df_train["y"].values
    try:
        model = RandomForestRegressor(n_estimators=100, max_depth=6, random_state=42, n_jobs=1)
        model.fit(X_train, y_train)
        # iterative forecast
        last_window = train.values[-nlags:][::-1]  # produce lag_1..lag_n (most recent first)
        preds = _iterative_forecast_supervised(model, last_window, periods, model_type="rf")
        mae = float(mean_absolute_error(test.values, preds))
        # residuals approximated by train - fitted via model on X_train
        fitted = model.predict(X_train)
        residuals = y_train - fitted
        return {"mae": mae, "fit": model, "model_type": "rf", "residuals": residuals, "nlags": nlags, "original_series": train}
    except Exception as e:
        logger.debug(f"RF fit error: {e}")
        return {"mae": float("inf")}


def _fit_and_forecast_xgb(train: pd.Series, test: pd.Series, periods: int, nlags: int = 14) -> Dict[str, Any]:
    try:
        import xgboost as xgb  # type: ignore
    except Exception:
        return {"mae": float("inf")}
    df_train = _create_lag_features(train, nlags)
    if df_train.empty:
        return {"mae": float("inf")}
    X_train = df_train[[f"lag_{l}" for l in range(1, nlags + 1)]].values
    y_train = df_train["y"].values
    try:
        model = xgb.XGBRegressor(n_estimators=100, max_depth=4, random_state=42, n_jobs=1, verbosity=0)
        model.fit(X_train, y_train)
        last_window = train.values[-nlags:][::-1]
        preds = _iterative_forecast_supervised(model, last_window, periods, model_type="xgb")
        mae = float(mean_absolute_error(test.values, preds))
        fitted = model.predict(X_train)
        residuals = y_train - fitted
        return {"mae": mae, "fit": model, "model_type": "xgb", "residuals": residuals, "nlags": nlags, "original_series": train}
    except Exception as e:
        logger.debug(f"XGB fit error: {e}")
        return {"mae": float("inf")}


def _fit_and_forecast_lstm(train: pd.Series, test: pd.Series, periods: int, nlags: int = 14) -> Dict[str, Any]:
    """
    Fit a tiny LSTM if tensorflow is available. Use very small epochs to be Colab-safe.
    """
    try:
        import tensorflow as tf  # type: ignore
        from tensorflow.keras.models import Sequential
        from tensorflow.keras.layers import LSTM, Dense
        from tensorflow.keras.callbacks import EarlyStopping
    except Exception:
        return {"mae": float("inf")}

    if len(train) < nlags + 2:
        return {"mae": float("inf")}
    df_train = _create_lag_features(train, nlags)
    if df_train.empty:
        return {"mae": float("inf")}

    X = df_train[[f"lag_{l}" for l in range(1, nlags + 1)]].values
    y = df_train["y"].values.reshape(-1, 1)

    scaler_x = MinMaxScaler()
    scaler_y = MinMaxScaler()
    Xs = scaler_x.fit_transform(X)
    ys = scaler_y.fit_transform(y)

    Xs = Xs.reshape((Xs.shape[0], Xs.shape[1], 1))

    # small model
    try:
        model = Sequential([
            LSTM(32, input_shape=(nlags, 1), activation="tanh"),
            Dense(1)
        ])
        model.compile(optimizer="adam", loss="mse")
        es = EarlyStopping(monitor="loss", patience=3, restore_best_weights=True, verbose=0)
        # Reduce epochs in Colab environment
        epochs = 10 if _is_running_in_colab() else 25
        model.fit(Xs, ys, epochs=epochs, batch_size=16, verbose=0, callbacks=[es])
        # iterative forecast
        last_window = train.values[-nlags:][::-1].reshape(1, -1)
        preds = _iterative_forecast_supervised(model, last_window.flatten(), periods, model_type="lstm", scaler=scaler_x)
        mae = float(mean_absolute_error(test.values, preds))
        # approximate residuals
        fitted = model.predict(Xs, verbose=0)
        fitted = scaler_y.inverse_transform(fitted).flatten()
        residuals = y.flatten() - fitted
        return {"mae": mae, "fit": model, "model_type": "lstm", "residuals": residuals, "nlags": nlags, "scaler_x": scaler_x, "scaler_y": scaler_y, "original_series": train}
    except Exception as e:
        logger.debug(f"LSTM fit error: {e}")
        return {"mae": float("inf")}


def train_wholesale_model(history_df: pd.DataFrame, test_days: int = 14, n_trials: int = 30) -> Optional[Dict[str, Any]]:
    """
    Train and select the best wholesale model (ETS, ARIMA, RF, XGB, LSTM) based on a
    single rolling-origin backtest (train on all but last test_days, validate on last test_days).
    """
    ser_full = _prepare_series_for_hw(history_df, enforce_daily=True)
    if len(ser_full) < test_days + 30:
        logger.debug("Wholesale series too short for reliable multi-model training.")
        return None

    train = ser_full[:-test_days]
    test = ser_full[-test_days:]

    results = []

    # ETS
    try:
        res_ets = _fit_and_forecast_ets(train, test, test_days)
        results.append(res_ets)
    except Exception as e:
        logger.debug(f"ETS candidate failure: {e}")

    # ARIMA
    try:
        res_arima = _fit_and_forecast_arima(train, test, test_days)
        results.append(res_arima)
    except Exception as e:
        logger.debug(f"ARIMA candidate failure: {e}")

    # Random Forest
    try:
        res_rf = _fit_and_forecast_rf(train, test, test_days, nlags=14)
        results.append(res_rf)
    except Exception as e:
        logger.debug(f"RF candidate failure: {e}")

    # XGBoost (optional)
    try:
        res_xgb = _fit_and_forecast_xgb(train, test, test_days, nlags=14)
        results.append(res_xgb)
    except Exception as e:
        logger.debug(f"XGB candidate failure: {e}")

    # LSTM (optional & lightweight)
    try:
        res_lstm = _fit_and_forecast_lstm(train, test, test_days, nlags=14)
        results.append(res_lstm)
    except Exception as e:
        logger.debug(f"LSTM candidate failure: {e}")

    # Select best (lowest mae)
    best = None
    best_mae = float("inf")
    for r in results:
        if r is None:
            continue
        mae = r.get("mae", float("inf"))
        if mae is None:
            continue
        if mae < best_mae:
            best_mae = mae
            best = r

    if best is None or best_mae == float("inf"):
        logger.warning("No wholesale candidate model could be fit successfully.")
        return None

    # Fit final model on full series using the chosen model type and any best hyperparams
    model_type = best.get("model_type", "ets")
    try:
        if model_type == "ets":
            final_model = ExponentialSmoothing(ser_full, trend="add", damped_trend=False, seasonal=None, initialization_method="estimated").fit(optimized=True)
            residuals = ser_full.values - final_model.fittedvalues
            model_dict = {"model_type": "ets", "fit": final_model, "train_index": ser_full.index, "residuals": residuals, "best_mae": best_mae}
            return model_dict
        elif model_type == "arima":
            arima_res = best["fit"]
            try:
                order = getattr(arima_res.model, "order", None) or (1, 1, 1)
                final_res = ARIMA(ser_full, order=order).fit()
            except Exception:
                final_res = ARIMA(ser_full, order=(1, 1, 1)).fit()
            residuals = ser_full.values - final_res.fittedvalues
            model_dict = {"model_type": "arima", "fit": final_res, "train_index": ser_full.index, "residuals": residuals, "best_mae": best_mae}
            return model_dict
        elif model_type in ("rf", "xgb"):
            nlags = best.get("nlags", 14)
            df_full = _create_lag_features(ser_full, nlags)
            if df_full.empty:
                return None
            X_full = df_full[[f"lag_{l}" for l in range(1, nlags + 1)]].values
            y_full = df_full["y"].values
            if model_type == "rf":
                m = RandomForestRegressor(n_estimators=200, max_depth=6, random_state=42, n_jobs=1)
            else:
                try:
                    import xgboost as xgb  # type: ignore
                    m = xgb.XGBRegressor(n_estimators=200, max_depth=4, random_state=42, n_jobs=1, verbosity=0)
                except Exception:
                    logger.warning("XGBoost not available for final fit; falling back to RF.")
                    m = RandomForestRegressor(n_estimators=200, max_depth=6, random_state=42, n_jobs=1)
            m.fit(X_full, y_full)
            fitted = m.predict(X_full)
            residuals = y_full - fitted
            model_dict = {"model_type": model_type, "fit": m, "train_index": ser_full.index, "residuals": residuals, "nlags": nlags, "original_series": ser_full, "best_mae": best_mae}
            return model_dict
        elif model_type == "lstm":
            nlags = best.get("nlags", 14)
            try:
                import tensorflow as tf  # type: ignore
                from tensorflow.keras.models import Sequential
                from tensorflow.keras.layers import LSTM, Dense
                from tensorflow.keras.callbacks import EarlyStopping
                df_full = _create_lag_features(ser_full, nlags)
                if df_full.empty:
                    return None
                X = df_full[[f"lag_{l}" for l in range(1, nlags + 1)]].values
                y = df_full["y"].values.reshape(-1, 1)
                scaler_x = MinMaxScaler()
                scaler_y = MinMaxScaler()
                Xs = scaler_x.fit_transform(X)
                ys = scaler_y.fit_transform(y)
                Xs = Xs.reshape((Xs.shape[0], Xs.shape[1], 1))
                model = Sequential([
                    LSTM(32, input_shape=(nlags, 1), activation="tanh"),
                    Dense(1)
                ])
                model.compile(optimizer="adam", loss="mse")
                es = EarlyStopping(monitor="loss", patience=3, restore_best_weights=True, verbose=0)
                epochs = 10 if _is_running_in_colab() else 25
                model.fit(Xs, ys, epochs=epochs, batch_size=16, verbose=0, callbacks=[es])
                fitted = model.predict(Xs, verbose=0)
                fitted = scaler_y.inverse_transform(fitted).flatten()
                residuals = y.flatten() - fitted
                model_dict = {"model_type": "lstm", "fit": model, "train_index": ser_full.index, "residuals": residuals, "nlags": nlags, "scaler_x": scaler_x, "scaler_y": scaler_y, "original_series": ser_full, "best_mae": best_mae}
                return model_dict
            except Exception as e:
                logger.warning(f"LSTM final fit failed: {e}")
                return None
    except Exception as e:
        logger.error(f"Final fit for chosen model type {model_type} failed: {e}")
        return None


def _hw_forecast_from_model_dict(model_dict: Optional[Dict[str, Any]], periods: int) -> pd.DataFrame:
    """
    Given the model dict produced by train_wholesale_model, produce a DataFrame with
    in-sample fitted values + out-of-sample forecasts and approximate intervals.

    Robust: Will always return a DataFrame with columns ds, yhat, yhat_lower, yhat_upper.
    """
    # Default empty frame (in case of failure)
    empty_df = pd.DataFrame(columns=["ds", "yhat", "yhat_lower", "yhat_upper"])

    if model_dict is None:
        return empty_df

    try:
        model_type = model_dict.get("model_type", None)
        train_index = model_dict.get("train_index", None)
        residuals = model_dict.get("residuals", None)
        resid_std = float(np.std(residuals, ddof=1)) if (residuals is not None and len(residuals) > 1) else 0.0
        z = 1.96

        if train_index is None or len(train_index) == 0:
            return empty_df

        # ETS or ARIMA: Statsmodels-like results with fittedvalues & forecast
        if model_type == "ets":
            fit = model_dict["fit"]
            fitted = np.asarray(fit.fittedvalues)
            fc = fit.forecast(steps=periods)
            yhat = np.concatenate([fitted, np.asarray(fc)])
            ds_in = pd.DatetimeIndex(train_index)
            ds_out = pd.date_range(ds_in[-1] + pd.Timedelta(days=1), periods=periods, freq='D') if periods > 0 else pd.DatetimeIndex([])
            ds = ds_in.append(ds_out)
            yhat_lower = yhat - z * resid_std
            yhat_upper = yhat + z * resid_std
            df = pd.DataFrame({"ds": ds, "yhat": yhat, "yhat_lower": yhat_lower, "yhat_upper": yhat_upper})
            return df[["ds", "yhat", "yhat_lower", "yhat_upper"]]

        if model_type == "arima":
            fit = model_dict["fit"]
            fitted = np.asarray(fit.fittedvalues)
            fc = fit.forecast(steps=periods)
            yhat = np.concatenate([fitted, np.asarray(fc)])
            ds_in = pd.DatetimeIndex(train_index)
            ds_out = pd.date_range(ds_in[-1] + pd.Timedelta(days=1), periods=periods, freq='D') if periods > 0 else pd.DatetimeIndex([])
            ds = ds_in.append(ds_out)
            yhat_lower = yhat - z * resid_std
            yhat_upper = yhat + z * resid_std
            df = pd.DataFrame({"ds": ds, "yhat": yhat, "yhat_lower": yhat_lower, "yhat_upper": yhat_upper})
            return df[["ds", "yhat", "yhat_lower", "yhat_upper"]]

        # Supervised models (rf, xgb)
        if model_type in ("rf", "xgb"):
            fit = model_dict["fit"]
            nlags = model_dict.get("nlags", 14)
            orig = model_dict.get("original_series", None)
            # Build ds indexes
            ds_in = pd.DatetimeIndex(train_index)
            ds_out = pd.date_range(ds_in[-1] + pd.Timedelta(days=1), periods=periods, freq='D') if periods > 0 else pd.DatetimeIndex([])
            ds = ds_in.append(ds_out)
            # Try to compute in-sample fitted if original_series exists
            try:
                if orig is not None:
                    df_lags = _create_lag_features(orig, nlags)
                    if not df_lags.empty:
                        X_full = df_lags[[f"lag_{l}" for l in range(1, nlags + 1)]].values
                        fitted_vals = fit.predict(X_full)
                        preds_out = _iterative_forecast_supervised(fit, orig.values[-nlags:][::-1], periods, model_type)
                        yhat = np.concatenate([fitted_vals, preds_out])
                        # Align ds (if df_lags index shorter than train_index, pad front with NaNs)
                        if len(yhat) != len(ds):
                            # create ds aligned to yhat length
                            ds_adj_in = df_lags.index
                            ds_out_adj = pd.date_range(ds_adj_in[-1] + pd.Timedelta(days=1), periods=periods, freq='D') if periods > 0 else pd.DatetimeIndex([])
                            ds_adj = ds_adj_in.append(ds_out_adj)
                            df = pd.DataFrame({"ds": ds_adj, "yhat": yhat})
                        else:
                            df = pd.DataFrame({"ds": ds, "yhat": yhat})
                    else:
                        # fallback: zeros
                        yhat = np.concatenate([np.zeros(len(ds_in)), np.zeros(periods)])
                        df = pd.DataFrame({"ds": ds, "yhat": yhat})
                else:
                    # No original series; fallback to zeros
                    yhat = np.concatenate([np.zeros(len(ds_in)), np.zeros(periods)])
                    df = pd.DataFrame({"ds": ds, "yhat": yhat})
            except Exception:
                yhat = np.concatenate([np.zeros(len(ds_in)), np.zeros(periods)])
                df = pd.DataFrame({"ds": ds, "yhat": yhat})

            df["yhat_lower"] = df["yhat"] - z * resid_std
            df["yhat_upper"] = df["yhat"] + z * resid_std
            # Ensure required columns
            for c in ["ds", "yhat", "yhat_lower", "yhat_upper"]:
                if c not in df.columns:
                    df[c] = 0.0 if c != "ds" else pd.date_range("1970-01-01", periods=len(df))
            return df[["ds", "yhat", "yhat_lower", "yhat_upper"]]

        # LSTM
        if model_type == "lstm":
            fit = model_dict["fit"]
            nlags = model_dict.get("nlags", 14)
            scaler_x = model_dict.get("scaler_x", None)
            scaler_y = model_dict.get("scaler_y", None)
            orig = model_dict.get("original_series", None)
            ds_in = pd.DatetimeIndex(train_index)
            ds_out = pd.date_range(ds_in[-1] + pd.Timedelta(days=1), periods=periods, freq='D') if periods > 0 else pd.DatetimeIndex([])
            if orig is not None:
                try:
                    df_lags = _create_lag_features(orig, nlags)
                    if not df_lags.empty:
                        X_full = df_lags[[f"lag_{l}" for l in range(1, nlags + 1)]].values
                        Xs = scaler_x.transform(X_full).reshape((X_full.shape[0], X_full.shape[1], 1))
                        fitted_vals = fit.predict(Xs, verbose=0)
                        fitted_vals = scaler_y.inverse_transform(fitted_vals).flatten()
                        preds_out = _iterative_forecast_supervised(fit, orig.values[-nlags:][::-1], periods, model_type="lstm", scaler=scaler_x)
                        yhat = np.concatenate([fitted_vals, preds_out])
                        ds_adj_in = df_lags.index
                        ds = ds_adj_in.append(pd.date_range(ds_adj_in[-1] + pd.Timedelta(days=1), periods=periods, freq='D') if periods > 0 else pd.DatetimeIndex([]))
                        df = pd.DataFrame({"ds": ds, "yhat": yhat})
                    else:
                        yhat = np.concatenate([np.zeros(len(ds_in)), np.zeros(periods)])
                        df = pd.DataFrame({"ds": ds_in.append(ds_out), "yhat": yhat})
                except Exception:
                    yhat = np.concatenate([np.zeros(len(ds_in)), np.zeros(periods)])
                    df = pd.DataFrame({"ds": ds_in.append(ds_out), "yhat": yhat})
            else:
                yhat = np.concatenate([np.zeros(len(ds_in)), np.zeros(periods)])
                df = pd.DataFrame({"ds": ds_in.append(ds_out), "yhat": yhat})

            df["yhat_lower"] = df["yhat"] - z * resid_std
            df["yhat_upper"] = df["yhat"] + z * resid_std
            for c in ["ds", "yhat", "yhat_lower", "yhat_upper"]:
                if c not in df.columns:
                    df[c] = 0.0 if c != "ds" else pd.date_range("1970-01-01", periods=len(df))
            return df[["ds", "yhat", "yhat_lower", "yhat_upper"]]

        # Unknown model_type: safe fallback
        ds_in = pd.DatetimeIndex(train_index)
        ds_out = pd.date_range(ds_in[-1] + pd.Timedelta(days=1), periods=periods, freq='D') if periods > 0 else pd.DatetimeIndex([])
        ds = ds_in.append(ds_out)
        yhat = np.concatenate([np.zeros(len(ds_in)), np.zeros(periods)])
        df = pd.DataFrame({"ds": ds, "yhat": yhat})
        df["yhat_lower"] = df["yhat"] - z * resid_std
        df["yhat_upper"] = df["yhat"] + z * resid_std
        return df[["ds", "yhat", "yhat_lower", "yhat_upper"]]

    except Exception as e:
        logger.debug(f"Forecast building error: {e}")
        # Robust fallback to empty_df with a single timestamp for safety
        return pd.DataFrame({
            "ds": [pd.Timestamp.now()],
            "yhat": [0.0],
            "yhat_lower": [0.0],
            "yhat_upper": [0.0]
        })


def forecast_wholesale_prices(model_dict: Optional[Dict[str, Any]], periods: int = 14) -> pd.DataFrame:
    """
    Wrapper to produce DataFrame of forecasts (ds, yhat, yhat_lower, yhat_upper)
    using the selected wholesale model. This function is robust and will never
    return None (it returns an empty / zeroed DataFrame with expected columns
    if forecasting fails).
    """
    df = _hw_forecast_from_model_dict(model_dict, periods)
    # Defensive checks
    if df is None or not isinstance(df, pd.DataFrame):
        return pd.DataFrame(columns=["ds", "yhat", "yhat_lower", "yhat_upper"])
    # Ensure required columns exist
    for col in ["ds", "yhat", "yhat_lower", "yhat_upper"]:
        if col not in df.columns:
            if col == "ds":
                df[col] = pd.date_range(pd.Timestamp("1970-01-01"), periods=len(df))
            else:
                df[col] = 0.0
    return df[["ds", "yhat", "yhat_lower", "yhat_upper"]]


# -----------------------------
# Forecast Accuracy (unchanged overall behavior, but routes Wholesale)
# -----------------------------
def compute_mae(history_df: pd.DataFrame, test_days: int = 14) -> Optional[float]:
    market_type = None
    if "MarketType" in history_df.columns and not history_df["MarketType"].isna().all():
        market_type = history_df["MarketType"].iloc[0]

    if market_type != "Wholesale":
        # retail path (Prophet)
        if len(history_df) < test_days + 30:
            return None
        train = history_df.iloc[:-test_days]
        test = history_df.iloc[-test_days:]
        model = train_prophet_model(train)
        if model is None:
            return None
        fc = forecast_prices(model, test_days)
        preds = fc.tail(test_days)["yhat"].values
        actuals = test["Price"].values
        return mean_absolute_error(actuals, preds)

    # Wholesale path
    if len(history_df) < test_days + 30:
        return None

    wh_model = train_wholesale_model(history_df, test_days=test_days, n_trials=12 if _is_running_in_colab() else 25)
    if wh_model is None:
        return None

    fc = forecast_wholesale_prices(wh_model, periods=test_days)
    future = fc[fc["ds"] > history_df["Date"].max()]
    if future.empty:
        preds = fc.tail(test_days)["yhat"].values if not fc.empty else None
    else:
        preds = future["yhat"].values

    actuals = history_df.sort_values("Date").iloc[-test_days:]["Price"].values
    if preds is None or len(preds) != len(actuals):
        return None
    return float(mean_absolute_error(actuals, preds))


def n_trials_default() -> int:
    return 25


# -----------------------------
# Metrics & Signals (unchanged)
# -----------------------------
def volatility_score(series: pd.Series) -> float:
    s = series.dropna()
    if len(s) < 2 or np.mean(s) == 0:
        return 0.0
    return np.std(s, ddof=1) / abs(np.mean(s))


def generate_procurement_signal(
    forecast_df: pd.DataFrame,
    history_df: pd.DataFrame
) -> Dict[str, Any]:

    cv = volatility_score(history_df["Price"])
    future = forecast_df[forecast_df["ds"] > history_df["Date"].max()]

    if future.empty:
        return {"signal": "Monitor", "reason": "No future forecast"}

    slope = (future["yhat"].iloc[-1] - future["yhat"].iloc[0]) / len(future)

    if cv > 0.3:
        return {"signal": "Risk Zone Alert", "reason": f"High volatility (CV={cv:.2f})"}
    if slope > 0:
        return {"signal": "Stock Early", "reason": "Upward trend expected"}
    if slope < 0:
        return {"signal": "Wait / Defer Procurement", "reason": "Downward trend expected"}
    return {"signal": "Monitor", "reason": "Stable trend"}


# -----------------------------
# Pipeline (unchanged high-level behavior, routes Wholesale to new functions)
# -----------------------------
def pipeline_all(
    retail_filepath: str,
    wholesale_filepath: str,
    target_crops: List[str],
    horizon_days: int = 14,
    chunksize: Optional[int] = None
):

    df_r = preprocess_prices(retail_filepath, target_crops, "Retail", chunksize)
    df_w = preprocess_prices(wholesale_filepath, target_crops, "Wholesale", chunksize)

    df_all = pd.concat([df_r, df_w], ignore_index=True)
    groups = df_all.groupby(["MarketType", "Crop", "State"])

    forecasts, volatility, signals, accuracy = [], [], [], []

    for (market, crop, state), hist in groups:
        # Route Wholesale through multi-model path, Retail remains Prophet
        if market == "Wholesale":
            wh_model = train_wholesale_model(hist, n_trials=12 if _is_running_in_colab() else 30)
            if wh_model is None:
                continue
            fc = forecast_wholesale_prices(wh_model, horizon_days)
        else:
            model = train_prophet_model(hist[["Date", "Price"]])
            if model is None:
                continue
            fc = forecast_prices(model, horizon_days)

        sig = generate_procurement_signal(fc, hist)
        mae = compute_mae(hist)

        forecasts.append(fc.assign(MarketType=market, Crop=crop, State=state))
        volatility.append({
            "MarketType": market,
            "Crop": crop,
            "State": state,
            "volatility": volatility_score(hist["Price"])
        })
        signals.append({
            "MarketType": market,
            "Crop": crop,
            "State": state,
            **sig
        })
        accuracy.append({
            "MarketType": market,
            "Crop": crop,
            "State": state,
            "MAE": mae
        })

    # If no forecasts, return empty dataframes with expected columns
    if len(forecasts) == 0:
        return (
            pd.DataFrame(columns=["ds", "yhat", "yhat_lower", "yhat_upper", "MarketType", "Crop", "State"]),
            pd.DataFrame(volatility),
            pd.DataFrame(signals),
            pd.DataFrame(accuracy)
        )

    return (
        pd.concat(forecasts, ignore_index=True),
        pd.DataFrame(volatility),
        pd.DataFrame(signals),
        pd.DataFrame(accuracy)
    )


# =============================
# COLAB INPUT SECTION
# =============================
if __name__ == "__main__":
    RETAIL_PATH = "/content/retail_price_data.csv"
    WHOLESALE_PATH = "/content/wholesale_price_data.csv"

    TARGET_CROPS = ["Onion", "Potato", "Salt Pack"]

    forecasts_df, volatility_df, procurement_df, accuracy_df = pipeline_all(
        retail_filepath=RETAIL_PATH,
        wholesale_filepath=WHOLESALE_PATH,
        target_crops=TARGET_CROPS,
        horizon_days=21,
        chunksize=50000
    )

    forecasts_df.to_csv("/content/state_wise_forecast.csv", index=False)
    volatility_df.to_csv("/content/volatility_table.csv", index=False)
    procurement_df.to_csv("/content/procurement_recommendations.csv", index=False)
    accuracy_df.to_csv("/content/forecast_accuracy.csv", index=False)

    print("✅ Pipeline executed successfully with forecast accuracy (MAE)")

  parsed = pd.to_datetime(df[col], errors='coerce')
INFO:prophet:Disabling daily seasonality. Run prophet with daily_seasonality=True to override this.
INFO:prophet:Disabling daily seasonality. Run prophet with daily_seasonality=True to override this.
INFO:prophet:Disabling daily seasonality. Run prophet with daily_seasonality=True to override this.
INFO:prophet:Disabling daily seasonality. Run prophet with daily_seasonality=True to override this.
INFO:prophet:Disabling daily seasonality. Run prophet with daily_seasonality=True to override this.
INFO:prophet:Disabling daily seasonality. Run prophet with daily_seasonality=True to override this.
INFO:prophet:Disabling daily seasonality. Run prophet with daily_seasonality=True to override this.
INFO:prophet:Disabling daily seasonality. Run prophet with daily_seasonality=True to override this.
INFO:prophet:Disabling daily seasonality. Run prophet with daily_seasonality=True to override this.
INFO:prophet:Disabling daily seasonality. Run pr

✅ Pipeline executed successfully with forecast accuracy (MAE)


In [9]:
from google.colab import files

files.download("state_wise_forecast.csv")
files.download("volatility_table.csv")
files.download("procurement_recommendations.csv")
files.download("forecast_accuracy.csv")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>