# SARIMA(X) Model

#### Implementation of a SARIMAX/SARIMA (extension of ARIMA) model

Links:
- [SARIMAX Documentation](https://www.statsmodels.org/devel/generated/statsmodels.tsa.statespace.sarimax.SARIMAX.html)
- [A Gentle Introduction to SARIMA for Time Series Forecasting in Python](https://machinelearningmastery.com/sarima-for-time-series-forecasting-in-python/)
- [How to Grid Search SARIMA Hyperparameters for Time Series Forecasting in Python](https://machinelearningmastery.com/how-to-grid-search-sarima-model-hyperparameters-for-time-series-forecasting-in-python/)
- [Complete Guide to SARIMAX in Python for Time Series Modeling](https://analyticsindiamag.com/complete-guide-to-sarimax-in-python-for-time-series-modeling/)
- [Implementation of Time Series Forecasting Methods (SARIMA, SARIMAX, and Prophet)](https://medium.com/@shawanugya12/implementation-of-time-series-forecasting-methods-sarima-sarimax-and-prophet-ff8407b25aaa)

Import necessary modules and define helper functions:

In [None]:
from dotenv import load_dotenv
import os

from datetime import datetime, time
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pmdarima as pm
from statsmodels.tsa.stattools import adfuller, acf, pacf
from statsmodels.tsa.arima.model import ARIMA

load_dotenv()
DATASET_PATH = os.environ.get("DATASET_PATH")


def is_stationary(time_series: pd.Series, sig_level: float = 0.01) -> bool:
    """
    Tests whether the given time series is stationary at the given significance
    threshold using the ADF unit root test.

    Parameters:
        time_series: time series to test stationarity
        sig_level: significance level to use for the test. defaults to 0.05

    Returns:
        (bool): True if the time series is stationary, False otherwise
    """
    results = adfuller(time_series.values)
    p_value = results[1]

    return p_value < sig_level


def calc_d(time_series: pd.Series, iter: int = 5) -> int:
    """
    Calculates the value of the parameter d (order of differencing) for an
    ARIMA model.

    Parameters:
        time_series: time series to calculate d for
        iter: how many times to difference the time series, defaults to 5

    Returns:
        (int): optimal order of differencing
    """
    differences = [(time_series, np.std(time_series), 0)]
    # optimal order of differencing given by the (stationary) time series with
    # the smallest standard deviation

    # repeatedly difference the time series and store each time series, order
    # and standard deviation in the list
    df_diff = time_series.copy()
    for order in range(1, iter+1):
        # difference the previous time series and store it in list
        df_diff = df_diff.diff().dropna()
        differences.append((df_diff, np.std(df_diff), order))

    # sort list by ascending standard deviation
    sorted(differences, key=lambda x: x[1])

    # find first time series in list that is stationary
    for difference, _, order in differences:
        if is_stationary(difference):  # optimal order of differencing found
            d = order
            break

    return d


def calc_p(time_series: pd.Series) -> int:
    """
    Calculates the value of the parameter p (order of AR term) for an ARIMA
    model.

    Parameters:
        time_series: time series to calculate p for
    """
    # get values and confidence intervals from PACF plot
    nlags = min(int(10*np.log10(time_series.size)), time_series.size//2 - 1)
    values, conf_int = pacf(time_series, alpha=0.05, nlags=nlags)
    # trim first value
    values = values[1:]
    conf_int = conf_int[1:]

    # confidence interval width is constant for all PACFs
    conf_int_width = (conf_int[0][1] - conf_int[0][0]) / 2

    p = 0
    for value in values:
        if abs(value) >= conf_int_width:
            # value outside critical region
            p += 1
        else:
            break

    return p


def calc_q(time_series: pd.Series) -> int:
    """
    Calculates the value of the parameter q (order of MA term) for an ARIMA
    model.

    Parameters:
        time_series: time series to calculate q for
    """
    # get values and confidence intervals from ACF plot
    nlags = min(int(10*np.log10(time_series.size)), time_series.size//2 - 1)
    values, conf_int = acf(time_series, alpha=0.05, nlags=nlags, fft=True)
    # trim first value
    values = values[1:]
    conf_int = conf_int[1:]

    q = 0
    for i, value in enumerate(values):
        if abs(value) >= abs(conf_int[i][0] - value):
            # value is outside critical region
            q += 1
        else:
            break

    return q

Data preparation:

In [None]:
df_main = pd.read_excel(DATASET_PATH + "Conversion by Day.xlsx")

dimension_col = int(input("Column number of dimension variable: "))
response_col = int(input("Column number of response variable: "))

df = df_main.loc[df_main.iloc[:, 0] == "NSW"]

# get the list of categories (possible values) for the selected dimension
categories = df_main[df_main.columns[dimension_col]].tolist()
categories = sorted(list(set(categories)))  # remove duplicates and sort alphabetically
# store response variable as a string
response = df_main.columns[response_col]

Determine ARIMA parameters (p, d, q):

In [None]:
# find p, d, q in ARIMA model
d = calc_d(df[response])

df_diff = df[response].copy()
for _ in range(d):
    df_diff = df_diff.diff().dropna()

p = calc_p(df_diff)
q = calc_q(df_diff)
print(f"ARIMA parameters: {p, d, q}")

# split data into training vs. test dataset
train, test = pm.model_selection.train_test_split(df[response], train_size=df.shape[0]-30)

# grid search to find the optimal parameters
model = pm.auto_arima(
    train, start_p=0, d=d, start_q=0, max_p=p, max_d=d, max_q=q,
    start_P=0, D=0, start_Q=0, max_P=3, max_D=3, max_Q=3,
    m=12, seasonal=True
)

prediction, ci = model.predict(n_periods=test.shape[0], return_conf_int=True, dynamic=False)

# plot ARIMA model against original dataset
x_axis = np.arange(train.shape[0] + prediction.shape[0])

plt.rcParams["figure.figsize"] = (9, 6)
plt.plot(x_axis[:train.shape[0]], train, alpha=0.75)
plt.plot(x_axis[train.shape[0]:], prediction, alpha=0.75)
plt.scatter(x_axis[train.shape[0]:], test, alpha=0.4, marker='x')
plt.fill_between(x_axis[-prediction.shape[0]:], ci[:, 0], ci[:, 1], alpha=0.1, color='b')
plt.title('Actual test samples vs. forecasts')
plt.show()

Output outliers as a dataframe:

In [None]:
outliers = []
for i in range(len(test)):
    if not (ci[i][0] <= test.tolist()[i] <= ci[i][1]):  # outlier
        outliers.append(train.shape[0] + i)

df.iloc[outliers, :]

All the categories (states) at once:

In [None]:
df_main = pd.read_excel(DATASET_PATH + "Conversion by Day.xlsx")

dimension_col = int(input("Column number of dimension variable: "))
response_col = int(input("Column number of response variable: "))

# get the list of categories (possible values) for the selected dimension
categories = df_main[df_main.columns[dimension_col]].tolist()
categories = sorted(list(set(categories)))  # remove duplicates and sort alphabetically
# store response variable as a string
response = df_main.columns[response_col]

anomaly_dfs = []

for category in categories:
    df = df_main.loc[df_main.iloc[:, 0] == category]
    
    # find p, d, q in ARIMA model
    d = calc_d(df[response])

    # use the optimally differenced time series to calculate p and q
    df_diff = df[response].copy()
    for _ in range(d):
        df_diff = df_diff.diff().dropna()

    p = calc_p(df_diff)
    q = calc_q(df_diff)
    
    # get the number of days that have passed since the first day of the last month;
    # this will be the size of the test dataset
    last_month = datetime.now() - pd.DateOffset(months=1)
    last_month = datetime(last_month.year, last_month.month, 1)  # round to first day of month
    days = (datetime.now() - last_month).days  # number of days to include in the test dataset
    
    train, test = pm.model_selection.train_test_split(df[response], train_size=df.shape[0]-days)
    
    # starting p, d, q parameters will be 2 less than the calculated parameters
    start_p = 0 if (p - 2 < 0) else (p - 2)
    start_d = 0 if (d - 2 < 0) else (d - 2)
    start_q = 0 if (q - 2 < 0) else (q - 2)

    # grid search to find the most optimal parameters
    model = pm.auto_arima(
        train, start_p=start_p, d=start_d, start_q=start_q, max_p=p, max_d=d, max_q=q,
        start_P=0, D=0, start_Q=0, m=12, seasonal=True)

    prediction, ci = model.predict(n_periods=test.shape[0], return_conf_int=True, dynamic=False)
    x_axis = np.arange(train.shape[0] + prediction.shape[0])
    
    # find outliers by comparing values against confidence interval of forecast
    anomaly_indices = []
    for i in range(len(test)):
        if not (ci[i][0] <= test.tolist()[i] <= ci[i][1]):  # outlier
            anomaly_indices.append(train.shape[0] + i)
    anomaly_df = df.iloc[anomaly_indices, :].copy()
    anomaly_dfs.append(anomaly_df)
    
    print(category)
    # plot original dataset, ARIMA model forecast, and test dataset points (as scatterplot)
    plt.figure()
    plt.plot(x_axis[:train.shape[0]], train, alpha=0.75)
    plt.plot(x_axis[train.shape[0]:], prediction, alpha=0.75)
    plt.scatter(x_axis[train.shape[0]:], test, alpha=0.4, marker='x')
    plt.fill_between(x_axis[-prediction.shape[0]:], ci[:, 0], ci[:, 1], alpha=0.1, color='b')
    plt.title('Actual test samples vs. forecasts')
    plt.show()

# collate all outliers into a dataframe and output it
df_outliers = pd.concat(anomaly_dfs)
display(df_outliers)