# Base Vector Auto-Regressive Model

In [None]:
# This makes sure Pandas keeps it mouth shut
import warnings
warnings.simplefilter(action = 'ignore', category = FutureWarning)
warnings.simplefilter(action = 'ignore', category = RuntimeWarning)

import pandas as pd
import numpy as np

from IPython.display import Markdown

## Data Preprocessing
Before we do anything, we first import our unmodified dataset and introduce some new useful columns.

In [None]:
dep = 'gdp_growth'
dep_name = 'GDP Growth'
gdp_type = 'rgdpna'
country_codes = ['DEU', 'USA']
features = ['emp', 'avh', 'rtfpna', 'rdana']

In [None]:
pwt = pd.read_excel('../../../Data/pwt1001.xlsx',
                    sheet_name = 'Data',
                    parse_dates = ['year'],
                    index_col = 3)

# Create a new column for GDP Growth
pwt[f'{gdp_type}_lag'] = pwt.groupby(['countrycode'])[gdp_type].shift(1)
pwt['gdp_growth'] = pwt.apply(lambda row : ((row[gdp_type] - row[f'{gdp_type}_lag']) / row[f'{gdp_type}_lag']) * 100, axis = 1)

Now, we:
- Filter out the countries we will make the predictions for
- Select the columns we use in the calculation
- Drop N/A values

In [None]:
def extract_countries(data: pd.DataFrame, countries: list[str]) -> bool:
    result = {}

    # Filter out the relevant countries
    data = data[data['countrycode'].isin(countries)]

    # Filter out the variables we need and drop N/A values
    data = data[['country', 'countrycode', dep] + features]
    data = data.dropna()

    for countrycode, name in zip(countries, data['country'].unique()):
        result[name] = {}
        result[name]['raw'] = data[data['countrycode'] == countrycode].drop(['country', 'countrycode'], axis = 1)

    return result

countries = extract_countries(pwt, country_codes)

## Testing Multicollinearity

In [None]:
from statsmodels.stats.outliers_influence import variance_inflation_factor 

#print([variance_inflation_factor(df.values, i) for i in range(df.values.shape[1])])

## Testing Causation
Granger's causality test is used to determine whether one time series influence another, i.e. it tests if there is causation or not.


In [None]:
from statsmodels.tsa.stattools import grangercausalitytests

def causation_test(data: pd.DataFrame, verbose = False) -> bool:
    maxlag = round(12 * pow(len(df) / 100, 1/4))
    alpha = 0.05
    results = pd.DataFrame([], columns = data.columns, index = data.columns)
    causation = True

    # TODO: find out whether we need to have causation between all variables,
    #       or only for the variable we want, the rgdpna
    for col in results.columns:
        for row in results.index:
            if row == col:
                results.loc[row, col] = ''
                continue

            result = grangercausalitytests(
                data[[row, col]],
                maxlag   = maxlag,
                addconst = True,
                verbose  = False
            )

            p_values = [round(result[i + 1][0]['ssr_chi2test'][1], 3) for i in range(maxlag)]
            p_value = np.min(p_values)

            results.loc[row, col] = p_value

            if p_value >= alpha:
                causation = False

    if verbose:
        display(results)

    return causation

for country, df in countries.items():
    display(Markdown(f'### {country}'))

    if causation_test(df['raw'], True):
        print('Causation is present between all variables')
    else:
        print('Causation is not present for all variables')

## Testing Cointegration
Cointegration means there is a long-term tendency for two or more time series to move together.

There are multiple ways to test for cointegration, like Engle-Granger and Johansen. While the Engle-Granger test is more simple, the Johansen test allows for multiple cointegrated relationships to be tested. Johansen is therefore the one that is used here.

In [None]:
from statsmodels.tsa.vector_ar.vecm import coint_johansen

def cointegration_test(data: pd.DataFrame, verbose = False) -> bool:
    results = pd.DataFrame()
    cointegration = True

    result = coint_johansen(
        data,
        det_order = -1, # No deterministic terms
        k_ar_diff = 5 # TODO not entirely sure what value we should use here
    )

    for name, trace, cv95 in zip(data.columns, result.trace_stat, result.cvt[:,1]):
        results[name] = {
            'Trace':                round(trace, 3),
            'Critical Value (95%)': round(cv95, 3),
            'Significant':          'Yes' if trace > cv95 else 'No'
        }

        if trace <= cv95:
            cointegration = False
    
    if verbose:
        display(results.transpose())

    return cointegration

for country, df in countries.items():
    display(Markdown(f'### {country}'))

    if cointegration_test(df['raw'], True):
        print('A significant relationship is present between all variables')
    else:
        print('A significant relationship is not present for all variables')

## Split
Now we split the data into a test, and training sample.

In [None]:
for country, df in countries.items():
    train_length = int(len(df['raw']) * 0.75)

    countries[country]['train_length'] = train_length
    countries[country]['train'] = df['raw'][:train_length]
    countries[country]['test'] = df['raw'][train_length:]

## Stationarity
Now a _Augmented Dickey-Fuller (ADF)_ test is performed to check if the data is stationary.

When time series data is stationary, its properties such as variance and mean, are constant over time. For VAR to forecast meaningful results, it is required that the data that is put in is stationary. 

There are also other methods of determining whether a series is stationary or not, but ADF seems to be the most popular one.

A significance level (alpha) of __5%__ is used to determine whether or not a series is stationary or not.

If any series is not stationary, all series are differenced again until they all are stationary.

In [None]:
from statsmodels.tsa.stattools import adfuller

def stationarity_test(data: pd.DataFrame, verbose = False) -> bool:
    maxlag = round(12 * pow(len(df) / 100, 1/4))
    alpha = 0.05
    results = pd.DataFrame()
    stationary = True

    for name, series in data.items():
        result = adfuller(
            series,
            maxlag     = maxlag,
            regression = 'c', # Constant regression
            autolag    = 'AIC', 
            store      = False,
            regresults = False
        )

        results[name] = {
            'P-Value':     round(result[1], 3),
            'Number lags': result[2],
            'Stationary':  'Yes' if result[1] <= alpha else 'No'
        }

        if result[1] > alpha:
            stationary = False

    if verbose:
        display(results.transpose())

    return stationary

for country, df in countries.items():
    display(Markdown(f'### {country}'))

    countries[country]['diff'] = df['train'].copy()
    countries[country]['diff_passes'] = 0

    while stationarity_test(df['diff'], True) == False:
        #if diff_pass >= 1:
        #    #raise Exception('Whoops', 'Unable to make all series stationary')
        #    break
        countries[country]['diff_passes'] += 1

        print(f'One or more series are non-stationary, differencing... (pass = {df["diff_passes"]})')

        countries[country]['diff'] = df['diff'].diff().dropna()

        if df['diff_passes'] > 2:
            raise Exception('Whoops', 'Unable to make all series stationary')
            break
    else:
        print(f'All series are stationary')

## Fitting and Lag Order Selection

The lag order refers to how far the VAR model looks back in the time series.

TODO pas aan
Determining the optimal lag order will greatly influence the results of the prediction. Besides manually selecting the lag order, there are several statistical methods that select the lag order, such as AIC, BIC, FPE and HQIC.

Here, the lag order is automatically chosen based on the HQIC method. QHIC is a combination of both AIC and BIC.
AIC is the most simple method, but is prone to overfitting.
BIC is more conservative, and is less likely to overfit.

In [None]:
from statsmodels.tsa.api import VAR
from VARParameterSelection import VARParameterSelection

for country, df in countries.items():
    display(Markdown(f'### {country}'))

    model = VAR(df['diff'], freq = df['diff'].index.inferred_freq)

    params = VARParameterSelection.var_parameter_search(
        model         = model,
        target_column = dep,
        df_train      = df['train'],
        df_diff       = df['diff'],
        df_test       = df['test'],
    )

    #countries[country]['maxlag'] = model.select_order().selected_orders['aic']
    countries[country]['maxlag'] = params[0].lag
    trend = params[0].trend

    print(f'Best found maxlag = {df['maxlag']}')
    print(f'Best found trend = {trend}')

    countries[country]['results'] = model.fit(
        maxlags = df['maxlag'],
        method  = 'ols',
        ic      = None,
        trend   = trend
    )

## Test Normality

TODO

In [None]:
for country, df in countries.items():
    display(Markdown(f'### {country}'))
    print(df['results'].test_normality())

## Check for Serial Correlation of Residuals

TODO explain

In [None]:
from statsmodels.stats.stattools import durbin_watson

for country, df in countries.items():
    display(Markdown(f'### {country}'))

    out = durbin_watson(df['results'].resid)

    for col, val in zip(df['raw'].columns, out):
        print(str(col).ljust(10), ':', round(val, 2))

## Forecasting
Now the model will make a prediction for the period of the test data based on the training data.

In [None]:
def diff_inv(forecast_diff, original, passes):
    df_temp = forecast_diff.copy()

    for i in range(passes, 0, -1):
        df_orig = original.iloc[-1]

        for j in range(2, i + 1):
            df_orig = df_orig - original.iloc[-j]

        df_temp = df_orig + df_temp.cumsum()

    return df_temp

df_forecast = {}
df_lower_error = {}
df_upper_error = {}

for country, df in countries.items():
    horizon = len(df['test'])
    alpha = 0.05

    # Do the forecast
    mid, lower, upper = df['results'].forecast_interval(df['diff'].values[-df['maxlag']:], steps = horizon, alpha = alpha)

    # Put it into a dataframe
    countries[country]['forecast'] = pd.DataFrame(mid, columns = df['diff'].columns, index = df['test'].iloc[:horizon].index)
    countries[country]['lower_error'] = pd.DataFrame(lower, columns = df['diff'].columns, index = df['test'].iloc[:horizon].index)
    countries[country]['upper_error'] = pd.DataFrame(upper, columns = df['diff'].columns, index = df['test'].iloc[:horizon].index)

    # Reverse the differencing
    if df['diff_passes'] > 0:
        original = df['train']
        passes = df['diff_passes']

        countries[country]['forecast'] = diff_inv(df['forecast'], original, passes)
        countries[country]['lower_error'] = diff_inv(df['lower_error'], original, passes)
        countries[country]['upper_error'] = diff_inv(df['upper_error'], original, passes)

## Forecast Plot

In [None]:
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt
#from sklearn.model_selection import StandardScaler

for country, df in countries.items():
    rmse = mean_squared_error(df['forecast'][dep], df['test'][dep].values[:horizon], squared = False)

    fig, ax = plt.subplots(1, 1)

    ax.plot(df['train'][-train_length:][dep], color='black', label='Train')
    ax.plot(df['test'][:horizon][dep], color='tab:blue', label = 'Test')
    ax.plot(df['forecast'][dep], color = 'tab:orange', label = 'Forecast')

    ylims = ax.get_ylim()
    ax.fill_between(
        df['forecast'][dep].index,
        df['lower_error'][dep].values,
        df['upper_error'][dep].values,
        color = 'bisque',
        label = f'{int(100 - (alpha * 100))}% CI'
    )

    ax.set_ylim(ylims)

    ax.xaxis.set_ticks_position('none')
    ax.yaxis.set_ticks_position('none')
    ax.spines['top'].set_alpha(0)
    ax.tick_params(labelsize = 6)
    ax.set_title(f'{country} {dep_name}')
    ax.set_title(f'RMSE: {round(rmse, 3)}', loc = 'left', x = 0.04, y = 0.9, fontsize = 'small', color = 'white', backgroundcolor = 'gray')
    ax.legend(loc = 'lower right')
    ax.spines[['right', 'top']].set_visible(False)