# **ARMA Model**

## 1. Import libraries

In [None]:
# Library imports
import numpy as np
import pandas as pd
import os
from config.paths import dir_input_cleaned
import matplotlib.pyplot as plt
import statsmodels.api as sm
plt.style.use('ggplot')
import warnings
warnings.filterwarnings("ignore", category=UserWarning)

In [None]:
import numpy as np
import statsmodels.api as sm

def automatic_arima(time_series, max_ar=3, max_ma=3, d=0, criterion="AIC", verbose=False):
    """
    Finds the best ARIMA model for a given time series based on the specified criterion (AIC or BIC).
    
    Parameters:
    - time_series (pd.Series): The time series data.
    - max_ar (int): Maximum value for the AR component.
    - max_ma (int): Maximum value for the MA component.
    - d (int): The order of differencing. Default is 0.
    - criterion (str): Criterion for model selection ('AIC' or 'BIC'). Default is 'AIC'.
    - verbose (bool): If True, prints each model's (p, q) order and criterion value.

    Returns:
    - best_model (ARIMAResultsWrapper): The fitted ARIMA model with the lowest criterion value.
    - best_order (tuple): The order of the ARIMA model (p, d, q) with the lowest criterion value.
    - best_criterion (float): The minimum criterion value (AIC or BIC).
    """
    best_criterion = np.inf
    best_order     = None
    best_model     = None

    # Check if criterion is valid
    if criterion not in ["AIC", "BIC"]:
        raise ValueError("Criterion must be 'AIC' or 'BIC'")

    # Loop through possible values of p and q
    for p in range(max_ar + 1):
        for q in range(max_ma + 1):
            try:
                # Define and fit the ARIMA model
                model = sm.tsa.ARIMA(time_series, order=(p, d, q))
                fitted_model = model.fit()

                # Select criterion
                model_criterion = fitted_model.aic if criterion == "AIC" else fitted_model.bic
                
                # Display each model's criterion if verbose is enabled
                if verbose:
                    print(f"ARIMA({p},{d},{q}) - {criterion}: {model_criterion:.2f}")

                # Compare criterion value with the best one found so far
                if model_criterion < best_criterion:
                    best_criterion = model_criterion
                    best_order = (p, d, q)
                    best_model = fitted_model

            except Exception as e:
                # Handle exceptions, such as convergence errors
                if verbose:
                    print(f"ARIMA({p},{d},{q}) failed to converge.")
                continue

    return best_model, best_order, best_criterion

## 2. Data set up

In [None]:
# Params
# ==============================================================================
target = "gdpc1"
gdp_lag = 1
lags_to_test = list(range(-2, 3))  # Lags of -2, -1, 0, 1, 2 months
start_train = "1947-01-01" 
start_val   = "2005-03-01"
start_test  = "2010-04-01"

# Data read and preprocessing
# ==============================================================================
data = pd.read_csv(os.path.join(dir_input_cleaned, "data_tf.csv"), parse_dates=["date"])
data = data.set_index("date")[[target]].asfreq("MS")

# Split data into training, validation, and test sets
data_train = data[start_train: "2005-02-01"]
data_val   = data[start_val: "2010-03-01"]
data_test  = data[start_test:]

# Plot training, validation, and test data
# ==============================================================================
plt.figure(figsize=(10, 6))
plt.plot(data_train[target].dropna(),label="Training")
plt.plot(data_val[target].dropna(),label="Validation")
plt.title('GDP growth for different sets', fontsize=16)
plt.xlabel('Time', fontsize=14)
plt.ylabel('GDP Growth', fontsize=14)
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

## 3. Training the model

In [None]:
# Fit ARIMA model and get AR, MA order
# ==============================================================================
best_model, best_order, _ = automatic_arima(data_train.dropna(), max_ar=3, max_ma=3, criterion="AIC", verbose=True)
print(f"Best ARIMA order: {best_order}")

## 4. Rolling nowcast on artificial data vintages

In [None]:
# Forecasting with lags
# ==============================================================================
val_range     = data_val.dropna().index                # Quarterly dates
actual_values = data_val.dropna().values               # Quarterly GDP values
data_train_val = pd.concat([data_train, data_val])     # Monthly data for training and validation

# Initialize a list to store results
results = []

# Prediction loop for each lag
for lag in lags_to_test:
    for prediction_date in val_range:
        # Prepare the test series up to the current prediction date
        test_series = data_train_val.loc[data_train_val.index <= prediction_date, target].copy().to_frame()

        # Simulate vintage by setting future values to NaN
        test_series.iloc[len(test_series) + lag - 1 - gdp_lag:, 0] = np.nan
        test_series.loc[test_series.index == prediction_date, target] = np.nan
        
        # Filter time series for ARIMA model fitting
        time_series = test_series.dropna().reset_index()
        ts_dates = time_series["date"]
        ts_values = time_series[target]

        # Fit the ARIMA model on the available time series
        arima_model = sm.tsa.ARIMA(ts_values, order = best_order)
        arima_fit = arima_model.fit()
        predictions = arima_fit.forecast(steps=10)

        # Create a complete date range for the predictions
        complete_dates = pd.date_range(ts_dates.iloc[0], periods=len(ts_dates) + 10, freq="3MS")
        final_predictions = list(ts_values) + list(predictions)
        
        # Compile predictions into a DataFrame
        prediction_results = pd.DataFrame({"date": complete_dates, "predicted_values": final_predictions})
        predicted_value = prediction_results.loc[prediction_results.date == prediction_date, "predicted_values"].values[0]
        
        # Append the results as a new row (including the date and predicted value for the current lag)
        results.append({"date": prediction_date, "predicted_value": predicted_value, "lag": lag})

# Create a DataFrame from the results list
predictions_df = pd.DataFrame(results)
predictions_by_lag_df = predictions_df.pivot(index='date', columns='lag', values='predicted_value')
predictions_by_lag_df.columns = ["Lag " + str(lag) + " Month(s)" for lag in predictions_by_lag_df.columns]
predictions_by_lag_df

## 5. Assess and visualize model performance

In [None]:
# Create a DataFrame for actual values
final_df = pd.merge(data_val.dropna(), predictions_by_lag_df, left_index=True, right_index=True)
final_df.rename(columns={target: "Actual GDP"}, inplace=True)

# RMSE Calculation
rmse_results = {}
for lag in predictions_by_lag_df.columns:
    # Calculate RMSE
    rmse = np.sqrt(((final_df[lag] - final_df["Actual GDP"]) ** 2).mean())
    rmse_results[lag] = rmse
    
rmse_df = pd.DataFrame(list(rmse_results.items()), columns=['Vintage', 'RMSE'])
rmse_df

In [None]:
plt.figure(figsize=(15, 10))

# Plot each lag series using a loop
for label in final_df.columns:
    linestyle = '-' if label == "Actual GDP" else '--'  # Solid line for actual, dashed for lags
    plt.plot(final_df[label], label=label, marker='o', markersize=10 ,linestyle=linestyle, linewidth=3)

# Add labels and title
plt.title('Predictions vs Actual GDP', fontsize=16)
plt.xlabel('Time', fontsize=14)
plt.ylabel('GDP Growth', fontsize=14)
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()