## Applying  Vector auto-regressive multivariate

<u>**Vector autoregression (VAR)**</u> is a statistical model for multivariate time series analysis, especially in a time series where the variables have a relationship that affects each other to time. VAR models are different from univariate autoregressive models because they allow analysis and make predictions on multivariate time series data. VAR models are often used in economics and weather forecasting.
In autoregression,  the time series is modeled as a linear combination of it’s own lags. That is, the past values of the series are used to forecast the current and future.

Vector AutoRegression (VAR)
In this notebook we fit a VAR model to revenue data collected in EDR between 2016 and 2023 for BRs Revenue. Specifically, we will

- Analyze the time series, clean them and remove possible outliers.
- check for stationary and differencing the variables that have unit root.
- fit a model and chose it using a multi-variate version of AIC.
- check the residuals (correlations, autocorrelations, normality).
- check causality to better understand the model (Granger-causality, impulse response functions).
- use the model to forcast revenue for next periods.

In [0]:
! pip install statsmodels -q

In [0]:
import datetime
from matplotlib import rcParams
from cycler import cycler
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import statsmodels
from statsmodels.graphics.tsaplots import plot_acf
from statsmodels.tsa.seasonal import seasonal_decompose
import seaborn as sns
from statsmodels.tsa.stattools import adfuller
from pandas.tseries.offsets import DateOffset
from statsmodels.tsa.vector_ar.var_model import VAR

## 1. Uploading Data, Index the Dataframe with TimeDate Column

In [0]:
def upload_file(path):
    time_column_name = "Timestamp"
    df = pd.read_csv(path, delimiter=";")
    df['Timestamp'] = df['SNAPSHOT_PERIOD_END'].apply(lambda x: datetime.datetime.strptime(x, "%d%b%Y:%H:%M:%S").date())
    df.sort_values(by=['Timestamp'],inplace=True)
    df.set_index(df.Timestamp, inplace=True)
    df= df.drop(['FY','SNAPSHOT_PERIOD_END','POST_PERIOD', 'Timestamp'], axis=1)
    df= df.rename(columns={"SUM_of_BR_BILLED_AMT": "Revenue"})
    df['Revenue']=df['Revenue']/1000000
 
    return (df)

df= upload_file("./dataset2.csv")
df.head(20)

## 2.  Visualize the Time Series

In [0]:
sns.set(style="whitegrid")
fig, axes = plt.subplots(1, 3, figsize=(16,6),
                      sharex=True,
                      )
sns.lineplot (ax=axes[0],data=df, x='Timestamp', y='Revenue' ,color='orange',linewidth=2)
sns.lineplot (ax=axes[1],data=df, x='Timestamp',y='COUNT_BR_NMBR', linewidth=2)
sns.lineplot (ax=axes[2],data=df, x='Timestamp',y='COUNT_GC_ORG',color='black', linewidth=2)

## 3. ADF Test for Stationarity

Since the VAR model requires the time series you want to forecast to be stationary, it is customary to check all the time series in the system for stationarity.

The ADF test is one of the most popular statistical tests. It can be used to help us understand whether the time series is stationary or not.

Null hypothesis: If failed to be rejected, it suggests the time series is not stationarity.

Alternative hypothesis: The null hypothesis is rejected, it suggests the time series is stationary.

A stationary time series is one whose characteristics like mean and variance does not change over time. If you have clear trend and seasonality in your time series, then model these components, remove them from observations by difference, then train models on the residuals.

#### a) Stationarity

In [0]:
# Plot the acf and the Dickley-Fuller test for the columns of the passed dataframe
def stationary_test(df,columns,treshold = 0.05):

    for quant in columns:
        
        # Remove null values (useful when plotting diff series)
        time_series = df[quant][df[quant].notnull()]

        # Autocorrelation function
        plot_acf(time_series,title='Autocorrelation function - {}'.format(quant))
        plt.xlabel('lags')
        plt.show()

        # Dickely-Fuller test statistics
        result = adfuller(time_series, autolag= 'AIC')
        pvalue = result[1]
        print(result)
        print(f'ADF Statistic: {result[0]}')
        print(f'n_lags: {result[1]}')
        print(f'p-value: {result[1]}')
        for key, value in result[4].items():
            print('Critial Values:')
            print(f'   {key}, {value}')   

        if pvalue < treshold:
            print('Time series "{}" is stationay (NULL HP rejected - pvalue = {:.4f})'.format(quant,pvalue))
        else:
            print('Time series "{}" might have unit root (NULL HP cannot be rejected - pvalue = {:.4f})'.format(quant,pvalue))
stationary_test(df, df.columns)

####  Test again the stationarity  with ADF

In [0]:
df_log = np.log(df['Revenue']) 
temp_df = df[['COUNT_BR_NMBR','COUNT_GC_ORG']]
df_differenced = temp_df.diff().dropna()
transformed_df = pd.concat([df_log, df_differenced], axis = 1)
transformed_df= transformed_df.dropna()

In [0]:
sns.set(style="whitegrid")
fig, axes = plt.subplots(1, 3, figsize=(16,6),
                      sharex=True,
                      )
sns.lineplot (ax=axes[0],data=transformed_df, x='Timestamp', y='Revenue' ,color='orange',linewidth=2)
sns.lineplot (ax=axes[1],data=transformed_df, x='Timestamp',y='COUNT_BR_NMBR', linewidth=2)
sns.lineplot (ax=axes[2],data=transformed_df, x='Timestamp',y='COUNT_GC_ORG',color='black', linewidth=2)

In [0]:
transformed_df.head()

In [0]:
diff_columns = ['Revenue','COUNT_BR_NMBR','COUNT_GC_ORG']
stationary_test(transformed_df,diff_columns )

### b) Testing Causation using Granger’s Causality Test
The Granger causality test is a statistical hypothesis test for determining whether one time series is a factor and offer useful information in forecasting another time series

In [0]:
from statsmodels.tsa.stattools import grangercausalitytests
maxlag=12
test = 'ssr_chi2test'
def grangers_causation_matrix(data, variables, test='ssr_chi2test', verbose=False):    
    """Check Granger Causality of all possible combinations of the Time series.
    The rows are the response variable, columns are predictors. The values in the table 
    are the P-Values. P-Values lesser than the significance level (0.05), implies 
    the Null Hypothesis that the coefficients of the corresponding past values is 
    zero, that is, the X does not cause Y can be rejected.

    data      : pandas dataframe containing the time series variables
    variables : list containing names of the time series variables.
    """
    df = pd.DataFrame(np.zeros((len(variables), len(variables))), columns=variables, index=variables)
    for c in df.columns:
        for r in df.index:
            test_result = grangercausalitytests(data[[r, c]], maxlag=maxlag, verbose=False)
            p_values = [round(test_result[i+1][0][test][1],4) for i in range(maxlag)]
            if verbose: print(f'Y = {r}, X = {c}, P Values = {p_values}')
            min_p_value = np.min(p_values)
            df.loc[r, c] = min_p_value
    df.columns = [var + '_x' for var in variables]
    df.index = [var + '_y' for var in variables]
    return df

grangers_causation_matrix(transformed_df, variables = transformed_df.columns)  

The row are the Response (Y) and the columns are the predictor series (X). If a given p-value is < significance level (0.05), for example, when we take the value 0.0089 in (row 1, column 2), we can reject the null hypothesis and conclude that COUNT_BR_NMBR_x Granger causing Revenue_y. Likewise, the 0.0 in (row 2, column 1) refers to COUNT_BR_NMBR_y Granger causing Revenue_x.

All the time series in the above data are interchangeably Granger causing each other.

### c) Cointegration Test

In [0]:
from statsmodels.tsa.vector_ar.vecm import coint_johansen

def cointegration_test(df, alpha=0.05): 
    """Perform Johanson's Cointegration Test and Report Summary"""
    out = coint_johansen(df,-1,5)
    d = {'0.90':0, '0.95':1, '0.99':2}
    traces = out.lr1
    cvts = out.cvt[:, d[str(1-alpha)]]
    def adjust(val, length= 6): return str(val).ljust(length)

    # Summary
    print('Name   ::  Test Stat > C(95%)    =>   Signif  \n', '--'*20)
    for col, trace, cvt in zip(df.columns, traces, cvts):
        print(adjust(col), ':: ', adjust(round(trace,2), 9), ">", adjust(cvt, 8), ' =>  ' , trace > cvt)

cointegration_test(transformed_df)

### 4.  Split the Series into Training and Testing Data

### Note
- Try using the df_differenced instead of df

In [0]:
df

In [0]:
transformed_df

In [0]:
# Check size
nobs = 12
transformed_df.drop(['COUNT_BR_NMBR'], axis =1, inplace = True)
df_train_diff, df_test_diff = transformed_df[0:-nobs], transformed_df[-nobs:]
#df_train_diff= df_train.diff().dropna()


In [0]:
df_test_diff

In [0]:
model = VAR(df_train_diff)
x = model.select_order()
x.summary()

In [0]:
model_fitted = model.fit(11)
model_fitted.summary()

The biggest correlation is 0.66 (Revenue & Br Numbers)
Correlation matrix of residuals

## Check for Serial Correlation of Residuals (Errors) using Durbin Watson Statistic
Serial correlation of residuals is used to check if there is any leftover pattern in the residuals (errors).The value of this statistic can vary between 0 and 4. The closer it is to the value 2, then there is no significant serial correlation. The closer to 0, there is a positive serial correlation, and the closer it is to 4 implies negative serial correlation.

In [0]:
from statsmodels.stats.stattools import durbin_watson
out = durbin_watson(model_fitted.resid)

for col, val in zip(df.columns, out):
    print(col, ':', round(val, 2))

A value close to 2.0 means that there is no autocorrelation detected in the residuals.

## Forecast VAR model using statsmodels

In [0]:
# Forecast
# Get the lag order
lag_order = model_fitted.k_ar
print(lag_order)  #> 4
df_train_diff

In [0]:
df_train_diff['Revenue'].iloc[-1]

In [0]:
def invert_transformation(df, fc):
    cols = ['COUNT_GC_ORG']
    x = []
    for col in cols:
        diff_results = df[col].iloc[-1] + fc[col]
        x.append(diff_results)
    diff_df_inverted = pd.concat(x, axis=1)
    return diff_df_inverted


In [0]:
#def invert_transformation(df_train, df_forecast):
#    forecast = df_forecast.copy()
#    columns = df_train.columns
#    for col in columns:
#        forecast[str(col)+'_pred'] = df_train[col].iloc[-1] + forecast[str(col)+'_pred'].cumsum()
#    return forecast


In [0]:
# Input data for forecasting
df_input = df_train_diff.values[-lag_order:] # making prediction using last 13 previous value to predict 12 future values in our case
nobs=12
df_forecast  = model_fitted.forecast(y=df_input, steps=nobs)
df_forecast

In [0]:
df_forecast = (pd.DataFrame(df_forecast , index=df_test_diff.index, columns=df_test_diff.columns))
df_forecast.head(12)

In [0]:
df.head(12)

In [0]:
y = df_forecast["Revenue"]
df_forecast["Revenue"] = np.exp(y)
result = invert_transformation(df_train_diff, df_forecast)

In [0]:
df_forecast

In [0]:
# df_results = invert_transformation(df_train_diff, df_forecast)
# df_results.head(12)

In [0]:
fig, axes = plt.subplots(nrows=int(len(df_train_diff.columns)/2), ncols=3, dpi=150, figsize=(10,5))
for i, (col,ax) in enumerate(zip(df_train.columns, axes.flatten())):
    df_results[col+'_pred'].plot(legend=True, ax=ax).autoscale(axis='x',tight=True)
    df_test[col][-nobs:].plot(legend=True, ax=ax);
    ax.set_title(col + ": Forecast vs Actuals")
    ax.xaxis.set_ticks_position('none')
    ax.yaxis.set_ticks_position('none')
    ax.spines["top"].set_alpha(0)
    ax.tick_params(labelsize=6)

plt.tight_layout();