# TIME Forecasting

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Patch
from matplotlib.gridspec import GridSpec

import statsmodels.api as sm
from statsmodels.tsa.arima.model import ARIMA as sm_ARIMA

from sklearn.model_selection import TimeSeriesSplit
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import PolynomialFeatures

from sktime.utils.plotting import plot_interval, plot_series
from sktime.forecasting.naive import NaiveForecaster
from sktime.forecasting.arima import ARIMA, AutoARIMA

from utils import time_plot, get_figure, baseline_forecasting, residual_analysis_plots, acf_plot, pacf_plot, stationarity_tests, split_time_series, evaluate_forecaster
from data import get_apple_5y, get_switzerland_temperature

## Baselines

In [None]:
data = get_apple_5y().resample('W').mean().ffill()['Close']
train_size = int(len(data) * 0.8)
fig, ax = time_plot(
    x=data.index,
    y=data,
    label="Signal",
    title='Weekly Apple Inc. (AAPL) Closing Prices',
    xlabel='Date',
    ylabel='Closing Price (USD)',
    return_fig=True,
)
ax.axvline(x=data.index[train_size], color='gray', linestyle='--', label='Train/Test Split')

baselines = baseline_forecasting(data, train_size, period=52)
for name, (_, pred, _) in baselines.items():
    ax.plot(pred.index, pred, label=name, linestyle='--', alpha=0.75)
ax.legend()

In [None]:
fig, ax = time_plot(
    x=data.index,
    y=data,
    label="Signal",
    title='Weekly Apple Inc. (AAPL) Closing Prices',
    xlabel='Date',
    ylabel='Closing Price (USD)',
    return_fig=True,
)
ax.axvline(x=data.index[train_size], color='gray', linestyle='--', label='Train/Test Split')

name = 'Naïve with drift'
_, pred, pred_interval = baselines[name]
ax.plot(pred.index, pred, label=name, linestyle='--', alpha=0.75)
plot_interval(ax, pred_interval)
ax.legend()

In [None]:
data = get_switzerland_temperature().set_index('dt').asfreq('ME')['AverageTemperature']
train_size = int(len(data) * 0.8)
fig, ax = time_plot(
    x=data.index,
    y=data,
    label="Signal",
    title='Monthly Average Temperature in Switzerland',
    xlabel='Year',
    ylabel='Average Temperature (°C)',
    return_fig=True,
)
ax.axvline(x=data.index[train_size], color='gray', linestyle='--', label='Train/Test Split')
baselines = baseline_forecasting(data, train_size, period=12)
for name, (_, pred, _) in baselines.items():
    ax.plot(pred.index, pred, label=name, linestyle='--', alpha=0.75)
ax.legend(loc='upper left')

In [None]:
fig, ax = time_plot(
    x=data.index,
    y=data,
    label="Signal",
    title='Monthly Average Temperature in Switzerland',
    xlabel='Year',
    ylabel='Average Temperature (°C)',
    return_fig=True,
)
ax.axvline(x=data.index[train_size], color='gray', linestyle='--', label='Train/Test Split')

name = 'Seasonal naïve'
_, pred, pred_interval = baselines[name]
ax.plot(pred.index, pred, label=name, linestyle='--', alpha=0.75)
plot_interval(ax, pred_interval)
ax.legend(loc='upper left')

## Underfitting and overfitting

In [None]:
np.random.seed(42)
n = 120
t = np.arange(n)
data = 10 + 0.5 * t + 20 * np.sin(2 * np.pi * t / 40) + np.random.normal(size=n)

train_size = int(n * 0.8)
x_train, y_train = t[:train_size], data[:train_size]
x_test, y_test = t[train_size:], data[train_size:]

linear_model = LinearRegression()
linear_model.fit(x_train.reshape(-1, 1), y_train)
y_pred_linear = linear_model.predict(t.reshape(-1, 1))

poly_features = PolynomialFeatures(degree=10)
x_poly_train = poly_features.fit_transform(x_train.reshape(-1, 1))
x_poly = poly_features.transform(t.reshape(-1, 1))
poly_model = LinearRegression()
poly_model.fit(x_poly_train, y_train)
y_pred_poly = poly_model.predict(x_poly)

fig, [ax] = get_figure()
time_plot(t, data, label='Signal', ax=ax, ylim=(0, 150))
ax.axvline(x=t[train_size], color='gray', linestyle='--', label='Train/Test Split')
ax.plot(t, y_pred_linear, label='Linear model', color='red', linestyle='--', alpha=0.75)
ax.plot(t, y_pred_poly, label='Polynomial model', color='purple', linestyle='-.', alpha=0.75)
ax.set_title('Underfitting vs overfitting in Time Series')
ax.legend(loc='upper left')
fig.tight_layout()

## Rolling cross-validation

In [None]:
rng = np.random.RandomState(42)
cmap_cv = plt.cm.coolwarm
scatter_kwargs = {'lw': 14, 'marker': '|', 'cmap': cmap_cv, 'vmin': 0, 'vmax': 1}
nsamples = 100
train_size = int(nsamples * 0.8)
sample_index = np.arange(nsamples)
samples = rng.randn(100)
n_splits = 4
cv = TimeSeriesSplit(n_splits=n_splits)

fig, [ax] = get_figure()
for ii, (tr, tt) in enumerate(cv.split(X=samples[:train_size])):
    # Fill in indices with the training/test groups
    indices = np.array([np.nan] * train_size)
    indices[tt] = .7
    indices[tr] = 0.1
    
    # Visualize the results
    ax.scatter(
        range(len(indices)),
        [ii + 0.5] * len(indices),
        c=indices,
        **scatter_kwargs,
    )

# Plot final train-test split
indices = np.zeros_like(samples)+.1
indices[train_size+1:] = .85
ax.scatter(
    range(len(indices)), 
    [ii + 2.5] * nsamples, 
    c=indices,
    **scatter_kwargs,
)

# Formatting
yticklabels = list(range(n_splits))
ax.set(
    yticks=np.arange(n_splits) + 0.5,
    yticklabels=yticklabels,
    xlabel="Sample index",
    ylim=[n_splits + 2.2, -0.2],
    xlim=[0, nsamples],
)
ax.set_title("Splitting time series data", fontsize=15)
ax.axvline(x=sample_index[train_size], color='gray', linestyle='--')
ax.axhline(y=ii+1.5, color='black', linestyle='-')
ax.text(-2.5, 2, "Cross-validation folds", va='center', ha='right', rotation=90)
ax.text(-2.5, 5.4, "Final split", va='center', ha='right', rotation=90)
ax.grid(False)

ax.legend(
    [Patch(color=cmap_cv(.1)), Patch(color=cmap_cv(0.7)), Patch(color=cmap_cv(.85))],
    ["Training", "Validation", "Testing"],
    loc=(1.02, 0.4),
    title="Data splits"
)
fig.tight_layout()

## Residual analysis

In [None]:
np.random.seed(42)
n = 120
t = np.arange(n)
data = 10 + 0.5 * t + 10 * np.sin(2 * np.pi * t / 12) + np.random.normal(size=n)

# Fit a simple linear regression model
X = sm.add_constant(t)
result_linear = sm.OLS(data, X).fit()
data_pred_linear = result_linear.predict()
residuals_linear = data - data_pred_linear
residuals_linear /= np.std(residuals_linear)

# Fit a Seasonal ARIMA model
results_arima = sm_ARIMA(data, order=(1, 1, 1), seasonal_order=(1, 1, 1, 12)).fit()
data_pred_arima = results_arima.predict()
residuals_arima = results_arima.standardized_forecasts_error[0]

# Plot the original data and fitted values from both models
fig = plt.figure(figsize=(16, 10))
gs = GridSpec(4, 4, figure=fig)
ax1 = fig.add_subplot(gs[0, :])
ax2 = fig.add_subplot(gs[1, :2])
ax3 = fig.add_subplot(gs[1, 2:], sharey=ax2)
linear_axs = [
    fig.add_subplot(gs[2, 0]),
    fig.add_subplot(gs[2, 1]),
    fig.add_subplot(gs[3, 0]),
    fig.add_subplot(gs[3, 1]),
]
arima_axs = [
    fig.add_subplot(gs[2, 2]),
    fig.add_subplot(gs[2, 3]),
    fig.add_subplot(gs[3, 2]),
    fig.add_subplot(gs[3, 3]),
]

ax1.plot(data, label='Original Data', color='blue')
ax1.plot(data_pred_linear, label='Linear regression', color='red')
ax1.plot(data_pred_arima, label='SARIMA', color='green')
ax1.set_title('Original Data and Fitted Values')
ax1.legend(loc="upper left")
ax1.grid(True)
ax2.plot(residuals_linear, label='Residuals: Linear Regression', color='red')
ax2.set_title('Standardized residuals of the linear model')
ax2.grid(True)
ax3.plot(residuals_arima, label='Residuals: Seasonal ARIMA', color='green')
ax3.set_title('Standardized residuals of the SARIMA model')
ax3.grid(True)
residual_analysis_plots(residuals_linear, axs=linear_axs)
residual_analysis_plots(residuals_arima, axs=arima_axs)
fig.tight_layout()

## Examples: Apple closing prices

In [None]:
data = get_apple_5y().resample('W').mean().ffill()['Close']
full_train, (train, eval), test = split_time_series(data)
plot_series(full_train, test, labels=['Train', 'Test'])
fh = list(range(1, 5))  # 4 weeks forecasting horizon

### Baselines

In [None]:
naive = NaiveForecaster(strategy="last")
pred_naive = evaluate_forecaster(naive, train, eval, fh)

naive_with_drift = NaiveForecaster(strategy="drift")
pred_naive_with_drift = evaluate_forecaster(naive_with_drift, train, eval, fh)

plot_series(train, eval, pred_naive, pred_naive_with_drift, labels=['Training', 'Validation', 'Naïve', 'Naïve with drift'])

In [None]:
naive_with_drift_residuals = train - naive_with_drift.predict(train.index)
residual_analysis_plots(naive_with_drift_residuals['AAPL']/np.std(naive_with_drift_residuals['AAPL']))

### ARIMA

In [None]:
fig, [ax1, ax2] = get_figure(ncols=2, figsize=(16,4))
time_plot(
    x=train.index,
    y=train,
    title='Weekly average of Apple Closing Prices',
    xlabel='Date',
    ylabel='Closing Price (USD)',
    ax=ax1,
)
differenced_data = train.diff().dropna()
stationarity_tests(differenced_data)
time_plot(
    x=differenced_data.index,
    y=differenced_data,
    title='Differenced data',
    ylabel='$\\nabla x_t$',
    ax=ax2,
)
fig.tight_layout()
fig, [ax1, ax2] = get_figure(ncols=2)
acf_plot(differenced_data, 30, ax1, title="Differenced data")
pacf_plot(differenced_data, 30, ax2, title="Differenced data")

In [None]:
arima110 = ARIMA(order=(1,1,0))
pred_arima110 = evaluate_forecaster(arima110, train, eval, fh)

arima = AutoARIMA(d=1, seasonal=False, start_p=0, start_q=0, suppress_warnings=True)
pred_arima = evaluate_forecaster(arima, train, eval, fh)
print(arima.get_fitted_params())

plot_series(train, eval, pred_arima110, pred_arima, labels=['Training', 'Validation', 'ARIMA(1,1,0)', 'AutoARIMA'])

In [None]:
arima110_residuals = (train - arima110.predict(train.index)).dropna()
residual_analysis_plots(arima110_residuals['AAPL']/np.std(arima110_residuals['AAPL']))

### Test results

In [None]:
naive_with_drift = NaiveForecaster(strategy="drift")
pred_naive_with_drift = evaluate_forecaster(naive_with_drift, full_train, test, fh, is_test=True)

arima110 = ARIMA(order=(1,1,0))
pred_arima110 = evaluate_forecaster(arima110, full_train, test, fh, is_test=True)

plot_series(full_train, test, pred_naive_with_drift, pred_arima110, labels=['Training', 'Testing', 'Naïve with drift', 'ARIMA(1,1,0)'])

## Examples: Swiss temperatures

In [None]:
data = get_switzerland_temperature().set_index('dt').asfreq('ME')['AverageTemperature']
full_train, (train, eval), test = split_time_series(data)
plot_series(full_train, test, labels=['Train', 'Test'])
fh = list(range(1, 5))  # 4 months forecasting horizon

### Baselines

In [None]:
naive = NaiveForecaster(strategy="last")
pred_naive = evaluate_forecaster(naive, train, eval, fh)

seasonal_naive = NaiveForecaster(strategy="last", sp=12)
pred_seasonal_naive = evaluate_forecaster(seasonal_naive, train, eval, fh)

plot_series(train, eval, pred_naive, pred_seasonal_naive, labels=['Training', 'Validation', 'Naïve', 'Seasonal naïve'])

In [None]:
seasonal_naive_residuals = train - seasonal_naive.predict(train.index)
residual_analysis_plots(seasonal_naive_residuals/np.std(seasonal_naive_residuals))

### ARIMA

In [None]:
fig, [ax1, ax2] = get_figure(ncols=2)
time_plot(
    x=train.index,
    y=train,
    title='Monthly Average Temperature in Switzerland',
    xlabel='Year',
    ylabel='Average Temperature (°C)',
    ax=ax1,
)
differenced_data = train.diff(12).dropna()
stationarity_tests(differenced_data)
time_plot(
    x=differenced_data.index,
    y=differenced_data,
    title='Differenced data',
    ylabel='$\\nabla x_t$',
    ax=ax2,
)
fig, [ax1, ax2] = get_figure(ncols=2)
acf_plot(differenced_data, 30, ax1, title="Differenced data")
pacf_plot(differenced_data, 30, ax2, title="Differenced data")

In [None]:
arima000110 = ARIMA(order=(0,0,0), seasonal_order=(1,1,0,12))
pred_arima110 = evaluate_forecaster(arima000110, train, eval, fh)

arima = AutoARIMA(d=0, D=1, seasonal=True, sp=12, suppress_warnings=True)
pred_arima = evaluate_forecaster(arima, train, eval, fh)
print(arima.get_fitted_params())

plot_series(train, eval, pred_arima110, pred_arima, labels=['Training', 'Validation', "ARIMA(0,0,0)(1,1,0)12", 'AutoARIMA'])

In [None]:
arima000211 = ARIMA(order=(0,0,0), seasonal_order=(2,1,1,12))
pred_arima000211 = evaluate_forecaster(arima000211, train, eval, fh)

arima000211_residuals = (train - arima000211.predict(train.index)).dropna()
residual_analysis_plots(arima000211_residuals/np.std(arima000211_residuals))

### Test results

In [None]:
seasonal_naive = NaiveForecaster(strategy="last", sp=12)
pred_seasonal_naive = evaluate_forecaster(seasonal_naive, full_train, test, fh, is_test=True)

arima000211 = ARIMA(order=(0,0,0), seasonal_order=(2,1,1,12))
pred_arima000211 = evaluate_forecaster(arima000211, full_train, test, fh, is_test=True)

plot_series(full_train, test, pred_seasonal_naive, pred_arima000211, labels=['Training', 'Testing', 'Seasonal naïve', 'ARIMA(0,0,0)(2,1,1)12'])