# EXERCISE 3:

***Neural Networks and Gaussian process:***
Predict the SP500 with the  nancial indicators assigned to your team in the google spreadsheet (ep, dp, de, dy, dfy, bm, svar, ntis, in, tbl , see RLab3 2 GWcausalSP500.R), some lagged series of these indicators and lags of the target using a Neural Network and a GP regression with your desired kernel. Predict return, or price, or direction (up or down). For which target works best? Do some feature selection to disregard some variables, select appropriate lags: causality, (distance) correlation, VAR-test, Lasso ... (The script RLab5 GausProc.R can be of help. The dataset is goyal-welch2022Monthly.csv and work within the period 1927/2021.)

In our case, we have been assigned variables dp, de and ep.

# INDEX:

0. [DATA AND LIBRARY IMPORTS](#0.-DATA-AND-LIBRARY-IMPORTS)

1. [PREPROCESSING AND FEATURE ENGINEERING](#1.-PREPROCESSING-AND-FEATURE-ENGINEERING)

2. [LAG CREATION AND SELECTION](#2.-LAG-CREATION-AND-SELECTION)

3. [ARIMA BASELINE MODEL](#3.-ARIMA-BASELINE-MODEL)

4. [NEURAL NETWORK](#4.-NEURAL-NETWORK)

5. [GAUSSIAN PROCESS](#5.-GAUSSIAN-PROCESS)

# 0. DATA AND LIBRARY IMPORTS

[Back to Index](#INDEX)

## Libraries

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from statsmodels.tsa.vector_ar.var_model import VAR
from statsmodels.tsa.stattools import grangercausalitytests, adfuller, acf, pacf
from sklearn.preprocessing import StandardScaler, RobustScaler
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.tsa.arima.model import ARIMA
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import pmdarima as pm
from pmdarima.model_selection import train_test_split
from scipy.stats import chi2
from scipy.spatial.distance import correlation
import scipy.stats
import dcor
from sklearn.linear_model import Lasso
from keras.models import Sequential
from keras.layers import Dense
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Input, LSTM
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF, ConstantKernel as C

## Classes Definition

In [None]:
class PerformanceMetrics:
    def __init__(self):
        self.metrics = {'R2': [], 'MSE': [], 'RMSE': [], 'MAE': []}
        self.set_names = []

    def add_metrics(self, y_true, y_pred, name=None):
        """Add a new set of metrics to the storage with an optional name."""
        r2 = r2_score(y_true, y_pred)
        mse = mean_squared_error(y_true, y_pred)
        rmse = np.sqrt(mse)
        mae = mean_absolute_error(y_true, y_pred)
        
        if r2 >= 0:
            self.metrics['R2'].append(r2)
        else:
            self.metrics['R2'].append(0)
        self.metrics['MSE'].append(mse)
        self.metrics['RMSE'].append(rmse)
        self.metrics['MAE'].append(mae)
        
        if name is None:
            name = f"Set {len(self.set_names) + 1}"
        self.set_names.append(name)
        
        print(f"Metrics for {name}:")
        print("R2 Score:", r2)
        print("Mean Squared Error (MSE):", mse)
        print("Root Mean Squared Error (RMSE):", rmse)
        print("Mean Absolute Error (MAE):", mae)

    def plot_metrics(self):
        """Plot all metrics in separate bar charts for comparison using set names for labels."""
        import matplotlib.pyplot as plt
        
        n = len(self.metrics['MSE'])
        index = np.arange(n)
        bar_width = 0.35 
        color = 'skyblue'
        titles = {'R2': 'R2 Score', 'MSE': 'Mean Squared Error', 'RMSE': 'Root Mean Squared Error', 'MAE': 'Mean Absolute Error'}
        fig, axes = plt.subplots(nrows=4, ncols=1, figsize=(6, 18), sharey=True)
        for i, (metric_name, values) in enumerate(self.metrics.items()):
            axes[i].bar(index, values, bar_width, color=color, label=metric_name)
            # axes[i].set_xlabel('Model Set')
            axes[i].set_title(titles[metric_name])
            axes[i].set_xticks(index)
            axes[i].set_xticklabels(self.set_names)
            axes[i].legend()
        plt.suptitle('Comparison of Performance Metrics')
        plt.tight_layout(rect=[0, 0.03, 1, 0.95])
        plt.show()

In [None]:
class ModelPredictions:
    def __init__(self, actual_data):
        """
        Initializes the ModelPredictions class with actual test data.
        
        Parameters:
            actual_data (pd.Series or pd.DataFrame): The actual data points from the test set.
        """
        self.actual_data = actual_data
        self.predictions = {}
        self.colors = ['red', 'blue', 'magenta', 'orange', 'purple', 'brown', 'pink', 'olive', 'cyan']
    
    def add_predictions(self, prediction_data, label):
        """
        Stores the prediction data under a specified label.
        
        Parameters:
            prediction_data (pd.Series or pd.DataFrame): The predicted data points.
            label (str): The label for the prediction to be used in the plot legend.
        """
        self.predictions[label] = prediction_data
    
    def plot_predictions(self):
        """
        Plots all stored predictions and the actual data on the same plot.
        """
        plt.figure(figsize=(10, 5))
        plt.plot(self.actual_data, label='Actual Returns', color='black')
        colors = iter(self.colors)
        for label, prediction in self.predictions.items():
            plt.plot(prediction.index, prediction, label=label, color=next(colors))
        plt.legend()
        plt.title('Comparison of Model Predictions and Actual Data')
        plt.show()

## Data Import

After some further steps on the way down, we decided to keep the last period from 1921 for now, as it will be useful for havig a first value for the differenced variables straight from the beginning. We will also use the previous periods in order to be able to have some lags for the variables straight from the beginning of 1927.

We also decided to keep the data up until the whole 2022 because we will use it to test our models on completely new data.

In [None]:
snp = pd.read_csv('goyal-welch2022Monthly.csv')

snp['yyyymm'] = snp['yyyymm'].astype(str)
snp['yyyymm'] = pd.to_datetime(snp['yyyymm'], format='%Y%m')
snp = snp.loc[(snp['yyyymm'] >= '1921-01-01')]# & (snp['yyyymm'] < '2022-01-01')].reset_index(drop=True)
snp['Index'] = snp['Index'].str.replace(',', '').astype(float)

display(snp)

[Back to Index](#INDEX)

# 1. PREPROCESSING AND FEATURE ENGINEERING

[Back to Index](#INDEX)

## Missing Values and Corrupted Data Check

In [None]:
snp.info()

We can observe that THERE ARE NO MISSING VALUES FOR OUR VARIABLES OF INTEREST (Index, D12 and E12).

## Feature Creation

We compute the new features as we need to compute:

- Dividend Price Ratio (DP)
- Dividend Earnings Ratio (DE)
- Earnings Price Ratio (EP)

In [None]:
snp = snp[['yyyymm', 'Index', 'D12', 'E12']]

snp['LogReturns'] = np.log(snp['Index']).diff()
snp['PriceDiv'] = snp['Index'] + snp['D12']
snp['LogReturnsDiv'] = np.log(snp['PriceDiv']).diff()

# We need to fill the NaN values with 0 because the ADF test doesn't tolerate NaN values and we might still need to further differentiate the series.
snp.fillna({'LogReturns': 0}, inplace=True)
snp.fillna({'LogReturnsDiv': 0}, inplace=True)

In [None]:
snp['DP'] = np.log(snp['D12']) - np.log(snp['Index'])
snp['DE'] = np.log(snp['D12']) - np.log(snp['E12'])
snp['EP'] = np.log(snp['E12']) - np.log(snp['Index'])

display(snp.head())

## Stationarity and Outlier Treatment

In [None]:
fig, axes = plt.subplots(nrows=9, ncols=1, figsize=(10, 18))

columns_to_plot = ['Index', 'PriceDiv', 'D12', 'E12', 'LogReturns', 'LogReturnsDiv', 'DP', 'DE', 'EP']
for i, col in enumerate(columns_to_plot):
    axes[i].plot(snp['yyyymm'], snp[col], marker='', linestyle='-')
    axes[i].set_title(col)
    axes[i].set_xlabel('Date')
    axes[i].set_ylabel(col)

plt.tight_layout()
plt.show()

In [None]:
# We test for stationarity in the data

for col in columns_to_plot:
    adf_result = adfuller(snp[col])
    print(f'ADF Statistic for {col}: {adf_result[0]}')
    print(f'p-value for {col}: {adf_result[1]}\n')

The variables we are interested in are mostly stationary but there are a couple that should be further differenced in order to make them stationary.

That being, said, log returns and log returns + dividends are already quite surely stationary so we're not going to bother with their stationarity anymore.

Additionally, in further boxplots we have been able to see that variables DE and EP are quite skewed so, even if they are already decently stationary, we will also difference them in order to center them a bit more.

In [None]:
snp['DP'] = snp['DP'].diff()
snp['DE'] = snp['DE'].diff()
snp['EP'] = snp['EP'].diff()

display(snp.head())

# We can safely remove the first row now instead of filling it with 0
snp.dropna(axis = 0, inplace = True)

display(snp.head())

In [None]:
fig, axes = plt.subplots(nrows=3, ncols=1, figsize=(10, 6))

columns_to_plot = ['DP', 'DE', 'EP']
for i, col in enumerate(columns_to_plot):
    axes[i].plot(snp['yyyymm'], snp[col], marker='', linestyle='-')
    axes[i].set_title(col)
    axes[i].set_xlabel('Date')
    axes[i].set_ylabel(col)

plt.tight_layout()
plt.show()

# We test again for stationarity in the data

for col in columns_to_plot:
    adf_result = adfuller(snp[col])
    print(f'ADF Statistic for {col}: {adf_result[0]}')
    print(f'p-value for {col}: {adf_result[1]}\n')

We can see how now our data is completely stationary and we may proceed. Careful attention to the outliers will be needed, though, as they are very specific to the 2008 crisis. We will first standardize the data and afterwards, if there are still outliers, we will treat them or maybe consider scaling with robust scaling (i.e. taking away the median instead of the mean).

In [None]:
columns_to_plot = ['LogReturns', 'LogReturnsDiv', 'DP', 'DE', 'EP']

fig, axes = plt.subplots(nrows=1, ncols=5, figsize=(15, 10))
for i, col in enumerate(columns_to_plot):
    sns.boxplot(y=col, data=snp, ax=axes[i])
    axes[i].set_title(f'Boxplot of {col}', fontsize=12)
    axes[i].tick_params(axis='y', labelsize=10)

plt.tight_layout()
plt.show()

In [None]:
std_scaler = StandardScaler()
robust_scaler = RobustScaler()

snp_std = snp.copy()

for col in columns_to_plot:
    snp_std[col] = robust_scaler.fit_transform(snp_std[[col]])

fig, axes = plt.subplots(nrows=1, ncols=5, figsize=(15, 10))
for i, col in enumerate(columns_to_plot):
    sns.boxplot(y=col, data=snp, ax=axes[i])
    axes[i].set_title(f'Boxplot of {col}', fontsize=12)
    axes[i].tick_params(axis='y', labelsize=10)

plt.tight_layout()
plt.show()

As we can observe, the scaled version, even with the robust scaler, returns ranges which are still bigger than the range from the previous version. Even with the outliers we had, as we had already applied a logarithmic transformation to the data, the outliers were not excessively far away from the center, with the exception of the DE and EP ratios.

What we will do is "manually" transform the data which is bigger than 0.3 in the five columns we are considering. We will first try to simply perform a sort of winsorization on those values to the nearest value that we establish as "the maximum" we allow. As such, we simply set the values exceeding ±0.3, to ±0.3.

In [None]:
for col in columns_to_plot:
    snp[col] = snp[col].clip(upper=0.3, lower=-0.3)

fig, axes = plt.subplots(nrows=1, ncols=5, figsize=(15, 10))
for i, col in enumerate(columns_to_plot):
    sns.boxplot(y=col, data=snp, ax=axes[i])
    axes[i].set_title(f'Boxplot of {col}', fontsize=12)
    axes[i].tick_params(axis='y', labelsize=10)

plt.tight_layout()
plt.show()

In [None]:
fig, axes = plt.subplots(nrows=5, ncols=1, figsize=(10, 10))

columns_to_plot = ['LogReturns', 'LogReturnsDiv', 'DP', 'DE', 'EP']
for i, col in enumerate(columns_to_plot):
    axes[i].plot(snp['yyyymm'], snp[col], marker='', linestyle='-')
    axes[i].set_title(col)
    axes[i].set_xlabel('Date')
    axes[i].set_ylabel(col)

plt.tight_layout()
plt.show()

[Back to Index](#INDEX)

# 2. LAG CREATION AND SELECTION

[Back to Index](#INDEX)

## ACF and PACF

In [None]:
columns_to_plot = ['LogReturns', 'LogReturnsDiv', 'DP', 'DE', 'EP']

fig, axes = plt.subplots(10, 1, figsize=(10, 20))

for i, col in enumerate(columns_to_plot):
    acf_index = 2 * i
    plot_acf(snp[col], lags = 100, alpha = 0.05, ax = axes[acf_index], title = f'ACF for {col}')
    axes[acf_index].grid(True)
    
    pacf_index = 2 * i + 1
    plot_pacf(snp[col], lags = 100, alpha = 0.05, ax = axes[pacf_index], title = f'PACF for {col}')
    axes[pacf_index].grid(True)

plt.tight_layout()
plt.show()

We can observe how there is no clear patter in the ACF and PACF plots for most of the variables, so we will just start using an arbitrary number of lags and try different options from there. That being said, there is some lag correlation for DE and EP so those lags are probably going to be the most relevant, although with 5-10 lags we will already cover these correlations.

Let's perform some causality tests to see how many lags we should consider for each variable. As we have already seen from the ACF and PACF that the maximum amount of relevant lags is less than 10, we will consider only 5 lags for each variable.

## Granger Causality

In [None]:
snp_lags = snp.copy()
snp_lags.drop(['Index', 'D12', 'E12', 'PriceDiv'], axis = 1, inplace = True)
display(snp_lags)

We had to go with intervals longer than only a year for this exercise because the yearly rolling window would not allow us to have enough observations to perform some of the tests (for example the Granger causality test already only allows us to test up to 2 lags behind with the 12 observations of a yearly window).

As a result, we decided to use windows of different eras more than just one year at a time.

In [None]:
intervals = [   ["1927-01-01", "1932-12-01"],
                ["1933-01-01", "1970-12-01"],
                ["1971-01-01", "1997-12-01"],
                ["1998-01-01", "2005-12-01"],
                ["2006-01-01", "2021-11-01"]]

causality_tests = []

for interval in intervals:
    snp_temp = snp_lags[(snp_lags['yyyymm'] >= interval[0]) & (snp_lags['yyyymm'] <= interval[1])].dropna().reset_index(drop=True)
    snp_temp = snp_temp.drop('yyyymm', axis = 1)
    if interval == intervals[0]:
        print(interval[0],' , ',interval[1],'\n p = 5 \n')
    else:
        print('\n\n',interval[0],' , ',interval[1],'\n p = 5 \n')
    
    print("\n\nFor lagged LogReturns\n", '#'*20)
    result = grangercausalitytests(snp_temp[["LogReturns", "LogReturns"]], 5)
    causality_tests.append(result)
    
    print("\n\nFor lagged DP\n", '#'*20)
    result = grangercausalitytests(snp_temp[["LogReturns", "DP"]], 5)
    causality_tests.append(result)
    
    print("\n\nFor lagged DE\n", '#'*20)
    result = grangercausalitytests(snp_temp[["LogReturns", "DE"]], 5)
    causality_tests.append(result)
    
    print("\n\nFor lagged EP\n", '#'*20)
    result = grangercausalitytests(snp_temp[["LogReturns", "EP"]], 5)
    causality_tests.append(result)

We can observe how it appears that the EP ratio tends to be the most causal for LogReturns, especially in some periods more than others. Then we can also see that some lags of the DE ratio also pass the test (although only at a 10% confidence interval). As a result we are going to keep evaluating the lags to use, but we can probably safely try to use more than just 5 lags for DE and EP ratios.

That being said, the results are not too surprising because the Dividends are just one component of the price, but a lot of people value more highly a stock which keeps its value and grows in price more than another stock which returns dividends but doesn't grow as much, because you can always resell the stock if needed.
As a result, a company which is having consistent earnings is more probably higher valued than another with lower earnings and, as such, its price will probably be higher as well, hence this result. If a company does well one month, probably a lot of people will be at least a little bit more interested in buying their stock. The same can be said about an index like the S&P500. It's better to know that they are getting consistent and significant earnings more than knowing whether they paid dividends or not.

## Distance Correlation and Correlation among Factors

In [None]:
for interval in intervals:
    snp_temp = snp_lags[(snp_lags['yyyymm'] >= interval[0]) & (snp_lags['yyyymm'] <= interval[1])].dropna()
    snp_temp = snp_temp.drop(['yyyymm', 'LogReturns', 'LogReturnsDiv'], axis = 1)
    if interval == intervals[0]:
        print(interval[0],' , ',interval[1], '\n', '#'*20)
    else:
        print('\n\n',interval[0],' , ',interval[1], '\n', '#'*20)
    
    dp_de_correlation = dcor.distance_correlation(snp_temp['DP'], snp_temp['DE'])
    dp_ep_correlation = dcor.distance_correlation(snp_temp['DP'], snp_temp['EP'])
    de_ep_correlation = dcor.distance_correlation(snp_temp['DE'], snp_temp['EP'])
    
    print("Distance correlation between DP and DE:", dp_de_correlation)
    print("Distance correlation between DP and EP:", dp_ep_correlation)
    print("Distance correlation between DE and EP:", de_ep_correlation)

In [None]:
for interval in intervals:
    snp_temp = snp_lags[(snp_lags['yyyymm'] >= interval[0]) & (snp_lags['yyyymm'] <= interval[1])].dropna()
    snp_temp = snp_temp.drop(['yyyymm', 'LogReturns', 'LogReturnsDiv'], axis = 1)
    corr = snp_temp.corr()
    display(corr.style.background_gradient(cmap='coolwarm', axis=None).format("{:.2f}"))

## Lag Columns Creation

In order to conduct LASSO analysis, we have to already have the variables created. We have to create the lags for the variables so we will analyze up until 20 lags for each variable.

In [None]:
cols_corr_matrix = columns_to_plot.copy()

for i in range(1, 11):
    snp[f'Return_lag_{i}'] = snp['LogReturns'].shift(i)
    # snp[f'ReturnsDiv_lag_{i}'] = snp['LogReturnsDiv'].shift(i)
    snp[f'DP_lag_{i}'] = snp['DP'].shift(i)
    snp[f'DE_lag_{i}'] = snp['DE'].shift(i)
    snp[f'EP_lag_{i}'] = snp['EP'].shift(i)
    
    cols_corr_matrix.append(f'Return_lag_{i}')
    # cols_corr_matrix.append(f'ReturnsDiv_lag_{i}')
    cols_corr_matrix.append(f'DP_lag_{i}')
    cols_corr_matrix.append(f'DE_lag_{i}')
    cols_corr_matrix.append(f'EP_lag_{i}')

display(snp)

In [None]:
snp_corr = snp.copy()

snp_corr = snp_corr.loc[(snp_corr['yyyymm'] >= '1927-01-01') & (snp_corr['yyyymm'] < '2022-01-01')].reset_index(drop=True)

In [None]:
corr = snp_corr[cols_corr_matrix].corr()
corr.style.background_gradient(cmap='coolwarm', axis=None).format("{:.2f}")

We can see how the Log Returns are not super correlated with anything but are actually a little bit correlated (less than 10%) with some things like some of its own lags and some lags of the DP and EP ratios as well. That being said, obviously we cannot really use the most correlated variables, which are the lag 0 ratios because they are directly computed using the Price so we would not have them in the actual period we will be trying to forecast.

## LASSO Analysis

In [None]:
snp_lasso = snp.copy()

snp_lasso.drop(['LogReturnsDiv', 'DP', 'DE', 'EP'], axis = 1, inplace = True)

display(snp_lasso)

for interval in intervals:
    snp_temp = snp_lasso[(snp_lasso['yyyymm'] >= interval[0]) & (snp_lasso['yyyymm'] <= interval[1])].dropna().copy()
    snp_temp = snp_temp.drop(['yyyymm'], axis = 1)
    if interval == intervals[0]:
        print(interval[0],' , ',interval[1], '\n', '#'*20)
    else:
        print('\n\n',interval[0],' , ',interval[1], '\n', '#'*20)
    
    y = snp_temp['LogReturns'].copy()
    X = snp_temp.drop(['LogReturns'], axis = 1)
    
    lasso_model = Lasso(alpha=0.01)
    lasso_model.fit(X, y)
    print("Lasso coefficients:", lasso_model.coef_)
    print("Lasso intercept:", lasso_model.intercept_)

[Back to Index](#INDEX)

## Teras Virta Test

In [None]:
def terasvirta_test(x, y):
    model = Sequential()
    input_dim = 1 if len(x.shape) == 1 else x.shape[1]
    model.add(Dense(2, activation='relu', input_dim=input_dim))
    model.add(Dense(1, activation='relu'))
    model.compile(optimizer='adam', loss='mean_squared_error')
    model.fit(x, y, epochs=50, verbose=False)
    
    linear_model = Sequential()
    linear_model.add(Dense(1, activation='linear'))
    linear_model.compile(loss='mse', optimizer='sgd')
    linear_model.fit(x, y, epochs=50, verbose=False)
    pred_orig = linear_model.predict(x)
    resid_orig = y.values - pred_orig
    pred_nn = model.predict(x)
    pred_nn = pred_nn.reshape(-1)
    resid_nn = y.values - pred_nn
    test_stat = np.mean(resid_orig**2 - resid_nn**2)
    crit_val = chi2.ppf(0.95, 2)
    if test_stat > crit_val:
        print("The null hypothesis of linearity is rejected")
    else:
        print("The null hypothesis of linearity is not rejected")

In [None]:
snp_teras = snp.copy()

snp_teras = snp_teras.loc[(snp_teras['yyyymm'] >= '1927-01-01') & (snp_teras['yyyymm'] < '2022-01-01')].reset_index(drop=True)

for interval in intervals:
    snp_temp = snp_teras[(snp_teras['yyyymm'] >= interval[0]) & (snp_teras['yyyymm'] <= interval[1])].dropna().copy()
    snp_temp = snp_temp.drop(['yyyymm', 'LogReturnsDiv'], axis = 1)
    if interval == intervals[0]:
        print(interval[0],' , ',interval[1], '\n', '#'*20)
    else:
        print('\n\n',interval[0],' , ',interval[1], '\n', '#'*20)
    
    y = snp_temp['LogReturns'].copy()
    X = snp_temp['DP'].copy()
    
    terasvirta_test(X, y)

In [None]:
for interval in intervals:
    snp_temp = snp_teras[(snp_teras['yyyymm'] >= interval[0]) & (snp_teras['yyyymm'] <= interval[1])].dropna().copy()
    snp_temp = snp_temp.drop(['yyyymm', 'LogReturnsDiv'], axis = 1)
    if interval == intervals[0]:
        print(interval[0],' , ',interval[1], '\n', '#'*20)
    else:
        print('\n\n',interval[0],' , ',interval[1], '\n', '#'*20)
    
    y = snp_temp['LogReturns'].copy()
    X = snp_temp['DE'].copy()
    
    terasvirta_test(X, y)

In [None]:
for interval in intervals:
    snp_temp = snp_teras[(snp_teras['yyyymm'] >= interval[0]) & (snp_teras['yyyymm'] <= interval[1])].dropna().copy()
    snp_temp = snp_temp.drop(['yyyymm', 'LogReturnsDiv'], axis = 1)
    if interval == intervals[0]:
        print(interval[0],' , ',interval[1], '\n', '#'*20)
    else:
        print('\n\n',interval[0],' , ',interval[1], '\n', '#'*20)
    
    y = snp_temp['LogReturns'].copy()
    X = snp_temp['EP'].copy()
    
    terasvirta_test(X, y)

In [None]:
for i in ['DP', 'DE', 'EP']:
    snp_temp = snp_teras.dropna().copy()
    if i == ['DP']:
        print(i, '\n', '#'*20)
    else:
        print('\n\n', i, '\n', '#'*20)
    
    y = snp_temp['LogReturns'].copy()
    X = snp_temp[i].copy()
    
    terasvirta_test(X, y)

[Back to Index](#INDEX)

# 3. ARIMA Baseline Model

[Back to Index](#INDEX)

## Data Preparation

In [None]:
snp['yyyymm'] = pd.to_datetime(snp['yyyymm'])
snp.set_index('yyyymm', inplace=True)
snp.drop(['Index', 'D12', 'E12', 'PriceDiv', 'LogReturnsDiv'], axis = 1, inplace = True)

snp_train = snp['1927-01-01':'2021-12-31']
snp_test = snp['2022-01-01':]

display(snp_train)
display(snp_test)

## ARIMA Model

In [None]:
snp_train_arima = snp_train.copy()
snp_train_arima = snp_train_arima[['LogReturns', 'DP', 'DE', 'EP']]

snp_test_arima = snp_test.copy()
snp_test_arima = snp_test_arima[['LogReturns', 'DP', 'DE', 'EP']]

model_auto_arima = pm.auto_arima(   snp_train_arima['LogReturns'], start_p=0, start_q=0,
                                    test='adf',   
                                    max_p=10, max_q=10,
                                    m=1,         
                                    d=None,      
                                    seasonal=False, 
                                    start_P=0, 
                                    D=0, 
                                    trace=True,
                                    error_action='ignore',  
                                    suppress_warnings=True, 
                                    stepwise=True)

print(model_auto_arima.summary())

The auto arima function suggests a Moving Average model with 1 lag. We will instead try to fit an ARMA 1,1 model to the data as it is not entirely clear that the MA1 model really works better than the ARMA 11.

In [None]:
model_arima = ARIMA(snp_train_arima['LogReturns'], order=(1, 0, 1))
model_arima_fit = model_arima.fit()

arima_pred = model_arima_fit.get_forecast(steps=len(snp_test_arima))
arima_pred_mean = arima_pred.predicted_mean

# plt.figure(figsize=(10, 5))
# # plt.plot(snp_train_arima['LogReturns'], label='Training Data')
# plt.plot(snp_test_arima['LogReturns'], label='Actual Returns')
# plt.plot(arima_pred_mean.index, arima_pred_mean, label='ARIMA Predictions')
# plt.legend()
# plt.title('ARIMA Forecast versus Actuals')
# plt.show()

preds_plots = ModelPredictions(snp_test_arima['LogReturns'])
preds_plots.add_predictions(arima_pred_mean, 'ARIMA')
preds_plots.plot_predictions()

In [None]:
metrics = PerformanceMetrics()
metrics.add_metrics(snp_test_arima['LogReturns'], arima_pred_mean, name="ARIMA")

In [None]:
metrics.plot_metrics()

[Back to Index](#INDEX)

# 4. NEURAL NETWORK

[Back to Index](#INDEX)

In [None]:
y_tr = snp_train['LogReturns'].copy()
X_tr = snp_train.drop(['LogReturns'], axis=1).copy()
y_te = snp_test['LogReturns'].copy()
X_te = snp_test.drop(['LogReturns'], axis=1).copy()

words_to_search = ['Return', 'DE', 'lag'] 
# also tried with keeping all the columns and with only the returns but the results were worse.
# best predictions were achieved when completely excluding DP and EP columns.
# difference in performance when only taking lagged returns and when taking lagged returns with DE
# with DE lags is not too significant.
cols_to_use = [col for col in X_tr.columns if any(word in col for word in words_to_search)]

X_tr = X_tr[cols_to_use]
X_te = X_te[cols_to_use]

y_valid = y_tr['2021-01-01':'2021-12-31'].copy()
X_valid = X_tr['2021-01-01':'2021-12-31'].copy()
y_train = y_tr['1927-01-01':'2020-12-31'].copy()
X_train = X_tr['1927-01-01':'2020-12-31'].copy()

display(y_train.head())
display(X_train)

In [None]:
X_train = X_train.values.reshape((X_train.shape[0], 1, X_train.shape[1]))
X_test = X_te.values.reshape((X_te.shape[0], 1, X_te.shape[1]))
X_valid = X_valid.values.reshape((X_valid.shape[0], 1, X_valid.shape[1]))

display(X_train.shape)
# display(X_train_reshaped)

In [None]:
nn_model = Sequential([
    Input(shape=(X_train.shape[1], X_train.shape[2])),
    LSTM(64), # Tried 16, 50, 128
    Dense(64, activation='relu'), # Tried 16, 32, 64
    Dense(1)
])

nn_model.compile(optimizer=Adam(learning_rate=0.001), loss='mean_squared_error')
nn_model.summary()

early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

history = nn_model.fit(X_train, y_train, epochs=50, batch_size=2, validation_data=(X_valid, y_valid))#, callbacks=[early_stopping])
# Tried batch size 2, 64, 128
test_loss = nn_model.evaluate(X_test, y_te)
print(f'Test Loss: {test_loss}')

In [None]:
train_loss = history.history['loss']
val_loss = history.history['val_loss']

plt.figure(figsize=(10, 5))
plt.plot(train_loss, label='Training Loss')
plt.plot(val_loss, label='Validation Loss')
plt.title('Train and Valid Loss Over Epochs')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)
plt.show()

In [None]:
y_pred_nn = nn_model.predict(X_test)

y_pred_nn_series = pd.Series(y_pred_nn.flatten(), index=y_te.index)
preds_plots.add_predictions(y_pred_nn_series, 'LSTM')
preds_plots.plot_predictions()

In [None]:
metrics.add_metrics(y_te, y_pred_nn_series, name="LSTM")

In [None]:
metrics.plot_metrics()

[Back to Index](#INDEX)

# 5. GAUSSIAN PROCESS

[Back to Index](#INDEX)

In [None]:
X_trainGP = X_tr.copy()
y_trainGP = y_tr.copy()
X_testGP = X_te.copy()
y_testGP = y_te.copy()

In [None]:
gp_model = GaussianProcessRegressor(
    kernel = C(), # Tried RBF and C kernels, both worked not so good.
    alpha = 0.01 # Tried 0.001, 0.01. 0.1, 1.0, none of them worked well
)

In [None]:
gp_model.fit(X_trainGP, y_trainGP)

In [None]:
y_pred_gp = gp_model.predict(X_testGP)
y_pred_gp_series = pd.Series(y_pred_gp[0], index=y_testGP.index)

In [None]:
preds_plots.add_predictions(y_pred_gp_series, 'Gaussian Process')
preds_plots.plot_predictions()

In [None]:
metrics.add_metrics(y_testGP, y_pred_gp, name="Gaussian Process")

In [None]:
metrics.plot_metrics()

[Back to Index](#INDEX)

# NOT IN USE, SAFEKEEPING

In [None]:
# def distance_matrix(vector):
#     return np.abs(vector[:, None] - vector[None, :])

# def distance_covariance(X, Y):
#     n = X.shape[0]
#     A = distance_matrix(X)
#     B = distance_matrix(Y)
#     A_mean = A.mean()
#     B_mean = B.mean()
#     A_centered = A - A_mean
#     B_centered = B - B_mean
#     dcov = np.sqrt((A_centered * B_centered).sum() / (n * n))
#     return dcov

# def distance_correlation(X, Y):
#     dcov_XY = distance_covariance(X, Y)
#     dcov_XX = distance_covariance(X, X)
#     dcov_YY = distance_covariance(Y, Y)
#     if dcov_XX * dcov_YY == 0:
#         return 0
#     else:
#         return dcov_XY / np.sqrt(dcov_XX * dcov_YY)

# for interval in intervals:
#     snp_temp = snp_lags[(snp_lags['yyyymm'] >= interval[0]) & (snp_lags['yyyymm'] <= interval[1])].dropna()
#     snp_temp = snp_temp.drop(['yyyymm', 'LogReturns', 'LogReturnsDiv'], axis = 1)
#     print('\n\n',interval[0],' , ',interval[1],'\n', '#'*20)
    
#     dp_de_corr = distance_correlation(snp_temp['DP'].values, snp_temp['DE'].values)
#     dp_ep_corr = distance_correlation(snp_temp['DP'].values, snp_temp['EP'].values)
#     de_ep_corr = distance_correlation(snp_temp['DE'].values, snp_temp['EP'].values)
    
#     print("Distance correlation between DP and DE:", dp_de_corr)
#     print("Distance correlation between DP and EP:", dp_ep_corr)
#     print("Distance correlation between DE and EP:", de_ep_corr)

In [None]:
# for interval in intervals:
#     snp_temp = snp_lags[(snp_lags['yyyymm'] >= interval[0]) & (snp_lags['yyyymm'] <= interval[1])].dropna()
#     snp_temp = snp_temp.drop(['yyyymm', 'LogReturns', 'LogReturnsDiv'], axis = 1)
#     if interval == intervals[0]:
#         print(interval[0],' , ',interval[1],'\n', '#'*20)
#     else:
#         print('\n\n',interval[0],' , ',interval[1],'\n', '#'*20)
    
#     dp_de_corr_dist = correlation(snp_temp['DP'].values, snp_temp['DE'].values)
#     dp_ep_corr_dist = correlation(snp_temp['DP'].values, snp_temp['EP'].values)
#     de_ep_corr_dist = correlation(snp_temp['DE'].values, snp_temp['EP'].values)
    
#     print(f"Correlation distance between DP and DE: {dp_de_corr_dist}")
#     print(f"Correlation distance between DP and EP: {dp_ep_corr_dist}")
#     print(f"Correlation distance between DE and EP: {de_ep_corr_dist}")

In [None]:
# class PerformanceMetrics:
#     def __init__(self):
#         self.metrics = {'R2': [], 'MSE': [], 'RMSE': [], 'MAE': []}
#         self.set_names = []

#     def add_metrics(self, r2, mse, rmse, mae, name=None):
#         """Add a new set of metrics to the storage with an optional name."""
#         if r2 >= 0:
#             self.metrics['R2'].append(r2)
#         else:
#             self.metrics['R2'].append(0)
#         self.metrics['MSE'].append(mse)
#         self.metrics['RMSE'].append(rmse)
#         self.metrics['MAE'].append(mae)
#         if name is None:
#             name = f"Set {len(self.set_names) + 1}"
#         self.set_names.append(name)

#     def plot_metrics(self):
#         """Plot all metrics in separate bar charts for comparison using set names for labels."""
#         n = len(self.metrics['MSE'])
#         index = np.arange(n)
#         bar_width = 0.35 
#         color = 'skyblue'
#         titles = {'R2': 'R2 Score', 'MSE': 'Mean Squared Error', 'RMSE': 'Root Mean Squared Error', 'MAE': 'Mean Absolute Error'}
#         fig, axes = plt.subplots(nrows=4, ncols=1, figsize=(18, 18), sharey=True)
#         for i, (metric_name, values) in enumerate(self.metrics.items()):
#             axes[i].bar(index, values, bar_width, color=color, label=metric_name)
#             axes[i].set_xlabel('Model Set')
#             axes[i].set_title(titles[metric_name])
#             axes[i].set_xticks(index)
#             axes[i].set_xticklabels(self.set_names)
#             axes[i].legend()
#         plt.suptitle('Comparison of Performance Metrics')
#         plt.tight_layout(rect=[0, 0.03, 1, 0.95])
#         plt.show()