<a href="https://colab.research.google.com/github/EdiNel0407/us-ie-big-data-technologies/blob/main/postblock2/q5/postblock2_q5_1_multi_forecast_spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# === Q5.1 — Many models in parallel with Spark (Colab/local prototype) ===
# - Ingest data (or synthesize if not provided)
# - Partition by (store_id, item_id)
# - Fit & forecast per group in parallel via grouped Pandas UDF
# - Persist forecasts to local filesystem
# - Evaluate each model (evaluate_forecast) and print results

# 0) Environment
!pip -q install pyspark==3.5.1 statsmodels==0.14.2

from pyspark.sql import SparkSession
spark = (SparkSession.builder
         .appName("multi-forecast-parallel")
         .config("spark.sql.execution.arrow.pyspark.enabled", "true")
         .getOrCreate())
print("Spark:", spark.version)

from pyspark.sql import functions as F, types as T
import pandas as pd
import numpy as np
import os, pathlib, datetime as dt


[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/10.7 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.7/10.7 MB[0m [31m107.7 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━[0m [32m7.8/10.7 MB[0m [31m108.2 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m10.7/10.7 MB[0m [31m114.8 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m10.7/10.7 MB[0m [31m114.8 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.7/10.7 MB[0m [31m62.7 MB/s[0m eta [36m0:00:00[0m
[?25hSpark: 3.5.1


In [2]:
# 1) Ingest data from source (CSV) or synthesize if not found
# Expected schema if reading: store_id (int/str), item_id (int/str), ds (date/str), y (float / demand)

DATA_PATH = "/content/data/store_item_demand.csv"   # <-- change if you have a real file
pathlib.Path("/content/data").mkdir(parents=True, exist_ok=True)

def load_or_make_data():
    if os.path.exists(DATA_PATH):
        dfp = pd.read_csv(DATA_PATH)
        # normalize column names
        cols = {c.lower():c for c in dfp.columns}
        # ensure expected names exist
        rename_map = {}
        for want in ["store_id","item_id","ds","y"]:
            # try exact first, then any lower-cased match
            if want in dfp.columns:
                continue
            elif want in [c.lower() for c in dfp.columns]:
                # find original
                orig = [c for c in dfp.columns if c.lower()==want][0]
                rename_map[orig] = want
        if rename_map:
            dfp = dfp.rename(columns=rename_map)
        # type cleanup
        dfp["ds"] = pd.to_datetime(dfp["ds"])
        dfp["y"]  = pd.to_numeric(dfp["y"], errors="coerce").astype(float)
        return dfp

    # --- synthesize weekly data for 50 stores x 20 items x 104 weeks ---
    rng = np.random.default_rng(7)
    stores, items = range(1, 51), range(1, 21)
    weeks = pd.date_range("2019-01-06", periods=104, freq="W-SUN")
    rows = []
    for s in stores:
        for it in items:
            base = 20 + 5*np.sin(np.arange(len(weeks))/6) + 0.1*s + 0.05*it
            noise = rng.normal(0, 2.5, size=len(weeks))
            y = np.maximum(0, base + noise)
            rows.append(pd.DataFrame({
                "store_id": s,
                "item_id":  it,
                "ds": weeks,
                "y": y
            }))
    dfp = pd.concat(rows, ignore_index=True)
    # Also save so you can inspect/restart quickly
    dfp.to_csv(DATA_PATH, index=False)
    return dfp

pdf_raw = load_or_make_data()
print(pdf_raw.head())
print(pdf_raw.shape, pdf_raw["ds"].min(), pdf_raw["ds"].max())

sdf_raw = spark.createDataFrame(pdf_raw)\
               .withColumn("ds", F.to_timestamp("ds"))
sdf_raw.cache().count()
sdf_raw.printSchema()


   store_id  item_id         ds          y
0         1        1 2019-01-06  20.153075
1         1        1 2019-01-13  21.726345
2         1        1 2019-01-20  21.100629
3         1        1 2019-01-27  20.320648
4         1        1 2019-02-03  22.105172
(104000, 4) 2019-01-06 00:00:00 2020-12-27 00:00:00
root
 |-- store_id: long (nullable = true)
 |-- item_id: long (nullable = true)
 |-- ds: timestamp (nullable = true)
 |-- y: double (nullable = true)



In [3]:
# 2) Prepare data & choose horizon
# We'll forecast the last H periods per group and train on the rest.
H = 8   # forecast horizon (e.g., 8 weeks)

# For convenience, compute the per-group cutoff date (max(ds) - H + 1 step)
# We'll make this inside the UDF; but it's handy to know globally as well.


In [5]:
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import functions as F, types as T
import pandas as pd
import numpy as np


In [6]:
forecast_schema = T.StructType([
    T.StructField("store_id", T.IntegerType(), False),
    T.StructField("item_id",  T.IntegerType(), False),
    T.StructField("ds",       T.TimestampType(), False),
    T.StructField("yhat",     T.DoubleType(), False),
])

@pandas_udf(forecast_schema, PandasUDFType.GROUPED_MAP)   # <-- enum, not string
def fit_and_forecast(pdf: pd.DataFrame) -> pd.DataFrame:
    pdf = pdf.sort_values("ds")
    sid = int(pdf["store_id"].iloc[0])
    iid = int(pdf["item_id"].iloc[0])

    last_ds = pd.to_datetime(pdf["ds"].max())
    freq = pd.infer_freq(pdf["ds"]) or "W-SUN"
    from pandas.tseries.frequencies import to_offset
    future_ds = pd.date_range(last_ds + to_offset(freq), periods=H, freq=freq)

    y = pdf["y"].astype(float).values
    if len(y) < max(13, H):
        yhat = np.repeat(float(np.nanmean(y)), H)
    else:
        from statsmodels.tsa.holtwinters import ExponentialSmoothing
        try:
            model = ExponentialSmoothing(y, trend="add", seasonal="add", seasonal_periods=52).fit(optimized=True)
            yhat = model.forecast(H)
        except Exception:
            yhat = np.repeat(float(np.nanmean(y)), H)

    return pd.DataFrame({
        "store_id": sid,
        "item_id":  iid,
        "ds":       future_ds.tz_localize(None),
        "yhat":     np.asarray(yhat, dtype=float),
    })


In [7]:
holdout_schema = T.StructType([
    T.StructField("store_id", T.IntegerType(), False),
    T.StructField("item_id",  T.IntegerType(), False),
    T.StructField("ds",       T.TimestampType(), False),
    T.StructField("yhat",     T.DoubleType(), False),
])

@pandas_udf(holdout_schema, PandasUDFType.GROUPED_MAP)    # <-- enum here too
def backtest_fit_forecast(pdf: pd.DataFrame) -> pd.DataFrame:
    pdf = pdf.sort_values("ds")
    sid = int(pdf["store_id"].iloc[0]); iid = int(pdf["item_id"].iloc[0])
    if len(pdf) <= H:
        return pd.DataFrame(columns=["store_id","item_id","ds","yhat"])

    train = pdf.iloc[:-H]
    valid = pdf.iloc[-H:]
    y_train = train["y"].astype(float).values
    future_ds = pd.to_datetime(valid["ds"].values)

    if len(y_train) < max(13, H):
        yhat = np.repeat(float(np.nanmean(y_train)), H)
    else:
        from statsmodels.tsa.holtwinters import ExponentialSmoothing
        try:
            model = ExponentialSmoothing(y_train, trend="add", seasonal="add", seasonal_periods=52).fit(optimized=True)
            yhat = model.forecast(H)
        except Exception:
            yhat = np.repeat(float(np.nanmean(y_train)), H)

    return pd.DataFrame({
        "store_id": sid,
        "item_id":  iid,
        "ds":       future_ds.tz_localize(None),
        "yhat":     np.asarray(yhat, dtype=float),
    })


In [8]:
forecasts = (
    sdf_raw
    .groupBy("store_id", "item_id")
    .apply(fit_and_forecast)
)
forecasts.cache().count()
forecasts.orderBy("store_id","item_id","ds").show(6, truncate=False)




+--------+-------+-------------------+------------------+
|store_id|item_id|ds                 |yhat              |
+--------+-------+-------------------+------------------+
|1       |1      |2021-01-03 00:00:00|18.432629908086195|
|1       |1      |2021-01-10 00:00:00|19.52189844319622 |
|1       |1      |2021-01-17 00:00:00|17.888720210849673|
|1       |1      |2021-01-24 00:00:00|18.196130749170198|
|1       |1      |2021-01-31 00:00:00|17.741818604821212|
|1       |1      |2021-02-07 00:00:00|17.875982660232278|
+--------+-------+-------------------+------------------+
only showing top 6 rows



In [9]:
bt_forecasts = (
    sdf_raw
    .groupBy("store_id","item_id")
    .apply(backtest_fit_forecast)
    .cache()
)
bt_forecasts.count()


8000

In [10]:
FORECAST_DIR = "/content/forecasts_parquet"
(forecasts
  .repartition(1)               # just to get a single file for convenience
  .write.mode("overwrite")
  .parquet(FORECAST_DIR))
print("Saved forecasts to:", FORECAST_DIR)


Saved forecasts to: /content/forecasts_parquet


In [11]:
from pyspark.sql import functions as F, Window

H = 8  # same horizon you used

w = Window.partitionBy("store_id","item_id").orderBy(F.col("ds").desc())
lastH_actuals = (sdf_raw
    .withColumn("rn", F.row_number().over(w))
    .where(F.col("rn") <= H)
    .drop("rn"))

eval_df = (bt_forecasts.alias("f")
           .join(lastH_actuals.alias("a"),
                 on=["store_id","item_id","ds"], how="inner")
           .select("store_id","item_id","ds","a.y","f.yhat")
           .cache())
eval_df.show(6, truncate=False)
print("eval_df rows:", eval_df.count())


+--------+-------+-------------------+------------------+-----------------+
|store_id|item_id|ds                 |y                 |yhat             |
+--------+-------+-------------------+------------------+-----------------+
|4       |19     |2020-12-27 00:00:00|17.424108517592984|22.19792189600221|
|4       |19     |2020-12-20 00:00:00|10.24830665236501 |22.19792189600221|
|4       |19     |2020-12-13 00:00:00|16.880208187806403|22.19792189600221|
|4       |19     |2020-12-06 00:00:00|15.069959663021121|22.19792189600221|
|4       |19     |2020-11-29 00:00:00|16.548755516507995|22.19792189600221|
|4       |19     |2020-11-22 00:00:00|20.28239288922161 |22.19792189600221|
+--------+-------+-------------------+------------------+-----------------+
only showing top 6 rows

eval_df rows: 8000


In [13]:
from pyspark.sql import functions as F, types as T
import pandas as pd
import numpy as np

metric_schema = T.StructType([
    T.StructField("store_id", T.IntegerType(), False),
    T.StructField("item_id",  T.IntegerType(), False),
    T.StructField("rmse",     T.DoubleType(), True),
    T.StructField("mape",     T.DoubleType(), True),
    T.StructField("smape",    T.DoubleType(), True),
    T.StructField("n",        T.IntegerType(), True),
])

def evaluate_forecast(pdf: pd.DataFrame) -> pd.DataFrame:
    sid = int(pdf["store_id"].iloc[0])
    iid = int(pdf["item_id"].iloc[0])
    y    = pdf["y"].astype(float).values
    yhat = pdf["yhat"].astype(float).values
    n = len(y)

    rmse = float(np.sqrt(np.mean((y - yhat)**2))) if n else np.nan
    mask = (y != 0)
    mape = float(np.mean(np.abs((y[mask] - yhat[mask]) / y[mask]))) if mask.any() else np.nan
    denom = (np.abs(y) + np.abs(yhat))
    mask2 = (denom != 0)
    smape = float(np.mean(2*np.abs(y - yhat)[mask2] / denom[mask2])) if mask2.any() else np.nan

    return pd.DataFrame([{
        "store_id": sid, "item_id": iid, "rmse": rmse, "mape": mape, "smape": smape, "n": n
    }])

per_model_metrics = (
    eval_df
    .groupBy("store_id","item_id")
    .applyInPandas(evaluate_forecast, schema=metric_schema)
    .orderBy("rmse")
)

per_model_metrics.show(10, truncate=False)


+--------+-------+------------------+-------------------+-------------------+---+
|store_id|item_id|rmse              |mape               |smape              |n  |
+--------+-------+------------------+-------------------+-------------------+---+
|46      |6      |2.29708174780037  |0.08596043662653152|0.08107873968512481|8  |
|19      |19     |2.3405969888022136|0.09645478653464229|0.0897401545784619 |8  |
|48      |8      |2.3671454543765384|0.09023741841787944|0.08516959093641564|8  |
|1       |6      |2.4725953727087333|0.12104604515369524|0.11158686789852995|8  |
|41      |10     |2.5916782803802088|0.10461183112974799|0.09772844744624379|8  |
|6       |15     |2.611368887409247 |0.11371421288015085|0.10427135186484802|8  |
|35      |12     |2.6380204695697285|0.0928721790219631 |0.08527861137155032|8  |
|50      |2      |2.6786313024117008|0.09791202239740054|0.0910024032914927 |8  |
|42      |20     |2.7055566375180726|0.10137772776905044|0.09459250296174146|8  |
|4       |18    

In [14]:
overall = (per_model_metrics
           .agg(F.avg("rmse").alias("rmse_avg"),
                F.avg("mape").alias("mape_avg"),
                F.avg("smape").alias("smape_avg"),
                F.sum("n").alias("total_points")))
overall.show(truncate=False)

METRICS_DIR = "/content/forecast_metrics_parquet"
per_model_metrics.repartition(1).write.mode("overwrite").parquet(METRICS_DIR)
print("Saved metrics to:", METRICS_DIR)


+-----------------+-------------------+-------------------+------------+
|rmse_avg         |mape_avg           |smape_avg          |total_points|
+-----------------+-------------------+-------------------+------------+
|4.941213258390197|0.24849697962337464|0.21027958835810126|8000        |
+-----------------+-------------------+-------------------+------------+

Saved metrics to: /content/forecast_metrics_parquet


In [15]:
from pyspark.sql import functions as F

overall = (per_model_metrics
           .agg(F.avg("rmse").alias("rmse_avg"),
                F.avg("mape").alias("mape_avg"),
                F.avg("smape").alias("smape_avg"),
                F.sum("n").alias("total_points")))
overall.show(truncate=False)


+-----------------+-------------------+-------------------+------------+
|rmse_avg         |mape_avg           |smape_avg          |total_points|
+-----------------+-------------------+-------------------+------------+
|4.941213258390197|0.24849697962337464|0.21027958835810126|8000        |
+-----------------+-------------------+-------------------+------------+

