In [None]:
import pandas as pd
from sklearn.preprocessing import StandardScaler
import numpy as np

import matplotlib.pyplot as plt

# reduce display precision on numpy arrays
np.set_printoptions(precision=5)

df = pd.read_csv('C:/Users/Computer/Documents/bachelor/dataset.csv')
pd.options.mode.chained_assignment=None
df['csp'][637:]  = df['csp'][0:638].mean()

df['Index'] = pd.to_numeric(df['Index'], errors='coerce')

In [None]:
df.head(865)

In [None]:
# check missing values
print(f"Missing values: {df.isna().sum().sum()}")

print(f"Invalid values: {df.isnull().sum().sum()}")

In [None]:
X = df.iloc[:, 1:-30].values
Y = df.iloc[:, -30:].values

In [None]:
feature_names = df.columns[1:18]
target_names = df.columns[-30:]  

from sklearn.preprocessing import StandardScaler

X_df = df[feature_names]
Y_df = df[target_names]

scaler = StandardScaler()
X_df = pd.DataFrame(scaler.fit_transform(X_df), columns=X_df.columns)

data = pd.concat([Y_df, X_df], axis=1)

In [None]:
import numpy as np
import pandas as pd
from sklearn.linear_model import LassoCV, LinearRegression
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_squared_error

In [None]:
def lasso_feature_selection(train, test, y_key, model_config, jobs=24):
    time_series_cv = TimeSeriesSplit(n_splits=model_config["tscv_splits"])
    lasso = LassoCV(n_alphas=model_config["n_alphas"], cv=time_series_cv, n_jobs=jobs, random_state=42, max_iter=20000)
    lasso.fit(train.drop(y_key, axis=1), train[y_key])
    
    selected_features = train.drop(y_key, axis=1).columns[lasso.coef_ != 0]
    return selected_features

In [None]:
def ols(train, test, y_key, selected_features):
    X_train = train[selected_features]
    X_test = test[selected_features]
    y_train = train[y_key]
    
    ols_model = LinearRegression().fit(X_train, y_train)
    y_hat = pd.Series(ols_model.predict(X_test), index=test.index).rename("ols_post_lasso_y_hat")
    
    return y_hat

In [None]:
def multioutput_ols_post_lasso(train, test, y_keys, model_config, jobs=24):
    preds = {}
    for y_key in y_keys:
        selected_features = lasso_feature_selection(train, test, y_key, model_config, jobs)
        preds[y_key] = ols(train, test, y_key, selected_features)
    return preds

In [None]:
model_config = {
    "n_alphas": 100,
    "tscv_splits": 20
}

In [None]:
def print_current_values_and_errors():
    for col in Y_df.columns:
        actuals_no_nan = Actuals[col][train_start_size:].dropna()
        forecasts_no_nan = Forecasts[col][train_start_size:].dropna()
        forecast_errors_no_nan = Forecast_Errors[col][train_start_size:].dropna()
        
        print(f"Actuals for {col}:\n{actuals_no_nan}")
        print(f"Forecasts for {col}:\n{forecasts_no_nan}")
        print(f"Forecast_Errors for {col}:\n{forecast_errors_no_nan}")
        
        mae = mean_absolute_error(actuals_no_nan, forecasts_no_nan)
        mse = mean_squared_error(actuals_no_nan, forecasts_no_nan)
        
        print(f"Current MAE for {col}: {mae}")
        print(f"Current MSE for {col}: {mse}")

In [None]:
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

train_start_size = 200

# Initialize empty dictionaries to store results
Actuals = {col: pd.Series(dtype=float, name=col) for col in Y_df.columns}
Forecasts = {col: pd.Series(dtype=float, name=col) for col in Y_df.columns}
Forecast_Errors = {col: pd.Series(dtype=float, name=col) for col in Y_df.columns}
MAE = {}
MSE = {}
R2 = {}

for t in range(train_start_size, len(data) - 1):
    train = data.iloc[:t]
    test = data.iloc[t : t + 1]

    preds = multioutput_ols_post_lasso(train, test, Y_df.columns, model_config, jobs=10)

    for col in Y_df.columns:
        Forecasts[col] = pd.concat([Forecasts[col], preds[col]])
        Actuals[col] = pd.concat([Actuals[col], test[col]])
        Forecast_Errors[col] = pd.concat([Forecast_Errors[col], test[col] - preds[col]])
        
    # Print progress every 25 data points
    if t % 25 == 0:
        print(f"Progress: {t}/{len(data) - 1}")

# Calculate MAE and MSE for each target variable
for col in Y_df.columns:
    MAE[col] = mean_absolute_error(Actuals[col], Forecasts[col])
    MSE[col] = mean_squared_error(Actuals[col], Forecasts[col])
    R2[col] = r2_score(Actuals[col], Forecasts[col])


for col in Y_df.columns:
    print(f"Actuals for {col}:")
    print(Actuals[col])
    print(f"Forecasts for {col}:")
    print(Forecasts[col])
    print(f"Forecast Errors for {col}:")
    print(Forecast_Errors[col])
    print(f"Mean Absolute Error for {col}: {MAE[col]}")
    print(f"Mean Squared Error for {col}: {MSE[col]}")
    print(f"R-squared for {col}: {R2[col]}")

In [None]:
print("NaN values in Actuals:")
print(Actuals[col].isna().sum())

print("NaN values in Forecasts:")
print(Forecasts[col].isna().sum())

In [None]:
import matplotlib.pyplot as plt

In [None]:
residuals = {col: Actuals[col] - Forecasts[col] for col in Y_df.columns}

fig, axes = plt.subplots(nrows=6, ncols=5, figsize=(25, 30))

for i, col in enumerate(Y_df.columns):
    row_idx = i // 5
    col_idx = i % 5
    axes[row_idx, col_idx].scatter(Forecasts[col], residuals[col])
    axes[row_idx, col_idx].axhline(y=0, color='r', linestyle='--')
    axes[row_idx, col_idx].set_xlabel('Predicted Values')
    axes[row_idx, col_idx].set_ylabel('Residuals')
    axes[row_idx, col_idx].set_title(f'Residuals vs Predicted Values for {col}')

plt.tight_layout()
plt.show()

In [None]:
correlation_matrix = data.drop(Y_df.columns, axis=1).corr()

# Create a heatmap
fig, ax = plt.subplots()
cax = ax.imshow(correlation_matrix, cmap='coolwarm', vmin=-1, vmax=1)

# Set axis labels
ax.set_xticks(np.arange(len(correlation_matrix.columns)))
ax.set_yticks(np.arange(len(correlation_matrix.index)))
ax.set_xticklabels(correlation_matrix.columns)
ax.set_yticklabels(correlation_matrix.index)

# Rotate the x-axis labels
plt.setp(ax.get_xticklabels(), rotation=45, ha="right", rotation_mode="anchor")

cbar = fig.colorbar(cax, ax=ax)
cbar.ax.set_ylabel("Correlation", rotation=-90, va="bottom")

plt.title("Heatmap of Predictor Correlations")

plt.show()

In [None]:
from statsmodels.stats.outliers_influence import variance_inflation_factor
from statsmodels.tools.tools import add_constant

X = add_constant(data.drop(Y_df.columns, axis=1))

vif_data = pd.DataFrame()
vif_data["Feature"] = X.columns
vif_data["VIF"] = [variance_inflation_factor(X.values, i) for i in range(X.shape[1])]

print(vif_data)

In [None]:
from statsmodels.stats.stattools import durbin_watson

# Calculate the Durbin-Watson statistic for each target variable
for col in Y_df.columns:
    dw_stat = durbin_watson(Forecast_Errors[col])
    print(f"Durbin-Watson statistic for {col}: {dw_stat:.2f}")

In [None]:
from statsmodels.stats.diagnostic import acorr_ljungbox

significance_level = 0.05

train = data.iloc[:train_start_size]

# Calculate in-sample actuals and forecasts
in_sample_preds = {}
for col in Y_df.columns:
    selected_features = lasso_feature_selection(train, train, col, model_config)
    in_sample_preds[col] = ols(train, train, col, selected_features)
    
autocorrelation_results = pd.DataFrame(columns=['Industry', 'Lag', 'Test Statistic', 'p-value', 'Evidence'])

for col in Y_df.columns:
    actuals = train[col]
    residuals = actuals - in_sample_preds[col]
    lb_test_result = acorr_ljungbox(residuals, lags=60, return_df=True)

    for i, row in lb_test_result.iterrows():
        if i + 1 in [2, 12, 24, 36, 48, 60]:  # Only check for lags 2 and 24
            lb_stat, p_value = row['lb_stat'], row['lb_pvalue']
            evidence = "Yes" if p_value < significance_level else "No"
            result_row = pd.DataFrame(
                {
                    'Industry': [col],
                    'Lag': [i + 1],
                    'Test Statistic': [lb_stat],
                    'p-value': [p_value],
                    'Evidence': [evidence]
                }
            )
            autocorrelation_results = pd.concat([autocorrelation_results, result_row], ignore_index=True)

In [None]:
def highlight_significant(val):
    if val < 0.05:
        return 'background-color: green'
    else:
        return ''

# Turn the DataFrame to have industries as rows and lags as columns
reshaped_autocorrelation_results = autocorrelation_results.pivot_table(
    index='Industry',
    columns='Lag',
    values=['Test Statistic', 'p-value']
)

# Apply the custom formatting
highlighted_pvalues = reshaped_autocorrelation_results['p-value'].style.applymap(highlight_significant)

display(highlighted_pvalues)

In [None]:
from statsmodels.stats.diagnostic import het_breuschpagan
import statsmodels.api as sm

#Breusch-pagan test

bp_test_results = {}

for col in Y_df.columns:
    selected_features = lasso_feature_selection(train, train, col, model_config)
    in_sample_forecasts = ols(train, train, col, selected_features)  # Modify this line
    
    actuals = train[col]
    residuals = actuals - in_sample_forecasts
    
    X_train = train[selected_features]
    ols_model = sm.OLS(actuals, sm.add_constant(X_train)).fit()
    bp_test_result = het_breuschpagan(ols_model.resid, sm.add_constant(X_train))
    bp_test_results[col] = bp_test_result

In [None]:
bp_test_df = pd.DataFrame(columns=["Industry", "LM Statistic", "LM-Test p-value", "F-Statistic", "F-Test p-value"])

for col, bp_result in bp_test_results.items():
    row_df = pd.DataFrame({"Industry": [col],
                           "LM Statistic": [bp_result[0]],
                           "LM-Test p-value": [bp_result[1]],
                           "F-Statistic": [bp_result[2]],
                           "F-Test p-value": [bp_result[3]]})
    bp_test_df = pd.concat([bp_test_df, row_df], ignore_index=True)

bp_test_df.set_index("Industry", inplace=True)

print("Breusch-Pagan Test Results:")
print("----------------------------")
print(bp_test_df)

In [None]:
def highlight_bp(val):
    if val < 0.05:
        return 'background-color: green'
    else:
        return ''

styled_bp_test_df = bp_test_df.style.applymap(highlight_bp, subset=['LM-Test p-value', 'F-Test p-value'])

styled_bp_test_df

In [None]:
# Extract p-values from the results
lm_p_values = [result[1] for result in bp_test_results.values()]
f_p_values = [result[3] for result in bp_test_results.values()]

# Set up the bar chart
bar_width = 0.35
indices = list(range(len(Y_df.columns)))

fig, ax = plt.subplots(figsize=(10, 6))
bar1 = ax.bar(indices, lm_p_values, bar_width, label="LM-Test p-value")
bar2 = ax.bar([i + bar_width for i in indices], f_p_values, bar_width, label="F-Test p-value")

ax.axhline(y=0.05, color='r', linestyle='--')

ax.set_xlabel("Industries")
ax.set_ylabel("p-values")
ax.set_title("Breusch-Pagan Test p-values")
ax.set_xticks([i + bar_width / 2 for i in indices])
ax.set_xticklabels([str(i) for i in range(1, 31)])
ax.legend()

plt.show()

In [None]:
performance_metrics = pd.DataFrame(columns=['Industry', 'MAE', 'MSE', 'R2'])

# Populate the DataFrame with the existing performance metrics
for i, col in enumerate(Y_df.columns):
    performance_metrics.loc[i, 'Industry'] = col
    performance_metrics.loc[i, 'MAE'] = MAE[col]
    performance_metrics.loc[i, 'MSE'] = MSE[col]
    performance_metrics.loc[i, 'R2'] = R2[col]

# Calculate the mean across all industries
mean_mae = performance_metrics['MAE'].mean()
mean_mse = performance_metrics['MSE'].mean()
mean_r2 = performance_metrics['R2'].mean()

performance_metrics.loc[len(Y_df.columns), ['Industry', 'MAE', 'MSE', 'R2']] = ['Mean', mean_mae, mean_mse, mean_r2]

print(performance_metrics)

In [None]:
Historical_Mean_In_Sample_MSE = {}

# Calculate in-sample historical mean forecasts and MSE for each target variable
for col in Y_df.columns:
    in_sample_data = Y_df.iloc[:train_start_size]
    in_sample_historical_mean = in_sample_data[col].mean()
    in_sample_historical_mean_forecast = pd.Series([in_sample_historical_mean] * len(in_sample_data), index=in_sample_data[col].index)
    
    Historical_Mean_In_Sample_MSE[col] = mean_squared_error(in_sample_data[col], in_sample_historical_mean_forecast)
    
mean_in_sample_mse = sum(Historical_Mean_In_Sample_MSE.values()) / len(Historical_Mean_In_Sample_MSE)

In [None]:
# Create a DataFrame with columns for each performance metric
results_df = pd.DataFrame(columns=["Historical_Mean_In_Sample_MSE"])

# Put in in-sample MSE results
for col in Y_df.columns:
    results_df.loc[col] = [Historical_Mean_In_Sample_MSE[col]]

# Calculate and add the mean in-sample MSE to the DataFrame
results_df.loc["Mean"] = [mean_in_sample_mse]

In [None]:
R2_OS_OLS_Post_Lasso = {}
for col in Y_df.columns:
    R2_OS_OLS_Post_Lasso[col] = 1 - (performance_metrics.set_index("Industry").loc[col, "MSE"] / results_df.loc[col, "Historical_Mean_In_Sample_MSE"])

# Create a DataFrame to store the out-of-sample R-squared results
out_of_sample_predictability_df = pd.DataFrame(columns=["Industry", "R2_OS_OLS_Post_Lasso"])

# Put in the out-of-sample R-squared values
for i, col in enumerate(Y_df.columns):
    out_of_sample_predictability_df.loc[i, "Industry"] = col
    out_of_sample_predictability_df.loc[i, "R2_OS_OLS_Post_Lasso"] = R2_OS_OLS_Post_Lasso[col]

# Calculate the mean out-of-sample R-squared across all industries and add it to the DataFrame
mean_r2_os_post_lasso = out_of_sample_predictability_df["R2_OS_OLS_Post_Lasso"].mean()
out_of_sample_predictability_df.loc[len(Y_df.columns), ["Industry", "R2_OS_OLS_Post_Lasso"]] = ["Mean", mean_r2_os_post_lasso]

print(out_of_sample_predictability_df)

In [None]:
import pandas as pd
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

train_start_size = 200

# Initialize empty dictionaries to store results
Actuals = {col: pd.Series(dtype=float, name=col) for col in Y_df.columns}
Historical_Mean_Forecasts = {col: pd.Series(dtype=float, name=col) for col in Y_df.columns}
Historical_Mean_Forecast_Errors = {col: pd.Series(dtype=float, name=col) for col in Y_df.columns}
Historical_Mean_MAE = {}
Historical_Mean_MSE = {}
Historical_Mean_R2 = {}

# Expanding window loop
for t in range(train_start_size, len(Y_df) - 1):
    train = Y_df.iloc[:t]
    test = Y_df.iloc[t : t + 1]

    for col in Y_df.columns:
        historical_mean = train[col].mean()
        historical_mean_forecast = pd.Series([historical_mean], index=test[col].index)
        
        Historical_Mean_Forecasts[col] = pd.concat([Historical_Mean_Forecasts[col], historical_mean_forecast])
        Actuals[col] = pd.concat([Actuals[col], test[col]])
        Historical_Mean_Forecast_Errors[col] = pd.concat([Historical_Mean_Forecast_Errors[col], test[col] - historical_mean_forecast])
        
    # Print progress every 25 data points
    if t % 25 == 0:
        print(f"Progress: {t}/{len(Y_df) - 1}")

# Calculate MAE, MSE, and R-squared for each target variable
for col in Y_df.columns:
    Historical_Mean_MAE[col] = mean_absolute_error(Actuals[col], Historical_Mean_Forecasts[col])
    Historical_Mean_MSE[col] = mean_squared_error(Actuals[col], Historical_Mean_Forecasts[col])
    Historical_Mean_R2[col] = r2_score(Actuals[col], Historical_Mean_Forecasts[col])

In [None]:
def ols(train, test, y_key, selected_features):
    X_train = train[selected_features]
    X_test = test[selected_features]
    y_train = train[y_key]
    
    ols_model = LinearRegression().fit(X_train, y_train)
    y_hat = pd.Series(ols_model.predict(X_test), index=test.index).rename("ols_y_hat")
    
    return y_hat

def multioutput_ols(train, test, y_keys):
    preds = {}
    for y_key in y_keys:
        selected_features = train.drop(y_key, axis=1).columns
        preds[y_key] = ols(train, test, y_key, selected_features)
    return preds

Y_diff = Y_df.diff().dropna()

from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

train_start_size = 200

# Initialize empty dictionaries to store results
Actuals_ols = {col: pd.Series(dtype=float, name=col) for col in Y_diff.columns}
Forecasts_ols = {col: pd.Series(dtype=float, name=col) for col in Y_diff.columns}
Forecast_Errors_ols = {col: pd.Series(dtype=float, name=col) for col in Y_diff.columns}
MAE_ols = {}
MSE_ols = {}
R2_ols = {}

# Expanding window loop
for t in range(train_start_size, len(data) - 1):
    train = data.iloc[:t]
    test = data.iloc[t : t + 1]

    preds_ols = multioutput_ols(train, test, Y_diff.columns)

    for col in Y_diff.columns:
        Forecasts_ols[col] = pd.concat([Forecasts_ols[col], preds_ols[col]])
        Actuals_ols[col] = pd.concat([Actuals_ols[col], test[col]])
        Forecast_Errors_ols[col] = pd.concat([Forecast_Errors_ols[col], test[col] - preds_ols[col]])
        
    # Print progress every 25 data points
    if t % 25 == 0:
        print(f"Progress: {t}/{len(data) - 1}")

# Calculate MAE and MSE for each target variable
for col in Y_diff.columns:
    MAE_ols[col] = mean_absolute_error(Actuals_ols[col], Forecasts_ols[col])
    MSE_ols[col] = mean_squared_error(Actuals_ols[col], Forecasts_ols[col])
    R2_ols[col] = r2_score(Actuals_ols[col], Forecasts_ols[col])

In [None]:
import numpy as np
import scipy.stats as stats

def diebold_mariano_test(errors1, errors2, h=1, alternative='two_sided'):
    """
    Perform Diebold-Mariano test for equal forecast accuracy.
    
    errors1 : array_like
        Forecast errors from the first model.
    errors2 : array_like
        Forecast errors from the second model.
    h : int, optional
        Forecast horizon, default is 1.
    alternative : str, optional
        Alternative hypothesis, one of 'two_sided', 'greater', 'less', default is 'two_sided'.
    
    Returns
    -------
    dm_stat : float
        Diebold-Mariano test statistic.
    p_value : float
        p-value for the Diebold-Mariano test.
    """
    assert len(errors1) == len(errors2), "Error series must have the same length"
    assert alternative in ['two_sided', 'greater', 'less'], "Invalid alternative hypothesis"
    
    d = errors1**2 - errors2**2
    T = len(d)
    d_bar = np.mean(d)
    gamma0 = np.var(d)
    
    # Calculate autocovariance of d for lag j
    autocov_d = [np.cov(d[:-j], d[j:])[0, 1] for j in range(1, h)]
    
    # Calculate variance of d_bar
    variance_d_bar = (1 / T) * (gamma0 + 2 * sum([((T - j) / T) * autocov_d[j - 1] for j in range(1, h)]))
    dm_stat = d_bar / np.sqrt(variance_d_bar)
    
    if alternative == 'two_sided':
        p_value = 2 * (1 - stats.norm.cdf(abs(dm_stat)))
    elif alternative == 'greater':
        p_value = 1 - stats.norm.cdf(dm_stat)
    elif alternative == 'less':
        p_value = stats.norm.cdf(dm_stat)
        
    return dm_stat, p_value

# Compute forecast errors for each model and target variable
errors_ols_post_lasso = {col: Forecast_Errors[col].values for col in Y_df.columns}
errors_ols = {col: Forecast_Errors_ols[col].values for col in Y_df.columns}

# Perform Diebold-Mariano test
for col in Y_df.columns:
    dm_stat, p_value = diebold_mariano_test(errors_ols[col], errors_ols_post_lasso[col])
    print(f"Diebold-Mariano test for {col}:")
    print(f"Test statistic: {dm_stat:.5f}")
    print(f"p-value: {p_value:.8f}")