# Distributed Forecasting (Ray + StatsForecast)

In [1]:
import ray
import pandas as pd
import numpy as np
from statsforecast import StatsForecast
from statsforecast.models import AutoARIMA, Naive
import time

# 1. Initialize Ray
# We use ignore_reinit_error=True in case you run the cell twice
if ray.is_initialized():
    ray.shutdown()
ray.init(ignore_reinit_error=True)

# 2. Generate Synthetic Data (20 unique series)
n_series = 20
horizon = 7
dates = pd.date_range(start='2023-01-01', periods=100, freq='D')

series_list = []
for i in range(n_series):
    # Create random demand patterns
    y = np.random.poisson(lam=20 + np.random.rand()*10, size=len(dates))
    series_list.append(pd.DataFrame({
        'unique_id': f'store_{i}',
        'ds': dates, 
        'y': y
    }))

df = pd.concat(series_list)
print(f"Generated dataset with {len(df)} rows across {n_series} unique series.")

# 3. Define Distributed Logic
@ray.remote
def train_and_predict_chunk(df_chunk, h):
    models = [AutoARIMA(season_length=7), Naive()]
    
    # Correct Initialization
    sf = StatsForecast(models=models, freq='D', n_jobs=1) 
    
    # Correct Execution
    return sf.forecast(df=df_chunk, h=h)

# 4. Scatter (Map)
print(f"Distributing 20 series across Ray cluster...")
start_time = time.time()

# Split unique_ids into chunks (e.g., 4 chunks)
unique_ids = df['unique_id'].unique()
chunks = np.array_split(unique_ids, 4) 

futures = []
for chunk_ids in chunks:
    # Filter data for this chunk
    df_subset = df[df['unique_id'].isin(chunk_ids)]
    # Dispatch remote task
    futures.append(train_and_predict_chunk.remote(df_subset, h=horizon))

# 5. Gather (Reduce)
results = ray.get(futures)
forecast_df = pd.concat(results)
total_time = time.time() - start_time

print(f"Done! Processed {n_series} series in {total_time:.2f} seconds.")
print(forecast_df.head())

2026-02-03 14:34:45,455	INFO worker.py:1843 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m


Generated dataset with 2000 rows across 20 unique series.
Distributing 20 series across Ray cluster...
Done! Processed 20 series in 29.64 seconds.
  unique_id         ds  AutoARIMA  Naive
0   store_0 2023-04-11       24.5   26.0
1   store_0 2023-04-12       24.5   26.0
2   store_0 2023-04-13       24.5   26.0
3   store_0 2023-04-14       24.5   26.0
4   store_0 2023-04-15       24.5   26.0


# Hierarchical Reconciliation (MinT)

In [2]:
from hierarchicalforecast.core import HierarchicalReconciliation
from hierarchicalforecast.methods import BottomUp, MinTrace

# 1. Construct a simple Hierarchy
print("\n--- Hierarchical Reconciliation ---")

n_obs = 50
dates_h = pd.date_range('2023-01-01', periods=n_obs, freq='D')

# Region A ~ Normal(100, 10), Region B ~ Normal(200, 10)
y_a = np.random.normal(100, 10, n_obs)
y_b = np.random.normal(200, 10, n_obs)
y_total = y_a + y_b 

# Create DataFrame
df_h = pd.DataFrame([
    {'unique_id': 'Total', 'ds': d, 'y': y, 'y_hat': y + np.random.normal(0, 15)} for d, y in zip(dates_h, y_total)
] + [
    {'unique_id': 'RegionA', 'ds': d, 'y': y, 'y_hat': y + np.random.normal(0, 5)} for d, y in zip(dates_h, y_a)
] + [
    {'unique_id': 'RegionB', 'ds': d, 'y': y, 'y_hat': y + np.random.normal(0, 5)} for d, y in zip(dates_h, y_b)
])

# 2. Define the Summing Matrix (S_df)
S_df = pd.DataFrame(
    [[1, 1],  # Total
     [1, 0],  # RegionA
     [0, 1]], # RegionB
    columns=['RegionA', 'RegionB'],
    index=['Total', 'RegionA', 'RegionB']
)
S_df.index.name = 'unique_id'
S_df = S_df.reset_index()

tags = {
    'Total': ['Total'],
    'Total/Regions': ['RegionA', 'RegionB']
}

# 3. Apply MinTrace Reconciliation
Y_hat_df = df_h[['unique_id', 'ds', 'y_hat']]
Y_df = df_h[['unique_id', 'ds', 'y']]

reconcilers = [
    BottomUp(),
    MinTrace(method='ols') 
]

hrec = HierarchicalReconciliation(reconcilers=reconcilers)
Y_rec = hrec.reconcile(Y_hat_df=Y_hat_df, Y_df=Y_df, S_df=S_df, tags=tags)

# 4. Check Coherency
sample_date = dates_h[0]
rec_values = Y_rec[Y_rec['ds'] == sample_date].set_index('unique_id')

print(f"\nChecking Coherency for date: {sample_date.date()}")
mint_col = 'y_hat/MinTrace_method-ols'

# Check if the column exists, otherwise fallback (just in case of version variance)
if mint_col not in rec_values.columns:
    # Try finding it dynamically if the name is slightly different
    mint_col = [c for c in rec_values.columns if 'MinTrace' in c][0]

print(f"Using column: {mint_col}")
print(rec_values[['y_hat', mint_col]])

total_mint = rec_values.loc['Total', mint_col]
sum_bottom_mint = rec_values.loc['RegionA', mint_col] + rec_values.loc['RegionB', mint_col]

print(f"\nMinTrace Total: {total_mint:.4f}")
print(f"Sum of MinTrace Regions: {sum_bottom_mint:.4f}")
print(f"Difference: {total_mint - sum_bottom_mint:.10f} (Should be near zero)")


--- Hierarchical Reconciliation ---

Checking Coherency for date: 2023-01-01
Using column: y_hat/MinTrace_method-ols
                y_hat  y_hat/MinTrace_method-ols
unique_id                                       
Total      306.408645                 304.537567
RegionA    100.586664                 102.457742
RegionB    200.208748                 202.079826

MinTrace Total: 304.5376
Sum of MinTrace Regions: 304.5376
Difference: 0.0000000000 (Should be near zero)
