In [None]:
#default_exp models

In [None]:
#hide
from nbdev import *
%load_ext autoreload
%autoreload 2

# Models

> Uniserie models implementations.

In [None]:
#hide
import warnings
warnings.filterwarnings('ignore', category=FutureWarning)
from nbdev.showdoc import add_docs, show_doc

In [None]:
#export
from itertools import count
from numbers import Number
from typing import Collection, List, Optional, Sequence, Tuple

import numpy as np
import pandas as pd
from numba import njit
from scipy.optimize import minimize

from statsforecast.arima import auto_arima_f, forecast_arima, fitted_arima
from statsforecast.ets import ets_f, forecast_ets

In [None]:
#hide
from statsforecast.utils import AirPassengers as ap

In [None]:
#exporti
class _TS:
    
    def new(self):
        b = type(self).__new__(type(self))
        b.__dict__.update(self.__dict__)
        return b

## ARIMA methods 

In [None]:
#export
class AutoARIMA(_TS):
    
    def __init__(
            self, 
            season_length: int = 1, # Number of observations per cycle
            approximation: bool = False,
        ):
        self.season_length = season_length
        self.approximation = approximation
        
    def __repr__(self):
        return f'AutoARIMA()'
    
    def fit(
            self, 
            y: np.ndarray, # time series
            X: Optional[np.ndarray] = None,
        ):
        with np.errstate(invalid='ignore'):
            self.model_ = auto_arima_f(
                y, 
                xreg=X,
                period=self.season_length, 
                approximation=self.approximation,
                allowmean=False, allowdrift=False #not implemented yet
            )
        return self
    
    def predict(
            self, 
            h: int, # Forecast horizon
            X: np.ndarray = None, 
            level: Optional[Tuple[int]] = None,
        ):
        fcst = forecast_arima(self.model_, h=h, xreg=X, level=level)
        if level is None:
            return fcst['mean']
        out = [
            fcst['mean'],
            *[fcst['lower'][f'{l}%'] for l in level],
            *[fcst['upper'][f'{l}%'] for l in level],
        ]
        return np.vstack(out).T
    
    def predict_in_sample(self):
        return fitted_arima(self.model_)
    
    def forecast(
            self, 
            y: np.ndarray, 
            h: int, 
            X: np.ndarray = None, 
            X_future: np.ndarray = None,
            level: Optional[Tuple[int]] = None, # level
            fitted: bool = False, # return fitted values?
        ):
        with np.errstate(invalid='ignore'):
            mod = auto_arima_f(
                y,
                xreg=X,
                period=self.season_length,
                approximation=self.approximation,
                allowmean=False, allowdrift=False #not implemented yet
            )
        fcst = forecast_arima(mod, h, xreg=X_future, level=level)
        mean = fcst['mean']
        if fitted:
            return {'mean': mean, 'fitted': fitted_arima(mod)}
        if level is None:
            return {'mean': mean}
        return {
            'mean': mean,
            **{f'lo-{l}': fcst['lower'][f'{l}%'] for l in reversed(level)},
            **{f'hi-{l}': fcst['upper'][f'{l}%'] for l in level},
        }

In [None]:
#hide
from statsforecast.utils import AirPassengers as ap

def test_class(cls_, x, h, skip_insample=False):
    cls_ = cls_.fit(x)
    fcst_cls = cls_.predict(h=h)
    test_eq(len(fcst_cls), h)
    test_eq(len(arima.predict_in_sample()), len(x))
    # test fit + predict equals forecast
    test_eq(
        cls_.forecast(y=x, h=h)['mean'],
        fcst_cls
    )
    if not skip_insample:
        assert isinstance(cls_.predict_in_sample(), np.ndarray)
        np.testing.assert_array_equal(
            cls_.forecast(y=x, h=h, fitted=True)['fitted'],
            cls_.predict_in_sample(), 
        )

In [None]:
#hide
arima = AutoARIMA(season_length=12)
test_class(arima, x=ap, h=12)

In [None]:
#hide
add_docs(
    AutoARIMA, 'Classical AutoARIMA model',
    forecast='Fit and predict without storing objects',
    fit='Fits the model and saves it',
    predict='Predict using the fitted model',
    predict_in_sample='Return fitted values',
)

In [None]:
show_doc(AutoARIMA, default_cls_level=3)

The `AutoARIMA` class instantiates the model.

In [None]:
arima = AutoARIMA(season_length=12)

In [None]:
show_doc(AutoARIMA.forecast)

Once `AutoARIMA` is instantiated, you can make forecasts directly using the `AutoARIMA.forecast` method. The purpose of this method is to avoid computational burden due to object storage. This method can be thought of as a `fit_predict` without storing information. This method assumes you know the forecast horizon in advance. Observe that the method a returns a `dict` object instead of a numpy array.

In [None]:
from statsforecast.utils import AirPassengers as ap
arima.forecast(y=ap, h=12)

In [None]:
show_doc(AutoARIMA.fit)

If you want to store the fitted model for future analysis, you can use the `AutoARIMA.fit` method. 

In [None]:
arima = arima.fit(y=ap)

In [None]:
show_doc(AutoARIMA.predict)

You can compute forecasts for the fitted model using `AutoARIMA.predict`.

In [None]:
arima.predict(h=12)

In [None]:
show_doc(AutoARIMA.predict_in_sample)

To obtain the fitted values, you can use `AutoARIMA.predict_in_sample`.

In [None]:
arima.predict_in_sample()

### Model Description

**Auto ARIMA**: Automatically selects the best ARIMA (AutoRegressive Integrated Moving Average) model using an information criterion. Default is the corrected Akaike Information Criterion (AICc). Information criterions are tests used to check how well a model fits the data it is trying to describe. 

The ARIMA models are based in the autocorrelations in the data and the autocorrelations of the forecast errors. Every model has three components: AR, I, and MA. 

An AR(p) model captures the autocorrelations in the data at lags $1,2,\dots p$. 

$$y_t = \beta_0+\beta_1y_{t-1}+\beta_2y_{t-2}+\dots+\beta_py{t_p}+\epsilon_t$$

If the autocorrelations of the forecast errors are added up to lag $q$, an ARMA(p,q) model is obtained. 

$$y_t = \beta_0+\beta_1y_{t-1}+\beta_2y_{t-2}+\dots+\beta_py{t_p} + \epsilon_t+\theta_1 \epsilon_{t-1}+\theta_2\epsilon_{t-2}+\dots\theta_q\epsilon_{t-q}$$

The last component of an ARIMA model is the integrated (I) part, which is a differencing operation. The order of differencing, denoted by $d$, indicates how many rounds of lag-1 differencing are performed. 

ARIMA models requiere the user to select $p$, $q$, and $d$. The values for $\beta_i, i=1,2,\dots,p$ and $\theta_j, j=1,2,\dots,q$ are then estimated. Auto ARIMA automatically makes this selection, searching over a range of possible values for $p$, $q$, and $d$, and then choosing the best model using an information criterion. 

## Exponential smoothing methods 

In [None]:
#export
class ETS(_TS):
    
    def __init__(self, season_length: int = 1, model: str = 'ZZZ'):
        self.season_length = season_length
        self.model = model
    
    def __repr__(self):
        return f'ETS(sl={self.season_length},model={self.model})'
    
    def fit(self, y: np.ndarray, X: np.ndarray = None):
        self.model_ = ets_f(y, m=self.season_length, model=self.model)
        return self
    
    def predict(self, h: int, X: np.ndarray = None):
        return forecast_ets(self.model_, h=h)['mean']
    
    def predict_in_sample(self):
        return self.model_['fitted']
    
    def forecast(
        self, 
        y: np.ndarray, 
        h: int, 
        X: np.ndarray = None, 
        X_future: np.ndarray = None,
        fitted: bool = False, # return fitted values?
    ):
        mod = ets_f(y, m=self.season_length, model=self.model)
        fcst = forecast_ets(mod, h)
        keys = ['mean']
        if fitted:
            keys.append('fitted')
        return {key: fcst[key] for key in keys}

In [None]:
#hide
ets = ETS(season_length=12)
test_class(ets, x=ap, h=12)

**Error, Trend, and Seasonality**: Statistical models for exponential smoothing. These models are stochastic data generating processes than can produce complete forecast distributions. Each model consists of a set of equations to describe the observed data and the unobserved components or states, which are level, trend, and seasonal. Errors can be either additive or multiplicative. The notation ETS(Z,Z,Z) is used to describe the ETS model being used, where Z can take one of the following values. 

Notation| 
----- |
N = None |     
A = Additive |   
Ad = Additive (damped) | 
M = Multiplicative | 

The possibilities for each state are shown below. 

State | Possible values 
----- | -----
Error | A, M
Trend | N, A, Ad
Seasonal | N, A, M

In [None]:
#export
@njit
def _ses_fcst_mse(x: np.ndarray, alpha: float) -> Tuple[float, float]:
    """Perform simple exponential smoothing on a series.

    This function returns the one step ahead prediction
    as well as the mean squared error of the fit.
    """
    smoothed = x[0]
    n = x.size
    mse = 0.
    fitted = np.full(n, np.nan, np.float32)

    for i in range(1, n):
        smoothed = (alpha * x[i - 1] + (1 - alpha) * smoothed).item()
        error = x[i] - smoothed
        mse += error * error
        fitted[i] = smoothed

    mse /= n
    forecast = alpha * x[-1] + (1 - alpha) * smoothed
    return forecast, mse, fitted


def _ses_mse(alpha: float, x: np.ndarray) -> float:
    """Compute the mean squared error of a simple exponential smoothing fit."""
    _, mse, _ = _ses_fcst_mse(x, alpha)
    return mse


@njit
def _ses_forecast(x: np.ndarray, alpha: float) -> float:
    """One step ahead forecast with simple exponential smoothing."""
    forecast, _, fitted = _ses_fcst_mse(x, alpha)
    return forecast, fitted


@njit
def _demand(x: np.ndarray) -> np.ndarray:
    """Extract the positive elements of a vector."""
    return x[x > 0]


@njit
def _intervals(x: np.ndarray) -> np.ndarray:
    """Compute the intervals between non zero elements of a vector."""
    y = []

    ctr = 1
    for val in x:
        if val == 0:
            ctr += 1
        else:
            y.append(ctr)
            ctr = 1

    y = np.array(y)
    return y


@njit
def _probability(x: np.ndarray) -> np.ndarray:
    """Compute the element probabilities of being non zero."""
    return (x != 0).astype(np.int32)


def _optimized_ses_forecast(
        x: np.ndarray,
        bounds: Sequence[Tuple[float, float]] = [(0.1, 0.3)]
    ) -> float:
    """Searches for the optimal alpha and computes SES one step forecast."""
    alpha = minimize(
        fun=_ses_mse,
        x0=(0,),
        args=(x,),
        bounds=bounds,
        method='L-BFGS-B'
    ).x[0]
    forecast, fitted = _ses_forecast(x, alpha)
    return forecast, fitted


@njit
def _chunk_sums(array: np.ndarray, chunk_size: int) -> np.ndarray:
    """Splits an array into chunks and returns the sum of each chunk."""
    n = array.size
    n_chunks = n // chunk_size
    sums = np.empty(n_chunks)
    for i, start in enumerate(range(0, n, chunk_size)):
        sums[i] = array[start : start + chunk_size].sum()
    return sums

@njit
def _repeat_val(val: float, h: int):
    return np.full(h, val, np.float32)

@njit
def _repeat_val_seas(season_vals: np.ndarray, h: int, season_length: int):
    out = np.empty(h, np.float32)
    for i in range(h):
        out[i] = season_vals[i % season_length]
    return out

In [None]:
#exporti
@njit
def _ses(
        y: np.ndarray, # time series
        h: int, # forecasting horizon
        fitted: bool, # fitted values
        alpha: float, # smoothing parameter
    ): 
    fcst, _, fitted_vals = _ses_fcst_mse(y, alpha)
    mean = _repeat_val(val=fcst, h=h)
    fcst = {'mean': mean}
    if fitted:
        fcst['fitted'] = fitted_vals
    return fcst

In [None]:
#export
class SimpleExponentialSmoothing(_TS):
    
    def __init__(self, alpha: float):
        self.alpha = alpha
        
    def __repr__(self):
        return f'SES(alpha={self.alpha})'
    
    def fit(self, y: np.ndarray, X: np.ndarray = None):
        mod = _ses(y=y, alpha=self.alpha, h=1, fitted=True)
        self.model_ = dict(mod)
        return self
        
    def predict(self, h: int, X: np.ndarray = None):
        return _repeat_val(val=self.model_['mean'][0], h=h)
    
    def predict_in_sample(self):
        return self.model_['fitted']
    
    def forecast(
        self, 
        y: np.ndarray, 
        h: int, 
        X: np.ndarray = None, 
        X_future: np.ndarray = None,
        fitted: bool = False, # return fitted values?
    ):
        out = _ses(y=y, h=h, fitted=fitted, alpha=self.alpha)
        return out

In [None]:
#hide
ses = SimpleExponentialSmoothing(alpha=0.1)
test_class(ses, x=ap, h=12)
#more tests
ses = ses.fit(ap)
fcst_ses = ses.predict(12)
test_close(fcst_ses, np.repeat(460.3028, 12), eps=1e-4)
#to recover these residuals from R
#you have to pass initial="simple"
#in the `ses` function
np.testing.assert_allclose(
    ses.predict_in_sample()[[0, 1, -1]], 
    np.array([np.nan, 118 - 6., 432 + 31.447525])
)

**Simple (or single) exponential smoothing**: Uses a weighted average of all past observations where the weights decrease exponentially into the past. Suitable for data with no clear trend or seasonality. Assuming there are $t$ observations, the one-step forecast is given by

$$\hat{y}_{t+1}= \alpha y_{t} + \alpha(1-\alpha)y_{t-1} + \alpha(1-\alpha)^2 y_{t-2} \dots$$

which can also be written as 

$$\hat{y}_{t+1} = \alpha y_t + \alpha(1-\alpha) \hat{y}_{t-1}$$

The rate $0 \leq \alpha \leq 1$ at which the weights decrease is called the smoothing parameter. When $\alpha = 1$, SES is equal to the naive method. 

In [None]:
#exporti
def _ses_optimized(
        y: np.ndarray, # time series
        h: int, # forecasting horizon
        fitted: bool, # fitted values
    ):
    fcst, fitted_vals = _optimized_ses_forecast(y, [(0.01, 0.99)])
    mean = _repeat_val(val=fcst, h=h)
    fcst = {'mean': mean}
    if fitted:
        fcst['fitted'] = fitted_vals
    return fcst

In [None]:
#export
class SimpleExponentialSmoothingOptimized(_TS):
    
    def __init__(self):
        pass
    
    def __repr__(self):
        return f'SESOpt()'
    
    def fit(self, y: np.ndarray, X: np.ndarray = None):
        mod = _ses_optimized(y=y, h=1, fitted=True)
        self.model_ = dict(mod)
        return self
        
    def predict(self, h: int, X: np.ndarray = None):
        return _repeat_val(val=self.model_['mean'][0], h=h)
    
    def predict_in_sample(self):
        return self.model_['fitted']
    
    def forecast(
        self, 
        y: np.ndarray, 
        h: int, 
        X: np.ndarray = None, 
        X_future: np.ndarray = None,
        fitted: bool = False, # return fitted values?
    ):
        out = _ses_optimized(y=y, h=h, fitted=fitted)
        return out

In [None]:
#hide
ses_op = SimpleExponentialSmoothingOptimized()
test_class(ses_op, x=ap, h=12)

**Simple exponential smoothing optimized**: A version of SES where the smoothing parameter $\alpha$ is chosen automatically by minimizing the mean squared error of the fit. 

In [None]:
#exporti
@njit
def _seasonal_exponential_smoothing(
        y: np.ndarray, # time series
        h: int, # forecasting horizon
        fitted: bool, # fitted values
        season_length: int, # length of season
        alpha: float, # smoothing parameter
    ): 
    if y.size < season_length:
        return {'mean': np.full(h, np.nan, np.float32)}
    season_vals = np.empty(season_length, np.float32)
    fitted_vals = np.full(y.size, np.nan, np.float32)
    for i in range(season_length):
        season_vals[i], fitted_vals[i::season_length] = _ses_forecast(y[i::season_length], alpha)
    out = _repeat_val_seas(season_vals=season_vals, h=h, season_length=season_length)
    fcst = {'mean': out}
    if fitted:
        fcst['fitted'] = fitted_vals
    return fcst

In [None]:
#export
class SeasonalExponentialSmoothing(_TS):
    
    def __init__(self, season_length: int, alpha: float):
        self.season_length = season_length
        self.alpha = alpha
    
    def __repr__(self):
        return f'SeasonalES(sl={self.season_length},alpha={self.alpha})'
    
    def fit(self, y: np.ndarray, X: np.ndarray = None):
        mod = _seasonal_exponential_smoothing(
            y=y, 
            season_length=self.season_length, 
            alpha=self.alpha,
            fitted=True,
            h=self.season_length,
        )
        self.model_ = dict(mod)
        return self
        
    def predict(self, h: int, X: np.ndarray = None):
        return _repeat_val_seas(self.model_['mean'], season_length=self.season_length, h=h)
    
    def predict_in_sample(self):
        return self.model_['fitted']
    
    def forecast(
        self, 
        y: np.ndarray, 
        h: int, 
        X: np.ndarray = None, 
        X_future: np.ndarray = None,
        fitted: bool = False, # return fitted values?
    ):
        out = _seasonal_exponential_smoothing(
            y=y, h=h, fitted=fitted, 
            alpha=self.alpha,
            season_length=self.season_length
        )
        return out

In [None]:
#hide
seas_es = SeasonalExponentialSmoothing(season_length=12, alpha=1.)
test_class(seas_es, x=ap, h=12)
test_eq(seas_es.predict_in_sample()[-3:],  np.array([461 - 54., 390 - 28., 432 - 27.]))

**Seasonal exponential smoothing**: A seasonal version of exponential smoothing. 

In [None]:
#exporti
def _seasonal_ses_optimized(
        y: np.ndarray, # time series
        h: int, # forecasting horizon
        fitted: bool , # fitted values
        season_length: int, # season length
    ): 
    if y.size < season_length:
        return {'mean': np.full(h, np.nan, np.float32)}
    season_vals = np.empty(season_length, np.float32)
    fitted_vals = np.full(y.size, np.nan, np.float32)
    for i in range(season_length):
        season_vals[i], fitted_vals[i::season_length] = _optimized_ses_forecast(y[i::season_length], [(0.01, 0.99)])
    out = _repeat_val_seas(season_vals=season_vals, h=h, season_length=season_length)
    fcst = {'mean': out}
    if fitted:
        fcst['fitted'] = fitted_vals
    return fcst

In [None]:
#export
class SeasonalExponentialSmoothingOptimized(_TS):
    
    def __init__(self, season_length: int):
        self.season_length = season_length
    
    def __repr__(self):
        return f'SeasESOpt(sl={self.season_length})'
    
    def fit(self, y: np.ndarray, X: np.ndarray = None):
        mod = _seasonal_ses_optimized(
            y=y, 
            season_length=self.season_length, 
            fitted=True,
            h=self.season_length,
        )
        self.model_ = dict(mod)
        return self
        
    def predict(self, h: int, X: np.ndarray = None):
        return _repeat_val_seas(self.model_['mean'], season_length=self.season_length, h=h)
    
    def predict_in_sample(self):
        return self.model_['fitted']
    
    def forecast(
        self, 
        y: np.ndarray, 
        h: int, 
        X: np.ndarray = None, 
        X_future: np.ndarray = None,
        fitted: bool = False, # return fitted values?
    ):
        out = _seasonal_ses_optimized(
            y=y, h=h, fitted=fitted, 
            season_length=self.season_length
        )
        return out

In [None]:
#hide
seas_es_opt = SeasonalExponentialSmoothingOptimized(season_length=12)
test_class(seas_es_opt, x=ap, h=12)

**Seasonal SES optimized**: A seasonal version of SES optimized 

## Simple methods

In [None]:
#exporti
@njit
def _historic_average(
        y: np.ndarray, # time series
        h: int, # forecasting horizon
        fitted: bool, # fitted values
    ):
    mean = _repeat_val(val=y.mean(), h=h)
    fcst = {'mean': mean}
    if fitted:
        fitted_vals = np.full(y.size, np.nan, np.float32)
        fitted_vals[1:] = y.cumsum()[:-1] / np.arange(1, y.size)
        fcst['fitted'] = fitted_vals
    return fcst

In [None]:
#export
class HistoricAverage(_TS):
    
    def __init__(self):
        pass
    
    def __repr__(self):
        return f'HistoricAverage()'
    
    def fit(self, y: np.ndarray, X: np.ndarray = None):
        mod = _historic_average(y, h=1, fitted=True)
        self.model_ = dict(mod)
        return self
        
    def predict(self, h: int, X: np.ndarray = None):
        return _repeat_val(val=self.model_['mean'][0], h=h)
    
    def predict_in_sample(self):
        return self.model_['fitted']
    
    def forecast(
        self, 
        y: np.ndarray, 
        h: int, 
        X: np.ndarray = None, 
        X_future: np.ndarray = None,
        fitted: bool = False, # return fitted values?
    ):
        out = _historic_average(y=y, h=h, fitted=fitted)
        return out

In [None]:
#hide
ha = HistoricAverage()
test_class(ha, x=ap, h=12)
#more tests
ha.fit(ap)
fcst_ha = ha.predict(12)
test_close(fcst_ha, np.repeat(ap.mean(), 12), eps=1e-5)
np.testing.assert_almost_equal(
    ha.predict_in_sample()[:4],
    np.array([np.nan, 112., 115., 120.6666667]), 
    decimal=5
)

**Historic average:** Also known as mean method. Uses a simple average of all past observations. Assuming there are $t$ observations, the one-step forecast is given by 

$$ \hat{y}_{t+1} = \frac{1}{t} \sum_{j=1}^t y_j $$


In [None]:
#exporti
@njit
def _naive(
        y: np.ndarray, # time series
        h: int, # forecasting horizon
        fitted: bool, # fitted values
    ): 
    mean = _repeat_val(val=y[-1], h=h)
    if fitted:
        fitted_vals = np.full(y.size, np.nan, np.float32)
        fitted_vals[1:] = np.roll(y, 1)[1:]
        return {'mean': mean, 'fitted': fitted_vals}
    return {'mean': mean}

In [None]:
#export
class Naive(_TS):
    
    def __init__(self):
        pass
    
    def __repr__(self):
        return f'Naive()'
    
    def fit(self, y: np.ndarray, X: np.ndarray = None):
        mod = _naive(y, h=1, fitted=True)
        self.model_ = dict(mod)
        return self
        
    def predict(self, h: int, X: np.ndarray = None):
        return _repeat_val(self.model_['mean'][0], h=h)
    
    def predict_in_sample(self):
        return self.model_['fitted']
    
    def forecast(
        self, 
        y: np.ndarray, 
        h: int, 
        X: np.ndarray = None, 
        X_future: np.ndarray = None,
        fitted: bool = False, # return fitted values?
    ):
        out = _naive(y=y, h=h, fitted=fitted)
        return out

In [None]:
#hide
naive = Naive()
test_class(naive, x=ap, h=12)
naive.fit(ap)
fcst_naive = naive.predict(12)
test_close(fcst_naive, np.repeat(ap[-1], 12), eps=1e-5)

**Naive**: A random walk model, defined as 

$$ \hat{y}_{t+1} = y_t $$

In [None]:
#exporti
@njit
def _random_walk_with_drift(
        y: np.ndarray, # time series
        h: int, # forecasting horizon
        fitted: bool, # fitted values
    ): 
    slope = (y[-1] - y[0]) / (y.size - 1)
    mean = slope * (1 + np.arange(h)) + y[-1]
    fcst = {'mean': mean.astype(np.float32), 
            'slope': np.array([slope], dtype=np.float32), 
            'last_y': np.array([y[-1]], dtype=np.float32)}
    if fitted:
        fitted_vals = np.full(y.size, np.nan, dtype=np.float32)
        fitted_vals[1:] = (slope + y[:-1]).astype(np.float32)
        fcst['fitted'] = fitted_vals
    return fcst

In [None]:
#export
class RandomWalkWithDrift(_TS):
    
    def __init__(self):
        pass
    
    def __repr__(self):
        return f'RWD()'
    
    def fit(self, y: np.ndarray, X: np.ndarray = None):
        mod = _random_walk_with_drift(y, h=1, fitted=True)
        self.model_ = dict(mod)
        return self
        
    def predict(self, h: int, X: np.ndarray = None):
        hrange = np.arange(h, dtype=np.float32)
        return self.model_['slope'] * (1 + hrange) + self.model_['last_y']
    
    def predict_in_sample(self):
        return self.model_['fitted']

    def forecast(
        self, 
        y: np.ndarray, 
        h: int, 
        X: np.ndarray = None, 
        X_future: np.ndarray = None,
        fitted: bool = False, # return fitted values?
    ):
        out = _random_walk_with_drift(y=y, h=h, fitted=fitted)
        return out

In [None]:
#hide
rwd = RandomWalkWithDrift()
test_class(rwd, x=ap, h=12)
rwd = rwd.fit(ap)
fcst_rwd = rwd.predict(12)
test_close(fcst_rwd[:2], np.array([434.2378, 436.4755]), eps=1e-4)
np.testing.assert_almost_equal(
    rwd.predict_in_sample()[:3], 
    np.array([np.nan, 118 - 3.7622378, 132 - 11.7622378]),
    decimal=6
)

**Random walk with drift**: A variation of the naive method allows the forecasts to change over time. The amout of change, called drift, is the average change seen in the historical data. 

$$ \hat{y}_{t+1} = y_t+\frac{1}{t-1}\sum_{j=1}^t (y_j-y_{j-1}) = y_t+ \frac{y_t-y_1}{t-1} $$

From the previous equation, we can see that this is equivalent to extrapolating a line between the first and the last observation. 

In [None]:
#exporti
@njit
def _seasonal_naive(
        y: np.ndarray, # time series
        h: int, # forecasting horizon
        fitted: bool, #fitted values
        season_length: int, # season length
    ): 
    if y.size < season_length:
        return {'mean': np.full(h, np.nan, np.float32)}
    season_vals = np.empty(season_length, np.float32)
    fitted_vals = np.full(y.size, np.nan, np.float32)
    for i in range(season_length):
        s_naive = _naive(y[i::season_length], h=1, fitted=fitted)
        season_vals[i] = s_naive['mean'].item()
        if fitted:
            fitted_vals[i::season_length] = s_naive['fitted']
    out = _repeat_val_seas(season_vals=season_vals, h=h, season_length=season_length)
    fcst = {'mean': out}
    if fitted:
        fcst['fitted'] = fitted_vals
    return fcst

In [None]:
#export
class SeasonalNaive(_TS):
    
    def __init__(self, season_length: int):
        self.season_length = season_length

    def __repr__(self):
        return f'SeasonalNaive(sl={self.season_length})'
    
    def fit(self, y: np.ndarray, X: np.ndarray = None):
        mod = _seasonal_naive(
            y=y, 
            season_length=self.season_length, 
            h=self.season_length, 
            fitted=True,
        )
        self.model_ = dict(mod)
        return self
        
    def predict(self, h: int, X: np.ndarray = None):
        return _repeat_val_seas(season_vals=self.model_['mean'], season_length=self.season_length, h=h)
    
    def predict_in_sample(self):
        return self.model_['fitted']
    
    def forecast(
        self, 
        y: np.ndarray, 
        h: int, 
        X: np.ndarray = None, 
        X_future: np.ndarray = None,
        fitted: bool = False, # return fitted values?
    ):
        out = _seasonal_naive(
            y=y, h=h, fitted=fitted, 
            season_length=self.season_length
        )
        return out

In [None]:
#hide
seas_naive = SeasonalNaive(season_length=12)
test_class(seas_naive, x=ap, h=12)
seas_naive = seas_naive.fit(ap)
fcst_seas_naive = seas_naive.predict(12)
test_eq(seas_naive.predict_in_sample()[-3:], np.array([461 - 54., 390 - 28., 432 - 27.]))

**Seasonal naive**: Similar to the naive method, but uses the last known observation of the same period (e.g. the same month of the previous year) in order to capture seasonal variations. 

In [None]:
#exporti
@njit
def _window_average(
        y: np.ndarray, # time series
        h: int, # forecasting horizon
        fitted: bool, # fitted values
        window_size: int, # window size
    ): 
    if fitted:
        raise NotImplementedError('return fitted')
    if y.size < window_size:
        return {'mean': np.full(h, np.nan, np.float32)}
    wavg = y[-window_size:].mean()
    mean = _repeat_val(val=wavg, h=h)
    return {'mean': mean}

In [None]:
#export
class WindowAverage(_TS):
    
    def __init__(self, window_size: int):
        self.window_size = window_size

    def __repr__(self):
        return f'WindowAverage(ws={self.window_size})'
    
    def fit(self, y: np.ndarray, X: np.ndarray = None):
        mod = _window_average(y=y, h=1, window_size=self.window_size, fitted=False)
        self.model_ = dict(mod)
        return self
        
    def predict(self, h: int, X: np.ndarray = None):
        return _repeat_val(self.model_['mean'][0], h=h)
    
    def predict_in_sample(self):
        raise NotImplementedError
        
    def forecast(
        self, 
        y: np.ndarray, 
        h: int, 
        X: np.ndarray = None, 
        X_future: np.ndarray = None,
        fitted: bool = False, # return fitted values?
    ):
        out = _window_average(y=y, h=h, fitted=fitted, window_size=self.window_size)
        return out

In [None]:
#hide
w_avg = WindowAverage(window_size=24)
test_class(w_avg, x=ap, h=12, skip_insample=True)
w_avg = w_avg.fit(ap)
fcst_w_avg = w_avg.predict(12)
test_close(fcst_w_avg, np.repeat(ap[-24:].mean(), 12))

**Window average**: Uses the average of the last $k$ observations, with $k$ the length of the window. Wider windows will capture global trends, while narrow windows will reveal local trends. The length of the window selected should take into account the importance of past observations and how fast the series changes. 

In [None]:
#exporti
@njit
def _seasonal_window_average(
        y: np.ndarray,
        h: int,
        fitted: bool,
        season_length: int,
        window_size: int,
    ):
    if fitted:
        raise NotImplementedError('return fitted')
    min_samples = season_length * window_size
    if y.size < min_samples:
        return {'mean': np.full(h, np.nan, np.float32)}
    season_avgs = np.zeros(season_length, np.float32)
    for i, value in enumerate(y[-min_samples:]):
        season = i % season_length
        season_avgs[season] += value / window_size
    out = _repeat_val_seas(season_vals=season_avgs, h=h, season_length=season_length)
    return {'mean': out}

In [None]:
#export
class SeasonalWindowAverage(_TS):
    
    def __init__(self, season_length: int, window_size: int):
        self.season_length = season_length
        self.window_size = window_size

    def __repr__(self):
        return f'SeasWA(sl={self.season_length},ws={self.window_size})'
    
    def fit(self, y: np.ndarray, X: np.ndarray = None):
        mod = _seasonal_window_average(
            y=y, 
            h=self.season_length,
            fitted=False,
            season_length=self.season_length, 
            window_size=self.window_size,
        )
        self.model_ = dict(mod)
        return self
        
    def predict(self, h: int, X: np.ndarray = None):
        return _repeat_val_seas(season_vals=self.model_['mean'], season_length=self.season_length, h=h)
    
    def predict_in_sample(self):
        raise NotImplementedError

    def forecast(
        self, 
        y: np.ndarray, 
        h: int, 
        X: np.ndarray = None, 
        X_future: np.ndarray = None,
        fitted: bool = False, # return fitted values?
    ):
        out = _seasonal_window_average(
            y=y, h=h, fitted=fitted, 
            season_length=self.season_length,
            window_size=self.window_size
        )
        return out

In [None]:
#hide
seas_w_avg = SeasonalWindowAverage(season_length=12, window_size=1)
#test_class(seas_w_avg, x=ap, h=12, skip_insample=True)
seas_w_avg = seas_w_avg.fit(ap)
fcst_seas_w_avg = w_avg.predict(12)
test_eq(fcst_w_avg, fcst_seas_w_avg)

**Seasonal window average**: An average of the last $k$ observations of the same period, with $k$ the length of the window.

## Sparse or intermittent series 

Sparse or intermittent series are series with very few non-zero observations. They are notoriously hard to forecast, and so, different methods have been developed especifically for them. Before the development of Croston's method and its variants, SES was usually used to forecast them. 

In [None]:
#exporti
def _adida(
        y: np.ndarray, # time series
        h: int, # forecasting horizon
        fitted: bool, # fitted values
    ):
    if fitted:
        raise NotImplementedError('return fitted')
    if (y == 0).all():
        return {'mean': np.repeat(np.float32(0), h)}
    y_intervals = _intervals(y)
    mean_interval = y_intervals.mean()
    aggregation_level = round(mean_interval)
    lost_remainder_data = len(y) % aggregation_level
    y_cut = y[lost_remainder_data:]
    aggregation_sums = _chunk_sums(y_cut, aggregation_level)
    sums_forecast, _ = _optimized_ses_forecast(aggregation_sums)
    forecast = sums_forecast / aggregation_level
    mean = _repeat_val(val=forecast, h=h)
    return {'mean': mean}

In [None]:
#export
class ADIDA(_TS):
    
    def __init__(self):
        pass

    def __repr__(self):
        return f'ADIDA()'
    
    def fit(self, y: np.ndarray, X: np.ndarray = None):
        mod = _adida(y=y, h=1, fitted=False)
        self.model_ = dict(mod)
        return self
        
    def predict(self, h: int, X: np.ndarray = None):
        return _repeat_val(val=self.model_['mean'][0], h=h)
    
    def predict_in_sample(self):
        raise NotImplementedError
        
    def forecast(
        self, 
        y: np.ndarray, 
        h: int, 
        X: np.ndarray = None, 
        X_future: np.ndarray = None,
        fitted: bool = False, # return fitted values?
    ):
        out = _adida(y=y, h=h, fitted=fitted)
        return out

In [None]:
#hide
adida = ADIDA()
test_class(adida, x=ap, h=12, skip_insample=True)

**Aggregate-Dissagregate Intermittent Demand Approach**: Uses temporal aggregation to reduce the number of zero observations. Once the data has been agregated, it uses the optimized SES to generate the forecasts at the new level. It then breaks down the forecast to the original level using equal weights.  

In [None]:
#exporti
@njit
def _croston_classic(
        y: np.ndarray, # time series
        h: int, # forecasting horizon
        fitted: bool, # fitted values
    ): 
    if fitted:
        raise NotImplementedError('return fitted')
    yd = _demand(y)
    yi = _intervals(y)
    ydp, _ = _ses_forecast(yd, 0.1)
    yip, _ = _ses_forecast(yi, 0.1)
    mean = ydp / yip
    mean = _repeat_val(val=mean, h=h)
    return {'mean': mean}

In [None]:
#export
class CrostonClassic(_TS):
    
    def __init__(self):
        pass

    def __repr__(self):
        return f'CrostonClassic()'
    
    def fit(self, y: np.ndarray, X: np.ndarray = None):
        mod = _croston_classic(y=y, h=1, fitted=False)
        self.model_ = dict(mod)
        return self
        
    def predict(self, h: int, X: np.ndarray = None):
        return _repeat_val(val=self.model_['mean'][0], h=h)
    
    def predict_in_sample(self):
        raise NotImplementedError
        
    def forecast(
        self, 
        y: np.ndarray, 
        h: int, 
        X: np.ndarray = None, 
        X_future: np.ndarray = None,
        fitted: bool = False, # return fitted values?
    ):
        out = _croston_classic(y=y, h=h, fitted=fitted)
        return out

In [None]:
#hide
croston = CrostonClassic()
test_class(croston, x=ap, h=12, skip_insample=True)

**Croston classic**: A method to forecast time series that exhibit intermittent demand. It decomposes the original time series into a non-zero demand size $z_t$ and inter-demand intervals $p_t$. Then the forecast is given by 

$$ \hat{y}_t = \frac{\hat{z}_t}{\hat{p}_t} $$ 

where $\hat{z}_t$ and $\hat{p}_t$ are forecasted using SES. The smoothing parameter of both components is set equal to 0.1

In [None]:
#exporti
def _croston_optimized(
        y: np.ndarray, # time series
        h: int, # forecasting horizon
        fitted: bool, # fitted values
    ): 
    if fitted:
        raise NotImplementedError('return fitted')
    yd = _demand(y)
    yi = _intervals(y)
    ydp, _ = _optimized_ses_forecast(yd)
    yip, _ = _optimized_ses_forecast(yi)
    mean = ydp / yip
    mean = _repeat_val(val=mean, h=h)
    return {'mean': mean}

In [None]:
#export
class CrostonOptimized(_TS):
    
    def __init__(self):
        pass
    
    def __repr__(self):
        return f'CrostonSBA()'
    
    def fit(self, y: np.ndarray, X: np.ndarray = None):
        mod = _croston_optimized(y=y, h=1, fitted=False)
        self.model_ = dict(mod)
        return self
        
    def predict(self, h: int, X: np.ndarray = None):
        return _repeat_val(val=self.model_['mean'][0], h=h)
    
    def predict_in_sample(self):
        raise NotImplementedError

    def forecast(
        self, 
        y: np.ndarray, 
        h: int, 
        X: np.ndarray = None, 
        X_future: np.ndarray = None,
        fitted: bool = False, # return fitted values?
    ):
        out = _croston_optimized(y=y, h=h, fitted=fitted)
        return out

In [None]:
#hide
croston_op = CrostonOptimized()
test_class(croston_op, x=ap, h=12, skip_insample=True)

**Croston Optimized**: A variation of the classic Croston's method where the smooting paramater is optimally selected from the range $[0.1,0.3]$. Both the non-zero demand $z_t$ and the inter-demand intervals $p_t$ are smoothed separately, so their smoothing parameters can be different. 

In [None]:
#exporti
@njit
def _croston_sba(
        y: np.ndarray, # time series
        h: int, # forecasting horizon
        fitted: bool,  # fitted values
    ):
    if fitted:
        raise NotImplementedError('return fitted')
    mean = _croston_classic(y, h, fitted)
    mean['mean'] *= 0.95
    return mean

In [None]:
#export
class CrostonSBA(_TS):
    
    def __init__(self):
        pass

    def __repr__(self):
        return f'CrostonSBA()'
    
    def fit(self, y: np.ndarray, X: np.ndarray = None):
        mod = _croston_sba(y=y, h=1, fitted=False)
        self.model_ = dict(mod)
        return self
        
    def predict(self, h: int, X: np.ndarray = None):
        return _repeat_val(val=self.model_['mean'][0], h=h)
    
    def predict_in_sample(self):
        raise NotImplementedError
        
    def forecast(
        self, 
        y: np.ndarray, 
        h: int, 
        X: np.ndarray = None, 
        X_future: np.ndarray = None,
        fitted: bool = False, # return fitted values?
    ):
        out = _croston_sba(y=y, h=h, fitted=fitted)
        return out

In [None]:
#hide
croston_sba = CrostonSBA()
test_class(croston_sba, x=ap, h=12, skip_insample=True)

**Croston with Syntetos-Boylan Approximation**: A variation of the classic Croston's method that uses a debiasing factor, so that the forecast is given by 

$$ \hat{y}_t = 0.95  \frac{\hat{z}_t}{\hat{p}_t} $$ 


In [None]:
#exporti
def _imapa(
        y: np.ndarray, # time series
        h: int, # forecasting horizon
        fitted: bool, # fitted values
    ): 
    if fitted:
        raise NotImplementedError('return fitted')
    if (y == 0).all():
        return {'mean': np.repeat(np.float32(0), h)}
    y_intervals = _intervals(y)
    mean_interval = y_intervals.mean().item()
    max_aggregation_level = round(mean_interval)
    forecasts = np.empty(max_aggregation_level, np.float32)
    for aggregation_level in range(1, max_aggregation_level + 1):
        lost_remainder_data = len(y) % aggregation_level
        y_cut = y[lost_remainder_data:]
        aggregation_sums = _chunk_sums(y_cut, aggregation_level)
        forecast, _ = _optimized_ses_forecast(aggregation_sums)
        forecasts[aggregation_level - 1] = (forecast / aggregation_level)
    forecast = forecasts.mean()
    mean = _repeat_val(val=forecast, h=h)
    return {'mean': mean}

In [None]:
#export
class IMAPA(_TS):
    
    def __init__(self):
        pass

    def __repr__(self):
        return f'IMAPA()'
    
    def fit(self, y: np.ndarray, X: np.ndarray = None):
        mod = _imapa(y=y, h=1, fitted=False)
        self.model_ = dict(mod)
        return self
        
    def predict(self, h: int, X: np.ndarray = None):
        return _repeat_val(val=self.model_['mean'][0], h=h)
    
    def predict_in_sample(self):
        raise NotImplementedError
        
    def forecast(
        self, 
        y: np.ndarray, 
        h: int, 
        X: np.ndarray = None, 
        X_future: np.ndarray = None,
        fitted: bool = False, # return fitted values?
    ):
        out = _imapa(y=y, h=h, fitted=fitted)
        return out

In [None]:
#hide
imapa = IMAPA()
test_class(imapa, x=ap, h=12, skip_insample=True)

**Intermittent Multiple Aggregation Prediction Algorithm**: Similar to ADIDA, but instead of using a single aggregation level, it considers multiple in order to capture different dynamics of the data. Uses the optimized SES to generate the forecasts at the new levels and then combines them using a simple average. 

In [None]:
#exporti
@njit
def _tsb(
        y: np.ndarray, # time series
        h: int, # forecasting horizon
        fitted: int, # fitted values
        alpha_d: float,
        alpha_p: float,
    ):
    if fitted:
        raise NotImplementedError('return fitted')
    if (y == 0).all():
        return {'mean': np.repeat(np.float32(0), h)}
    yd = _demand(y)
    yp = _probability(y)
    ypf, _ = _ses_forecast(yp, alpha_p)
    ydf, _ = _ses_forecast(yd, alpha_d)
    forecast = np.float32(ypf * ydf)
    mean = _repeat_val(val=forecast, h=h)
    return {'mean': mean}

In [None]:
#export
class TSB(_TS):
    
    def __init__(self, alpha_d: float, alpha_p: float):
        self.alpha_d = alpha_d
        self.alpha_p = alpha_p
        
    def __repr__(self):
        return f'TSB(d={self.alpha_d},p={self.alpha_p})'
    
    def fit(self, y: np.ndarray, X: np.ndarray = None):
        mod = _tsb(
            y=y, h=1, 
            fitted=False, 
            alpha_d=self.alpha_d, 
            alpha_p=self.alpha_p
        )
        self.model_ = dict(mod)
        return self
        
    def predict(self, h: int, X: np.ndarray = None):
        return _repeat_val(self.model_['mean'][0], h=h)
    
    def predict_in_sample(self):
        raise NotImplementedError
        
    def forecast(
        self, 
        y: np.ndarray, 
        h: int, 
        X: np.ndarray = None, 
        X_future: np.ndarray = None,
        fitted: bool = False, # return fitted values?
    ):
        out = _tsb(
            y=y, h=h, 
            fitted=fitted, 
            alpha_d=self.alpha_d, 
            alpha_p=self.alpha_p
        )
        return out

In [None]:
#hide
tsb = TSB(alpha_d=0.9, alpha_p=0.1)
test_class(tsb, x=ap, h=12, skip_insample=True)

**Teunter-Syntetos-Babai**: A modification of Croston's method that replaces the inter-demand intervals with the demand probability $d_t$, which is defined as follows. 

$$
d_t = \begin{cases}
    1  & \text{if demand occurs at time t}\\ 
    0 & \text{otherwise.}
\end{cases}
$$

Hence, the forecast is given by 

$$\hat{y}_t= \hat{d}_t\hat{z_t}$$

Both $d_t$ and $z_t$ are forecasted using SES. The smooting paramaters of each may differ, like in the optimized Croston's method.

## References

#### **General**
- Hyndman, R.J., & Athanasopoulos, G. (2021) Forecasting: principles and practice, 3rd edition, OTexts: Melbourne, Australia. [OTexts.com/fpp3](https://otexts.com/fpp3/). Accessed on July 2022.

- Shmueli, G., & Lichtendahl Jr, K. C. (2016). Practical time series forecasting with r: A hands-on guide. Axelrod Schnall Publishers.

#### **For sparse or intermittent series**

- [Croston, J. D. (1972). Forecasting and stock control for intermittent demands. Journal of the Operational Research Society, 23(3), 289-303.](https://link.springer.com/article/10.1057/jors.1972.50)


- [Nikolopoulos, K., Syntetos, A. A., Boylan, J. E., Petropoulos, F., & Assimakopoulos, V. (2011). An aggregate–disaggregate intermittent demand approach (ADIDA) to forecasting: an empirical proposition and analysis. Journal of the Operational Research Society, 62(3), 544-554.](https://researchportal.bath.ac.uk/en/publications/an-aggregate-disaggregate-intermittent-demand-approach-adida-to-f)

- [Syntetos, A. A., & Boylan, J. E. (2005). The accuracy of intermittent demand estimates. International Journal of forecasting, 21(2), 303-314.](https://www.academia.edu/1527250/The_accuracy_of_intermittent_demand_estimates)

- Syntetos, A. A., & Boylan, J. E. (2021). Intermittent demand forecasting: Context, methods and applications. John Wiley & Sons.

- [Teunter, R. H., Syntetos, A. A., & Babai, M. Z. (2011). Intermittent demand: Linking forecasting to inventory obsolescence. European Journal of Operational Research, 214(3), 606-615.](https://www.sciencedirect.com/science/article/abs/pii/S0377221711004437)


In [None]:
from statsforecast.utils import AirPassengers as ap

In [None]:
arima = AutoARIMA(season_length=12)
arima = arima.fit(ap)
arima.predict(12)

In [None]:
ets = ETS(season_length=12)
ets = ets.fit(ap)
ets.predict(12)

External regressors

In [None]:
drift = np.arange(1, ap.size + 1)
X = np.vstack([np.log(drift), np.sqrt(drift)]).T

In [None]:
newdrift = np.arange(ap.size + 1, ap.size + 7 + 1).reshape(-1, 1)
newxreg = np.concatenate([np.log(newdrift), np.sqrt(newdrift)], axis=1)

In [None]:
arima = AutoARIMA(season_length=12)
arima = arima.fit(y=ap, X=X)
arima.predict(12, X=newxreg)

Confidence intervals

In [None]:
pd.DataFrame(arima.predict(12, X=newxreg, level=(80, 50, 95))).plot()