In [None]:
# Code that tunes and trains the ARIMA model on all the exchange rates to perform forecasts and calculate the model uncertainty using the GARCH model.
# Author Samwel Portelli <samwel.portelli.18@um.edu.mt>
# Code to tune and update the ARIMA model adapted from http://alkaline-ml.com/2019-12-18-pmdarima-1-5-2/ (last accessed: 28/11/2024)

In [None]:
from deel.puncc.metrics import regression_ace
from deel.puncc.metrics import regression_mean_coverage
from deel.puncc.metrics import regression_sharpness
import pandas as pd
from statsmodels.tsa.arima.model import ARIMA
import numpy as np
import matplotlib.pyplot as plt
import pmdarima
import arch
import scipy.stats as stats
import csv
from pmdarima.arima import ndiffs
import pmdarima as pm
import warnings; 
warnings.filterwarnings('ignore')
import os
import csv

def direction_accuracy(actual, predicted):
    
    # Ensure the lengths of actual and predicted are the same
    if len(actual) != len(predicted):
        raise ValueError("Input lists must have the same length.")

    # Calculate differences between consecutive values
    actual_diff = [actual[i] - actual[i-1] for i in range(1, len(actual))]
    predicted_diff = [predicted[i] - actual[i-1] for i in range(1, len(predicted))]

    # Count the number of correct predictions
    correct_predictions = sum((a * p) > 0 for a, p in zip(actual_diff, predicted_diff))

    # Calculate accuracy as a percentage
    accuracy_percentage = (correct_predictions / len(actual_diff)) * 100.0

    return float(accuracy_percentage)

def create_unique_folder(folder_name):
    version = 1
    new_folder_name = folder_name
    while os.path.exists(new_folder_name):
        new_folder_name = f"{folder_name}_v{version}"
        version += 1
    os.makedirs(new_folder_name)
    return new_folder_name

In [None]:
def forecast_one_step(exog_y_test_values, alpha):
    fc, conf_int = model.predict(n_periods=1,X=exog_y_test_values, return_conf_int=True, alpha=alpha)
    return (
        fc.tolist()[0],
        np.asarray(conf_int).tolist()[0])

In [None]:
# Load the exchange_rate dataset
data = pd.read_csv("C:/Users/porte/Downloads/dataset/exchange_rate/exchange_rate.csv", index_col='date', parse_dates=True)

# Select the relevant columns for the model
columns = ['0', '1', '2', '3', '4', '5', '6', 'OT']

window_size = 96

train_data = data.iloc[:5311]
test_data = data.iloc[5215:]

train_end = 5311
test_start = 5311
lags = 8

alphas=[0.01, 0.05, 0.1, 0.15]

confidence=np.subtract(np.ones(len(alphas)), alphas)

targets_to_run = ['0', '1', '2', '3', '4', '5', '6', 'OT']

folder_names = ['.\\arima_garch_8lags\\val_arima_garch_model_data_0', '.\\arima_garch_9lags\\val_arima_garch_model_data_1',  '.\\arima_garch_9lags\\val_arima_garch_model_data_2',  '.\\arima_garch_9lags\\val_arima_garch_model_data_3', '.\\arima_garch_9lags\\val_arima_garch_model_data_4', '.\\arima_garch_9lags\\val_arima_garch_model_data_5', '.\\arima_garch_9lags\\val_arima_garch_model_data_6', '.\\arima_garch_9lags\\val_arima_garch_model_data_OT']
test_folder_names = ['.\\arima_garch_9lags\\test_arima_garch_model_data_0', '.\\arima_garch_9lags\\test_arima_garch_model_data_1', '.\\arima_garch_9lags\\test_arima_garch_model_data_2', '.\\arima_garch_9lags\\test_arima_garch_model_data_3', '.\\arima_garch_9lags\\test_arima_garch_model_data_4', '.\\arima_garch_9lags\\test_arima_garch_model_data_5',  '.\\arima_garch_9lags\\test_arima_garch_model_data_6', '.\\arima_garch_9lags\\test_arima_garch_model_data_OT']

orders = []
model_alpha = []

for ind, target in enumerate(targets_to_run):
     
    folder_name = create_unique_folder(folder_names[ind])
    test_folder_name = create_unique_folder(test_folder_names[ind])
    
    # Create rolling windows for training data
    train_X, train_y = create_rolling_windows(train_data, window_size, target)

    # Create rolling windows for testing data
    test_X, test_y = create_rolling_windows(test_data, window_size, target)

    ace_list, mean_coverage_list, average_width_list, direction_accuracy_value_list = [], [], [], []

    for alpha in alphas:
        
        # Reading and preparing the data
        df = pd.read_csv("C:/Users/porte/Downloads/dataset/exchange_rate/exchange_rate.csv", usecols=[target], parse_dates=True)
        df.rename(columns={target: "Open"}, inplace=True)

        exog_var = targets.remove(target)

        exog_df = pd.read_csv("C:/Users/porte/Downloads/dataset/exchange_rate/exchange_rate.csv", usecols=exog_var, parse_dates=False)
        exog_df = exog_df.drop(['date'], axis=1)

        y_train = df[1:train_end].values
        y_test = df[test_start:].values

        exog_y_train = exog_df[:train_end-1].values
        exog_y_test = exog_df[test_start:].values
        exog_y_test = np.concatenate((np.array([exog_y_train[-1]]), exog_y_test[:-1]))
        
        # Finding the best parameters for the model
        kpss_diffs = ndiffs(y_train, alpha=0.05, test='kpss', max_d=6)
        adf_diffs = ndiffs(y_train, alpha=0.05, test='adf', max_d=6)
        n_diffs = max(adf_diffs, kpss_diffs)

        # Training the arima model
        model = pm.auto_arima(y_train, exog_y_train, d=n_diffs, seasonal=False, stepwise=True,
                         suppress_warnings=True, max_p=6, trace=2)

        print(model.order)
        orders.append(model.order)
        model_alpha.append(str(target)+'_'+str(alpha))

        arima_residuals = model.resid()
        arima_residuals = arima_residuals[-lags:]

        print('Testing alpha value: '+str(alpha))
        
        # Fitting the garch to the residuals of the arima model
        garch = arch.arch_model(arima_residuals, p=1, q=1, vol="Garch", dist="Normal")
        garch_fitted = garch.fit(update_freq=1)

        # Generate forecasts for the next 1422 periods (testing set)
        forecast_values, forecast_intervals_lower, forecast_intervals_upper, garch_upper, garch_lower = [], [], [], [], []
        
        for ind, new_ob in enumerate(y_test):
            
            fc, conf = forecast_one_step(np.array([exog_y_test[ind]]), alpha)
            forecast_values.append(fc)
            forecast_intervals_lower.append(conf[0])
            forecast_intervals_upper.append(conf[1])

            # Updates the existing model with a small number of MLE steps
            model.update(new_ob, np.array([exog_y_test[ind]]))
            
            # Obtaining the prediction intervals
            garch_forecast = garch_fitted.forecast(horizon=1)
            predicted_et = garch_forecast.mean['h.1'].iloc[-1]
            garch_variance = garch_forecast.residual_variance['h.1'].iloc[-1]

            garch_upper.append(forecast_values[-1] + stats.norm.ppf(1 - alpha / 2)*np.sqrt(garch_variance))
            garch_lower.append(forecast_values[-1] - stats.norm.ppf(1 - alpha / 2)*np.sqrt(garch_variance))

            if ind==0:
                updated_errors = np.concatenate((arima_residuals[1:],(y_test[ind]-fc)))
            else:
                updated_errors = np.concatenate((updated_errors[1:],(y_test[ind]-fc)))

            # re-fit GARCH model
            garch = arch.arch_model(updated_errors, p=1, q=1, vol="Garch", dist="Normal")
            garch_fitted = garch.fit(update_freq=0, disp=False)
            
        valpreds = forecast_values[:760]
        testpreds = forecast_values[760:]
        
        valtrues = test_y[:760]
        testtrues = test_y[760:]
        
        val_garch_upper = garch_upper[:760]
        val_garch_lower = garch_lower[:760]
        test_garch_upper = garch_upper[760:]
        test_garch_lower = garch_lower[760:]
        
        # VALIDATION
        forecast_intervals_lower = np.array(val_garch_lower).flatten()
        forecast_intervals_upper = np.array(val_garch_upper).flatten()


        # Create a dictionary with the arrays
        actual_pred_data = {
            'y_true': valtrues,
            'y_pred_from_arimagarch': np.array(valpreds).flatten(),
            'y_lower': forecast_intervals_lower,
            'y_upper': forecast_intervals_upper
        }

        # Create a DataFrame
        df = pd.DataFrame(actual_pred_data)

        # Specify the CSV file name
        preds_actual_csv_file_name = folder_name  + '\\val_arimagarch_y_true_and_pred_alp_'+str(int((1-alpha)*100))+'.csv'

        # Write to the CSV file
        df.to_csv(preds_actual_csv_file_name, index=False)

        print(f"Data saved to {preds_actual_csv_file_name}")
        
        # TEST
        forecast_intervals_lower = np.array(test_garch_lower).flatten()
        forecast_intervals_upper = np.array(test_garch_upper).flatten()

        # Create a dictionary with the arrays
        actual_pred_data = {
            'y_true': testtrues,
            'y_pred_from_arimagarch': np.array(testpreds).flatten(),
            'y_lower': forecast_intervals_lower,
            'y_upper': forecast_intervals_upper
        }

        # Create a DataFrame
        df = pd.DataFrame(actual_pred_data)

        # Specify the CSV file name
        preds_actual_csv_file_name = test_folder_name  + '\\arimagarch_y_true_and_pred_alp_'+str(int((1-alpha)*100))+'.csv'

        # Write to the CSV file
        df.to_csv(preds_actual_csv_file_name, index=False)

        print(f"Data saved to {preds_actual_csv_file_name}")

    # Combine lists into a list of tuples
    metric_data = list(zip(ace_list, mean_coverage_list, average_width_list, direction_accuracy_value_list))

    # Specify the CSV file name
    csv_file_name = folder_name + '\\val_arimagarch_multiple_alpha_metrics_data.csv'

    # Write to the CSV file
    with open(csv_file_name, mode='w', newline='') as file:
        writer = csv.writer(file)
        # Write the header
        writer.writerow(['ACE', 'Mean Coverage', 'Average Width', 'Direction Accuracy'])
        # Write the data
        writer.writerows(metric_data)

    print(f"Metrics saved to {csv_file_name}")
    
# Open a CSV file for writing
with open('.\\arima_garch_8lags\\arima_orders_'+target+'.csv', 'w', newline='') as csvfile:
    csvwriter = csv.writer(csvfile)
    
    # Write the header
    csvwriter.writerow(['target_alpha', 'order'])
    
    # Write the data
    for string, tuple_value in zip(model_alpha, orders):
        csvwriter.writerow([string, tuple_value])