### Adaptive Fault Detection with VAR Models

##### Import libraries needed

In [15]:
# importing packages and libraries
from pandas import read_csv
import pandas as pd
import numpy as np
import pickle
import os
from statsmodels.tsa.api import VAR
import matplotlib.pyplot as plt

# Ignoe harmless warnings
import warnings
warnings.filterwarnings("ignore")

# Define the plot size default
from pylab import rcParams
rcParams['figure.figsize'] = (12, 5)

### Defining methods or functions

In [16]:
# Plotting multiple series
def plot_multiple_series(actual, pred, attr):
    for i in range(len(attr)):
        title = "Prediction of {}".format(attr[i])
        plt.title(title)
        plt.xlabel("Timestep")
        plt.ylabel("Values")
        plt.plot(actual.iloc[:,i], label="actual")
        plt.plot(pred.iloc[:,i], label="forecast")
        plt.legend()
        plt.show()


# Root mean squared error
def root_mse(x, y):
    if len(x) != len(y):
        return "Error: The two arguments must have the same length"
    mse = np.square(np.subtract(x, y)).mean()
    return np.sqrt(mse)

# Plotting series
def plot_series(series, attr):
    for i in range(len(attr)):
        title = "Plot of "+str(attr[i])
        actual = series.iloc[:,i]
        plt.title(title)
        plt.xlabel("Timestep")
        plt.ylabel(attr[i])
        plt.plot(actual)
        plt.show()

# Normalisation of time series
def normalise_timeseries(data):
    # Calculate the mean and standard deviation for each feature
    means = np.mean(data, axis=0)
    stds = np.std(data, axis=0)
    
    # Normalise each feature using standard deviation
    normalised_data = (data - means) / stds
    return pd.DataFrame(normalised_data)


# Denomalisation of time series
def denormalise_timeseries(data, means, stds):
    denormalised_data = (data * stds) + means
    return pd.DataFrame(denormalised_data)


# Augmented Dickey-Fuller Test
def adf_test(series, title=''):
    '''
    Hypothesis Test for Stationarity
    Pass in a time series and an optional title, return an ADF report
    '''
    print(f'Augmented Dickey-Fuller Test: {title}')
    result = adfuller(series.dropna(),autolag='AIC')
    labels = ['ADF test statistics','p-value','#lags','#observations'] # use help(adfuller) to understand why these labels are chosen
    
    outcome = pd.Series(result[0:4],index=labels)
    
    for key,val in result[4].items():
        outcome[f'critical value ({key})'] = val
        
    print(outcome.to_string()) # this will not print the line 'dtype:float64'
    
    if result[1] <= 0.05:
        print('Strong evidence against the null hypothesis') # Ho is Data is not stationary, check help(adfuller)
        print('Reject the null hypothesis')
        print('Data is Stationary')
    else:
        print('Weak evidence against the Null hypothesis')
        print('Fail to reject the null hypothesis')
        print('Data has a unit root and is non stationary')


# Loading expert models in a dictionary
def load_expert_models(expert_path):
    files = os.listdir(expert_path)
    pickle_files = [file for file in files if file.endswith('.pkl')]
    models = {}

    for file in pickle_files:
        with open(file, 'rb') as f:
            models[file.split('.')[0]] = pickle.load(f)

    return models


In [None]:
# Assigning variables
file = 'test_series_reduced.csv'
expert_path = 'expert_models'
df_raw = read_csv(file, header=0, index_col=0)
attr = list(pd.read_csv(file).columns.values)[1:]
series = df_raw.iloc[:40000,:]
plot_series(series, attr)
nobs = 3000
steps = 15


In [20]:
my_experts = load_expert_models(expert_path)
print(my_experts)


{'var_ctrl_stuckat0_perm_reduced': <statsmodels.tsa.vector_ar.var_model.VARResultsWrapper object at 0x0000021B87512260>, 'var_ctrl_stuckat1_perm_reduced': <statsmodels.tsa.vector_ar.var_model.VARResultsWrapper object at 0x0000021B87644760>, 'var_ctrl_valueFlip_perm_reduced': <statsmodels.tsa.vector_ar.var_model.VARResultsWrapper object at 0x0000021B876D2E60>, 'var_golden_model_reduced': <statsmodels.tsa.vector_ar.var_model.VARResultsWrapper object at 0x0000021B8CDC0C10>}


In [24]:
normalised_data = normalise_timeseries(df_raw)
testData = normalised_data.copy()
train = testData.iloc[:-nobs]
test = testData.iloc[-nobs:]
len(train), len(test)

(17001, 3000)