# (6) Factor Augmented Vector Autoregressive (FAVAR) Model

A factor augmented vector autoregressive (FAVAR) model with $p$ lags is defined by 

$$
Y_{t} = c + \Phi_{1}Y_{t-1} + ... + \Phi_{p}Y_{t-p} + e_{t} 
$$

where $Y_{t}$ is an $8 \times 1$ vector of endogenous variables, $c$ defines a vector of constant terms, $\Phi_{i}$ depicts our $8 \times 8$ packed coefficient matricies for each $i$ lag to be determined during model estimation, and $e_{t}$ represents the vector of errors. The endogenous variables include one target series to be forecasted and seven mutually orthogonal principal components extracted from a large information set. Principal component analysis is a dimensionality reduction technique that is designed to return mutually orthogonal latent factors that maximize the variability within the original information set.

Estimation is carried out by minimizing the forecast errors via an ordinary least squares (OLS)

$$
L(a_{1},...,a_{p}) = \sum_{t}(Y_{t+1} - f_{t+1|t})^{2}.
$$

The optimal lag length of $p$ is set by minimizing the root mean squared error (RMSE) over the validation set. The following code reestimates the FAVAR model each period using walk foreword cross-validation on the validation set with a fixed lag length (p). Additionally, to avoid data leakage, principal component weights are updated each period after every observation becomes known to reflect the release of the most recent information. The FAVAR model is constructed with 8 predictors (one target and seven principal components). Therefore, the vectors $(Y_{t},c,e_{t})$ are $8 \times 1$ and the matricies $\Phi_{i}$ is $8 \times 8$. Model validation is carried out using an 80-20 split. The initial training model is estimated on the first 80% of the training data. The training model weights are updated after each period. In other words, the training set expands with a window size of one-period. Therefore, model weights are always updated to reflect the most recent information. Walk foreword cross-validation carried out over the remaining 20% of the in-sample set. Each $h$-step ahead forecast is produced using linear model iteration. In the codes below, the phrase "test" actually references the “validation” set AND NOT an out-of-sample test set. 

The first block of code defines a function (MODEL) that takes in five arguments. The series to be forecasted is defined using the target argument. The information set is defined using feature_space. The p defines the number of autoregressive lags (AR_Lags) to set in the VAR model. The trend argument determines whether the model is estimated with a constant (c) or not (n) via the Const command. The number of multistep ahead forecasts are set using the step_size argument through horizons. The output of MODEL allows the researcher to analyze the number of observations in the training set (train_size), the training set predictions (train_pred), the test set predictions (test_pred), the training root mean squared error value (train_RMSE), the test set root mean squared error value (test_RMSE), the AIC, and BIC values. The evaluation metrics are stored in the Results DataFrame, which is used to determine the top-performing model via lag order selection. Therefore, the first block carries out our grid search methodology using walk-foreward cross-validation with the optimal lag length determined by validation set RMSE minimization. 

In [None]:
# Load Library:
from pandas import read_csv
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from statsmodels.tsa.api import VAR
from sklearn.metrics import mean_squared_error
from matplotlib import pyplot
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')
# Function to Fit Model using Walk Foreward Cross-Validation:
def MODEL(target, feature_space, p = 1, factors = 1, trend = 'n', step_size = 1):
    # Extracting Data:
    index_values = target.index.values
    # Inital Training & Test Set Sizes:
    train_size = int(len(target)*0.8)
    test_size = len(target) - train_size
    # Storage & Model Estimation:
    test_pred = []
    name = 'FAVAR('+str(p)+') Model'
    print('-'*len(name))
    print(name)
    print('-'*len(name))
    for t in range(test_size - step_size + 1):
        # Tracking Convergence:
        print('Test Set Walk Foreward: Iteration '+str(t+1))
        # Define Walk Foreward Training Set:
        target_train = target.values[:train_size+t]
        feature_space_train = feature_space.values[:train_size+t, :]
        # Define Walk Foreward Test Set:
        target_test = target.values[train_size+t:]
        feature_space_test = feature_space.values[train_size+t:, :]
        # Define Normalization Functions:
        feature_space_transform = StandardScaler().fit(feature_space_train)
        # Fit Normalization to Training Sets:
        feature_space_train = feature_space_transform.transform(feature_space_train)
        # Apply Normalization to Test Set:
        feature_space_test = feature_space_transform.transform(feature_space_test)
        # Define Principal Component Analysis Functions:
        feature_space_pca = PCA(n_components = factors, random_state = 1).fit(feature_space_train)
        # Fit Principal Component Analysis to Training Set:
        feature_space_train = feature_space_pca.transform(feature_space_train)
        # Apply Principal Component Analysis to Test Set:
        feature_space_test = feature_space_pca.transform(feature_space_test)
        # Compile Data For FAVAR:
        train_data = np.concatenate((target_train, feature_space_train), axis = 1)
        test_data = np.concatenate((target_test, feature_space_test), axis = 1)
        # Fit FAVAR(p) to Training Set:
        model = VAR(train_data)
        model_fit = model.fit(maxlags = p, trend = trend)
        if t == 0:
            train_pred = model_fit.fittedvalues[:,0]
            AIC = model_fit.aic
            BIC = model_fit.bic
        # N-Step Ahead Forecast:
        test_yhat = model_fit.forecast(y = train_data, steps = step_size)
        test_pred = np.append(test_pred, test_yhat[step_size-1,0])
    # Model Evaluation:
    train_RMSE = np.sqrt(mean_squared_error(target.values[p:train_size], train_pred))
    test_RMSE = np.sqrt(mean_squared_error(target.values[train_size+step_size-1:], test_pred))
    return train_RMSE, test_RMSE, AIC, BIC
# Setting Seed:
np.random.seed(12345)
# Load Data:
All_Data = read_csv('Compiled_Data.csv', header = 0, index_col = 0, parse_dates = True)
All_Data.index = pd.DatetimeIndex(All_Data.index.values, freq = "MS")
# Seperate Target From Feature Block:
housing_price = All_Data[['RHP']]
All_Data = All_Data.drop('RHP', axis = 1)
# Storage for Results & Hyperparameters:
Results = pd.DataFrame(columns = ['FAVAR(p)', 'p', 'Factors', 'Train_RMSE', 'Test_RMSE', 'AIC', 'BIC'])
# Setting Hyperparameters:
AR_Lags = range(1,37,1)
Factors = 7
Const = 'c'
horizons = 1
for p in AR_Lags:
    try:
        train_RMSE, test_RMSE, AIC, BIC = MODEL(target = housing_price, feature_space = All_Data, p = p, factors = Factors, trend = Const, step_size = horizons)
        model_performance = {'FAVAR(p)':'FAVAR('+str(p)+')', 'p':p, 'Factors':Factors, 'Train_RMSE':train_RMSE, 'Test_RMSE':test_RMSE, 'AIC':AIC, 'BIC':BIC}
        Results = Results.append(model_performance, ignore_index = True)            
    except:
        continue 

The second block of code reestimates the top performing model after determining the optimal lag length (p).

In [None]:
# Load Library:
from pandas import read_csv
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from statsmodels.tsa.api import VAR
from sklearn.metrics import mean_squared_error
from matplotlib import pyplot
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')
# Function to Fit Model using Walk Foreward Cross-Validation:
def MODEL(target, feature_space, p = 1, factors = 1, trend = 'n', step_size = 1):
    # Extracting Data:
    index_values = target.index.values
    # Inital Training & Test Set Sizes:
    train_size = int(len(target)*0.8)
    test_size = len(target) - train_size
    # Storage & Model Estimation:
    test_pred = []
    name = 'FAVAR('+str(p)+') Model'
    print('-'*len(name))
    print(name)
    print('-'*len(name))
    for t in range(test_size - step_size + 1):
        # Tracking Convergence:
        print('Test Set Walk Foreward: Iteration '+str(t+1))
        # Define Walk Foreward Training Set:
        target_train = target.values[:train_size+t]
        feature_space_train = feature_space.values[:train_size+t, :]
        # Define Walk Foreward Test Set:
        target_test = target.values[train_size+t:]
        feature_space_test = feature_space.values[train_size+t:, :]
        # Define Normalization Functions:
        feature_space_transform = StandardScaler().fit(feature_space_train)
        # Fit Normalization to Training Sets:
        feature_space_train = feature_space_transform.transform(feature_space_train)
        # Apply Normalization to Test Set:
        feature_space_test = feature_space_transform.transform(feature_space_test)
        # Define Block Principal Component Analysis Functions:
        feature_space_pca = PCA(n_components = factors, random_state = 1).fit(feature_space_train)
        # Fit Principal Component Analysis to Training Set:
        feature_space_train = feature_space_pca.transform(feature_space_train)
        # Apply Principal Component Analysis to Test Set:
        feature_space_test = feature_space_pca.transform(feature_space_test)
        # Compile Data For FAVAR:
        train_data = np.concatenate((target_train, feature_space_train), axis = 1)
        test_data = np.concatenate((target_test, feature_space_test), axis = 1)
        # Fit FAVAR(p) to Training Set:
        model = VAR(train_data)
        model_fit = model.fit(maxlags = p, trend = trend)
        if t == 0:
            train_pred = model_fit.fittedvalues[:,0]
            AIC = model_fit.aic
            BIC = model_fit.bic
        # N-Step Ahead Forecast:
        test_yhat = model_fit.forecast(y = train_data, steps = step_size)
        test_pred = np.append(test_pred, test_yhat[step_size-1,0])
    # Model Evaluation:
    train_RMSE = np.sqrt(mean_squared_error(target.values[p:train_size], train_pred))
    test_RMSE = np.sqrt(mean_squared_error(target.values[train_size+step_size-1:], test_pred))
    # Convert Data to DataFrame:
    train_pred = pd.DataFrame(train_pred, index = index_values[p:train_size], columns = ['train_pred'])
    test_pred = pd.DataFrame(test_pred, index = index_values[train_size + step_size - 1:], columns = ['test_pred'])
    return train_size, train_pred, test_pred, train_RMSE, test_RMSE, AIC, BIC
# Setting Seed:
np.random.seed(12345)
# Load Data:
All_Data = read_csv('Compiled_Data.csv', header = 0, index_col = 0, parse_dates = True)
All_Data.index = pd.DatetimeIndex(All_Data.index.values, freq = "MS")
# Seperate Target From Feature Block:
housing_price = All_Data[['RHP']]
All_Data = All_Data.drop('RHP', axis = 1)
# Setting Hyperparameters:
AR_Lags = Results.sort_values(by = 'Test_RMSE', ascending = True).iloc[0,1]
Factors = Results.sort_values(by = 'Test_RMSE', ascending = True).iloc[0,2]
Const = 'c'
horizons = 1
train_size, train_pred, test_pred, train_RMSE, test_RMSE, AIC, BIC = MODEL(target = housing_price, feature_space = All_Data, p = AR_Lags, factors = Factors, trend = Const, step_size = horizons)

The third block presents and graphs the stored output from the MODEL function. The MODEL above is fit to housing price data in order to forecast real housing price growth rates at the U.S. national level.

In [None]:
# Evaluate Autoregressive Moving Average Model: Growth Rate
print('-----------------------------')
print('National Housing Price Series')
print('-----------------------------')
print('Data Type: Growth Rates')
print('Model Type: FAVAR('+str(AR_Lags)+')')
print('Number of Factors Extracted: '+str(Factors))
print('Train RMSE: %.3f' % (train_RMSE))
print('Test RMSE: %.3f' % (test_RMSE))
print('AIC: %.3f' % (AIC))
print('BIC: %.3f' % (BIC))
# Plot Forecast: Growth Rate
sns.set_theme(style = 'whitegrid')
pyplot.figure(figsize = (12,6))
pyplot.plot(housing_price, label = 'Observed')
pyplot.plot(train_pred, label = 'FAVAR('+str(AR_Lags)+': Train')
pyplot.plot(test_pred, label = 'FAVAR('+str(AR_Lags)+'): Test')
pyplot.xlabel('Date')
pyplot.ylabel('Growth Rate')
pyplot.title('Real Housing Price Series (National)')
pyplot.legend()
pyplot.show()

The fourth block of code is used to analyze the forecast errors for stationarity. The forecast errors are computed, plotted, and distributed. Lastly, the autocorrelation function (ACF) is plotted and the Augmented Dickey-Fuller (ADF) unit root test is carried out.

In [None]:
# Load Library:
from statsmodels.tsa.stattools import adfuller
from statsmodels.graphics.tsaplots import plot_acf
# Define Residuals:
resids = housing_price[AR_Lags:train_size] - train_pred.values
# Plot Residuals:
sns.set_theme(style = 'whitegrid')
pyplot.figure(figsize = (16,4))
pyplot.subplot(1,2,1)
pyplot.plot(resids)
pyplot.xlabel('Date')
pyplot.title('Residual Series')
pyplot.subplot(1,2,2)
pyplot.hist(resids, bins = 20)
pyplot.title('Residual Distribution')
pyplot.tight_layout()
pyplot.show()
# Plot Autocorelation Function (ACF):
sns.set_theme(style = 'whitegrid')
fig, ax = pyplot.subplots(figsize=(8,4))
plot_acf(resids, title = 'Residual ACF', lags = 36, ax = ax)
pyplot.show()
# ADF Test: Non-Stationary v. Stationary
ADF_Test = adfuller(resids)
print('----------------------')
print('  ADF Unit-Root Test  ')
print('----------------------')
print('Test Statistic: %.3f' % (ADF_Test[0]))
print('P-Value: %.3f' % (ADF_Test[1]))
print('Critical Values:')
for key, value in ADF_Test[4].items():
    print('%s: %.3f' % (key, value))

The last block of code loads in the previous .csv files "National_Train_Growth_One" and "National_Test_Growth_One" that contain the stored forecasted values. The storage files are then augmented to include the predicted values from the current algorithm in order to estimate the forecast combinations, produce the final "top performing" model plots, and carry out the final comparison tests for predictive accuracy.

In [None]:
# Load Forecast Tables: 
train_forecasts = read_csv('National_Train_Growth_One.csv', header = 0, index_col = 0, parse_dates = True)
train_forecasts.index = pd.DatetimeIndex(train_forecasts.index.values, freq = "MS")
test_forecasts = read_csv('National_Test_Growth_One.csv', header = 0, index_col = 0, parse_dates = True)
test_forecasts.index = pd.DatetimeIndex(test_forecasts.index.values, freq = "MS")
# Add New Forecast Model:
train_forecasts['FAVAR'] = train_pred
test_forecasts['FAVAR'] = test_pred
# Save Forecast:
pd.DataFrame(train_forecasts).to_csv('National_Train_Growth_One.csv')
pd.DataFrame(test_forecasts).to_csv('National_Test_Growth_One.csv')