# Libraries necesary

%pip install keras

%pip install numpy

%pip install pandas

%pip install pmdarima

%pip install statsmodels

%pip install matplotlib

In [1]:
from keras.models import model_from_json
import numpy as np
import pandas as pd
from pmdarima import auto_arima
from pmdarima.arima import auto_arima
import sys
from contextlib import redirect_stdout
import warnings
from pmdarima.arima import ARIMA
from statsmodels.tsa.stattools import acf, pacf
import matplotlib.pyplot as plt
import random
import logging
import os

# Suppress specific TensorFlow retracing warnings
logging.getLogger('tensorflow').setLevel(logging.ERROR)


def load_model(architecture_path, weights_path):
    try:
        # Load the model architecture from JSON
        with open(architecture_path, 'r') as json_file:
            loaded_model_json = json_file.read()
        
        # Reconstruct the model from the architecture
        model = model_from_json(loaded_model_json)
        
        # Load the model weights into the reconstructed model
        model.load_weights(weights_path)
        
        return model
    except Exception as e:
        print(f"Error: {e}")
        print("The model architecture or weights could not be loaded. Check if the files exist and are in the correct format.")
        sys.exit(1)

def prepare_data_for_lstm(acf_vals, pacf_vals):
    acf_list = list(acf_vals[1:])
    pacf_list = list(pacf_vals[1:])
    X_test = [acf_list, pacf_list]
    X_test = np.array(X_test)
    X_test = np.expand_dims(X_test, axis=0)
    return X_test


def freitas_gouveia(ts_data, opt=1, acf_pacf = False, alpha = 0.01, supress_general_warnings = True, hynd_start_p = 0, hynd_d = None, hynd_start_q = 0, hynd_max_p = 5, hynd_max_d = 2, hynd_max_q = 5, 
                    hynd_start_P = 0, hynd_D = None, hynd_start_Q = 1, hynd_max_P = 0, hynd_max_D = 1, hynd_max_Q = 2, hynd_max_order = 5, hynd_m = 1, hynd_seasonal = True, hynd_stationary = False, 
                    hynd_information_criterion = 'aic', hynd_alpha = 0.05, hynd_test = 'kpss', hynd_seasonal_test = 'ocsb', hynd_stepwise = True, hynd_n_jobs = 1, hynd_start_params = None, 
                    hynd_trend = None, hynd_method = 'lbfgs', hynd_maxiter = 50, hynd_offset_test_args = None, hynd_seasonal_test_args = None, hynd_suppress_warnings = True, hynd_error_action = 'trace',
                      hynd_trace = False, hynd_random = False, hynd_random_state = None, hynd_n_fits = 10, hynd_out_of_sample_size = 0, hynd_scoring = 'mse', hynd_scoring_args = None, 
                      hynd_with_intercept = 'auto'):
    
    # ts_data = time series data
    
    # opt = 1: return only the Freitas-Gouveia model
    # opt = 2: return the LSTM model and the improved Hyndman-Khandakar algorithm
    # opt = 3: return the LSTM model, the Hyndman-Khandakar algorithm and the improved Hyndman-Khandakar algorithm
    # opt = 4: return only the improved Hyndman-Khandakar algorithm
    # opt = 5: return only the Hyndman-Khandakar algorithm

    # acf_pacf = True: plot the ACF and PACF.

    # alpha = significance level for the p-values of the coefficients. Default is 0.01.

    # supress_general_warnings = True: suppress general warnings.

    # For Hyndman arguments check: https://alkaline-ml.com/pmdarima/modules/generated/pmdarima.arima.AutoARIMA.html

    # Suppress general warnings
    if supress_general_warnings == True:
        warnings.filterwarnings("ignore")
        
    # try to convert the input data to a list
    try:
        input_data = list(ts_data)
        # convert to float
        input_data = [float(i) for i in input_data]
    except:
        print("Error: the input cannot be converted to a list. Try using a list or an array.")
        sys.exit(1)

    ############################################################### Freitas-Gouveia model
    # get order of differencing
    hyndman = auto_arima(input_data, start_p=0, start_q=0, max_p=0, max_q=0)
    non_seasonal_order = hyndman.order
    temp1, hynd_d, temp2 = non_seasonal_order

    # if hyndman_d is > 0, perform d-differencing
    if hynd_d > 0:
        data = np.diff(input_data, hynd_d)
    else:
        data = input_data

    # Calculate the ACF and PACF
    acf_vals = acf(data, nlags=20)
    pacf_vals = pacf(data, nlags=20)

    if acf_pacf == True:
        # Create two subplots and unpack the output array immediately
        f, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 6))

        # Plot the ACF
        ax1.axhline(y=0, linestyle='-', color='black')
        ax1.axhline(y=-1.96 / np.sqrt(len(data)), linestyle='--', color='blue')
        ax1.axhline(y=1.96 / np.sqrt(len(data)), linestyle='--', color='blue')
        ax1.plot(acf_vals, color='red')
        if hynd_d > 0:
            ax1.set_title(f'Autocorrelation Function (differenced {hynd_d} times)')
        else:
            ax1.set_title('Autocorrelation Function')

        # Plot the PACF
        ax2.axhline(y=0, linestyle='-', color='black')
        ax2.axhline(y=-1.96 / np.sqrt(len(data)), linestyle='--', color='blue')
        ax2.axhline(y=1.96 / np.sqrt(len(data)), linestyle='--', color='blue')
        ax2.plot(pacf_vals, color='red')
        if hynd_d > 0:
            ax2.set_title(f'Partial Autocorrelation Function (differenced {hynd_d} times)')
        else:
            ax2.set_title('Partial Autocorrelation Function')


    if opt == 1 or opt == 2 or opt == 3:

        model = load_model('freitas-gouveia.json', 'freitas-gouveia.weights.h5')
        X_test = prepare_data_for_lstm(acf_vals, pacf_vals)

        predictions = model.predict(X_test, verbose=0)
        predicted_order = abs(np.round(predictions))
        predicted_order = predicted_order.astype(int)
        predicted_order

        lstm_p = predicted_order[0][0]
        lstm_q = predicted_order[0][1]
        lstm_d = hynd_d

    if opt == 1:
        print('Warning: Make sure to check data for Seasonality as Freitas-Gouveia model is not able to handle SARIMA models....')
        print(f'Freitas-Gouveia model returned ARIMA({lstm_p}, {lstm_d}, {lstm_q}).')
        return [lstm_p, hynd_d, lstm_q]

    ############################################################### Hyndman-Khandakar algorithm     
    data = input_data
    #hyndman = auto_arima(data, start_p=0, start_q=0, start_P=0, start_Q=0)

    hyndman = auto_arima(data, start_p=hynd_start_p, d = hynd_d, start_q=hynd_start_q, max_p=hynd_max_p, max_d=hynd_max_d, max_q=hynd_max_q, start_P=hynd_start_P, D=hynd_D, start_Q=hynd_start_Q, 
                         max_P=hynd_max_P, max_D=hynd_max_D, max_Q=hynd_max_Q, max_order=hynd_max_order, m=hynd_m, seasonal=hynd_seasonal, stationary=hynd_stationary, 
                         information_criterion=hynd_information_criterion, alpha=hynd_alpha, test=hynd_test, seasonal_test=hynd_seasonal_test, stepwise=hynd_stepwise, 
                         n_jobs=hynd_n_jobs, start_params=hynd_start_params, trend=hynd_trend, method=hynd_method, maxiter=hynd_maxiter, offset_test_args=hynd_offset_test_args, 
                         seasonal_test_args=hynd_seasonal_test_args, suppress_warnings=hynd_suppress_warnings, error_action=hynd_error_action, trace=hynd_trace, random=hynd_random, 
                         random_state=hynd_random_state, n_fits=hynd_n_fits, out_of_sample_size=hynd_out_of_sample_size, scoring=hynd_scoring, scoring_args=hynd_scoring_args, 
                         with_intercept=hynd_with_intercept)


    non_seasonal_order = hyndman.order
    seasonal_order = hyndman.seasonal_order

    hynd_p = non_seasonal_order[0]
    hynd_d = non_seasonal_order[1]
    hynd_q = non_seasonal_order[2]

    if seasonal_order != (0, 0, 0, 0):
        print('Seasonal order found. Using only the Hyndman-Khandakar algorithm for the final model.')
        print(f'Hyndman-Khandakar algorithm returned SARIMA{non_seasonal_order}{seasonal_order[:-1]}{seasonal_order[-1]}')
        opt = 5
        
    ############################################################### Improved Hyndman-Khandkar algorithm
    data = input_data
    if opt==2 or opt == 3 or opt == 4:
        imp_hynd_p, imp_hynd_d, imp_hynd_q = non_seasonal_order
        remove = True

        iter = 0
        while remove == True:
            # Fit the ARIMA model
            arima_model = ARIMA(order=(imp_hynd_p, imp_hynd_d, imp_hynd_q), suppress_warnings=True)
            arima_model.fit(data)

            # Get the p-values of the coefficients
            p_values = arima_model.pvalues()
            # remove the first and last p-value number
            p_values = p_values[1:-1]

            # Get the coefficients
            coefficients = arima_model.params()
            # remove the first and last coefficient
            coefficients = coefficients[1:-1]

            if imp_hynd_p > 0:
                p_index = imp_hynd_p
                pp_value = p_values[p_index-1]
                pp_coeff = coefficients[p_index-1]
            else:
                pp_value = alpha
                pp_coeff = 100000

            if imp_hynd_q > 0:
                q_index = imp_hynd_p + imp_hynd_q
                qq_value = p_values[q_index-1]
                qq_coeff = coefficients[q_index-1]
            else:
                qq_value = alpha
                qq_coeff = 100000

            if (pp_value <= alpha and qq_value <= alpha):
                remove = False
                if iter == 0:
                    imp_print = 1
                break

            else:
                iter += 1
                imp_print = 2
                if pp_coeff < qq_coeff:
                    imp_hynd_p -= 1
                else:
                    imp_hynd_q -= 1


    if opt == 2:
        print(f'Freitas-Gouveia model returned ARIMA({lstm_p}, {lstm_d}, {lstm_q}).')
        if imp_print == 1:
            print(f'All coefficients are significant. Final order of the Hyndman-Khandkar algorithm is ARIMA({imp_hynd_p}, {imp_hynd_d}, {imp_hynd_q}).')
        else:
            print(f'The Improved Hyndman-Khandakar algorithm returned ARIMA({imp_hynd_p}, {imp_hynd_d}, {imp_hynd_q}).')
        return [imp_hynd_p, hynd_d, imp_hynd_q, lstm_p, hynd_d, lstm_q]
    
    elif opt == 3:
        print(f'Freitas-Gouveia model returned ARIMA({lstm_p}, {lstm_d}, {lstm_q}).')
        print(f'Hyndman-Khandakar algorithm returned ARIMA({hynd_p}, {hynd_d}, {hynd_q}).')
        if imp_print == 1:
            print(f'All coefficients are significant. Final order of the Hyndman-Khandkar algorithm is ARIMA({imp_hynd_p}, {imp_hynd_d}, {imp_hynd_q}).')
        else:
            print(f'The Improved Hyndman-Khandakar algorithm returned ARIMA({imp_hynd_p}, {imp_hynd_d}, {imp_hynd_q}).')
        return [hynd_p, hynd_d, hynd_q, imp_hynd_p, hynd_d, imp_hynd_q, lstm_p, hynd_d, lstm_q]
    elif opt == 4:
        if imp_print == 1:
            print(f'All coefficients are significant. Final order of the Hyndman-Khandkar algorithm is ARIMA({imp_hynd_p}, {imp_hynd_d}, {imp_hynd_q}).')
        else:
            print(f'The Improved Hyndman-Khandakar algorithm returned ARIMA({imp_hynd_p}, {imp_hynd_d}, {imp_hynd_q}).')
        return [imp_hynd_p, hynd_d, imp_hynd_q]

    elif opt == 5:
        print(f'Hyndman-Khandakar algorithm returned ARIMA({hynd_p}, {hynd_d}, {hynd_q}).')
        return [hynd_p, hynd_d, hynd_q]
        

In [13]:
p = random.randint(0, 5)
q = random.randint(0, 5)
i = random.randint(0, 500)
data_folder = 'C:\\Users\\Omen\\Documents\\GitHub\\Dissertation Code\\LSTM\\Data\\'
file_path = data_folder + f'Test\\TimeSeries_{p}_{q}\\ts_{i}.txt'
with open(file_path, 'r') as file:
    data = file.read().strip().split()  # Split the string into individual values
    ts_data = [float(val) for val in data]

print(f'Right answer: ARIMA({p}, 0, {q})')
print(f'Using time series {i}')

Right answer: ARIMA(5, 0, 0)
Using time series 296


In [14]:
freitas_gouveia(ts_data, opt=3)

Freitas-Gouveia model returned ARIMA(5, 0, 1).
Hyndman-Khandakar algorithm returned ARIMA(5, 0, 1).
The Improved Hyndman-Khandakar algorithm returned ARIMA(5, 0, 0).


[5, 0, 1, 5, 0, 0, 5, 0, 1]

In [8]:
# folder = 'C:\\Users\\Omen\\Documents\\GitHub\\Dissertation Code\\LSTM\\Data\\'
# for p in [0,1,2,3,4,5]:
#    for q in [0,1,2,3,4,5]:
#        for i in range(1,500+1,1):
#            file_path = folder + f'Test\\TimeSeries_{p}_{q}\\ts_{i}.txt'
#            with open(file_path, 'r') as file:
#                data = file.read().strip().split()  # Split the string into individual values
#                ts_data = [float(val) for val in data]
           
#            freitas_gouveia(ts_data, opt=3)