# Classical techniques for forecasting 

We try to find the best-fit combination of the parameters by Auto-ARIMA and we use those parameters in the VARMA model for predicting the forecast values. 

In [1]:
import pandas as pd
import numpy as np
from matplotlib import pyplot as plt
from statsmodels.tsa.ar_model import AutoReg
from statsmodels.tsa.arima_process import ArmaProcess
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.statespace.varmax import VARMAX, VARMAXResults
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
import sklearn.metrics
from sklearn.preprocessing import MinMaxScaler
from pmdarima import auto_arima
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import warnings
warnings.filterwarnings("ignore")
import random
import os

In [2]:
BASE_DIR = 'data_feed/'
# Set the hyperparameters
HIDDEN_SIZE = 32
NUM_EPOCHS = 100
LAG = 10
N_STOCK = 20
BATCH_SIZE = 32

In [3]:
# check if data_feed directory exist if not 
assert os.path.exists(BASE_DIR)

In [4]:
# list all the files in the data_feed directory
files = os.listdir('data_feed/')

## Data preprocessing

In [5]:
def rename_columns(df: pd.DataFrame, file_name: str) -> None:
    """
    modify columns names (exept date) into the following structure: 
        *column_name*_*stock_name* 
        
    Example: Open_AAPL
    """
    df.rename(columns=dict(zip(df.columns[1:], df.columns[1:] + '_' + file_name[:-4])), inplace=True)

In [6]:
# Read First File into a pandas dataframe
df = pd.read_csv(BASE_DIR+files[0])
rename_columns(df, files[0])

# Merge Everything into a unique DataFrame
for fn in files[1:]:
    df2 = pd.read_csv(BASE_DIR+fn)
    rename_columns(df2, fn)
    result = df.merge(df2, on='Date')
    df = result

df.set_index('Date',inplace=True)
df.index = pd.to_datetime(df.index)

In [7]:
cols = [col for col in df.columns if "Adj Close" in col]
df = df[cols]

df_copy = df.copy(deep=True)
for col in df.columns:
    df_copy[f'Ret_{col}'] = df_copy[col] / df_copy[col].shift(1)

In [8]:
split_date = df.index[0] + pd.offsets.DateOffset(years=8)
train_data = df[df.index <= split_date]
test_data = df[df.index > split_date]
#test_data.to_csv('TEST.csv')
#train_data.to_csv('TRAIN.csv')

In [9]:
device = torch.device("cpu")
if torch.cuda.is_available():
    device = torch.device("cuda")

In [10]:
scaler = MinMaxScaler()
scaled_train = scaler.fit_transform(train_data.values)
scaled_test = scaler.transform(test_data.values)

In [11]:
class TimeSeriesdataset(Dataset):
    def __init__(self, lag: int, data: np.ndarray, device: torch.device):
        self.lag = lag
        self.data = data
        self.device = device

    def __len__(self):
        lenght = len(self.data)
        return lenght - (self.lag + 1)

    def __getitem__(self, idx):
        X = self.data[idx:idx+self.lag, :].flatten()
        Y = self.data[idx+self.lag, :]
        return torch.Tensor(X, device=self.device), torch.Tensor(Y, device=self.device)

In [12]:
train_dataset = TimeSeriesdataset(lag=LAG, data=scaled_train, device=device)
test_dataset = TimeSeriesdataset(lag=LAG, data=scaled_test, device=device)

train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

## Application of Auto_ARIMA and VARMA

`auto_arima` is a function used in time series analysis to automatically determine the optimal parameters for an ARIMA (AutoRegressive Integrated MovingAverage) model. ARIMA models are commonly used to forecast future values based on past observations.
The `auto_arima` function uses an iterative approach to search for the best combination of parameters (p, d, q) for the ARIMA model.
- `p` represents the order of the AutoRegressive (AR) component, which captures the linear relationship between an observation and some number of lagged observations.
- `d` represents the order of differencing required to make the time series stationary. Differencing removes trends and seasonality from the data.
- `q` represents the order of the Moving Average (MA) component, which models the dependency between an observation and a residual error from a moving average of lagged observations.

By automatically determining the optimal parameters, `auto_arima` simplifies the process of fitting an ARIMA model to time series data and helps in generating more accurate forecasts.

`VARMA` is an acronym for Vector Autoregressive Moving Average. It is a statistical model used for analyzing and forecasting multivariate time series data. The advantage is that VARMA models can handle multiple time series variables simultaneously.
VARMA models combine autoregressive (AR) and moving average (MA) components to capture the linear relationships and dependencies among the variables. The "vector" in VARMA refers to the fact that each variable is modeled as a linear combination of lagged values of all variables in the system.

VARMA models require specifying two sets of parameters:
- `p` and `q` represent the orders of the autoregressive and moving average components, respectively, for each variable in the system.
- Additionally, VARMA models also involve specifying the lag order for the error terms of each equation.
VARMA models are useful for analyzing the dynamic relationships between multiple time series variables and can provide forecasts for each variable in the system based on historical data.

In our project, we have used `auto_arima` to automatically select the optimal parameters for an ARIMA model, and then used VARMA with the same parameters to analyze and perform forecasting on multiple time series variables simultaneously.

In [13]:
def timeseries_evaluation_metrics_func(y_true, y_pred):
    
    def mean_absolute_percentage_error(y_true, y_pred): 
        y_true, y_pred = np.array(y_true), np.array(y_pred)
        return np.mean(np.abs((y_true - y_pred) / y_true)) * 100
    print('Evaluation metric results:-')
    print(f'MSE is : {metrics.mean_squared_error(y_true, y_pred)}')
    print(f'MAE is : {metrics.mean_absolute_error(y_true, y_pred)}')
    print(f'RMSE is : {np.sqrt(metrics.mean_squared_error(y_true, y_pred))}')
    print(f'MAPE is : {mean_absolute_percentage_error(y_true, y_pred)}')
    print(f'R2 is : {metrics.r2_score(y_true, y_pred)}',end='\n\n')

In [14]:
def Augmented_Dickey_Fuller_Test_func(series , column_name):
    print (f'Results of Dickey-Fuller Test for column: {column_name}')
    dftest = adfuller(series, autolag='AIC')
    dfoutput = pd.Series(dftest[0:4], index=['Test Statistic','p-value','No Lags Used','Number of Observations Used'])
    for key,value in dftest[4].items():
       dfoutput['Critical Value (%s)'%key] = value
    print (dfoutput)
    if dftest[1] <= 0.05:
        print("Conclusion:====> Reject the null hypothesis: Data is stationary")
    else:
        print("Conclusion:====> Fail to reject the null hypothesis: Data is non-stationary")

In [16]:
df_stat = pd.DataFrame(df)

for column in df.columns:
    Augmented_Dickey_Fuller_Test_func(df[column].values,column)
    #df_stat[column] = stationarity_test(df[column].values)

Results of Dickey-Fuller Test for column: Adj Close_AAPL
Test Statistic                    0.410100
p-value                           0.981869
No Lags Used                     22.000000
Number of Observations Used    2589.000000
Critical Value (1%)              -3.432878
Critical Value (5%)              -2.862657
Critical Value (10%)             -2.567365
dtype: float64
Conclusion:====> Fail to reject the null hypothesis: Data is non-stationary
Results of Dickey-Fuller Test for column: Adj Close_AMZN
Test Statistic                   -1.066050
p-value                           0.728419
No Lags Used                     27.000000
Number of Observations Used    2584.000000
Critical Value (1%)              -3.432883
Critical Value (5%)              -2.862659
Critical Value (10%)             -2.567366
dtype: float64
Conclusion:====> Fail to reject the null hypothesis: Data is non-stationary
Results of Dickey-Fuller Test for column: Adj Close_BRK-A
Test Statistic                   -0.180345
p

In [17]:
train_data = train_data.diff()
train_data.dropna(inplace=True)

In [18]:
for column in df.columns:
    Augmented_Dickey_Fuller_Test_func(train_data[column].values,column)
    #df_stat[column] = stationarity_test(df[column].values)

Results of Dickey-Fuller Test for column: Adj Close_AAPL
Test Statistic                -8.868298e+00
p-value                        1.420554e-14
No Lags Used                   2.600000e+01
Number of Observations Used    1.987000e+03
Critical Value (1%)           -3.433645e+00
Critical Value (5%)           -2.862996e+00
Critical Value (10%)          -2.567545e+00
dtype: float64
Conclusion:====> Reject the null hypothesis: Data is stationary
Results of Dickey-Fuller Test for column: Adj Close_AMZN
Test Statistic                -9.626434e+00
p-value                        1.655871e-16
No Lags Used                   2.600000e+01
Number of Observations Used    1.987000e+03
Critical Value (1%)           -3.433645e+00
Critical Value (5%)           -2.862996e+00
Critical Value (10%)          -2.567545e+00
dtype: float64
Conclusion:====> Reject the null hypothesis: Data is stationary
Results of Dickey-Fuller Test for column: Adj Close_BRK-A
Test Statistic                -1.095695e+01
p-value   

Find the best orders using auto_arima

In [19]:
pq = []
max_pdq = 3
for column in df.columns:
    print(f'Searching order of p and q for : {column}')
    #parameter = find_best_order_ARIMA(train_data[column].values, max_pdq)
    stepwise_model = auto_arima(train_data[column].values, start_p=1, start_q=1, max_p=max_pdq, max_q=max_pdq, seasonal=False,
        error_action='ignore',suppress_warnings=True, stepwise=True,maxiter=1000)
    parameter = stepwise_model.get_params().get('order')
    print(f'optimal order for:{column} is: {parameter} \n\n')
    pq.append(parameter)

Searching order of p and q for : Adj Close_AAPL
optimal order for:Adj Close_AAPL is: (3, 1, 0) 


Searching order of p and q for : Adj Close_AMZN
optimal order for:Adj Close_AMZN is: (3, 0, 0) 


Searching order of p and q for : Adj Close_BRK-A
optimal order for:Adj Close_BRK-A is: (3, 0, 2) 


Searching order of p and q for : Adj Close_DIS
optimal order for:Adj Close_DIS is: (2, 0, 1) 


Searching order of p and q for : Adj Close_GOOGL
optimal order for:Adj Close_GOOGL is: (1, 0, 0) 


Searching order of p and q for : Adj Close_HD
optimal order for:Adj Close_HD is: (3, 0, 2) 


Searching order of p and q for : Adj Close_JNJ
optimal order for:Adj Close_JNJ is: (2, 0, 3) 


Searching order of p and q for : Adj Close_JPM
optimal order for:Adj Close_JPM is: (2, 0, 0) 


Searching order of p and q for : Adj Close_KO
optimal order for:Adj Close_KO is: (2, 0, 3) 


Searching order of p and q for : Adj Close_MA
optimal order for:Adj Close_MA is: (3, 0, 1) 


Searching order of p and q for : A

In [20]:
from collections import Counter

def valore_piu_frequente(lista):
    conteggio = Counter(lista)
    valore_piu_comune = conteggio.most_common(1)[0][0]
    return valore_piu_comune

best_order_pq = valore_piu_frequente(pq)
print(best_order_pq)  
print(best_order_pq[0], best_order_pq[2])

(3, 0, 2)
3 2


In [24]:
print(train_data.shape)
print(test_data.shape)

(2014, 20)
(597, 20)


Now we can apply VARMAX using the most frequent 'best-order' pair of order.

In [25]:
# Apply VARMAX
model = VARMAX(train_data.values, exog=None, order=(best_order_pq[0], best_order_pq[2]))
model_fit = model.fit(maxiter=50, disp=True)
print(model)
print(model_fit)
print(model_fit.params)
print('\n\nCov matrix')
cov_matrix = model_fit.cov_params()
print(cov_matrix)

parameters = model_fit.params

KeyboardInterrupt: 

In [None]:
predictions = model_fit.predict(start=len(train_data), end=len(train_data) + len(test_data) - 1)

In [None]:
# Save the model
model_fit.save('varmax_model.pkl')

# Reload the model
#loaded_results = VARMAXResults.load('varmax_model.pkl')

In [None]:
def get_realized_vol(dataset, time):
    #dataset['returns'] = np.log(dataset["Adj Close"]/dataset["Adj Close"].shift(1))
    #dataset.fillna(0, inplace = True)
    #window/time tells us how many days out vol you want. ~21 = 1 month out vol (~21 trading days in a month)
    #we do this so we can match up with the vix which is the 30 day out (~21 trading day) calculated vol
    volatility = dataset.returns.rolling(window=time).std(ddof=0)*np.sqrt(252)
    return volatility
    
def get_realized_spx_vol(dataset, time):
    #dataset['returns'] = np.log(dataset["Close"]/dataset["Close"].shift(1))
    #dataset.fillna(0, inplace = True)
    spx_volatility = dataset.returns.rolling(window=time).std(ddof=0)*np.sqrt(252)
    return spx_volatility

In [None]:
# Forecast on the test set
predictions2 = model_fit.forecast(len(test_data))

In [None]:
# Extract the target variable from the test set
for column in train_data.columns:
    actual_values = test_data[column]  # Adjust this based on your data

    # Calculate Mean Absolute Error (MAE)
    mae = mean_squared_error(actual_values, predictions2[column])  # Adjust the variable name based on your data

    # Print the MAE
    print("Mean Absolute Error (MAE):", mae)