# Weighted Reconciliation with Spark

## Feng Li

### Guanghua School of Management
### Peking University


### [feng.li@gsm.pku.edu.cn](feng.li@gsm.pku.edu.cn)
### Course home page: [https://feng.li/bdcf](https://feng.li/bdcf)

## MinT-WLS


- MinT-WLS assumes a **diagonal forecast error covariance matrix** $ W $, where each diagonal element is the **variance of forecast errors** for each series (e.g., Region_Category). The formula becomes:

$$
\tilde{y} = S (S^\top W^{-1} S)^{-1} S^\top W^{-1} \hat{y}
$$

- $ \hat{y} $: base forecasts (from ETS, etc.)
- $ S $: summing matrix
- $ W $: diagonal matrix of **forecast error variances** from the training residuals


## How to Approximate MinT-WLS in Spark?

Since Spark is not optimized for full matrix ops, you can:

- Estimate **error variance per Region_Category** using training residuals.
- To compute forecast error variances for use in MinT-WLS, you need **in-sample forecasts** (i.e., forecasts over the training period, not just for the future 12 months). These are often called fitted values from the model.
- Use the **inverse variance** as weights.
- Perform a **weighted projection** manually (just like MinT-OLS, but with weights).

In [1]:
import os, sys # Ensure All environment variables are properly set 
# os.environ["JAVA_HOME"] = os.path.dirname(sys.executable)
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

from pyspark.sql import SparkSession # build Spark Session
spark = SparkSession.builder \
    .config("spark.ui.enabled", "false") \
    .config("spark.executor.memory", "16g") \
    .config("spark.executor.cores", "4") \
    .config("spark.cores.max", "32") \
    .config("spark.driver.memory", "30g") \
    .config("spark.sql.shuffle.partitions", "96") \
    .config("spark.memory.fraction", "0.8") \
    .config("spark.memory.storageFraction", "0.5") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.minExecutors", "4") \
    .config("spark.dynamicAllocation.maxExecutors", "8") \
    .appName("Spark Forecasting").getOrCreate()
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/25 15:48:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
train_sdf = spark.read.csv("../data/tourism/tourism_train.csv", header=True, inferSchema=True)
test_sdf = spark.read.csv("../data/tourism/tourism_test.csv", header=True, inferSchema=True)

## Spark approach

- You can modify the `ets_forecast` function to return both the **fitted values** (in-sample) and **forecast values** (out-of-sample) in a single column, while tagging them with a type column ("fitted" or "forecast"). 
- Later, you can easily split or filter.

In [3]:
# Modified ets_forecast Function: Fitted + Forecast in One UDF
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType

from pandas.tseries.offsets import MonthBegin
from statsmodels.tsa.holtwinters import ExponentialSmoothing
import pandas as pd

# Schema with an extra 'type' column to label fitted vs forecast
schema = StructType([
    StructField("date", DateType(), False),
    StructField("Region_Category", StringType(), False),
    StructField("Forecast", DoubleType(), False),
    StructField("type", StringType(), False)
])

def ets_fitted_and_forecast(pdf):
    region = pdf["Region_Category"].iloc[0]
    pdf = pdf.sort_values("date")

    try:
        ts = pdf["Visitors"].dropna()
        dates = pdf["date"]

        if len(ts) >= 24:
            model = ExponentialSmoothing(ts, trend="add", seasonal="add", seasonal_periods=12)
            fitted_model = model.fit()

            # Fitted values (same length as training data)
            fitted_values = fitted_model.fittedvalues
            fitted_df = pd.DataFrame({
                "date": dates[-len(fitted_values):].values,
                "Region_Category": region,
                "Forecast": fitted_values.values,
                "type": "fitted"
            })

            # Forecast for next 12 months
            forecast_values = fitted_model.forecast(steps=12)
            last_date = pdf["date"].max()
            forecast_dates = pd.date_range(start=last_date, periods=12, freq="ME") + MonthBegin(1)

            forecast_df = pd.DataFrame({
                "date": forecast_dates,
                "Region_Category": region,
                "Forecast": forecast_values.values,
                "type": "forecast"
            })

            result = pd.concat([fitted_df, forecast_df], ignore_index=True)

        else:
            result = pd.DataFrame({
                "date": pdf["date"],
                "Region_Category": region,
                "Forecast": [None] * len(pdf),
                "type": "fitted"
            })

    except:
        result = pd.DataFrame({
            "date": pdf["date"],
            "Region_Category": region,
            "Forecast": [None] * len(pdf),
            "type": "fitted"
        })

    return result

In [4]:
forecast_all_sdf = train_sdf.groupBy("Region_Category").applyInPandas(
    ets_fitted_and_forecast,
    schema=schema
)

forecast_all_sdf.show()

[Stage 6:>                                                          (0 + 1) / 1]

+----------+---------------+------------------+------+
|      date|Region_Category|          Forecast|  type|
+----------+---------------+------------------+------+
|1998-01-01|         AAAAll|3246.4056861089557|fitted|
|1998-02-01|         AAAAll|1846.3514350015978|fitted|
|1998-03-01|         AAAAll| 2013.827299893829|fitted|
|1998-04-01|         AAAAll| 2312.356250241453|fitted|
|1998-05-01|         AAAAll| 1976.696739197145|fitted|
|1998-06-01|         AAAAll|1886.1704427003526|fitted|
|1998-07-01|         AAAAll|2252.3437065440035|fitted|
|1998-08-01|         AAAAll|1997.6286143492266|fitted|
|1998-09-01|         AAAAll|2190.5384498174626|fitted|
|1998-10-01|         AAAAll| 2422.643967592123|fitted|
|1998-11-01|         AAAAll|2099.5706305548833|fitted|
|1998-12-01|         AAAAll| 2158.740350374741|fitted|
|1999-01-01|         AAAAll|3193.7236309368136|fitted|
|1999-02-01|         AAAAll|1617.9434116495956|fitted|
|1999-03-01|         AAAAll| 1779.115328477587|fitted|
|1999-04-0

                                                                                

In [5]:
from pyspark.sql.functions import explode, col

train_forecast_sdf = forecast_all_sdf.filter(col("type") == "fitted")
forecast_sdf = forecast_all_sdf.filter(col("type") == "forecast")

train_forecast_sdf.show()
forecast_sdf.show()

                                                                                

+----------+---------------+------------------+------+
|      date|Region_Category|          Forecast|  type|
+----------+---------------+------------------+------+
|1998-01-01|         AAAAll|3246.4056861089557|fitted|
|1998-02-01|         AAAAll|1846.3514350015978|fitted|
|1998-03-01|         AAAAll| 2013.827299893829|fitted|
|1998-04-01|         AAAAll| 2312.356250241453|fitted|
|1998-05-01|         AAAAll| 1976.696739197145|fitted|
|1998-06-01|         AAAAll|1886.1704427003526|fitted|
|1998-07-01|         AAAAll|2252.3437065440035|fitted|
|1998-08-01|         AAAAll|1997.6286143492266|fitted|
|1998-09-01|         AAAAll|2190.5384498174626|fitted|
|1998-10-01|         AAAAll| 2422.643967592123|fitted|
|1998-11-01|         AAAAll|2099.5706305548833|fitted|
|1998-12-01|         AAAAll| 2158.740350374741|fitted|
|1999-01-01|         AAAAll|3193.7236309368136|fitted|
|1999-02-01|         AAAAll|1617.9434116495956|fitted|
|1999-03-01|         AAAAll| 1779.115328477587|fitted|
|1999-04-0

[Stage 12:>                                                         (0 + 1) / 1]

+----------+---------------+------------------+--------+
|      date|Region_Category|          Forecast|    type|
+----------+---------------+------------------+--------+
|2015-12-01|         AAAAll| 2058.838101212888|forecast|
|2016-01-01|         AAAAll|3162.9085270260384|forecast|
|2016-02-01|         AAAAll|1744.1909768938476|forecast|
|2016-03-01|         AAAAll|2059.3010229302345|forecast|
|2016-04-01|         AAAAll|  2060.36170915585|forecast|
|2016-05-01|         AAAAll| 1972.482680954841|forecast|
|2016-06-01|         AAAAll| 1846.108381522378|forecast|
|2016-07-01|         AAAAll| 2151.959971338432|forecast|
|2016-08-01|         AAAAll| 1872.768734964083|forecast|
|2016-09-01|         AAAAll|2014.0112033543805|forecast|
|2016-10-01|         AAAAll|2296.4055573391643|forecast|
|2016-11-01|         AAAAll| 2043.693618904705|forecast|
|2015-12-01|         AAABus|455.55263433165527|forecast|
|2016-01-01|         AAABus| 296.1898590727801|forecast|
|2016-02-01|         AAABus| 45

                                                                                

In [6]:
from pyspark.sql.functions import col, pow, avg

# Extract In-Sample Residuals and Estimate Variance

# Step 1: Get fitted values only
fitted_sdf = forecast_all_sdf.filter(col("type") == "fitted")

# Step 2: Join with actual visitors from train_sdf to compute residuals
residuals_sdf = fitted_sdf.join(train_sdf, on=["date", "Region_Category"], how="inner") \
    .withColumn("squared_error", pow(col("Forecast") - col("Visitors"), 2))

# Step 3: Compute variance per Region_Category (mean squared error)
error_variance_sdf = residuals_sdf.groupBy("Region_Category").agg(
    avg("squared_error").alias("Forecast_Variance")
)


In [7]:
from pyspark.sql.functions import col, sum as spark_sum


# Load the summing matrix file
summing_matrix_path = "../data/tourism/agg_mat.csv"  # Update with actual path

# Load the summing matrix file (skip the first column)
summing_sdf = spark.read.csv(summing_matrix_path, header=True, inferSchema=True)


# Convert from wide format to long format (Region_Category, Parent_Group, Weight)
summing_sdf_long = summing_sdf.selectExpr(
    "Parent_Group",
    "stack(" + str(len(summing_sdf.columns) - 1) + ", " +
    ", ".join([f"'{col}', {col}" for col in summing_sdf.columns if col != "Parent_Group"]) +
    ") as (Region_Category, Weight)"
)

# Show the reshaped summing matrix
summing_sdf_long.show()

25/03/25 15:49:08 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+------------+---------------+------+
|Parent_Group|Region_Category|Weight|
+------------+---------------+------+
|    TotalAll|         AAAHol|   1.0|
|    TotalAll|         AAAVis|   1.0|
|    TotalAll|         AAABus|   1.0|
|    TotalAll|         AAAOth|   1.0|
|    TotalAll|         AABHol|   1.0|
|    TotalAll|         AABVis|   1.0|
|    TotalAll|         AABBus|   1.0|
|    TotalAll|         AABOth|   1.0|
|    TotalAll|         ABAHol|   1.0|
|    TotalAll|         ABAVis|   1.0|
|    TotalAll|         ABABus|   1.0|
|    TotalAll|         ABAOth|   1.0|
|    TotalAll|         ABBHol|   1.0|
|    TotalAll|         ABBVis|   1.0|
|    TotalAll|         ABBBus|   1.0|
|    TotalAll|         ABBOth|   1.0|
|    TotalAll|         ACAHol|   1.0|
|    TotalAll|         ACAVis|   1.0|
|    TotalAll|         ACABus|   1.0|
|    TotalAll|         ACAOth|   1.0|
+------------+---------------+------+
only showing top 20 rows



In [8]:
# Step 1: Extract 12-step forecasts
forecast_sdf = forecast_all_sdf.filter(col("type") == "forecast")

# Step 2: Join with summing matrix and forecast variances
forecast_wls_sdf = forecast_sdf.join(summing_sdf_long, on="Region_Category", how="inner") \
    .join(error_variance_sdf, on="Region_Category", how="inner")

In [9]:
from pyspark.sql.functions import sum as spark_sum

# Step 1: Compute weighted forecast (Forecast × Weight ÷ Variance)
forecast_wls_sdf = forecast_wls_sdf.withColumn(
    "Weighted_Forecast", col("Forecast") * col("Weight") / col("Forecast_Variance")
).withColumn(
    "Normalized_Weight", col("Weight") / col("Forecast_Variance")
)

# Step 2: Aggregate numerator and denominator per Parent_Group and date
numerator_sdf = forecast_wls_sdf.groupBy("date", "Parent_Group").agg(
    spark_sum("Weighted_Forecast").alias("Numerator"),
    spark_sum("Normalized_Weight").alias("Denominator")
)

# Step 3: Compute reconciled forecast
reconciled_wls_sdf = numerator_sdf.withColumn(
    "Reconciled_Forecast", col("Numerator") / col("Denominator")
)


In [11]:
from pyspark.sql.functions import abs

# Step 1: Prepare test set at Parent_Group level
test_parent_sdf = test_sdf.join(summing_sdf_long, on="Region_Category", how="inner") \
    .groupBy("date", "Parent_Group") \
    .agg(spark_sum("Visitors").alias("Actual_Visitors"))

# Step 2: Join with reconciled forecast and compute APE
evaluation_sdf = reconciled_wls_sdf.join(test_parent_sdf, on=["date", "Parent_Group"], how="inner") \
    .withColumn("APE", abs((col("Reconciled_Forecast") - col("Actual_Visitors")) / col("Actual_Visitors")))

# Step 3: Compute MAPE per Parent_Group
mape_sdf = evaluation_sdf.groupBy("Parent_Group").agg(avg("APE").alias("MAPE"))
mape_sdf.show()




+------------+------------------+
|Parent_Group|              MAPE|
+------------+------------------+
|      CBDAll|0.9990308809580983|
|       BCHol|0.9952348648671636|
|      BCBOth|0.9998273964952409|
|      DDBHol|0.9965713950479036|
|      CCBAll|0.9969357208629136|
|       CCOth|0.9993691868557918|
|      DCCAll|0.9998990568465357|
|      BDEAll|0.9998635234555749|
|      FBAVis|0.9996472874910226|
|      EABVis|0.9849344998635591|
|      GABVis|0.9997748506792165|
|      ADBAll|0.9982988253046847|
|      FAAHol|0.9929209612350367|
|      BDFAll|0.9998573749089767|
|      CBCHol| 0.996186434565634|
|      GBCAll|0.9995320207417416|
|      CDBHol| 0.998463935084155|
|      BEGAll|0.9989692918469983|
|       DABus| 0.999867619528087|
|      DAAVis|0.9899752829805119|
+------------+------------------+
only showing top 20 rows



                                                                                

In [12]:
from pyspark.sql.functions import mean

# Compute overall (mean of all Parent_Group MAPE values)
overall_mape = mape_sdf.agg(mean("MAPE").alias("Overall_MAPE"))

# Show the result
overall_mape.show()


                                                                                

+------------------+
|      Overall_MAPE|
+------------------+
|0.9972887320763939|
+------------------+



## Summary


- **MinT-WLS** in Spark is approximate — no matrix inversion is used. 

- For full matrix-based MinT with off-diagonal covariance, it requires distributed matrix inversion techniques 