In [None]:
"""
ssm_em_forecast.py

Advanced Time-Series Forecasting with State-Space Models, EM and optimization-based
estimation of noise covariances. This updated module implements:

1) A documented synthetic data generator producing a series with two distinct
   seasonalities (weekly and yearly), changing trend regimes and optional regressors.
2) A clear StateSpaceModel class defining state transition and observation
   equations (local linear trend + trigonometric seasonal blocks).
3) Two estimation strategies for noise covariances (R and diagonal Q):
   - EM algorithm with closed-form M-steps (already implemented), and
   - Direct maximum-likelihood optimization using `scipy.optimize.minimize` on
     the Kalman-filter likelihood (with parameter transforms for positivity).
4) Forecasting utilities and benchmark against a SARIMAX baseline.
5) Automatic computation of RMSE, MAE, and MAPE and a final textual analysis
   comparing complexity, interpretability, and forecasting performance.

Run the file as a script for a complete demo and summary.

Dependencies: numpy, scipy, pandas, statsmodels (optional for benchmarking)

"""

import numpy as np
import pandas as pd
from scipy.linalg import block_diag
from scipy.optimize import minimize
from typing import Optional, Tuple, Dict

try:
    import statsmodels.api as sm
    from statsmodels.tsa.statespace.sarimax import SARIMAX
except Exception:
    sm = None
    SARIMAX = None




def generate_synthetic_series(n: int = 800,
                              seed: int = 42,
                              weekly_amp: float = 5.0,
                              yearly_amp: float = 20.0,
                              noise_std: float = 3.0) -> Tuple[np.ndarray, Optional[np.ndarray], Dict]:
    """
    Generate a synthetic time series with the following documented construction:

    - Length n (e.g., 800 observations ~ multiple years of daily data)
    - Two seasonalities: weekly (P=7) and yearly (P=365) implemented as sums of
      harmonics (sin/cos terms). This yields smooth trigonometric seasonalities
      that are easy to represent in a state-space form.
    - Changing trend regimes: piecewise-linear trend with a break point in the
      middle of the series to introduce non-stationarity / structural change.
    - Additive Gaussian noise.
    - An optional time-varying regressor (e.g., a marketing campaign indicator
      or exogenous numeric) to make the identification problem more realistic.

    Returns:
      y: (n,) time series
      X: (n,1) regressor array
      meta: metadata dict containing components and parameters
    """
    rng = np.random.default_rng(seed)
    t = np.arange(n)

    # Piecewise linear trend: before and after a change point
    cp = int(n * 0.5)
    trend = np.zeros(n)
    slope1 = 0.02
    slope2 = 0.15
    trend[:cp] = slope1 * t[:cp]
    trend[cp:] = slope1 * cp + slope2 * (t[cp:] - cp)

    # Weekly seasonality (period 7) - two harmonics
    P_week = 7
    weekly = np.zeros(n)
    for h in [1, 2]:
        weekly += (weekly_amp / h) * np.sin(2 * np.pi * h * t / P_week)

    # Yearly seasonality (period 365) - a few harmonics but with smaller amplitude
    P_year = 365
    yearly = np.zeros(n)
    for h in [1, 2, 3]:
        yearly += (yearly_amp / h) * np.sin(2 * np.pi * h * t / P_year)

    # Regressor: slowly varying numeric with occasional shocks (e.g., campaigns)
    X = rng.normal(scale=1.0, size=(n, 1))
    campaign = np.zeros(n)
    # place 3 campaigns at random epochs
    for start in [int(n*0.2), int(n*0.5), int(n*0.8)]:
        campaign[start:start+10] += 3.0
    X[:, 0] += campaign

    true_beta = np.array([2.5])
    reg_effect = X.dot(true_beta)

    noise = rng.normal(scale=noise_std, size=n)

    y = trend + weekly + yearly + reg_effect.flatten() + noise

    meta = {
        'trend': trend,
        'weekly': weekly,
        'yearly': yearly,
        'regressor': X,
        'true_beta': true_beta,
        'noise_std': noise_std,
        'change_point': cp,
        'periods': {'weekly': P_week, 'yearly': P_year}
    }
    return y, X, meta




class StateSpaceModel:
    """
    Linear Gaussian State-Space Model with explicit local linear trend and
    trigonometric seasonal components implemented as state blocks.

    State vector alpha_t contains:
      [level, slope, seasonal_week_cos, seasonal_week_sin, seasonal_year_cos_1, seasonal_year_sin_1, ...]

    Observation equation:
      y_t = Z alpha_t + x_t' beta + epsilon_t,  epsilon_t ~ N(0, R)

    State transition:
      alpha_{t+1} = T alpha_t + eta_t,  eta_t ~ N(0, Q)

    The trigonometric seasonal components use the standard rotation matrix per
    harmonic (2x2 blocks) so they are parsimonious and smooth.
    """

    def __init__(self,
                 n_week_harmonics: int = 1,
                 n_year_harmonics: int = 3,
                 k_regressors: int = 1):

        self.n_week_harmonics = n_week_harmonics
        self.n_year_harmonics = n_year_harmonics
        self.k_regressors = k_regressors

        # Indexing
        pos = 0
        self.idx = {}
        self.idx['level'] = pos; pos += 1
        self.idx['slope'] = pos; pos += 1


        self.idx['weekly'] = (pos, pos + 2 * n_week_harmonics)
        pos += 2 * n_week_harmonics


        self.idx['yearly'] = (pos, pos + 2 * n_year_harmonics)
        pos += 2 * n_year_harmonics

        self.m = pos


        self.R = 1.0

        self.Q_diag = np.ones(self.m) * 0.01
        self.beta = np.zeros(k_regressors) if k_regressors > 0 else None


        self.T = np.eye(self.m)
        self.Z = np.zeros((1, self.m))
        self._build_matrices()

    def _build_matrices(self):
        m = self.m
        T = np.eye(m)


        T[self.idx['level'], self.idx['level']] = 1.0
        T[self.idx['level'], self.idx['slope']] = 1.0

        T[self.idx['slope'], self.idx['slope']] = 1.0

        # weekly harmonic blocks
        wk_start, wk_end = self.idx['weekly']
        for h in range(self.n_week_harmonics):
            k = h + 1
            omega = 2 * np.pi * k / 7.0
            cos = np.cos(omega); sin = np.sin(omega)
            block = np.array([[cos, -sin],[sin, cos]])
            i = wk_start + 2*h
            T[i:i+2, i:i+2] = block

        # yearly harmonic blocks
        yr_start, yr_end = self.idx['yearly']
        for h in range(self.n_year_harmonics):
            k = h + 1
            omega = 2 * np.pi * k / 365.0
            cos = np.cos(omega); sin = np.sin(omega)
            block = np.array([[cos, -sin],[sin, cos]])
            i = yr_start + 2*h
            T[i:i+2, i:i+2] = block

        self.T = T

        # Observation matrix Z: level plus sum of cos components map to observation
        Z = np.zeros((1, m))
        Z[0, self.idx['level']] = 1.0


        wk_start, wk_end = self.idx['weekly']
        for i in range(wk_start, wk_end, 2):
            Z[0, i] = 1.0
        yr_start, yr_end = self.idx['yearly']
        for i in range(yr_start, yr_end, 2):
            Z[0, i] = 1.0

        self.Z = Z

    # Kalman filter returns prediction errors for likelihood computation
    def kalman_filter(self, y: np.ndarray, X: Optional[np.ndarray] = None) -> Dict:
        n = len(y)
        m = self.m
        R = float(self.R)
        Q = np.diag(self.Q_diag)

        a_prev = np.zeros(m)
        P_prev = np.eye(m) * 1e6

        v = np.zeros(n)
        F = np.zeros(n)
        a_pred = np.zeros((n, m))
        P_pred = np.zeros((n, m, m))
        a_upd = np.zeros((n, m))
        P_upd = np.zeros((n, m, m))

        for t in range(n):
            # predict
            a_pr = self.T.dot(a_prev)
            P_pr = self.T.dot(P_prev).dot(self.T.T) + Q

            # observation
            d_t = 0.0
            if X is not None and self.beta is not None:
                d_t = float(X[t].dot(self.beta))
            v_t = y[t] - float(self.Z.dot(a_pr) + d_t)
            F_t = float(self.Z.dot(P_pr).dot(self.Z.T) + R)

            K = P_pr.dot(self.Z.T) / F_t
            a_up = a_pr + K.flatten() * v_t
            P_up = P_pr - K.dot(self.Z).dot(P_pr)

            v[t] = v_t
            F[t] = F_t
            a_pred[t] = a_pr
            P_pred[t] = P_pr
            a_upd[t] = a_up
            P_upd[t] = P_up

            a_prev = a_up
            P_prev = P_up

        return {'v': v, 'F': F, 'a_pred': a_pred, 'P_pred': P_pred, 'a_upd': a_upd, 'P_upd': P_upd}

    def log_likelihood(self, y: np.ndarray, X: Optional[np.ndarray] = None) -> float:
        res = self.kalman_filter(y, X)
        v = res['v']; F = res['F']
        # compute log-likelihood (Gaussian): sum(-0.5*(log(2pi) + log F + v^2/F))
        ll = -0.5 * np.sum(np.log(2 * np.pi) + np.log(F) + (v**2) / F)
        return ll

    # EM step (as before) retained for comparison
    def em_step(self, y: np.ndarray, X: Optional[np.ndarray] = None) -> None:

        kf = self.kalman_filter(y, X)

        a_s = kf['a_upd']
        P_s = kf['P_upd']


        n = len(y)
        resid = 0.0
        for t in range(n):
            mean_y = float(self.Z.dot(a_s[t]) + (X[t].dot(self.beta) if (X is not None and self.beta is not None) else 0.0))
            resid += (y[t] - mean_y)**2 + float(self.Z.dot(P_s[t]).dot(self.Z.T))
        self.R = float(resid / n)

        # Update Q diagonal using state innovations approx

        Q_acc = np.zeros(self.m)
        for t in range(n - 1):
            e = a_s[t+1] - self.T.dot(a_s[t])
            Q_acc += e**2 + np.diag(P_s[t+1])
        self.Q_diag = np.maximum(Q_acc / max(1, n - 1), 1e-8)

        # Beta via OLS on residuals
        if X is not None and self.k_regressors > 0:
            y_tilde = np.zeros(n)
            for t in range(n):
                y_tilde[t] = y[t] - float(self.Z.dot(a_s[t]))
            XtX = X.T.dot(X)
            Xty = X.T.dot(y_tilde)
            try:
                self.beta = np.linalg.solve(XtX, Xty)
            except np.linalg.LinAlgError:
                self.beta = np.linalg.lstsq(XtX, Xty, rcond=None)[0]

    def fit_em(self, y: np.ndarray, X: Optional[np.ndarray] = None, n_iter: int = 30, tol: float = 1e-6, verbose: bool = True):
        prev = np.hstack([np.atleast_1d(self.R), self.Q_diag])
        history = {'R': [], 'Q_diag': []}
        for it in range(n_iter):
            self.em_step(y, X)
            history['R'].append(self.R)
            history['Q_diag'].append(self.Q_diag.copy())
            cur = np.hstack([np.atleast_1d(self.R), self.Q_diag])
            rel = np.max(np.abs(cur - prev) / (np.abs(prev) + 1e-8))
            if verbose:
                print(f"EM iter {it+1}: R={self.R:.4f}, Q_mean={np.mean(self.Q_diag):.4e}, rel={rel:.2e}")
            if rel < tol:
                break
            prev = cur
        return history

    # Optimization-based estimation of noise parameters (MLE)
    def mle_estimate_QR(self, y: np.ndarray, X: Optional[np.ndarray] = None,
                        method: str = 'L-BFGS-B', verbose: bool = True) -> Dict:
        """
        Estimate parameters theta = [log(R), log(Q_diag_1), ..., log(Q_diag_m)] by
        maximizing the Kalman-filter log-likelihood (equivalently minimizing negative log-likelihood).

        We optimize in log-space to enforce positivity.
        """
        m = self.m
        # initial guess from current R and Q_diag
        x0 = np.hstack([np.log(max(self.R, 1e-8)), np.log(np.maximum(self.Q_diag, 1e-9))])

        def obj(x):
            logR = x[0]
            logQ = x[1:]
            self.R = float(np.exp(logR))
            self.Q_diag = np.exp(logQ)
            ll = self.log_likelihood(y, X)
            if np.isnan(ll) or np.isinf(ll):
                return 1e12
            return -ll

        bounds = [(-20, 20)] * (1 + m)
        res = minimize(obj, x0, method=method, bounds=bounds, options={'disp': verbose})

        # assign final
        if res.success:
            self.R = float(np.exp(res.x[0]))
            self.Q_diag = np.exp(res.x[1:])
        else:
            # still assign best known
            self.R = float(np.exp(res.x[0]))
            self.Q_diag = np.exp(res.x[1:])
        return {'success': res.success, 'message': res.message, 'fun': res.fun, 'x': res.x}

    # Forecast function using last filtered state
    def forecast(self, steps: int, a_last: Optional[np.ndarray], P_last: Optional[np.ndarray], X_future: Optional[np.ndarray] = None) -> Tuple[np.ndarray, np.ndarray]:
        preds = np.zeros(steps)
        preds_var = np.zeros(steps)
        a = a_last.copy()
        P = P_last.copy()
        Q = np.diag(self.Q_diag)
        R = float(self.R)
        for h in range(steps):
            a = self.T.dot(a)
            P = self.T.dot(P).dot(self.T.T) + Q
            d = 0.0
            if X_future is not None and self.beta is not None:
                d = float(X_future[h].dot(self.beta))
            preds[h] = float(self.Z.dot(a) + d)
            preds_var[h] = float(self.Z.dot(P).dot(self.Z.T) + R)
        return preds, preds_var




def metrics(y_true: np.ndarray, y_pred: np.ndarray) -> Dict:
    eps = 1e-8
    rmse = np.sqrt(np.mean((y_true - y_pred)**2))
    mae = np.mean(np.abs(y_true - y_pred))
    mape = np.mean(np.abs((y_true - y_pred) / (y_true + eps))) * 100.0
    return {'RMSE': rmse, 'MAE': mae, 'MAPE_pct': mape}


def train_test_split(y: np.ndarray, X: Optional[np.ndarray], test_size: int = 120):
    n = len(y)
    train_n = n - test_size
    return y[:train_n], y[train_n:], (X[:train_n] if X is not None else None), (X[train_n:] if X is not None else None)


def benchmark_pipeline():
    # generate data
    y, X, meta = generate_synthetic_series(n=800)
    y_train, y_test, X_train, X_test = train_test_split(y, X, test_size=120)

    # build model
    model = StateSpaceModel(n_week_harmonics=1, n_year_harmonics=3, k_regressors=X.shape[1])

    # initialize parameters sensibly
    model.R = np.var(y_train) * 0.5
    model.Q_diag = np.ones(model.m) * 0.001

    #  Estimate via EM
    print('Estimating parameters with EM...')
    em_model = StateSpaceModel(n_week_harmonics=1, n_year_harmonics=3, k_regressors=X.shape[1])
    em_model.R = model.R
    em_model.Q_diag = model.Q_diag.copy()
    em_model.beta = np.zeros(X.shape[1])
    em_history = em_model.fit_em(y_train, X_train, n_iter=30, verbose=True)

    # get last filtered state to forecast
    kf_res = em_model.kalman_filter(y_train, X_train)
    a_last = kf_res['a_upd'][-1]
    P_last = kf_res['P_upd'][-1]
    preds_em, var_em = em_model.forecast(len(y_test), a_last, P_last, X_future=X_test)
    metrics_em = metrics(y_test, preds_em)
    print('EM metrics:', metrics_em)

    #  Estimate via MLE optimizer
    print('Estimating parameters with MLE (optimization)...')
    mle_model = StateSpaceModel(n_week_harmonics=1, n_year_harmonics=3, k_regressors=X.shape[1])
    mle_model.R = model.R
    mle_model.Q_diag = model.Q_diag.copy()
    mle_model.beta = np.zeros(X.shape[1])
    mle_res = mle_model.mle_estimate_QR(y_train, X_train, method='L-BFGS-B', verbose=False)
    print('MLE optimization success:', mle_res['success'], mle_res.get('message',''))
    kf_res2 = mle_model.kalman_filter(y_train, X_train)
    a_last2 = kf_res2['a_upd'][-1]
    P_last2 = kf_res2['P_upd'][-1]
    preds_mle, var_mle = mle_model.forecast(len(y_test), a_last2, P_last2, X_future=X_test)
    metrics_mle = metrics(y_test, preds_mle)
    print('MLE metrics:', metrics_mle)

    #  SARIMAX baseline
    sar_metrics = None
    if SARIMAX is not None:
        print('Fitting SARIMAX baseline...')
        try:

            sar = SARIMAX(y_train, exog=X_train, order=(1,1,0), seasonal_order=(1,0,0,7), enforce_stationarity=False, enforce_invertibility=False)
            sar_res = sar.fit(disp=False)
            sar_pred = sar_res.get_forecast(steps=len(y_test), exog=X_test).predicted_mean
            sar_metrics = metrics(y_test, sar_pred)
            print('SARIMAX metrics:', sar_metrics)
        except Exception as e:
            print('SARIMAX failed:', e)

    # Collate results
    results = {
        'meta': meta,
        'em': {'model': em_model, 'metrics': metrics_em, 'history': em_history},
        'mle': {'model': mle_model, 'metrics': metrics_mle, 'opt_result': mle_res},
        'sarimax': {'metrics': sar_metrics}
    }

    # Textual analysis (brief â€” printed and returned)
    analysis = []
    analysis.append('Analysis of models:')
    # Complexity
    analysis.append('- Complexity: The custom SSM (EM or MLE) explicitly models level, slope, and harmonics via compact state-vector. Parameter count scales with number of harmonics and Q diagonal size. SARIMAX has fewer explicit components but often requires manual seasonal order selection and differencing.')
    # Interpretability
    analysis.append('- Interpretability: SSM provides interpretable state components (trend, seasonal harmonics, regressor effects). EM yields smoothed state trajectories useful for diagnostics. SARIMAX coefficients are less directly decomposed into smooth trend + harmonic seasonality.')
    # Performance
    analysis.append(f"- Forecast performance (on this synthetic data): EM RMSE={metrics_em['RMSE']:.3f}, MLE RMSE={metrics_mle['RMSE']:.3f}, SARIMAX RMSE={(sar_metrics['RMSE'] if sar_metrics is not None else float('nan')):.3f}.")
    analysis.append('- MLE directly optimizes the likelihood and can give slightly better fit than EM in some cases, but is more sensitive to initialization and may be slower for large state dims. EM is simple and stable but can converge slowly.')

    print(''.join(analysis))

    return results


if __name__ == '__main__':
    results = benchmark_pipeline()




Estimating parameters with EM...
EM iter 1: R=46.3273, Q_mean=7.5436e+04, rel=1.95e+08


  v_t = y[t] - float(self.Z.dot(a_pr) + d_t)
  F_t = float(self.Z.dot(P_pr).dot(self.Z.T) + R)
  mean_y = float(self.Z.dot(a_s[t]) + (X[t].dot(self.beta) if (X is not None and self.beta is not None) else 0.0))
  resid += (y[t] - mean_y)**2 + float(self.Z.dot(P_s[t]).dot(self.Z.T))
  y_tilde[t] = y[t] - float(self.Z.dot(a_s[t]))


EM iter 2: R=46.3236, Q_mean=1.9243e+07, rel=3.51e+02
EM iter 3: R=46.3235, Q_mean=3.3637e+09, rel=1.85e+02
EM iter 4: R=46.3235, Q_mean=5.2492e+11, rel=1.64e+02
EM iter 5: R=46.3204, Q_mean=7.8208e+13, rel=1.60e+02
EM iter 6: R=46.7117, Q_mean=1.1389e+16, rel=1.55e+02
EM iter 7: R=62.6441, Q_mean=1.6383e+18, rel=1.51e+02
EM iter 8: R=-20.0475, Q_mean=2.3403e+20, rel=1.48e+02
EM iter 9: R=-59608.2059, Q_mean=3.3294e+22, rel=2.97e+03
EM iter 10: R=-39395616.0000, Q_mean=4.7242e+24, rel=6.60e+02
EM iter 11: R=11341371319.7176, Q_mean=6.6922e+26, rel=2.89e+02
EM iter 12: R=864706465165.5530, Q_mean=9.4699e+28, rel=1.42e+02
EM iter 13: R=-270378664213564.2500, Q_mean=1.3391e+31, rel=3.14e+02
EM iter 14: R=-10107438500505962.0000, Q_mean=1.8926e+33, rel=1.41e+02
EM iter 15: R=-2744012440760403456.0000, Q_mean=2.6740e+35, rel=2.70e+02
EM iter 16: R=408983810077586620416.0000, Q_mean=3.7772e+37, rel=1.50e+02
EM iter 17: R=104349389283608116592640.0000, Q_mean=5.3348e+39, rel=2.54e+02
EM iter 

  preds[h] = float(self.Z.dot(a) + d)
  preds_var[h] = float(self.Z.dot(P).dot(self.Z.T) + R)
  res = minimize(obj, x0, method=method, bounds=bounds, options={'disp': verbose})


MLE optimization success: True CONVERGENCE: RELATIVE REDUCTION OF F <= FACTR*EPSMCH
MLE metrics: {'RMSE': np.float64(5.139841346826129), 'MAE': np.float64(4.235163374973573), 'MAPE_pct': np.float64(8.6623656897369)}
Fitting SARIMAX baseline...
SARIMAX metrics: {'RMSE': np.float64(45.512737165509265), 'MAE': np.float64(37.73761211573125), 'MAPE_pct': np.float64(51.01889880119007)}
Analysis of models:- Complexity: The custom SSM (EM or MLE) explicitly models level, slope, and harmonics via compact state-vector. Parameter count scales with number of harmonics and Q diagonal size. SARIMAX has fewer explicit components but often requires manual seasonal order selection and differencing.- Interpretability: SSM provides interpretable state components (trend, seasonal harmonics, regressor effects). EM yields smoothed state trajectories useful for diagnostics. SARIMAX coefficients are less directly decomposed into smooth trend + harmonic seasonality.- Forecast performance (on this synthetic dat