In [19]:
import pandas as pd
import numpy as np
from datetime import datetime

from sklearn.linear_model import HuberRegressor, ElasticNet


In [20]:
# Read datafile
file_path1 = "Features_X.csv"
Predictors = pd.read_csv(file_path1)
fie_path2 = "Dependet_y.csv"
Dependent = pd.read_csv(fie_path2)

In [21]:
Predictors.set_index('Date', inplace=True)
Predictors.index = pd.to_datetime(Predictors.index)

Dependent.set_index('Date', inplace=True)
Dependent.index = pd.to_datetime(Dependent.index)

In [22]:
yrs = Predictors.index.year.unique()
yrs = Dependent.index.year.unique()
Dependent.fillna(0, inplace=True)
Predictors.fillna(0, inplace=True)

# Validation function from previous code of Marco

In [27]:
from sklearn.model_selection import ParameterGrid

def R_oos(actual, predicted):
    actual, predicted = np.array(actual), np.array(predicted).flatten()
    predicted = np.where(predicted<0,0,predicted)
    return 1 - (np.dot((actual-predicted),(actual-predicted)))/(np.dot(actual,actual))

def val_fun(model, params: dict, X_trn, y_trn, X_vld, y_vld):
    best_ros = None
    lst_params = list(ParameterGrid(params))
    for param in lst_params:
        if best_ros == None:
            mod = model().set_params(**param).fit(X_trn, y_trn)
            y_pred = mod.predict(X_vld)
            best_ros = R_oos(y_vld, y_pred)
            best_param = param
        else:
            mod = model().set_params(**param).fit(X_trn, y_trn)
            y_pred = mod.predict(X_vld)
            ros = R_oos(y_vld, y_pred)
            if ros > best_ros:
                best_ros = ros
                best_param = param
    return best_param


# ELASTIC NET:

In [None]:
from sklearn.metrics import r2_score
def ENet(Dependent, Predictors, initial_train_years=18, validation_years=12, test_years=1):
    yrs = Dependent.index.year.unique()
    r_squared_scores = []
    tuning_par = {
        "alpha": [1e-1, 1e-4],
        "l1_ratio": [0.5], "tol":[1e-2]
    } 
    
    # Now the model runs for every time of the 30 splits and for every possible combination of the tuning parameters.
    for i in range(len(yrs) - initial_train_years - validation_years):
        start_year = yrs[i]
        end_train_year = start_year + initial_train_years  # 18 years of training and increasing with 1 year every iteration
        end_validation_year = end_train_year + validation_years  # 12 years of validation
        end_test_year = end_validation_year + test_years  # 1 year of test

        # Creating training, validation and test sets.
        X_train = Predictors[(Predictors.index.year < end_train_year)]
        X_test = Predictors[(Predictors.index.year >= end_validation_year) & (Predictors.index.year < end_test_year)]
        X_val = Predictors[(Predictors.index.year >= end_train_year) & (Predictors.index.year < end_validation_year)]
        y_train = Dependent[(Dependent.index.year < end_train_year)].values.ravel()
        y_test = Dependent[(Dependent.index.year >= end_validation_year) & (Dependent.index.year < end_test_year)].values.ravel()
        y_val = Dependent[(Dependent.index.year >= end_train_year) & (Dependent.index.year < end_validation_year)].values.ravel()
        
        # This part runs the tuning to find the best combination of the tuning parameters for every split
        best_par = val_fun(ElasticNet, params=tuning_par, X_trn=X_train, y_trn=y_train, X_vld=X_val, y_vld=y_val)
        
        # Now we test the model
        RF_SP500 = ElasticNet(alpha=best_par['alpha'], l1_ratio=best_par['l1_ratio']).fit(X_train, y_train)

        # Calculate R_squared
        y_test_pred = RF_SP500.predict(X_test)
        test_r2 = r2_score(y_test, y_test_pred)
        r_squared_scores.append(test_r2)
        
    return r_squared_scores
    
ENet = ENet(Dependent=Dependent,Predictors=Predictors)   
ENet

### When l1_ratio is 1, Elastic Net behaves like LASSO regression, and when l1_ratio is 0, it behaves like RIDGE regression.



# LASSO

In [None]:
# Set l1_ratio = 1 -> Lasso
def Lasso(Dependent, Predictors, initial_train_years=18, validation_years=12, test_years=1):
    yrs = Dependent.index.year.unique()
    r_squared_scores = []
    tuning_par = {
        "alpha": [1e-1, 1e-4],
        "l1_ratio": [1], "tol":[1e-2]
    } 
    
    # Now the model runs for every time of the 30 splits and for every possible combination of the tuning parameters.
    for i in range(len(yrs) - initial_train_years - validation_years):
        start_year = yrs[i]
        end_train_year = start_year + initial_train_years  # 18 years of training and increasing with 1 year every iteration
        end_validation_year = end_train_year + validation_years  # 12 years of validation
        end_test_year = end_validation_year + test_years  # 1 year of test

        # Creating training, validation and test sets.
        X_train = Predictors[(Predictors.index.year < end_train_year)]
        X_test = Predictors[(Predictors.index.year >= end_validation_year) & (Predictors.index.year < end_test_year)]
        X_val = Predictors[(Predictors.index.year >= end_train_year) & (Predictors.index.year < end_validation_year)]
        y_train = Dependent[(Dependent.index.year < end_train_year)].values.ravel()
        y_test = Dependent[(Dependent.index.year >= end_validation_year) & (Dependent.index.year < end_test_year)].values.ravel()
        y_val = Dependent[(Dependent.index.year >= end_train_year) & (Dependent.index.year < end_validation_year)].values.ravel()
        
        # This part runs the tuning to find the best combination of the tuning parameters for every split
        best_par = val_fun(ElasticNet, params=tuning_par, X_trn=X_train, y_trn=y_train, X_vld=X_val, y_vld=y_val)
        
        # Now we test the model
        RF_SP500 = ElasticNet(alpha=best_par['alpha'], l1_ratio=best_par['l1_ratio']).fit(X_train, y_train)

        # Calculate R_squared
        y_test_pred = RF_SP500.predict(X_test)
        test_r2 = r2_score(y_test, y_test_pred)
        r_squared_scores.append(test_r2)
        
    return r_squared_scores
    
ENet = ENet(Dependent=Dependent,Predictors=Predictors)   
ENet

# RIDGE

In [None]:
# Set l1_ratio = 0 -> RIDGE

def Lasso(Dependent, Predictors, initial_train_years=18, validation_years=12, test_years=1):
    yrs = Dependent.index.year.unique()
    r_squared_scores = []
    tuning_par = {
        "alpha": [1e-1, 1e-4],
        "l1_ratio": [0], "tol":[1e-2]
    } 
    
    # Now the model runs for every time of the 30 splits and for every possible combination of the tuning parameters.
    for i in range(len(yrs) - initial_train_years - validation_years):
        start_year = yrs[i]
        end_train_year = start_year + initial_train_years  # 18 years of training and increasing with 1 year every iteration
        end_validation_year = end_train_year + validation_years  # 12 years of validation
        end_test_year = end_validation_year + test_years  # 1 year of test

        # Creating training, validation and test sets.
        X_train = Predictors[(Predictors.index.year < end_train_year)]
        X_test = Predictors[(Predictors.index.year >= end_validation_year) & (Predictors.index.year < end_test_year)]
        X_val = Predictors[(Predictors.index.year >= end_train_year) & (Predictors.index.year < end_validation_year)]
        y_train = Dependent[(Dependent.index.year < end_train_year)].values.ravel()
        y_test = Dependent[(Dependent.index.year >= end_validation_year) & (Dependent.index.year < end_test_year)].values.ravel()
        y_val = Dependent[(Dependent.index.year >= end_train_year) & (Dependent.index.year < end_validation_year)].values.ravel()
        
        # This part runs the tuning to find the best combination of the tuning parameters for every split
        best_par = val_fun(ElasticNet, params=tuning_par, X_trn=X_train, y_trn=y_train, X_vld=X_val, y_vld=y_val)
        
        # Now we test the model
        RF_SP500 = ElasticNet(alpha=best_par['alpha'], l1_ratio=best_par['l1_ratio']).fit(X_train, y_train)

        # Calculate R_squared
        y_test_pred = RF_SP500.predict(X_test)
        test_r2 = r2_score(y_test, y_test_pred)
        r_squared_scores.append(test_r2)
        
    return r_squared_scores
    
ENet = ENet(Dependent=Dependent,Predictors=Predictors)   
ENet

# Implementation of Huber-Loss

# Huber-Loss-function

In [23]:
from sklearn.linear_model import ElasticNet
from sklearn.metrics import r2_score
from sklearn.model_selection import ParameterGrid
import numpy as np



In [24]:
def huber_loss(y_val, y_pred, delta):
    error = y_val - y_pred
    is_small_error = np.abs(error) <= delta
    squared_loss = 0.5 * (error ** 2)
    linear_loss = delta * (np.abs(error) - 0.5 * delta)
    return np.where(is_small_error, squared_loss, linear_loss)

In [None]:
from sklearn.model_selection import ParameterGrid


def R_oos(actual, predicted):
    actual, predicted = np.array(actual), np.array(predicted).flatten()
    predicted = np.where(predicted<0,0,predicted)
    return 1 - (np.dot((actual-predicted),(actual-predicted)))/(np.dot(actual,actual))

def val_fun_with_huber(model, params: dict, X_trn, y_trn, X_vld, y_vld):
    best_ros = None
    lst_params = list(ParameterGrid(params))
    for param in lst_params:
        if best_ros == None:
            mod = model().set_params(**param).fit(X_trn, y_trn)
            y_pred = mod.predict(X_vld)
            smallest_loss = huber_loss(y_vld, y_pred, delta=99.9)
            best_param = param
        else:
            mod = model().set_params(**param).fit(X_trn, y_trn)
            y_pred = mod.predict(X_vld)
            loss = huber_loss(y_vld, y_pred, delta=99.9)
            if loss < smallest_loss:
                smallest_loss = loss
                best_param = param
    return best_para

# Elastic Net with Huber

In [26]:
def ENet_with_huber(Dependent, Predictors, initial_train_years=18, validation_years=12, test_years=1):
    yrs = Dependent.index.year.unique()
    r_squared_scores = []
    tuning_par = {
        "alpha": [1e-1, 1e-4],
        "l1_ratio": [0.5], "tol":[1e-2]
    } 
    
    for i in range(len(yrs) - initial_train_years - validation_years):
        start_year = yrs[i]
        end_train_year = start_year + initial_train_years
        end_validation_year = end_train_year + validation_years
        end_test_year = end_validation_year + test_years

        X_train = Predictors[(Predictors.index.year < end_train_year)]
        X_test = Predictors[(Predictors.index.year >= end_validation_year) & (Predictors.index.year < end_test_year)]
        X_val = Predictors[(Predictors.index.year >= end_train_year) & (Predictors.index.year < end_validation_year)]
        y_train = Dependent[(Dependent.index.year < end_train_year)].values.ravel()
        y_test = Dependent[(Dependent.index.year >= end_validation_year) & (Dependent.index.year < end_test_year)].values.ravel()
        y_val = Dependent[(Dependent.index.year >= end_train_year) & (Dependent.index.year < end_validation_year)].values.ravel()
        
        best_par = val_fun_with_huber(ElasticNet, params=tuning_par, X_trn=X_train, y_trn=y_train, X_vld=X_val, y_vld=y_val)
        
        RF_SP500 = ElasticNet(alpha=best_par['alpha'], l1_ratio=best_par['l1_ratio']).fit(X_train, y_train)

        y_test_pred = RF_SP500.predict(X_test)
        test_r2 = r2_score(y_test, y_test_pred)
        r_squared_scores.append(test_r2)
        
    return r_squared_scores

ENet_scores = ENet_with_huber(Dependent=Dependent, Predictors=Predictors)


NameError: name 'y_val' is not defined

# Lasso with Huber

In [None]:
#Set l1_ratio = 1 -> Lasso
def Lasso_with_huber(Dependent, Predictors, initial_train_years=18, validation_years=12, test_years=1):
    yrs = Dependent.index.year.unique()
    r_squared_scores = []
    tuning_par = {
        "alpha": [1e-1, 1e-4],
        "l1_ratio": [1], "tol":[1e-2]
    } 
    
    for i in range(len(yrs) - initial_train_years - validation_years):
        start_year = yrs[i]
        end_train_year = start_year + initial_train_years
        end_validation_year = end_train_year + validation_years
        end_test_year = end_validation_year + test_years

        X_train = Predictors[(Predictors.index.year < end_train_year)]
        X_test = Predictors[(Predictors.index.year >= end_validation_year) & (Predictors.index.year < end_test_year)]
        X_val = Predictors[(Predictors.index.year >= end_train_year) & (Predictors.index.year < end_validation_year)]
        y_train = Dependent[(Dependent.index.year < end_train_year)].values.ravel()
        y_test = Dependent[(Dependent.index.year >= end_validation_year) & (Dependent.index.year < end_test_year)].values.ravel()
        y_val = Dependent[(Dependent.index.year >= end_train_year) & (Dependent.index.year < end_validation_year)].values.ravel()
        
        best_par = val_fun_with_huber(ElasticNet, params=tuning_par, X_trn=X_train, y_trn=y_train, X_vld=X_val, y_vld=y_val)
        
        RF_SP500 = ElasticNet(alpha=best_par['alpha'], l1_ratio=best_par['l1_ratio']).fit(X_train, y_train)

        y_test_pred = RF_SP500.predict(X_test)
        test_r2 = r2_score(y_test, y_test_pred)
        r_squared_scores.append(test_r2)
        
    return r_squared_scores

ENet_scores = ENet_with_huber(Dependent=Dependent, Predictors=Predictors)


# Ridge with Huber

In [None]:
#Set l1_ratio = 0 -> Ridge
def Lasso_with_huber(Dependent, Predictors, initial_train_years=18, validation_years=12, test_years=1):
    yrs = Dependent.index.year.unique()
    r_squared_scores = []
    tuning_par = {
        "alpha": [1e-1, 1e-4],
        "l1_ratio": [0], "tol":[1e-2]
    } 
    
    for i in range(len(yrs) - initial_train_years - validation_years):
        start_year = yrs[i]
        end_train_year = start_year + initial_train_years
        end_validation_year = end_train_year + validation_years
        end_test_year = end_validation_year + test_years

        X_train = Predictors[(Predictors.index.year < end_train_year)]
        X_test = Predictors[(Predictors.index.year >= end_validation_year) & (Predictors.index.year < end_test_year)]
        X_val = Predictors[(Predictors.index.year >= end_train_year) & (Predictors.index.year < end_validation_year)]
        y_train = Dependent[(Dependent.index.year < end_train_year)].values.ravel()
        y_test = Dependent[(Dependent.index.year >= end_validation_year) & (Dependent.index.year < end_test_year)].values.ravel()
        y_val = Dependent[(Dependent.index.year >= end_train_year) & (Dependent.index.year < end_validation_year)].values.ravel()
        
        best_par = val_fun_with_huber(ElasticNet, params=tuning_par, X_trn=X_train, y_trn=y_train, X_vld=X_val, y_vld=y_val)
        
        RF_SP500 = ElasticNet(alpha=best_par['alpha'], l1_ratio=best_par['l1_ratio']).fit(X_train, y_train)

        y_test_pred = RF_SP500.predict(X_test)
        test_r2 = r2_score(y_test, y_test_pred)
        r_squared_scores.append(test_r2)
        
    return r_squared_scores

ENet_scores = ENet_with_huber(Dependent=Dependent, Predictors=Predictors)
