In [None]:
from statsmodels.tsa.seasonal import seasonal_decompose, STL
from statsmodels.tsa.stattools import adfuller, kpss
import hvplot.pandas
import hvplot as hv
import scipy.stats as stats
from statsmodels.stats.diagnostic import kstest_normal
from statsmodels.tsa.filters.hp_filter import hpfilter
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
import statsmodels.api as sm
from scipy.stats import shapiro, kstest, normaltest
from statsmodels.tsa.api import AutoReg 
# testing for stability of the variance against the model's residuals.
from statsmodels.stats.api import (het_breuschpagan, het_goldfeldquandt, het_white)
from scipy.stats import boxcox
from statsmodels.stats.diagnostic import acorr_ljungbox
# from statsmodels.tsa.api import (kpss, adfuller, seasonal_decompose, STL)
from statsmodels.tools.eval_measures import rmspe, rmse
from sklearn.metrics import mean_absolute_percentage_error as mape
from itertools import product
from pathlib import Path
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import warnings
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.tsa.api import ExponentialSmoothing
from statsmodels.tsa.arima.model import ARIMA
import pmdarima as pm
from sklearn.model_selection import train_test_split 

In [2]:
# plot_outliers function
def plot_outliers(outliers, data, method = 'KNN', 
                 halignment = 'right', 
                 valignment = 'bottom', 
                 labels = False):
    ax = data.plot(alpha = 1.0)
    
    if labels:
        for i in outliers['value'].item():
            plt.plot(i[0], i[1], 'rx')
            plt.text(i[0], i[1], f'{i[0].date()}', 
                    horizontalalignment=halignment, 
                    verticalalignment=valignment)
    else:
        data.loc[outliers.index].plot(ax=ax, style='rx')

    plt.title(f'Name - {method}')
    plt.xlabel('date'); plt.ylabel('# of passengers')
    plt.legend(['outliers'])
    plt.show()
    
# As we proceed with the outlier detection recipes, the goal is to see how the different 
# techniques capture outliers and compare them to the ground truth labels

In [None]:
# outlier identifier function
def iqr_outliers(data):
    q1, q3 = np.percentile(data, [25, 75])
    IQR = q3 - q1
    lower_fence = q1 - (1.5 * IQR)
    upper_fence = q3 + (1.5 * IQR)
    return data[(data.value > upper_fence) | (data.value < lower_fence)]

outliers = iqr_outliers(dt)
outliers

In [None]:
# z score function
def zscore(df, column_name, degree=3):
    data = df.copy()
    data['zscore'] = (data[column_name] - data[column_name].mean()) / data[column_name].std()
    outliers = data[(data['zscore'] <= -degree) | (data['zscore'] >= degree)]
    return outliers[column_name], data


threshold = 2.5
outliers, transformed = zscore(dt, 'AAPL', threshold)

transformed[['AAPL', 'zscore']].hist(); 

In [None]:
def plot_zscore(data, d=3):
    n = len(data)
    plt.figure(figsize=(10, 6))
    plt.plot(data, 'k^')
    plt.plot([0, n], [d, d], 'r--')
    plt.plot([0, n], [-d, -d], 'r--')

data = transformed['zscore'].values
plot_zscore(data, d=2.5)

In [None]:
from statsmodels.stats.diagnostic import kstest_normal
def test_normal(df):
    t_test, p_value = kstest_normal(df)
    if p_value < 0.05:
        print('Reject null hypothesis. Data is not normally distributed')
    else:
        print('Fail to reject null hypothesis. Data is normally distributed')

test_normal(dt.AAPL)

In [None]:
from statsmodels.tsa.stattools import adfuller, kpss
def print_results(output, test='adf'):
    pval = output[1]
    test_score = output[0]
    lags = output[2]
    decision = 'Non-Stationary'
    if test == 'adf':
        critical = output[4]
        if pval < 0.05:
            decision = 'Stationary'
    elif test == 'kpss':
        critical = output[3]
        if pval >= 0.05:
            decision = 'Stationary'
    output_dict = {
        'Test Statistic': test_score, 
        'p-value': pval,
        'Numbers of lags': lags, 
        'decision': decision
    }

    for key, value in critical.items():
        output_dict['Critical Value (%s)' % key] = value

    return pd.Series(output_dict, name=test)


adf_output = adfuller(dt.AAPL)
kpss_output = kpss(dt.AAPL)

pd.concat([print_results(adf_output, 'adf'), print_results(kpss_output, 'kpss')], axis=1)

In [None]:
dt_STL_decomposed = STL(
    dt.AAPL, 
    seasonal = 13,
    robust = True # remove the impact of outliers
).fit()
dt_STL_decomposed.plot(); plt.show()

In [None]:
dt_cyclic, dt_trend = hpfilter(dt.AAPL)
# plt.rcParams["figure.figsize"] = (9,6)
fig, ax = plt.subplots(2, 1)
dt_cyclic.plot(ax=ax[0], title='Cyclic Component')
dt_trend.plot(ax=ax[1], title='Trend Component')
ax[0].title.set_size(10)
ax[1].title.set_size(10)

In [None]:
def is_normal(test, p_level = 0.05):
    stat, pval = test
    return 'Normal' if pval > 0.05 else 'Not Normal'

print(is_normal(shapiro(dt.AAPL)))

In [None]:
def het_test(model, test=het_breuschpagan):
    lm, lm_pvalue, fvalue, f_pvalue = (
        het_breuschpagan(model.resid, sm.add_constant(model.fittedvalues))
    )
    return 'Heteroskedastic' if f_pvalue < 0.05 else 'Homoskedastic'

het_test(model, test=het_breuschpagan)

In [None]:
xt, lmbda = boxcox(dt.AAPL)
xts = pd.Series(xt, index=dt.index)

fig, ax = plt.subplots(1, 2)
dt.AAPL.hist(ax=ax[0]) # original time series
xts.hist(ax=ax[1]) # Box-Cox Transformed
plt.show()

fig, ax = plt.subplots(2, 1)
dt.AAPL.plot(ax=ax[0]) # Original Time Series
xts.plot(ax=ax[1]) # Box-Cox Transformed
plt.show()

In [None]:
acorr_ljungbox(dt_diff.AAPL, lags = 10, return_df = True) # to check for autocorrelations

# apply the Ljung-Box test against residual from model_bx which was created using
# Power Transformations
acorr_ljungbox(model_bx.resid, return_df = True, lags = 10)

(acorr_ljungbox(results.resid, lags = 25, return_df=True) < 0.05)['lb_pvalue'].sum()

In [None]:
def get_best_model(score, c='AIC'):
    initial_score = score[0][c]
    best_model = 0
    for k, v, in score.items():
        if v[c] < initial_score:
            initial_score = v[c]
            best_model = k
    print(f'Best model: {best_model} with lowes {c} score: {initial_score}')
    return score[best_model]['model']

best_model = get_best_model(score, 'AIC')

In [None]:
def plot_forecast(model, start, train, test):
    forecast = pd.DataFrame(model.forecast(test.shape[0]), 
                           index = test.index)
    ax = train.loc[start:].plot(style='--')
    test.plot(ax=ax)
    forecast.plot(ax=ax, style='-.')
    ax.legend(['origin_train', 'origin_test', 'forecast'])
    plt.show()

In [None]:
def combinator(items):
    combo = [i for i in product(*items)]
    return combo

In [None]:
train2 = train.AAPL.values.ravel()
y = test.AAPL.values.ravel()
score={}
for i, (t, dp) in enumerate(dt_comb):
    exp = ExponentialSmoothing(train2, trend=t, damped_trend=dp, seasonal=None)
    model = exp.fit(use_brute=True, optimized=True)
    y_hat = model.forecast(len(y))
    score[i] = {'trend':t,
               'damped':dp, 
               'AIC':model.aic,
               'BIC': model.bic,
               'AICc': model.aicc,
               'RMSPE': rmspe(y, y_hat), 
               'RMSE': rmse(y, y_hat), 
               'MAPE': mape(y, y_hat), 
               'model': model}

In [None]:
train2 = train.AAPL.values.ravel()
y = test.AAPL.values.ravel()
score={}
for i, (t, dp) in enumerate(dt_comb):
    exp = ExponentialSmoothing(train2, trend=t, damped_trend=dp, seasonal=None)
    model = exp.fit(use_brute=True, optimized=True)
    y_hat = model.forecast(len(y))
    score[i] = {'trend':t,
               'damped':dp, 
               'AIC':model.aic,
               'BIC': model.bic,
               'AICc': model.aicc,
               'RMSPE': rmspe(y, y_hat), 
               'RMSE': rmse(y, y_hat), 
               'MAPE': mape(y, y_hat), 
               'model': model}

In [None]:
pv, dv, qv = [list(range(3))]*3
vals = combinator([pv, dv, qv])
score = {}
for i, (p, d, q) in enumerate(vals):
    m = ARIMA(train.AAPL, order=(p,d,q))
    res = m.fit()
    y = train.AAPL.values.ravel() 
    y_hat = res.forecast(steps=len(y))
    score[i] = {'order':(p,d,q),
                'AIC':res.aic, 
                'RMSPE':rmspe(y, y_hat),
                'BIC':res.bic,
                'AICc':res.aicc,
                'RMSE':rmse(y, y_hat),
                'MAPE':mape(y, y_hat),
                'model':res}
best_m = get_best_model(score, 'AIC')

In [None]:
pd.DataFrame(score).T.sort_values(by='AIC').reset_index().head()

In [None]:
train, test = pm.model_selection.train_test_split(dt, test_size=0.10)
print(f'Train: {train.shape}')
print(f'Test: {test.shape}')

In [None]:
auto_model = pm.auto_arima(train.AAPL, 
                          seasonal=True,
                          m=12, 
                          test='adf', 
                          stepwise=True, 
                          information_criterion='bic',
                          trace=True) # to observe the score at each iteration

auto_model

auto_model.summary()