# TIØ4317 Empirical Mini Project
## Introduction
Hva er dette prosjektet, formål osv. forklare litt

## Data Preprocessing

In [None]:
# Imports
import pandas as pd
import numpy as np
import os
from matplotlib import pyplot as plt
import seaborn as sns
from statsmodels.tsa.stattools import adfuller
import statsmodels.api as sm
from statsmodels.stats.diagnostic import acorr_breusch_godfrey, het_arch
from statsmodels.graphics.tsaplots import plot_pacf
from sklearn.metrics import mean_absolute_error, mean_squared_error
from tabulate import tabulate

We start by reading the different csv-files that contain the data.

In [None]:
# Define the folder path
data_path = "data/"

# Load datasets
zero_coupon = pd.read_csv(os.path.join(data_path, "zero_coupon_rates.csv"), delimiter=";")
exchange_rates = pd.read_csv(os.path.join(data_path, "usd_nok.csv"), delimiter=";")
inflation = pd.read_csv(os.path.join(data_path, "kpi.csv"), delimiter=";")
osebx = pd.read_csv(os.path.join(data_path, "osebx_prices.csv"), delimiter=";")

# Replace commas with dots in the KPI column and convert to float
inflation["kpi"] = inflation["kpi"].str.replace(",", ".").astype(float)


##### Missing Values
Then, we check for missing values in the data.

In [None]:
datasets = {
    "OSEBX": osebx,
    "Zero Coupon": zero_coupon,
    "Exchange Rates": exchange_rates,
    "Inflation": inflation
}

for name, df in datasets.items():
    missing = df.isnull().sum()
    missing_vars = missing[missing > 0]

    if missing_vars.empty: 
        print(f"{name}: No missing values.")
    else: # If missing values are present, print in which variables it is found. 
        print(f"{name}: Missing values in {', '.join(missing_vars.index)}.")

From the output, we can see that the data does not contain any missing values.

##### Convert to Datetime format
Next, we convert the dates to Datetime format.

Since the inflation data is originally available only on a monthly basis, we apply linear interpolation to estimate daily values.

In [None]:
# Convert Date to Datetime format
osebx["Date"] = pd.to_datetime(osebx["Date"])
zero_coupon["TIME_PERIOD"] = pd.to_datetime(zero_coupon["TIME_PERIOD"])
exchange_rates["TIME_PERIOD"] = pd.to_datetime(exchange_rates["TIME_PERIOD"])
inflation["Date"] = pd.to_datetime(inflation["Date"], format="%YM%m")

# Create a full date range from the first to last available date in your dataset
full_date_range = pd.date_range(start=inflation["Date"].min(), end=inflation["Date"].max(), freq="D")

# Create a DataFrame with daily dates
inflation_daily = pd.DataFrame({"Date": full_date_range})

# Merge with the original inflation data (left join) and forward-fill missing values
inflation_daily = inflation_daily.merge(inflation, on="Date", how="left")
inflation_daily["kpi"] = inflation_daily["kpi"].interpolate(method="linear")

##### Compute log returns for OSEBX
Also, rename columns to ensure that all dataframes have a column named "Date", so that we can merge all datasets on "Date". After having merged all the datasets to one dataframe, we drop all the columns we are not interested in. Consequently, the columns we are left with are "Date", "kpi", "zero_coupon_rate", "usd_nok_exchange_rate", and "log_return".

In [None]:
# Compute log returns
osebx["log_return"] = np.log(osebx["Close"] / osebx["Close"].shift(1))
osebx.dropna(inplace=True)  # Drop the first row where return cannot be calculated

# Rename columns to "Date"
zero_coupon.rename(columns={"TIME_PERIOD": "Date"}, inplace=True)
exchange_rates.rename(columns={"TIME_PERIOD": "Date"}, inplace=True)

# Merge all datasets on 'Date'
df = inflation_daily.merge(zero_coupon, on="Date", how="inner")
df = df.merge(exchange_rates, on="Date", how="inner")
df = df.merge(osebx, on="Date", how="inner") 

df = df.drop(columns=["Close", "High", "Low", "Open", "Volume",
       "FREQ_x", "Frequency_x", "TENOR_x", "Tenor_x", "DECIMALS_x",
       "FREQ_y", "Frequency_y", "BASE_CUR", "Base Currency",
       "QUOTE_CUR", "Quote Currency", "TENOR_y", "Tenor_y", "DECIMALS_y",
       "CALCULATED", "UNIT_MULT", "Unit Multiplier", "COLLECTION",
       "Collection Indicator"])

df.rename(columns={"OBS_VALUE_x": "zero_coupon_rate"}, inplace=True)
df.rename(columns={"OBS_VALUE_y": "usd_nok_exchange_rate"}, inplace=True)

print(df.head())

##### Multicollinearity
Check for multicollinearity by computing the correlation matrix.

In [None]:
# Compute the correlation matrix
selected_columns = ["kpi", "usd_nok_exchange_rate", "zero_coupon_rate"]
corr_matrix = df[selected_columns].corr()

# Visualize the correlation matrix as a heatmap
plt.figure(figsize=(10, 6))
sns.heatmap(corr_matrix, annot=True, cmap="coolwarm", fmt=".2f", linewidths=0.5)
plt.title("Correlation Matrix")
plt.show()

We define highly correlated values to have a correlation coefficient >0.9. Because all variables have a correlation coefficient below 0.9, we decide to keep all of the explanatory variables. However, we notice that especially between usd_nok_exchange_rate and kpi and between zero_coupon_rate and kpi the correlation coefficient is close to the boundary. 

##### Stationarity
After having checked for correlation, we check for stationarity by applying the Augmented Dickey-Fuller (ADF) test.

In [None]:
from statsmodels.tsa.stattools import adfuller


# Check for stationarity 
def adf_test(series, var_name):
    result = adfuller(series.dropna())  
    status = "Stationary" if result[1] < 0.05 else "Non-stationary"

    return [var_name, f"{result[0]:.4f}", f"{result[1]:.4f}", status]

# Run ADF tests on all relevant variables
stationarity = [
    adf_test(df["log_return"], "Log Return"),
    adf_test(df["zero_coupon_rate"], "Zero Coupon Rate"),
    adf_test(df["usd_nok_exchange_rate"], "USD/NOK Exchange Rate"),
    adf_test(df["kpi"], "CPI")
]

# Print results in a formatted table
headers = ["Variable", "ADF Statistic", "p-value", "Result"]
print(tabulate(stationarity, headers=headers, tablefmt="pretty"))


The output indicates that the variables "zero coupon rate," "USD/NOK exchange rate," and "CPI" are non-stationary. To address this, we apply first differences to these columns, ensuring stationarity in our analysis.

In [None]:
# Transform to stationary variables by applying first differences
df["d_kpi"] = df["kpi"].diff()
df["d_zero_coupon_rate"] = df["zero_coupon_rate"].diff()
df["d_usd_nok_exchange_rate"] = df["usd_nok_exchange_rate"].diff()

df = df.dropna()

# Run ADF tests after applying first differences on all relevant variables
post_diff_stationarity = [
    adf_test(df["d_zero_coupon_rate"], "Zero Coupon Rate"),
    adf_test(df["d_usd_nok_exchange_rate"], "USD/NOK Exchange Rate"),
    adf_test(df["d_kpi"], "CPI")
]

# Print results in a table format
headers = ["Variable", "ADF Statistic", "p-value", "Stationarity"]
print(tabulate(post_diff_stationarity, headers=headers, tablefmt="pretty"))


From the output, we can see that all the variables now are stationary.

#### Split into training and testing data

In [None]:
# Define the split percentage (e.g., 80% train, 20% test)
split_idx = int(len(df) * 0.8)

# Split the data
train_df = df.iloc[:split_idx].copy()  # First 80% for training
test_df = df.iloc[split_idx:].copy()   # Last 20% for testing

# Define training and testing sets
X_train, y_train = train_df[['d_kpi', 'd_zero_coupon_rate', 'd_usd_nok_exchange_rate']], train_df['log_return']
X_test, y_test = test_df[['d_kpi', 'd_zero_coupon_rate', 'd_usd_nok_exchange_rate']], test_df['log_return']


## Multiple Linear Regression (MLR)
### Autocorrelation
Start by fitting a model to the training set, and then proceed to check for autocorrelation.

In [None]:
# Add constant for intercept
X_train_ols = sm.add_constant(X_train)

# Train OLS model
ols_model = sm.OLS(y_train, X_train_ols).fit()

Define a function to test for autocorrelation using Breusch-Godfrey test.

In [None]:
def breusch_godfrey_test(model, nlags=1):
    # Perform the Breusch-Godfrey test
    lm_stat, p_value, f_stat, f_p_value = acorr_breusch_godfrey(model, nlags=nlags)

    # Print the test name and results
    print("Breusch-Godfrey Test for Autocorrelation")
    print("="*40)
    print(f"LM Statistic: {lm_stat:.4f}")
    print(f"P-Value: {p_value:.4f}")
    print(f"F-Statistic: {f_stat:.4f}")
    print(f"F-Test P-Value: {f_p_value:.4f}")
    print("="*40)

    # Interpretation of the p-value
    if p_value < 0.05:
        print("Autocorrelation detected in residuals (reject H0).")
    else:
        print("No significant autocorrelation detected (fail to reject H0).")


In [None]:
# Test if autocorrelation is present in our model         
breusch_godfrey_test(ols_model, nlags=1)

Since the p-value of the Lagrange Multiplier statistic (LM) is \<0.05 we reject H0. Since the p-value of the F-statistic is also \< 0.05, it suggest that adding lagged variables could improve the model by removing autocorrelation. 

To get an impression of how many lags to use we visualize the partial autocorrelation function (PACF). 

In [None]:
# Check PACF for 'log_return'
plt.figure(figsize=(8, 5))
plot_pacf(df['log_return'], lags=21, method='ywm')
plt.title("Partial Autocorrelation Function (PACF)")
plt.show()

The figure above shows the partial autocorrelation coefficient for each lag, with the blue dashed lines representing the 95% confidence interval. Any bar extending this interval indicates statistical significance. From the figure, we observe that none of the lags appear to be highly significant. Therefore, we will continue adding lags until we no longer detect autocorrelation.

In [None]:
# Added lags
lags = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28]
variables = ['d_kpi', 'd_zero_coupon_rate', 'd_usd_nok_exchange_rate']

# Create lagged variables using a loop
for var in variables:
    for lag in lags:
        train_df[f'{var}_lag{lag}'] = train_df[var].shift(lag)

# Drop NaN values introduced by lagging
train_df.dropna(inplace=True)

# Define independent variables dynamically
X = train_df[[col for col in train_df.columns if col.startswith('d_kpi') or 
               col.startswith('d_zero_coupon_rate') or 
               col.startswith('d_usd_nok_exchange_rate')]]


y_train = train_df['log_return']

# Add a constant for intercept
X = sm.add_constant(X)

# Fit the new OLS model
lagged_model = sm.OLS(y_train, X).fit()

# Check for autocorrelation again
breusch_godfrey_test(lagged_model, nlags=1)


We had to add 28 lags to remove autocorrelation. However, including lagged variables to address autocorrelation introduces some issues. It violates the assumption that the explanatory variables are non-stochastic, and while it address autocorrelation, it also makes the model harder to interpret (SOURCE? Brooks, page 284-285) 

Given these problems, we revisited the PACF and observed that lag 7 and lag 18 appeared to be the most significant. Therefore, we decided to proceed with a model that includes only lag 7 and lag 18, even though autocorrelation is present. While ignoring autocorrelation keeps the coefficient estimates unbiased, the standard error estimates could be wrong. This could lead to unrealiable significance tests and potentially misleading conclusions (SOURCE?, Brooks, p. 276). 

In [None]:
# Define the lags we want to keep
keep_lags = [7, 18]

# Keep only the original variables and the selected lagged variables
columns_to_keep = ['d_kpi', 'd_zero_coupon_rate', 'd_usd_nok_exchange_rate']  # Original columns
columns_to_keep += [f'{var}_lag{lag}' for var in variables for lag in keep_lags]  # Only lags 7 & 18

# Select only the necessary columns
train_df = train_df[columns_to_keep + ['log_return']]

# Drop NaN values introduced by lagging
train_df.dropna(inplace=True)

# Define independent and dependent variables
X = train_df.drop(columns=['log_return'])
y_train = train_df['log_return']

# Add a constant for intercept
X = sm.add_constant(X)

# Fit the new OLS model
keep_lags_model = sm.OLS(y_train, X).fit()

breusch_godfrey_test(keep_lags_model, nlags=1)

### Heteroscedasticity
We test for heteroscedasticity using the ARCH test, as it is well-suited for time-series data.

In [None]:
# Extract residuals
residuals = keep_lags_model.resid

# Perform ARCH test
arch_test = het_arch(residuals)

# Print results
print(f"ARCH Test Statistic: {arch_test[0]}")
print(f"p-value: {arch_test[1]}")

To deal with heteroscedasticity we use heteroscedasticity-consistent standard error estimates.
HC3 is conservative and recommended for larger sample sizes. Since our sample size is approximately 2000 observations it can be considered medium to large, and HC3 could be a good choice. 


In [None]:
# Compute robust standard errors (Huber-White/HC3 for larger samples)
model_robust = keep_lags_model.get_robustcov_results(cov_type='HC3')

# Print summary with robust standard errors
print(model_robust.summary())

### Performance Evaluation 

In [None]:
X_test = X_test.copy()

# Ensure that lags 7 and 18 exist in the test set
X_test['d_kpi_lag7'] = X_test['d_kpi'].shift(7)
X_test['d_zero_coupon_rate_lag7'] = X_test['d_zero_coupon_rate'].shift(7)
X_test['d_usd_nok_exchange_rate_lag7'] = X_test['d_usd_nok_exchange_rate'].shift(7)

X_test['d_kpi_lag18'] = X_test['d_kpi'].shift(18)
X_test['d_zero_coupon_rate_lag18'] = X_test['d_zero_coupon_rate'].shift(18)
X_test['d_usd_nok_exchange_rate_lag18'] = X_test['d_usd_nok_exchange_rate'].shift(18)

# Drop rows with NaN values (first 18 rows will have NaNs due to lags)
X_test = X_test.dropna()
y_test = y_test.loc[X_test.index]  # Ensure y_test aligns with new X_test

# Add constant for intercept
X_test = sm.add_constant(X_test)

# Make predictions
y_pred = model_robust.predict(X_test)

# Compute performance metrics
mape = np.mean(np.abs((y_test - y_pred) / y_test)) * 100  # Mean Absolute Percentage Error
mse = mean_squared_error(y_test, y_pred)  # Mean Squared Error
mae = mean_absolute_error(y_test, y_pred)  # Mean Absolute Error

# Print results
print(f"MAPE: {mape:.4f}%")
print(f"MSE: {mse:.4f}")
print(f"MAE: {mae:.4f}")

## ARIMAX

In [None]:
# TODO: Elias og Erik