# Corporate Hybrid Forecast Notebook ((Prophet vs ARIMA vs TBATS/ETS)) – v6


## 1. Imports & Config

In [None]:
"""
Capacity Forecast - Hybrid (Prophet / ARIMA / TBATS-ETS) - v6.1

Hotfix:
- Hardened 'forecast_per_department_monthly' to always include CV/weights columns.
- Hardened 'build_stability_report' to add missing CV/weights columns on the fly (avoids KeyError).

Keeps v6 features:
- Auto Christmas CSV with Xmas days counts (2024-2027)
- Exogenous features from:
   * case_reason.xlsx ('Global outage reported' proxy per dept)
   * christmas_holidays_*.csv (xmas days per month)
- Rate-per-workday modelling, wMAPE blending, robust smoothing (MAD), bias correction
- Monthly->Daily reconciliation
- Sheets: capacity_error, daily_capacity_plan, mape_table_cv, stability_report
"""

import os
import warnings
from typing import Optional, Dict, Tuple
import numpy as np
import pandas as pd

from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.tsa.holtwinters import ExponentialSmoothing
try:
    from prophet import Prophet
except Exception:
    Prophet = None
try:
    from tbats import TBATS
except Exception:
    TBATS = None

warnings.filterwarnings("ignore")

# ==================== Configuration ====================

# Inputs (adjust if your file locations change)
INCOMING_SOURCE_PATH = r"C:\Users\pt3canro\Desktop\CAPACITY\input_model\Incoming_new.xlsx"  # Sheet 'Main'
INCOMING_SHEET = "Main"
DEPT_MAP_PATH = r"C:\Users\pt3canro\Desktop\CAPACITY\input_model\department.xlsx"
DEPT_MAP_SHEET = "map"
PRODUCTIVITY_PATH = r"C:\Users\pt3canro\Desktop\CAPACITY\input_model\productivity_agents.xlsx"

# Outage proxy (case reasons)
CASE_REASON_PATH = r"C:\Users\pt3canro\Desktop\CAPACITY\input_model\case_reason.xlsx"
CASE_REASON_SHEET = "Main"             # provided by you
CASE_REASON_FILTER = "Global outage reported"

# Christmas holidays CSV
HOLIDAYS_CSV_PATH = r"C:\Users\pt3canro\Desktop\CAPACITY\input_model\christmas_holidays_2024_2027.csv"
HOLIDAYS_YEARS = [2024, 2025, 2026, 2027]
INCLUDE_JAN6 = True                    # include Jan 6 (common in ES/PT/IT)
INCLUDE_JAN_POSTXMAS = False           # monthly extra dummy for January (off by default)

# Output
OUTPUT_XLSX = r"C:\Users\pt3canro\Desktop\CAPACITY\outputs\capacity_forecast_hybrid.xlsx"

# Horizons and switches
H_MONTHS = 12             # monthly forecast horizon
DAILY_HORIZON_DAYS = 90   # daily plan horizon
REPORT_START_MONTH = "2025-01"  # show historical Actuals from this month in capacity_error

# Top-down reconciliation for daily forecasts
USE_DAILY_FROM_MONTHLY = True

# Optional final growth guard (disabled by default; enable if needed)

APPLY_LOCAL_GROWTH_GUARD = True
MAX_GROWTH = 1.6   # 60% of mean of historical month
MIN_GROWTH = 0.7   # Does not allow to fall more than 30%


# Language shares
LANGUAGE_SHARES = {
    'English': 0.6435,
    'French': 0.0741,
    'German': 0.0860,
    'Italian': 0.0667,
    'Portuguese': 0.0162,
    'Spanish': 0.1135
}

## 2. Load & Clean Data (2023–Current)

In [11]:

def load_incoming(path: str, sheet_name: Optional[str] = None) -> pd.DataFrame:
    """Load daily incoming volumes. Build ticket_total if needed."""
    if not os.path.exists(path):
        raise FileNotFoundError(f"Incoming file not found:\n{path}\n")
    ext = os.path.splitext(path)[1].lower()
    if ext in [".xlsx", ".xlsm", ".xls"]:
        if not sheet_name:
            raise ValueError("Excel file detected but no sheet_name provided (e.g., 'Main').")
        df = pd.read_excel(path, sheet_name=sheet_name, engine="openpyxl")
    elif ext == ".csv":
        df = pd.read_csv(path)
    else:
        raise ValueError(f"Unsupported extension for incoming data: {ext}")

    base_required = {'Date', 'department_id'}
    missing = base_required - set(df.columns)
    if missing:
        raise ValueError(f"Incoming file must contain {sorted(list(base_required))}. Found: {list(df.columns)}")

    if 'ticket_total' not in df.columns:
        if 'total_incoming' in df.columns:
            df['ticket_total'] = pd.to_numeric(df['total_incoming'], errors='coerce').fillna(0)
        elif {'incoming_from_customers', 'incoming_from_transfers'}.issubset(df.columns):
            df['ticket_total'] = (
                pd.to_numeric(df['incoming_from_customers'], errors='coerce').fillna(0) +
                pd.to_numeric(df['incoming_from_transfers'], errors='coerce').fillna(0)
            )
        else:
            raise ValueError("Missing 'ticket_total' or components to create it.")

    df['Date'] = pd.to_datetime(df['Date'], errors='coerce')
    if df['Date'].isna().any():
        bad = df.loc[df['Date'].isna()]
        raise ValueError(f"Some Date values could not be parsed. Example rows:\n{bad.head(5)}")
    df['department_id'] = df['department_id'].astype(str).str.strip()
    df['ticket_total'] = pd.to_numeric(df['ticket_total'], errors='coerce').fillna(0).astype(float)

    if 'department_name' in df.columns:
        df['department_name'] = df['department_name'].astype(str).str.strip()
    else:
        df['department_name'] = None
    if 'vertical' in df.columns:
        df['vertical'] = df['vertical'].astype(str).str.strip()

    return df


def load_dept_map(path: str, sheet: Optional[str] = None) -> pd.DataFrame:
    """Load dept mapping -> department_name, vertical."""
    if not os.path.exists(path):
        return pd.DataFrame(columns=['department_id', 'department_name', 'vertical'])

    ext = os.path.splitext(path)[1].lower()
    if ext in (".xlsx", ".xlsm", ".xls"):
        if sheet:
            mp = pd.read_excel(path, sheet_name=sheet, engine="openpyxl")
        else:
            xls = pd.ExcelFile(path, engine="openpyxl")
            mp = pd.read_excel(xls, sheet_name=xls.sheet_names[0])
    else:
        mp = pd.read_csv(path)

    rename_map = {
        'dept_id': 'department_id',
        'dept_name': 'department_name',
        'name': 'department_name',
        'segment': 'vertical',
        'vertical_name': 'vertical'
    }
    mp = mp.rename(columns={k: v for k, v in rename_map.items() if k in mp.columns})
    if 'department_id' not in mp.columns:
        raise ValueError(f"Department map must contain 'department_id'. Found: {list(mp.columns)}")

    mp['department_id'] = mp['department_id'].astype(str).str.strip()
    mp['department_name'] = (mp['department_name'].astype(str).str.strip()
                             if 'department_name' in mp.columns else None)
    mp['vertical'] = (mp['vertical'].astype(str).str.strip()
                      if 'vertical' in mp.columns else None)

    return mp[['department_id', 'department_name', 'vertical']].drop_duplicates('department_id')


def load_productivity(path: str) -> pd.DataFrame:
    """Load agent productivity and compute dept-level mean tickets/agent-day."""
    if not os.path.exists(path):
        raise FileNotFoundError(f"Productivity file not found: {path}")
    df = pd.read_excel(path, engine="openpyxl")
    req = {'Date', 'agent_id', 'department_id', 'prod_total_model'}
    missing = req - set(df.columns)
    if missing:
        raise ValueError(f"productivity_agents.xlsx missing columns: {sorted(list(missing))}. Found: {list(df.columns)}")
    df['Date'] = pd.to_datetime(df['Date'], errors='coerce')
    df['department_id'] = df['department_id'].astype(str).str.strip()
    df['prod_total_model'] = pd.to_numeric(df['prod_total_model'], errors='coerce')

    prod_dept = (df.groupby('department_id', as_index=False)['prod_total_model']
                 .mean()
                 .rename(columns={'prod_total_model': 'avg_tickets_per_agent_day'}))
    return prod_dept

## 3. helpers, metrics and exogenous feautures

In [None]:
def business_days_in_month(year: int, month: int) -> int:
    """Approximate Mon-Fri working days in a month."""
    rng = pd.date_range(start=pd.Timestamp(year=year, month=month, day=1),
                        end=pd.Timestamp(year=year, month=month, day=1) + pd.offsets.MonthEnd(0),
                        freq='D')
    return int(np.sum(rng.weekday < 5))


def smape(y_true, y_pred) -> float:
    """sMAPE robust for intermittent series."""
    y_true = np.array(y_true, dtype=float)
    y_pred = np.array(y_pred, dtype=float)
    denom = (np.abs(y_true) + np.abs(y_pred))
    denom[denom == 0] = 1.0
    return float(np.mean(2.0 * np.abs(y_pred - y_true) / denom) * 100.0)


def wmape(y_true, y_pred) -> float:
    """Weighted MAPE: sum(|e|)/sum(|y|)."""
    y_true = np.array(y_true, dtype=float)
    y_pred = np.array(y_pred, dtype=float)
    denom = np.sum(np.abs(y_true))
    if denom <= 0:
        return 200.0
    return float(100.0 * (np.sum(np.abs(y_true - y_pred)) / denom))


def apply_mapping(incoming: pd.DataFrame, mapping: pd.DataFrame) -> pd.DataFrame:
    """Merge department_name / vertical using department_id."""
    merged = incoming.merge(mapping, on='department_id', how='left', suffixes=('', '_map'))
    if 'department_name' not in merged.columns:
        merged['department_name'] = None
    if 'department_name_map' not in merged.columns:
        merged['department_name_map'] = None
    merged['department_name'] = merged['department_name'].fillna(merged['department_name_map']).fillna("Unknown")

    if 'vertical' not in merged.columns:
        merged['vertical'] = None
    if 'vertical_map' not in merged.columns:
        merged['vertical_map'] = None
    merged['vertical'] = merged['vertical'].fillna(merged['vertical_map']).fillna("Unmapped")

    drop_cols = [c for c in merged.columns if c.endswith('_map')]
    merged.drop(columns=drop_cols, inplace=True, errors='ignore')
    return merged


def winsorize_monthly(ts_m: pd.Series, lower_q: float = 0.01, upper_q: float = 0.99) -> pd.Series:
    """Winsorize monthly series to reduce the influence of extreme outliers."""
    if ts_m.empty:
        return ts_m
    lo = ts_m.quantile(lower_q)
    hi = ts_m.quantile(upper_q)
    return ts_m.clip(lower=lo, upper=hi)

# ---------- Safe inverse & dynamic cap ----------

def expm1_safe(log_vals: np.ndarray, cap_original: Optional[float] = None) -> np.ndarray:
    """
    Safe inverse of log1p:
    - replace non-finite logs by a very negative number (-> ~0)
    - lower-bound logs to avoid underflow
    - optional cap on original scale applied in log-domain and after expm1
    """
    x = np.array(log_vals, dtype=float)
    x[~np.isfinite(x)] = -50.0
    x = np.maximum(x, -50.0)

    if cap_original is not None and np.isfinite(cap_original) and cap_original > 0:
        log_cap = np.log1p(cap_original)
        x = np.minimum(x, log_cap)

    y = np.expm1(x)
    if cap_original is not None and np.isfinite(cap_original) and cap_original > 0:
        y = np.minimum(y, cap_original)
    return np.clip(y, 0, None)


def compute_dynamic_cap(ts_m: pd.Series) -> float:
    """Generous per-department cap on the original scale to prevent explosions."""
    if ts_m.empty or (ts_m.max() <= 0):
        return np.inf
    m12 = float(ts_m.tail(12).mean()) if len(ts_m) >= 3 else float(ts_m.mean())
    med = float(ts_m.median())
    mx = float(ts_m.max())
    base = max(1.0, m12, med, 1.1 * mx)
    cap = base * 2.0  # adjust 2.0–6.0 as needed
    return cap

# ---------- Rate modelling, Xmas CSV and robust smoothing ----------

def monthly_rate_series(ts_m: pd.Series) -> Tuple[pd.Series, pd.Series]:
    """Return (rate_per_workday, workdays series aligned to ts_m)."""
    w = ts_m.index.to_series().apply(lambda p: business_days_in_month(p.start_time.year, p.start_time.month))
    w = w.astype(float).replace(0, np.nan)
    rate = ts_m / w
    return rate, w


def robust_roll_cap(series: pd.Series, window: int = 12, K: float = 6.0) -> pd.Series:
    """Apply rolling Median ± K*MAD cap to stabilize spikes without flattening the series."""
    s = series.copy().astype(float)
    vals = s.values
    for i in range(len(s)):
        lo = max(0, i - window)
        ref = vals[lo:i] if i > 0 else []
        if len(ref) >= 4:
            med = np.median(ref)
            mad = np.median(np.abs(ref - med)) + 1e-9
            upper = med + K * mad
            lower = max(0.0, med - K * mad)
            vals[i] = min(max(vals[i], lower), upper)
        else:
            vals[i] = max(vals[i], 0.0)
    return pd.Series(vals, index=s.index)

# ==================== Christmas Holidays CSV ====================

def ensure_christmas_csv(path: str = HOLIDAYS_CSV_PATH,
                         years = HOLIDAYS_YEARS,
                         include_jan6: bool = INCLUDE_JAN6) -> str:
    """Create a CSV with core Christmas holidays if it doesn't exist."""
    if os.path.exists(path):
        return path

    rows = []
    for y in years:
        rows.append((f"{y}-12-24", "Christmas Eve", 1))
        rows.append((f"{y}-12-25", "Christmas Day", 1))
        rows.append((f"{y}-12-26", "Boxing/St. Stephen", 1))
        rows.append((f"{y}-12-31", "New Year Eve", 1))
        ny = y + 1
        rows.append((f"{ny}-01-01", "New Year Day", 1))
        if include_jan6:
            rows.append((f"{ny}-01-06", "Epiphany", 1))

    os.makedirs(os.path.dirname(path), exist_ok=True)
    pd.DataFrame(rows, columns=["date","label","is_xmas"]).to_csv(path, index=False, encoding="utf-8")
    return path


def load_christmas_csv(path: str = HOLIDAYS_CSV_PATH) -> pd.DataFrame:
    """Load the Christmas holidays CSV (ensure it first)."""
    ensure_christmas_csv(path)
    df = pd.read_csv(path)
    df['date'] = pd.to_datetime(df['date'], errors='coerce')
    df['is_xmas'] = pd.to_numeric(df['is_xmas'], errors='coerce').fillna(0).astype(int)
    return df[['date','label','is_xmas']]

# ==================== Outage proxy (case_reason.xlsx) ====================

def load_case_reason_proxy(path: str = CASE_REASON_PATH,
                           sheet=CASE_REASON_SHEET) -> pd.DataFrame:
    """Load case_reason.xlsx and keep only rows that can act as outage proxy."""
    if not os.path.exists(path):
        return pd.DataFrame(columns=['Date', 'department_id', 'case_reason'])

    df = pd.read_excel(path, sheet_name=sheet, engine="openpyxl")

    required = {'Date', 'department_id'}
    missing = required - set(df.columns)
    if missing:
        raise ValueError(f"case_reason.xlsx must contain {sorted(list(required))}. "
                         f"Found: {list(df.columns)}")

    df['Date'] = pd.to_datetime(df['Date'], errors='coerce')
    df['department_id'] = df['department_id'].astype(str).str.strip()
    if 'case_reason' in df.columns:
        df['case_reason'] = df['case_reason'].astype(str).str.strip()
        if CASE_REASON_FILTER:
            df = df[df['case_reason'].fillna('') == CASE_REASON_FILTER]

    df = df.dropna(subset=['Date','department_id'])
    return df[['Date', 'department_id', 'case_reason']].copy()

    # ==================== Monthly exogenous features ====================

def build_monthly_exog_from_proxy_and_xmas(month_index: pd.PeriodIndex,
                                           department_id: str,
                                           case_reason_df: pd.DataFrame,
                                           xmas_df: pd.DataFrame) -> pd.DataFrame:
    """
    Build monthly exogenous features for a given department:
      - outage_cases_z: z-scored count of 'Global outage reported' tickets per month (proxy)
      - xmas_days_cnt_z: z-scored count of xmas days per month from the CSV
    Returns DataFrame indexed by month.
    """
    X = pd.DataFrame(index=month_index)

    # ---- Outage proxy aggregation (per department) ----
    if case_reason_df is not None and not case_reason_df.empty:
        tmp = case_reason_df.copy()
        # ✅ fix: cast department_id correctly (no keyword arg in str())
        tmp = tmp[tmp['department_id'] == str(department_id)]
        tmp['month'] = tmp['Date'].dt.to_period('M')
        cnt = (tmp.groupby('month', as_index=False)
                  .size()
                  .rename(columns={'size': 'outage_cases'})
                  .set_index('month')
                  .reindex(month_index)
                  .fillna(0.0))
        mu = float(cnt['outage_cases'].mean())
        sd = float(cnt['outage_cases'].std(ddof=0)) + 1e-6
        X['outage_cases_z'] = (cnt['outage_cases'] - mu) / sd
    else:
        X['outage_cases_z'] = 0.0

    # ---- Xmas days per month (from CSV) ----
    if xmas_df is not None and not xmas_df.empty:
        mm = xmas_df.copy()
        mm = mm[mm['is_xmas'] == 1]
        mm['month'] = mm['date'].dt.to_period('M')
        cntx = (mm.groupby('month', as_index=False)
                  .size()
                  .rename(columns={'size': 'xmas_days_cnt'})
                  .set_index('month')
                  .reindex(month_index)
                  .fillna(0.0))
        mux = float(cntx['xmas_days_cnt'].mean())
        sdx = float(cntx['xmas_days_cnt'].std(ddof=0)) + 1e-6
        X['xmas_days_cnt_z'] = (cntx['xmas_days_cnt'] - mux) / sdx
    else:
        X['xmas_days_cnt_z'] = 0.0

    # ---- Optional January post-Christmas dummy ----
    X['jan_postxmas'] = [1.0 if (INCLUDE_JAN_POSTXMAS and p.start_time.month == 1) else 0.0
                         for p in month_index]

    return X.fillna(0.0).astype(float)

## 4. Monthly modelling (log-scale, rate-aware, with exog)

In [13]:
def fit_prophet_monthly_log(ts_m: pd.Series, is_rate: bool = False,
                            exog_train: Optional[pd.DataFrame] = None):
    """Fit Prophet on log1p(ts_m) with optional exogenous regressors."""
    if Prophet is None:
        return None, None

    y = np.log1p(ts_m.values)
    dfp = pd.DataFrame({'ds': ts_m.index.to_timestamp(), 'y': y})
    m = Prophet(weekly_seasonality=False, yearly_seasonality=True, daily_seasonality=False)

    exog_cols = []
    if exog_train is not None and not exog_train.empty:
        ex_al = exog_train.reindex(ts_m.index).fillna(0.0)
        for c in ex_al.columns:
            m.add_regressor(c, standardize=True)
            dfp[c] = ex_al[c].values
            exog_cols.append(c)

    m.fit(dfp)

    def fcast(h_months=H_MONTHS, future_workdays: Optional[pd.Series] = None,
              exog_future: Optional[pd.DataFrame] = None):
        future = m.make_future_dataframe(periods=h_months, freq='MS')
        if exog_future is not None and not exog_future.empty:
            exf = exog_future.copy()
            exf = exf.reindex(pd.PeriodIndex(future['ds'], freq='M')).fillna(0.0)
            for c in exog_cols:
                future[c] = exf[c].values

        pred = m.predict(future)
        pred = pred.set_index(pd.PeriodIndex(pred['ds'], freq='M'))['yhat'].iloc[-h_months:]

        vals = expm1_safe(pred.values, cap_original=None if is_rate else compute_dynamic_cap(ts_m))
        if is_rate and future_workdays is not None:
            vals = vals * future_workdays.values
        return pd.Series(vals, index=pred.index)

    return m, fcast


def fit_arima_monthly_log(ts_m: pd.Series, is_rate: bool = False,
                          exog_train: Optional[pd.DataFrame] = None):
    """SARIMAX on log1p(ts_m) with a conservative grid and exogenous support."""
    y = np.log1p(ts_m)
    X = None
    if exog_train is not None and not exog_train.empty:
        X = exog_train.reindex(ts_m.index).fillna(0.0).values

    best_aic, best_model = np.inf, None
    pqs = [0, 1]
    seasonal = len(ts_m) >= 12
    PsQs = [0, 1] if seasonal else [0]

    for p in pqs:
        for d in ([1] if len(ts_m) < 36 else [0, 1]):
            for q in pqs:
                for P in PsQs:
                    for D in ([0, 1] if seasonal else [0]):
                        for Q in PsQs:
                            try:
                                model = SARIMAX(
                                    y, order=(p, d, q),
                                    seasonal_order=(P, D, Q, 12 if seasonal else 0),
                                    exog=X,
                                    enforce_stationarity=False,
                                    enforce_invertibility=False
                                ).fit(disp=False)
                                if model.aic < best_aic:
                                    best_aic = model.aic
                                    best_model = model
                            except Exception:
                                continue

    def fcast(h_months=H_MONTHS, future_workdays: Optional[pd.Series] = None,
              exog_future: Optional[pd.DataFrame] = None):
        Xf = None
        if exog_future is not None and not exog_future.empty:
            Xf = exog_future.iloc[:h_months].fillna(0.0).values
        fc_log = best_model.get_forecast(h_months, exog=Xf).predicted_mean
        idx = pd.period_range(ts_m.index[-1] + 1, periods=h_months, freq='M')
        vals = expm1_safe(fc_log, cap_original=None if is_rate else compute_dynamic_cap(ts_m))
        if is_rate and future_workdays is not None:
            vals = vals * future_workdays.values
        return pd.Series(vals, index=idx)

    return best_model, fcast


def fit_tbats_or_ets_monthly_log(ts_m: pd.Series, is_rate: bool = False):
    """TBATS on log1p(ts_m) if available; else ETS (log1p)."""
    y_log = np.log1p(ts_m)

    if TBATS is not None and len(ts_m) >= 12:
        y_log_ts = pd.Series(y_log.values, index=ts_m.index.to_timestamp())
        estimator = TBATS(use_arma_errors=False, seasonal_periods=[12])
        model = estimator.fit(y_log_ts)

        def fcast(h_months=H_MONTHS, future_workdays: Optional[pd.Series] = None):
            vals_log = model.forecast(steps=h_months)
            idx = pd.period_range(ts_m.index[-1] + 1, periods=h_months, freq='M')
            vals = expm1_safe(vals_log, cap_original=None if is_rate else compute_dynamic_cap(ts_m))
            if is_rate and future_workdays is not None:
                vals = vals * future_workdays.values
            return pd.Series(vals, index=idx)

        return model, fcast

    else:
        seasonal = 12 if len(ts_m) >= 24 else None
        model = ExponentialSmoothing(y_log, trend='add',
                                     seasonal=('add' if seasonal else None),
                                     seasonal_periods=seasonal).fit()

        def fcast(h_months=H_MONTHS, future_workdays: Optional[pd.Series] = None):
            vals_log = model.forecast(h_months)
            idx = pd.period_range(ts_m.index[-1] + 1, periods=h_months, freq='M')
            vals = expm1_safe(vals_log, cap_original=None if is_rate else compute_dynamic_cap(ts_m))
            if is_rate and future_workdays is not None:
                vals = vals * future_workdays.values
            return pd.Series(vals, index=idx)

        return model, fcast


def fit_ets_damped_monthly_log(ts_m: pd.Series, is_rate: bool = False):
    """ETS with damped trend on log1p(ts_m); stable candidate for blending."""
    y = np.log1p(ts_m)
    seasonal = 12 if len(ts_m) >= 24 else None
    model = ExponentialSmoothing(y, trend='add', damped_trend=True,
                                 seasonal=('add' if seasonal else None),
                                 seasonal_periods=seasonal).fit()

    def fcast(h_months=H_MONTHS, future_workdays: Optional[pd.Series] = None):
        vals_log = model.forecast(h_months)
        idx = pd.period_range(ts_m.index[-1] + 1, periods=h_months, freq='M')
        vals = expm1_safe(vals_log, cap_original=None if is_rate else compute_dynamic_cap(ts_m))
        if is_rate and future_workdays is not None:
            vals = vals * future_workdays.values
        return pd.Series(vals, index=idx)

    return model, fcast

# ==================== Adaptive CV (rate-aware) ====================

def rolling_cv_monthly_adaptive_rate(ts_vol: pd.Series) -> Tuple[Optional[Dict[str, float]], Optional[Dict[str, float]]]:
    """Adaptive rolling-origin CV using rate modelling internally (returns sMAPE and wMAPE dicts)."""
    n = len(ts_vol)
    if n < 9:
        return None, None
    h = 3 if n >= 15 else 1
    min_train = max(12, n - (h + 2))

    s_out, w_out = [], []
    for start in range(min_train, n - h + 1):
        train_vol = ts_vol.iloc[:start]
        test_vol = ts_vol.iloc[start:start + h]

        train_rate, _ = monthly_rate_series(train_vol)
        future_idx = pd.period_range(train_vol.index[-1] + 1, periods=h, freq='M')
        future_w = future_idx.to_series().apply(lambda p: business_days_in_month(p.start_time.year, p.start_time.month)).astype(float)

        s_metrics, w_metrics = {}, {}

        # Prophet
        mp, fp = fit_prophet_monthly_log(train_rate, is_rate=True)
        if fp is not None:
            try:
                pred_vol = fp(h_months=h, future_workdays=future_w)
                pv = np.array(pred_vol.values[:h], dtype=float)
                pv[~np.isfinite(pv)] = np.nan
                s_metrics['Prophet'] = 200.0 if np.isnan(pv).all() else smape(test_vol.values, np.nan_to_num(pv, nan=0.0))
                w_metrics['Prophet'] = 200.0 if np.isnan(pv).all() else wmape(test_vol.values, np.nan_to_num(pv, nan=0.0))
            except Exception:
                s_metrics['Prophet'] = 200.0; w_metrics['Prophet'] = 200.0

        # ARIMA
        try:
            ma, fa = fit_arima_monthly_log(train_rate, is_rate=True)
            pred_vol = fa(h_months=h, future_workdays=future_w)
            pv = np.array(pred_vol.values[:h], dtype=float)
            pv[~np.isfinite(pv)] = np.nan
            s_metrics['ARIMA'] = 200.0 if np.isnan(pv).all() else smape(test_vol.values, np.nan_to_num(pv, nan=0.0))
            w_metrics['ARIMA'] = 200.0 if np.isnan(pv).all() else wmape(test_vol.values, np.nan_to_num(pv, nan=0.0))
        except Exception:
            s_metrics['ARIMA'] = 200.0; w_metrics['ARIMA'] = 200.0

        # TBATS/ETS
        try:
            mt, ft = fit_tbats_or_ets_monthly_log(train_rate, is_rate=True)
            pred_vol = ft(h_months=h, future_workdays=future_w)
            pv = np.array(pred_vol.values[:h], dtype=float)
            pv[~np.isfinite(pv)] = np.nan
            s_metrics['TBATS/ETS'] = 200.0 if np.isnan(pv).all() else smape(test_vol.values, np.nan_to_num(pv, nan=0.0))
            w_metrics['TBATS/ETS'] = 200.0 if np.isnan(pv).all() else wmape(test_vol.values, np.nan_to_num(pv, nan=0.0))
        except Exception:
            s_metrics['TBATS/ETS'] = 200.0; w_metrics['TBATS/ETS'] = 200.0

        # ETS Damped
        try:
            me, fe = fit_ets_damped_monthly_log(train_rate, is_rate=True)
            pred_vol = fe(h_months=h, future_workdays=future_w)
            pv = np.array(pred_vol.values[:h], dtype=float)
            pv[~np.isfinite(pv)] = np.nan
            s_metrics['ETS_Damped'] = 200.0 if np.isnan(pv).all() else smape(test_vol.values, np.nan_to_num(pv, nan=0.0))
            w_metrics['ETS_Damped'] = 200.0 if np.isnan(pv).all() else wmape(test_vol.values, np.nan_to_num(pv, nan=0.0))
        except Exception:
            s_metrics['ETS_Damped'] = 200.0; w_metrics['ETS_Damped'] = 200.0

        s_out.append(s_metrics); w_out.append(w_metrics)

    sm = pd.DataFrame(s_out).mean().to_dict()
    wm = pd.DataFrame(w_out).mean().to_dict()
    return sm, wm

## 5. Blending

In [14]:
def select_or_blend_forecasts(fc_dict: Dict[str, pd.Series],
                              cv_scores_wmape: Dict[str, float],
                              blend: bool = True):
    """Blend using 1/wMAPE as weights (lower better)."""
    scores = {k: (v if v is not None and np.isfinite(v) else 1e6) for k, v in cv_scores_wmape.items()}
    models = [m for m in fc_dict.keys() if m in scores]
    if not models:
        k0 = list(fc_dict.keys())[0]
        return fc_dict[k0], {'winner': k0, 'weights': {k0: 1.0}}

    if not blend:
        best = min(models, key=lambda m: scores[m])
        return fc_dict[best], {'winner': best, 'weights': {best: 1.0}}

    inv = {m: (1.0 / scores[m] if scores[m] > 0 else 0.0) for m in models}
    total = sum(inv.values())
    if total == 0:
        best = min(models, key=lambda m: scores[m])
        return fc_dict[best], {'winner': best, 'weights': {best: 1.0}}
    w = {m: inv[m] / total for m in models}

    idx = None
    for s in fc_dict.values():
        idx = s.index if idx is None else idx.union(s.index)
    blended = sum(w[m] * fc_dict[m].reindex(idx).fillna(0) for m in models)
    return blended, {'winner': min(models, key=lambda m: scores[m]), 'weights': w}

## 6. Monthly pipeline

In [15]:

def build_monthly_series(df: pd.DataFrame) -> pd.DataFrame:
    """Aggregate daily incoming to monthly by department."""
    df = df.copy()
    df['month'] = df['Date'].dt.to_period('M')
    monthly = (df.groupby(['department_id', 'month'], as_index=False)['ticket_total']
               .sum()
               .rename(columns={'ticket_total': 'incoming_monthly'}))
    return monthly


def bias_correction(blended: pd.Series, hist_actuals: pd.Series, window: int = 6) -> pd.Series:
    """Simple bias correction using rolling ratio (actual/pred)."""
    df = pd.concat([hist_actuals, blended], axis=1)
    df.columns = ['y', 'yhat']
    df = df.dropna()
    if len(df) >= 3:
        ratio = (df['y'] / df['yhat']).tail(window).clip(lower=0.5, upper=1.5).mean()
        return blended * float(ratio)
    return blended


def forecast_per_department_monthly(monthly: pd.DataFrame,
                                    case_reason_df: pd.DataFrame,
                                    xmas_df: pd.DataFrame) -> pd.DataFrame:
    """
    Rate-aware hybrid with exogenous features + adaptive CV + robust sanitation.
    Ensures CV/weights columns always exist.
    """
    out_rows = []
    dept_ids = monthly['department_id'].unique().tolist()

    for dept in dept_ids:
        ts_vol = (monthly.loc[monthly['department_id'] == dept, ['month', 'incoming_monthly']]
                  .sort_values('month')
                  .set_index('month')['incoming_monthly'])
        if not pd.api.types.is_period_dtype(ts_vol.index):
            ts_vol.index = pd.PeriodIndex(ts_vol.index, freq='M')
        if len(ts_vol) == 0:
            continue

        # Winsorize + rate
        ts_vol = winsorize_monthly(ts_vol, 0.01, 0.99)
        ts_rate, _ = monthly_rate_series(ts_vol)
        ts_rate = ts_rate.fillna(ts_rate.median()).clip(lower=0)

        # Future index & workdays
        future_idx = pd.period_range(ts_vol.index[-1] + 1, periods=H_MONTHS, freq='M')
        future_w = future_idx.to_series().apply(lambda p: business_days_in_month(p.start_time.year, p.start_time.month)).astype(float)

        # EXOG construction (train + future)
        X_train = build_monthly_exog_from_proxy_and_xmas(ts_rate.index, str(dept), case_reason_df, xmas_df)
        X_future = build_monthly_exog_from_proxy_and_xmas(future_idx,      str(dept), case_reason_df, xmas_df)

        # CV (rate-aware)
        try:
            cv_smape, cv_wmape = rolling_cv_monthly_adaptive_rate(ts_vol)
            cv_smape = cv_smape or {}
            cv_wmape = cv_wmape or {}
        except Exception:
            cv_smape, cv_wmape = {}, {}

        # Collect forecasts (on volume scale)
        fc_dict: Dict[str, pd.Series] = {}

        # Prophet (with exog)
        if Prophet is not None and len(ts_rate) >= 12:
            try:
                _, fp = fit_prophet_monthly_log(ts_rate, is_rate=True, exog_train=X_train)
                if fp is not None:
                    fc_dict['Prophet'] = fp(H_MONTHS, future_workdays=future_w, exog_future=X_future)
            except Exception:
                pass

        # ARIMA (SARIMAX with exog)
        try:
            _, fa = fit_arima_monthly_log(ts_rate, is_rate=True, exog_train=X_train)
            fc_dict['ARIMA'] = fa(H_MONTHS, future_workdays=future_w, exog_future=X_future)
        except Exception:
            pass

        # TBATS/ETS (no exog)
        try:
            _, ft = fit_tbats_or_ets_monthly_log(ts_rate, is_rate=True)
            fc_dict['TBATS/ETS'] = ft(H_MONTHS, future_workdays=future_w)
        except Exception:
            pass

        # ETS Damped (no exog)
        try:
            _, fe = fit_ets_damped_monthly_log(ts_rate, is_rate=True)
            fc_dict['ETS_Damped'] = fe(H_MONTHS, future_workdays=future_w)
        except Exception:
            pass

        if not fc_dict:
            idx = pd.period_range(ts_vol.index[-1] + 1, periods=H_MONTHS, freq='M')
            val = max(0.0, float(ts_vol.mean()))
            fc_dict['NaiveMean'] = pd.Series([val] * H_MONTHS, index=idx)

        # Blend/select using wMAPE
        blended, meta = select_or_blend_forecasts(fc_dict, cv_scores_wmape=cv_wmape, blend=True)

        # Enforce finiteness
        if not np.isfinite(blended.values).all():
            finite_mask = np.isfinite(blended.values)
            if finite_mask.any():
                finite_mean = float(np.nanmean(blended.values[finite_mask]))
                vals = np.where(finite_mask, blended.values, finite_mean)
                blended = pd.Series(vals, index=blended.index)
            else:
                idx = pd.period_range(ts_vol.index[-1] + 1, periods=H_MONTHS, freq='M')
                val = max(0.0, float(ts_vol.mean()))
                blended = pd.Series([val] * H_MONTHS, index=idx)

        # Robust smoothing (Median ± K·MAD) + bias correction
        blended = robust_roll_cap(blended, window=12, K=6.0)
        blended = bias_correction(blended, ts_vol, window=6)

        # Optional growth guard
        if APPLY_LOCAL_GROWTH_GUARD:
            ref = max(1.0, float(ts_vol.tail(12).mean())) if len(ts_vol) else 1.0
            blended = blended.clip(lower=ref * MIN_GROWTH, upper=ref * MAX_GROWTH)

        # Extract safe weights for sheet columns
        w_prophet = meta['weights'].get('Prophet', np.nan)
        w_arima = meta['weights'].get('ARIMA', np.nan)
        w_tbats = meta['weights'].get('TBATS/ETS', np.nan)

        for per, val in blended.items():
            out_rows.append({
                'department_id': dept,
                'month': per,
                'forecast_monthly': max(0.0, float(val)),
                'cv_prophet_smape': cv_smape.get('Prophet', np.nan),
                'cv_arima_smape': cv_smape.get('ARIMA', np.nan),
                'cv_tbats_ets_smape': cv_smape.get('TBATS/ETS', np.nan),
                'winner_model': meta.get('winner', np.nan),
                'blend_prophet_w': w_prophet,
                'blend_arima_w': w_arima,
                'blend_tbats_ets_w': w_tbats,
            })

    df_out = pd.DataFrame(out_rows)

    # --- Hotfix: ensure expected columns always exist ---
    expected_cols = [
        'forecast_monthly',
        'cv_prophet_smape','cv_arima_smape','cv_tbats_ets_smape',
        'winner_model','blend_prophet_w','blend_arima_w','blend_tbats_ets_w'
    ]
    for c in expected_cols:
        if c not in df_out.columns:
            df_out[c] = np.nan

    if not df_out.empty:
        df_out['department_id'] = df_out['department_id'].astype(str)
        if not pd.api.types.is_period_dtype(df_out['month']):
            df_out['month'] = pd.PeriodIndex(df_out['month'], freq='M')
    return df_out


def compute_monthly_accuracy_with_history(monthly: pd.DataFrame,
                                          fc_monthly: pd.DataFrame,
                                          report_start: str) -> pd.DataFrame:
    """Build capacity_error-like table with historical Actuals and future Forecasts."""
    monthly = monthly.copy()
    monthly['department_id'] = monthly['department_id'].astype(str)
    if not pd.api.types.is_period_dtype(monthly['month']):
        monthly['month'] = pd.PeriodIndex(monthly['month'], freq='M')

    fc = fc_monthly.copy()
    fc['department_id'] = fc['department_id'].astype(str)
    if not pd.api.types.is_period_dtype(fc['month']):
        fc['month'] = pd.PeriodIndex(fc['month'], freq='M')

    start_per = pd.Period(report_start, freq='M')
    last_actual = monthly['month'].max()

    hist = (monthly.loc[monthly['month'] >= start_per, ['department_id', 'month', 'incoming_monthly']]
            .rename(columns={'incoming_monthly': 'Actual_Volume'}))
    hist['Forecast'] = np.nan

    fut = fc[['department_id', 'month', 'forecast_monthly',
              'cv_prophet_smape', 'cv_arima_smape', 'cv_tbats_ets_smape',
              'winner_model', 'blend_prophet_w', 'blend_arima_w', 'blend_tbats_ets_w']].copy()
    fut = fut.loc[fut['month'] > last_actual]
    fut = fut.rename(columns={'forecast_monthly': 'Forecast'})
    fut['Actual_Volume'] = np.nan

    base = pd.concat([hist, fut], ignore_index=True, sort=False)

    base['Forecast_Accuracy'] = np.where(
        (base['Actual_Volume'].notna()) & (base['Forecast'].notna()) & (base['Actual_Volume'] > 0),
        (1 - (np.abs(base['Forecast'] - base['Actual_Volume']) / base['Actual_Volume'])) * 100.0,
        np.nan
    )
    return base


def compute_capacity_monthly(cap_df: pd.DataFrame, prod_dept: pd.DataFrame) -> pd.DataFrame:
    """Compute FTE/day needed per month."""
    out = cap_df.merge(prod_dept, on='department_id', how='left')
    out['avg_tickets_per_agent_day'] = pd.to_numeric(out['avg_tickets_per_agent_day'], errors='coerce')
    out['avg_tickets_per_agent_day'] = out['avg_tickets_per_agent_day'].replace(0, np.nan)
    out['workdays_in_month'] = [business_days_in_month(m.start_time.year, m.start_time.month) for m in out['month']]
    out['Capacity_FTE_per_day'] = np.where(
        (out['avg_tickets_per_agent_day'] > 0) & (out['workdays_in_month'] > 0) & (out['Forecast'].notna()),
        out['Forecast'] / (out['avg_tickets_per_agent_day'] * out['workdays_in_month']),
        np.nan
    )
    return out


def build_cv_table(fc_monthly: pd.DataFrame, mapping: pd.DataFrame) -> pd.DataFrame:
    """Build mape_table_cv with sMAPE, best model and weights."""
    if fc_monthly is None or fc_monthly.empty:
        raise ValueError("fc_monthly is empty; cannot build CV table.")
    cols_keep = [
        'department_id',
        'cv_prophet_smape', 'cv_arima_smape', 'cv_tbats_ets_smape',
        'winner_model',
        'blend_prophet_w', 'blend_arima_w', 'blend_tbats_ets_w'
    ]
    # Hotfix: add any missing columns as NaN to avoid KeyError
    for c in cols_keep:
        if c not in fc_monthly.columns:
            fc_monthly[c] = np.nan

    df = (fc_monthly[cols_keep]
          .drop_duplicates(subset=['department_id'])
          .copy())
    df = df.rename(columns={
        'cv_prophet_smape': 'sMAPE_Prophet_CV',
        'cv_arima_smape': 'sMAPE_ARIMA_CV',
        'cv_tbats_ets_smape': 'sMAPE_TBATS_ETS_CV',
        'winner_model': 'Best_Model',
        'blend_prophet_w': 'Weight_Prophet',
        'blend_arima_w': 'Weight_ARIMA',
        'blend_tbats_ets_w': 'Weight_TBATS_ETS',
    })
    df['department_id'] = df['department_id'].astype(str)
    df = apply_mapping(df, mapping)
    ordered_cols = [
        'department_id', 'department_name', 'vertical',
        'sMAPE_Prophet_CV', 'sMAPE_ARIMA_CV', 'sMAPE_TBATS_ETS_CV',
        'Best_Model',
        'Weight_Prophet', 'Weight_ARIMA', 'Weight_TBATS_ETS'
    ]
    df = df[ordered_cols]
    return df.sort_values(['vertical', 'department_id'])

## 7. Daily plan (reconciled)

In [16]:
# ================ Language share utilities (NEW) ================
EPS_SHARE = 1e-6  # avoid 0/1 in logit
KNOWN_LANG_PREFIX = "lang_"
SERVED_LANGS = {'English','Spanish','Italian','French','Portuguese','German'}


def _logit(x):
    import numpy as np
    x = np.clip(x, EPS_SHARE, 1.0 - EPS_SHARE)
    return np.log(x / (1.0 - x))


def _inv_logit(z):
    import numpy as np
    return 1.0 / (1.0 + np.exp(-z))


def _safe_row_renorm(m):
    import numpy as np
    import pandas as pd
    arr = m.values.astype(float)
    row_sum = arr.sum(axis=1, keepdims=True)
    row_sum[row_sum <= 0] = 1.0
    arr = arr / row_sum
    return pd.DataFrame(arr, index=m.index, columns=m.columns)


# ================ Monthly by-language (NEW) ================
from typing import Tuple

def parse_languages_incoming(df_in):
    """
    Returns:
      daily_total  : DataFrame con Date, department_id, ticket_total (coherente con pipeline actual)
      daily_bylang : DataFrame largo con Date, department_id, language, ticket_lang
    Admite:
      - Formato largo: ['Date','department_id','language','ticket_total'] por fila/idioma
      - Formato ancho: columnas por idioma (Whitelist o prefijo lang_)
    Regla de negocio: cualquier idioma fuera de los 6 servidos se atiende en inglés (se mapea a 'English').
    """
    import numpy as np
    import pandas as pd

    df = df_in.copy()
    df['Date'] = pd.to_datetime(df['Date'], errors='coerce')
    df['department_id'] = df['department_id'].astype(str).str.strip()

    # Caso largo
    if 'language' in df.columns:
        # Normalizar nombre de idioma
        lang = df['language'].astype(str).str.strip()
        # mapear no-served -> English
        lang_norm = lang.apply(lambda x: x.title() if isinstance(x, str) else x)
        lang_norm = lang_norm.apply(lambda x: ('English' if (not x or (x not in SERVED_LANGS)) else x))
        df['service_language'] = lang_norm
        # volumen por idioma/servicio
        df['ticket_total'] = pd.to_numeric(df['ticket_total'], errors='coerce').fillna(0.0)
        daily_bylang = (df[['Date','department_id','service_language','ticket_total']]
                        .rename(columns={'service_language':'language', 'ticket_total':'ticket_lang'})
                        .dropna(subset=['Date','department_id','language']))
        # total diario por dept (suma tras mapping)
        daily_total = (daily_bylang.groupby(['Date','department_id'], as_index=False)['ticket_lang']
                       .sum()
                       .rename(columns={'ticket_lang':'ticket_total'}))
        return daily_total, daily_bylang

    # Caso ancho: detectar columnas de idioma
    lang_cols = [c for c in df.columns if (c.title() in SERVED_LANGS) or c.startswith(KNOWN_LANG_PREFIX)]
    if lang_cols:
        import pandas as pd
        melt = (df[['Date','department_id'] + lang_cols]
                .melt(id_vars=['Date','department_id'], var_name='language_col', value_name='ticket_lang'))
        melt['language'] = melt['language_col'].str.replace(f'^{KNOWN_LANG_PREFIX}', '', regex=True)
        melt['language'] = melt['language'].str.strip().str.title()
        # mapear no-served -> English
        melt['language'] = melt['language'].apply(lambda x: x if x in SERVED_LANGS else 'English')
        melt['ticket_lang'] = pd.to_numeric(melt['ticket_lang'], errors='coerce').fillna(0.0)
        daily_bylang = melt[['Date','department_id','language','ticket_lang']].dropna(subset=['Date'])
        daily_total = (daily_bylang.groupby(['Date','department_id'], as_index=False)['ticket_lang']
                       .sum()
                       .rename(columns={'ticket_lang':'ticket_total'}))
        return daily_total, daily_bylang

    # Sin detalle de idioma
    return df[['Date','department_id','ticket_total']].copy(), pd.DataFrame(columns=['Date','department_id','language','ticket_lang'])


def build_monthly_language_totals(daily_bylang):
    """
    Devuelve DataFrame mensual por idioma:
      ['department_id','month','language','incoming_monthly_lang']
    """
    import pandas as pd
    if daily_bylang is None or len(daily_bylang) == 0:
        return pd.DataFrame(columns=['department_id','month','language','incoming_monthly_lang'])
    tmp = daily_bylang.copy()
    tmp['month'] = pd.to_datetime(tmp['Date']).dt.to_period('M')
    mon = (tmp.groupby(['department_id','month','language'], as_index=False)['ticket_lang']
           .sum()
           .rename(columns={'ticket_lang':'incoming_monthly_lang'}))
    mon['department_id'] = mon['department_id'].astype(str)
    return mon


# ================ Language share forecasting (NEW) ================
from statsmodels.tsa.holtwinters import ExponentialSmoothing

def _fit_ets_logit_share(ts_share):
    """ETS damped sobre logit(share). Devuelve (model, forecast_fn)."""
    import numpy as np
    y = _logit(np.array(ts_share.values, dtype=float))
    seasonal = 12 if len(ts_share) >= 24 else None
    model = ExponentialSmoothing(y, trend='add', damped_trend=True,
                                 seasonal=('add' if seasonal else None),
                                 seasonal_periods=seasonal).fit()
    def fcast(h_months):
        import numpy as np
        z = model.forecast(h_months)
        return _inv_logit(np.array(z, dtype=float))
    return model, fcast


def forecast_language_shares_per_dept(monthly_total, monthly_bylang, h_months=12, roll_fallback=6):
    import numpy as np
    import pandas as pd
    if monthly_bylang is None or len(monthly_bylang) == 0 or monthly_total is None or len(monthly_total) == 0:
        return pd.DataFrame(columns=['department_id','month','language','share_fc'])

    monthly_total = monthly_total.copy()
    if str(monthly_total['month'].dtype) != 'period[M]':
        monthly_total['month'] = pd.PeriodIndex(monthly_total['month'], freq='M')

    monthly_bylang = monthly_bylang.copy()
    if str(monthly_bylang['month'].dtype) != 'period[M]':
        monthly_bylang['month'] = pd.PeriodIndex(monthly_bylang['month'], freq='M')

    out_rows = []
    for dept, g_tot in monthly_total.groupby('department_id'):
        g_lang = monthly_bylang[monthly_bylang['department_id'] == dept].copy()
        if g_lang.empty:
            continue
        langs = sorted(g_lang['language'].dropna().unique().tolist())
        M = (g_lang.pivot_table(index='month', columns='language', values='incoming_monthly_lang', aggfunc='sum')
             .reindex(sorted(g_tot['month'].unique()), fill_value=0.0))
        ytot = (g_tot.set_index('month')['incoming_monthly'].reindex(M.index).fillna(0.0))
        shares_hist = _safe_row_renorm(M.div(ytot.replace(0, np.nan), axis=0).fillna(0.0))
        hist_idx = shares_hist.index
        fut_idx = pd.period_range(hist_idx[-1] + 1, periods=h_months, freq='M')
        shares_fc = pd.DataFrame(index=fut_idx, columns=langs, dtype=float)
        for lang in langs:
            s = shares_hist[lang].astype(float)
            try:
                if s.notna().sum() >= 10:
                    _, f = _fit_ets_logit_share(s)
                    pred = f(h_months)
                    shares_fc[lang] = pred
                elif s.notna().sum() >= max(3, roll_fallback):
                    val = float(s.tail(roll_fallback).mean())
                    shares_fc[lang] = val
                else:
                    val = float(s.mean() if s.notna().any() else 1.0 / max(1, len(langs)))
                    shares_fc[lang] = val
            except Exception:
                val = float(s.mean() if s.notna().any() else 1.0 / max(1, len(langs)))
                shares_fc[lang] = val
        shares_fc = _safe_row_renorm(shares_fc)
        shares_fc = shares_fc.reset_index().melt(id_vars='index', var_name='language', value_name='share_fc')
        shares_fc = shares_fc.rename(columns={'index':'month'})
        shares_fc.insert(0, 'department_id', dept)
        out_rows.append(shares_fc)
    return (pd.concat(out_rows, ignore_index=True) if out_rows else
            pd.DataFrame(columns=['department_id','month','language','share_fc']))


def rolling_cv_language_shares(monthly_total, monthly_bylang):
    import numpy as np
    import pandas as pd
    rows = []
    if monthly_bylang is None or len(monthly_bylang) == 0:
        return pd.DataFrame(columns=['department_id','language','sMAPE_share','obs_count'])
    monthly_total = monthly_total.copy()
    monthly_bylang = monthly_bylang.copy()
    if str(monthly_total['month'].dtype) != 'period[M]':
        monthly_total['month'] = pd.PeriodIndex(monthly_total['month'], freq='M')
    if str(monthly_bylang['month'].dtype) != 'period[M]':
        monthly_bylang['month'] = pd.PeriodIndex(monthly_bylang['month'], freq='M')

    from numpy import array
    # sMAPE from existing helper in notebook
    from math import isnan

    def _smape_np(y_true, y_pred):
        import numpy as np
        y_true = np.array(y_true, dtype=float)
        y_pred = np.array(y_pred, dtype=float)
        denom = (np.abs(y_true) + np.abs(y_pred))
        denom[denom == 0] = 1.0
        return float(np.mean(2.0 * np.abs(y_pred - y_true) / denom) * 100.0)

    for dept, g_tot in monthly_total.groupby('department_id'):
        g_lang = monthly_bylang[monthly_bylang['department_id'] == dept]
        if g_lang.empty:
            continue
        langs = sorted(g_lang['language'].dropna().unique().tolist())
        M = (g_lang.pivot_table(index='month', columns='language', values='incoming_monthly_lang', aggfunc='sum')
             .reindex(sorted(g_tot['month'].unique()), fill_value=0.0))
        ytot = (g_tot.set_index('month')['incoming_monthly'].reindex(M.index).fillna(0.0))
        shares_hist = _safe_row_renorm(M.div(ytot.replace(0, np.nan), axis=0).fillna(0.0))
        n = len(shares_hist.index)
        if n < 14:
            for lang in langs:
                s = shares_hist[lang]
                rows.append({'department_id': dept, 'language': lang,
                             'sMAPE_share': float(s.std() * 100.0), 'obs_count': n})
            continue
        h = 1 if n < 24 else 3
        start_min = max(12, n - (h + 4))
        for lang in langs:
            s = shares_hist[lang].astype(float)
            errs = []
            for start in range(start_min, n - h + 1):
                train = s.iloc[:start]
                test = s.iloc[start:start+h]
                try:
                    if train.notna().sum() >= 10:
                        _, f = _fit_ets_logit_share(train)
                        pred = f(h)
                        errs.append(_smape_np(test.values, array(pred[:h], dtype=float)))
                    else:
                        pred = [float(train.tail(6).mean() if train.notna().any() else 0.0)] * h
                        errs.append(_smape_np(test.values, array(pred, dtype=float)))
                except Exception:
                    pred = [float(train.mean() if train.notna().any() else 0.0)] * h
                    errs.append(_smape_np(test.values, array(pred, dtype=float)))
            rows.append({'department_id': dept, 'language': lang,
                         'sMAPE_share': float(np.mean(errs)) if errs else np.nan, 'obs_count': n})
    return pd.DataFrame(rows)


# ================ Apply shares & daily allocation by language (NEW) ================

def apply_language_shares_to_monthly(fc_monthly, shares_fc):
    import numpy as np
    import pandas as pd
    if fc_monthly is None or len(fc_monthly) == 0 or shares_fc is None or len(shares_fc) == 0:
        return pd.DataFrame(columns=['department_id','month','language','forecast_monthly_language'])
    f = fc_monthly[['department_id','month','forecast_monthly']].copy()
    if str(f['month'].dtype) != 'period[M]':
        f['month'] = pd.PeriodIndex(f['month'], freq='M')
    s = shares_fc.copy()
    if str(s['month'].dtype) != 'period[M]':
        s['month'] = pd.PeriodIndex(s['month'], freq='M')
    m = f.merge(s, on=['department_id','month'], how='left')
    m['share_fc'] = m['share_fc'].fillna(0.0)

    def _fix_group(g):
        if g['share_fc'].sum() <= 0:
            k = max(1, g['language'].nunique())
            g['share_fc'] = 1.0 / k
        else:
            g['share_fc'] = g['share_fc'] / g['share_fc'].sum()
        return g
    m = m.groupby(['department_id','month'], as_index=False).apply(_fix_group)
    m['forecast_monthly_language'] = m['forecast_monthly'] * m['share_fc']
    return m[['department_id','month','language','forecast_monthly_language']].reset_index(drop=True)


def build_daily_from_monthly_by_language(incoming_bylang, fc_monthly_bylang, horizon_days):
    import numpy as np
    import pandas as pd
    if fc_monthly_bylang is None or len(fc_monthly_bylang) == 0:
        return pd.DataFrame(columns=['department_id','Date','language','forecast_daily_language'])
    last_date = incoming_bylang['Date'].max() if (incoming_bylang is not None and not incoming_bylang.empty) else pd.Timestamp.today().normalize()
    start = last_date + pd.Timedelta(days=1)
    end = start + pd.Timedelta(days=horizon_days-1)
    future_months = pd.period_range(start=start.to_period('M'), end=end.to_period('M'), freq='M')

    rows = []
    by_key = {}
    if incoming_bylang is not None and not incoming_bylang.empty:
        incoming_bylang = incoming_bylang.copy()
        incoming_bylang['Date'] = pd.to_datetime(incoming_bylang['Date'])
        incoming_bylang['department_id'] = incoming_bylang['department_id'].astype(str)
        for (dept, lang), g in incoming_bylang.groupby(['department_id','language']):
            by_key[(dept, lang)] = g.sort_values('Date')

    for (dept, lang), gmon in fc_monthly_bylang.groupby(['department_id','language']):
        for m in future_months:
            fcm = gmon[gmon['month'] == m]
            if fcm.empty:
                continue
            target = float(fcm['forecast_monthly_language'].iloc[0])
            if target <= 0:
                continue
            hist_lang = by_key.get((dept, lang), None)
            if hist_lang is not None and len(hist_lang) >= 30:
                prof = (hist_lang.assign(dow=hist_lang['Date'].dt.dayofweek)
                        .groupby('dow')['ticket_lang'].mean())
                prof = prof / (prof.mean() if prof.mean() != 0 else 1.0)
            else:
                dept_hist = incoming_bylang[incoming_bylang['department_id'] == dept]
                if not dept_hist.empty:
                    prof = (dept_hist.assign(dow=dept_hist['Date'].dt.dayofweek)
                            .groupby('dow')['ticket_lang'].mean())
                    prof = prof / (prof.mean() if prof.mean() != 0 else 1.0)
                else:
                    prof = pd.Series(1.0, index=range(7))
            days = pd.date_range(start=m.start_time, end=m.end_time, freq='D')
            weights = np.array([float(prof.get(d.dayofweek, 1.0)) for d in days], dtype=float)
            weights = np.maximum(weights, 1e-6)
            weights = weights / weights.sum()
            alloc = target * weights
            alloc_df = pd.DataFrame({'Date': days, 'forecast_daily_language': alloc})
            alloc_df = alloc_df[(alloc_df['Date'] >= start) & (alloc_df['Date'] <= end)]
            alloc_df.insert(0, 'language', lang)
            alloc_df.insert(0, 'department_id', dept)
            rows.append(alloc_df)
    return (pd.concat(rows, ignore_index=True) if rows else
            pd.DataFrame(columns=['department_id','Date','language','forecast_daily_language']))


def dow_profile(g: pd.DataFrame) -> pd.Series:
    """Build normalized day-of-week profile for a department, fallback to uniform."""
    prof = (g.assign(dow=g['Date'].dt.dayofweek)
              .groupby('dow')['ticket_total']
              .mean())
    if prof.notna().sum() >= 3:
        prof = prof / prof.mean()
    else:
        prof = pd.Series(1.0, index=range(7))
    return prof


def disaggregate_month_to_days(dept_df: pd.DataFrame,
                               month_period: pd.Period,
                               target_sum: float) -> pd.DataFrame:
    """Allocate monthly forecast to each day in that month using recent DOW profile."""
    start = month_period.start_time
    end = month_period.end_time
    days = pd.date_range(start=start, end=end, freq='D')

    hist = dept_df.sort_values('Date').tail(90)
    profile = dow_profile(hist)

    weights = np.array([profile.get(d.dayofweek, 1.0) for d in days], dtype=float)
    weights = np.maximum(weights, 1e-6)
    weights = weights / weights.sum()

    alloc = target_sum * weights
    return pd.DataFrame({'Date': days, 'forecast_daily': alloc})


def build_daily_from_monthly(incoming: pd.DataFrame,
                             fc_monthly: pd.DataFrame,
                             horizon_days: int) -> pd.DataFrame:
    """Top-down daily plan."""
    last_date = incoming['Date'].max()
    start = last_date + pd.Timedelta(days=1)
    end = start + pd.Timedelta(days=horizon_days - 1)
    future_months = pd.period_range(start=start.to_period('M'),
                                    end=end.to_period('M'), freq='M')

    rows = []
    for dept, g in incoming.groupby('department_id'):
        for m in future_months:
            fcm = fc_monthly[(fc_monthly['department_id'] == dept) & (fc_monthly['month'] == m)]
            if fcm.empty:
                continue
            target = float(fcm['forecast_monthly'].iloc[0])
            if target <= 0:
                continue
            alloc_df = disaggregate_month_to_days(g, m, target)
            alloc_df = alloc_df[(alloc_df['Date'] >= start) & (alloc_df['Date'] <= end)]
            alloc_df.insert(0, 'department_id', dept)
            rows.append(alloc_df)

    df = pd.concat(rows, ignore_index=True) if rows else pd.DataFrame(columns=['department_id', 'Date', 'forecast_daily'])
    return df


def split_daily_by_language(df_daily_fc: pd.DataFrame) -> pd.DataFrame:
    """Split daily forecast by fixed language shares."""
    parts = []
    for lang, w in LANGUAGE_SHARES.items():
        tmp = df_daily_fc.copy()
        tmp['language'] = lang
        tmp['forecast_daily_language'] = tmp['forecast_daily'] * w
        parts.append(tmp)
    out = pd.concat(parts, ignore_index=True) if parts else pd.DataFrame()
    return out


def forecast_daily_baseline(df_daily: pd.DataFrame, horizon_days: int) -> pd.DataFrame:
    """Independent daily baseline (optional)."""
    df = df_daily.copy()
    df['Date'] = pd.to_datetime(df['Date'])
    df['department_id'] = df['department_id'].astype(str).str.strip()
    df = df.sort_values(['department_id', 'Date'])
    last_date = df['Date'].max()
    if pd.isna(last_date):
        raise ValueError("forecast_daily_baseline: No valid dates in incoming.")
    start = last_date + pd.Timedelta(days=1)
    idx_future = pd.date_range(start=start, periods=horizon_days, freq='D')

    rows = []
    for dept, g in df.groupby('department_id'):
        g = g.sort_values('Date')
        if len(g) >= 28:
            roll_mean = (g.set_index('Date')['ticket_total']
                         .rolling(window=28, min_periods=1)
                         .mean()
                         .iloc[-1])
            base = float(roll_mean) if np.isfinite(roll_mean) else float(g['ticket_total'].mean())
        else:
            base = float(g['ticket_total'].mean())

        prof = dow_profile(g)
        vals = []
        for d in idx_future:
            w = prof[d.dayofweek] if d.dayofweek in prof.index else 1.0
            vals.append(max(0.0, base * float(w)))
        rows.append(pd.DataFrame({'department_id': dept, 'Date': idx_future, 'forecast_daily': vals}))

    return pd.concat(rows, ignore_index=True) if rows else pd.DataFrame(columns=['department_id', 'Date', 'forecast_daily'])


def build_daily_capacity_plan(incoming: pd.DataFrame,
                              mapping: pd.DataFrame,
                              prod_dept: pd.DataFrame,
                              fc_monthly: pd.DataFrame,
                              horizon_days: int,
                              daily_bylang: Optional[pd.DataFrame] = None,
                              monthly_total: Optional[pd.DataFrame] = None,
                              monthly_bylang: Optional[pd.DataFrame] = None) -> pd.DataFrame:
    """
    End-to-end diario por idioma (dinámico). Si no hay detalle por idioma,
    cae al reparto legacy (no recomendado).
    """
    # Fallback legacy si no tenemos detalle por idioma
    if daily_bylang is None or daily_bylang.empty or monthly_bylang is None or monthly_bylang.empty:
        daily_fc = build_daily_from_monthly(incoming, fc_monthly, horizon_days) if USE_DAILY_FROM_MONTHLY else forecast_daily_baseline(incoming, horizon_days)
        parts = []
        for lang, w in LANGUAGE_SHARES.items():
            tmp = daily_fc.copy()
            tmp['language'] = lang
            tmp['forecast_daily_language'] = tmp['forecast_daily'] * w
            parts.append(tmp)
        daily_fc_lang = pd.concat(parts, ignore_index=True) if parts else pd.DataFrame()
    else:
        shares_fc = forecast_language_shares_per_dept(monthly_total, monthly_bylang, h_months=H_MONTHS, roll_fallback=6)
        fc_mon_by_lang = apply_language_shares_to_monthly(fc_monthly, shares_fc)
        daily_fc_lang = build_daily_from_monthly_by_language(daily_bylang, fc_mon_by_lang, horizon_days)

    daily_fc_lang = apply_mapping(daily_fc_lang, mapping)
    daily_fc_lang = daily_fc_lang.merge(prod_dept, on='department_id', how='left')
    daily_fc_lang['avg_tickets_per_agent_day'] = pd.to_numeric(daily_fc_lang['avg_tickets_per_agent_day'], errors='coerce')
    daily_fc_lang['FTE_per_day'] = np.where(
        daily_fc_lang['avg_tickets_per_agent_day'] > 0,
        daily_fc_lang['forecast_daily_language'] / daily_fc_lang['avg_tickets_per_agent_day'],
        np.nan
    )
    cols = ['Date','department_id','department_name','vertical','language','forecast_daily_language','FTE_per_day']
    daily_plan = daily_fc_lang[cols].sort_values(['Date','vertical','department_id','language'])
    return daily_plan

def build_stability_report(monthly: pd.DataFrame,
                           fc_monthly: pd.DataFrame,
                           daily_capacity_plan: pd.DataFrame,
                           mapping: pd.DataFrame) -> pd.DataFrame:
    """
    Diagnostic sheet with:
      - forecast_monthly vs sum of daily (reconciliation diff)
      - ref_mean_12m: mean of last 12 actual months
      - forecast_vs_ref_ratio
      - CV sMAPE (from fc_monthly), Best_Model, blend weights
    Hardened to add any missing columns as NaN (avoids KeyError).
    """
    m = monthly.copy()
    m['department_id'] = m['department_id'].astype(str)
    if not pd.api.types.is_period_dtype(m['month']):
        m['month'] = pd.PeriodIndex(m['month'], freq='M')
    last_actual = m['month'].max()

    f = fc_monthly.copy()
    f['department_id'] = f['department_id'].astype(str)
    if not pd.api.types.is_period_dtype(f['month']):
        f['month'] = pd.PeriodIndex(f['month'], freq='M')
    f = f[f['month'] > last_actual].copy()

    # Reference mean (last 12 actual months per dept)
    ref12 = (m.groupby('department_id')
               .apply(lambda g: g.set_index('month')['incoming_monthly'].sort_index().tail(12).mean())
               .rename('ref_mean_12m')
               .reset_index())

    # Daily reconciliation: sum by dept-month across languages
    d = daily_capacity_plan.copy()
    d['department_id'] = d['department_id'].astype(str)
    d['month'] = pd.to_datetime(d['Date']).dt.to_period('M')
    daily_sum = (d.groupby(['department_id','month'], as_index=False)['forecast_daily_language']
                   .sum()
                   .rename(columns={'forecast_daily_language':'daily_sum_monthly'}))

    rep = (f.merge(ref12, on='department_id', how='left')
             .merge(daily_sum, on=['department_id','month'], how='left'))

    rep['reconcile_diff'] = rep['daily_sum_monthly'] - rep['forecast_monthly']
    rep['forecast_vs_ref_ratio'] = np.where(rep['ref_mean_12m'] > 0,
                                            rep['forecast_monthly'] / rep['ref_mean_12m'],
                                            np.nan)

    # Attach mapping & CV/weights (deduplicate per dept)
    # Hotfix: ensure columns exist in fc_monthly before selecting
    for c in ['cv_prophet_smape','cv_arima_smape','cv_tbats_ets_smape',
              'winner_model','blend_prophet_w','blend_arima_w','blend_tbats_ets_w']:
        if c not in fc_monthly.columns:
            fc_monthly[c] = np.nan

    head = (fc_monthly[['department_id','cv_prophet_smape','cv_arima_smape','cv_tbats_ets_smape',
                        'winner_model','blend_prophet_w','blend_arima_w','blend_tbats_ets_w']]
            .drop_duplicates('department_id'))
    rep = rep.merge(head, on='department_id', how='left')
    rep = apply_mapping(rep, mapping)

    # Order columns for readability (add any missing as NaN to avoid KeyError)
    cols = ['vertical','department_id','department_name','month',
            'forecast_monthly','daily_sum_monthly','reconcile_diff',
            'ref_mean_12m','forecast_vs_ref_ratio',
            'cv_prophet_smape','cv_arima_smape','cv_tbats_ets_smape',
            'winner_model','blend_prophet_w','blend_arima_w','blend_tbats_ets_w']

    for c in cols:
        if c not in rep.columns:
            rep[c] = np.nan

    rep = rep[cols].sort_values(['vertical','department_id','month'])
    return rep

## 8. Main & Metrics

In [17]:

def main():
    # 0) Xmas CSV
    ensure_christmas_csv(HOLIDAYS_CSV_PATH, HOLIDAYS_YEARS, INCLUDE_JAN6)
    xmas_df = load_christmas_csv(HOLIDAYS_CSV_PATH)

    # 1) Load inputs
    incoming_raw = load_incoming(INCOMING_SOURCE_PATH, sheet_name=INCOMING_SHEET)
    # NEW: separar totales y detalle por idioma (si existe) con regla de negocio (no-served -> English)
    incoming_total, incoming_bylang = parse_languages_incoming(incoming_raw)

    mapping = load_dept_map(DEPT_MAP_PATH, DEPT_MAP_SHEET)
    prod = load_productivity(PRODUCTIVITY_PATH)

    # 2) Exogenous proxy
    case_reason_df = load_case_reason_proxy(CASE_REASON_PATH, CASE_REASON_SHEET)

    # 3) Monthly forecast (total)
    monthly = build_monthly_series(incoming_total)
    fc_monthly = forecast_per_department_monthly(monthly, case_reason_df, xmas_df)

    # 4) capacity_error (hist + future) + capacidad
    cap_err = compute_monthly_accuracy_with_history(monthly, fc_monthly, REPORT_START_MONTH)
    cap_err = compute_capacity_monthly(cap_err, prod)
    cap_err = apply_mapping(cap_err, mapping)

    # 5) NEW: Mensual por idioma (histórico) + CV de shares
    monthly_bylang = build_monthly_language_totals(incoming_bylang)
    monthly_total = monthly[['department_id','month','incoming_monthly']].copy()
    lang_cv = rolling_cv_language_shares(monthly_total, monthly_bylang)

    # 6) Daily plan por idioma (dinámico, reconciliado)
    daily_capacity_plan = build_daily_capacity_plan(
        incoming=incoming_total,
        mapping=mapping,
        prod_dept=prod,
        fc_monthly=fc_monthly,
        horizon_days=DAILY_HORIZON_DAYS,
        daily_bylang=incoming_bylang,
        monthly_total=monthly_total,
        monthly_bylang=monthly_bylang
    )

    # 7) CV table + Stability report
    cv_table = build_cv_table(fc_monthly, mapping)
    stability_report = build_stability_report(monthly, fc_monthly, daily_capacity_plan, mapping)

    # 8) NEW: Hoja de tendencias de idioma (últimos 12m)
    import numpy as np
    import pandas as pd
    if not monthly_bylang.empty:
        last_m = monthly_total['month'].max()
        hist12_idx = pd.period_range(last_m - 11, last_m, freq='M')
        mh = monthly_total[monthly_total['month'].isin(hist12_idx)]
        M = (monthly_bylang[monthly_bylang['month'].isin(hist12_idx)]
             .pivot_table(index=['department_id','month'], columns='language', values='incoming_monthly_lang', aggfunc='sum').fillna(0.0))
        ytot = (mh.set_index(['department_id','month'])['incoming_monthly']).reindex(M.index).fillna(0.0)
        shares_hist12 = _safe_row_renorm(M.div(ytot.replace(0, np.nan), axis=0).fillna(0.0))
        shares_hist12 = shares_hist12.reset_index()
        language_trends = shares_hist12
    else:
        language_trends = pd.DataFrame(columns=['department_id','month'])

    # 9) Clean infs
    for df_out in [cap_err, daily_capacity_plan, cv_table, stability_report, lang_cv, language_trends]:
        if df_out is not None and not df_out.empty:
            df_out.replace([np.inf, -np.inf], np.nan, inplace=True)

    # 10) Write Excel
    with pd.ExcelWriter(OUTPUT_XLSX, engine="openpyxl", mode="w") as w:
        (cap_err[['vertical', 'department_id', 'department_name', 'month',
                  'Actual_Volume', 'Forecast', 'Forecast_Accuracy',
                  'Capacity_FTE_per_day',
                  'winner_model', 'cv_prophet_smape', 'cv_arima_smape', 'cv_tbats_ets_smape',
                  'blend_prophet_w', 'blend_arima_w', 'blend_tbats_ets_w']]
         .sort_values(['vertical', 'department_id', 'month'])
         .to_excel(w, "capacity_error", index=False))

        daily_capacity_plan.to_excel(w, "daily_capacity_plan", index=False)
        cv_table.to_excel(w, "mape_table_cv", index=False)
        stability_report.to_excel(w, "stability_report", index=False)

        if language_trends is not None and not language_trends.empty:
            language_trends.to_excel(w, "language_trends_hist", index=False)
        if lang_cv is not None and not lang_cv.empty:
            lang_cv.to_excel(w, "language_share_cv", index=False)

    print("Excel written:", OUTPUT_XLSX)
    print("Christmas CSV at:", HOLIDAYS_CSV_PATH)
if __name__ == "__main__":
    main()

16:28:41 - cmdstanpy - INFO - Chain [1] start processing
16:28:41 - cmdstanpy - INFO - Chain [1] done processing
16:29:03 - cmdstanpy - INFO - Chain [1] start processing
16:29:03 - cmdstanpy - INFO - Chain [1] done processing
16:29:24 - cmdstanpy - INFO - Chain [1] start processing
16:29:24 - cmdstanpy - INFO - Chain [1] done processing
16:29:45 - cmdstanpy - INFO - Chain [1] start processing
16:29:45 - cmdstanpy - INFO - Chain [1] done processing
16:30:06 - cmdstanpy - INFO - Chain [1] start processing
16:30:17 - cmdstanpy - INFO - Chain [1] done processing
16:30:39 - cmdstanpy - INFO - Chain [1] start processing
16:30:52 - cmdstanpy - INFO - Chain [1] done processing
16:31:16 - cmdstanpy - INFO - Chain [1] start processing
16:31:17 - cmdstanpy - INFO - Chain [1] done processing
16:31:42 - cmdstanpy - INFO - Chain [1] start processing
16:31:42 - cmdstanpy - INFO - Chain [1] done processing
16:32:07 - cmdstanpy - INFO - Chain [1] start processing
16:32:07 - cmdstanpy - INFO - Chain [1]

Excel written: C:\Users\pt3canro\Desktop\CAPACITY\outputs\capacity_forecast_hybrid.xlsx
Christmas CSV at: C:\Users\pt3canro\Desktop\CAPACITY\input_model\christmas_holidays_2024_2027.csv
