# <center><u> ARIMA Models (Core)
* Authored By: Eric N. Valdez
* Date 2/12/2024

## `ARIMA Models Core Assignment`
* Load [this Walmart stock data](https://drive.google.com/file/d/1KKR8TZbkixVN2NundM2mEVv5AhjHhs9a/view).     ([source](https://www.kaggle.com/datasets/meetnagadia/walmart-stock-price-from-19722022))
* We will use data from 2010 to 2020 to predict the Adjusted Close values for the next quarter

## `Imports:`

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from pmdarima.arima.utils import ndiffs
from statsmodels.tsa.stattools import adfuller
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.tsa.arima.model import ARIMA
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_absolute_error, mean_absolute_percentage_error
from datetime import timedelta
import statsmodels.tsa.api as tsa
from pmdarima.model_selection import train_test_split
import itertools

In [None]:
def plot_forecast(ts_train, ts_test, forecast_df, n_train_lags=None, 
                  figsize=(10,4), title='Comparing Forecast vs. True Data'):
    ### PLot training data, and forecast (with upper/,lower ci)
    fig, ax = plt.subplots(figsize=figsize)

    # setting the number of train lags to plot if not specified
    if n_train_lags==None:
        n_train_lags = len(ts_train)
            
    # Plotting Training  and test data
    ts_train.iloc[-n_train_lags:].plot(ax=ax, label="train")
    ts_test.plot(label="test", ax=ax)

    # Plot forecast
    forecast_df['mean'].plot(ax=ax, color='green', label="forecast")

    # Add the shaded confidence interval
    ax.fill_between(forecast_df.index, 
                    forecast_df['mean_ci_lower'],
                   forecast_df['mean_ci_upper'],
                   color='green', alpha=0.3,  lw=2)

    # set the title and add legend
    ax.set_title(title)
    ax.legend();
    
    return fig, ax

In [None]:
# Custom function for Ad Fuller Test
def get_adfuller_results(ts, alpha=.05, label='adfuller', **kwargs): #kwargs for adfuller()
    # Saving each output
    (test_stat, pval, nlags, nobs, crit_vals_d, 
    icbest ) = tsa.adfuller(ts, **kwargs)
    # Converting output to a dictionary with the interpretation of p
    adfuller_results = {'Test Statistic': test_stat,
                        "# of Lags Used":nlags, 
                       '# of Observations':nobs,
                        'p-value': round(pval,6),
                        'alpha': alpha,
                       'sig/stationary?': pval < alpha}
    return pd.DataFrame(adfuller_results, index =[label])

In [None]:
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score, mean_absolute_percentage_error

def regression_metrics_ts(ts_true, ts_pred, label="", verbose=True, output_dict=False,):
    # Get metrics
    mae = mean_absolute_error(ts_true, ts_pred)
    mse = mean_squared_error(ts_true, ts_pred)
    rmse = mean_squared_error(ts_true, ts_pred, squared=False)
    r_squared = r2_score(ts_true, ts_pred)
    mae_perc = mean_absolute_percentage_error(ts_true, ts_pred) * 100

    if verbose == True:
        # Print Result with label
        header = "---" * 20
        print(header, f"Regression Metrics: {label}", header, sep="\n")
        print(f"- MAE = {mae:,.3f}")
        print(f"- MSE = {mse:,.3f}")
        print(f"- RMSE = {rmse:,.3f}")
        print(f"- R^2 = {r_squared:,.3f}")
        print(f"- MAPE = {mae_perc:,.2f}%")

    if output_dict == True:
        metrics = {
            "Label": label,
            "MAE": mae,
            "MSE": mse,
            "RMSE": rmse,
            "R^2": r_squared,
            "MAPE(%)": mae_perc,
        }
        return metrics

In [None]:
def plot_acf_pacf(ts, nlags=40, figsize=(10, 5), 
                  annotate_sig=False, alpha=.05,
                 acf_kws={}, pacf_kws={},  
                  annotate_seas=False, m = None,
                 seas_color='black'):
    
    fig, axes = plt.subplots(nrows=2, figsize=figsize)

    
    # Sig lags line style
    sig_vline_kwargs = dict( ls=':', lw=1, zorder=0, color='red')

    # ACF
    tsa.graphics.plot_acf(ts, ax=axes[0], lags=nlags, **acf_kws)
    
    ## Annotating sig acf lags
    if annotate_sig == True:
        sig_acf_lags = get_sig_lags(ts,nlags=nlags,alpha=alpha, type='ACF')
        for lag in sig_acf_lags:
            axes[0].axvline(lag,label='sig', **sig_vline_kwargs )

    # PACF
    tsa.graphics.plot_pacf(ts,ax=axes[1], lags=nlags, **pacf_kws)
    
    ## Annotating sig pacf lags
    if annotate_sig == True:
        ## ANNOTATING SIG LAGS
        sig_pacf_lags = get_sig_lags(ts,nlags=nlags,alpha=alpha, type='PACF')
        for lag in sig_pacf_lags:
            axes[1].axvline(lag, label='sig', **sig_vline_kwargs)



    
    ### ANNOTATE SEASONS
    if annotate_seas == True:
        # Ensure m was defined
        if m is None:
            raise Exception("Must define value of m if annotate_seas=True.")

        ## Calculate number of complete seasons to annotate
        n_seasons = nlags//m 

        # Seasonal Lines style
        seas_vline_kwargs = dict( ls='--',lw=1, alpha=.7, color=seas_color, zorder=-1)
        
        ## for each season, add a line
        for i in range(1, n_seasons+1):
            axes[0].axvline(m*i, **seas_vline_kwargs, label="season")
            axes[1].axvline(m*i, **seas_vline_kwargs, label="season")

    fig.tight_layout()
    
    return fig

## `Load Data:`

In [None]:
# Loading Dataframe
df = pd.read_csv('Data/WMT.csv')
df.head()

In [None]:
# Looking at Info
df.info()

* ### `Make a datetime index using the Date Column with a business day frequency ('B')`

In [None]:
# Make a datetime index using the Date column with a business day frequency ('B')
df['Date'] = pd.to_datetime(df['Date'], errors='coerce')
df.set_index('Date', inplace=True)
df = df.asfreq('B')
df.info()

* ### `Check for and address null values`

In [None]:
# Checking for nulls
df.isnull().sum()

In [None]:
# Filling forward to address null values
df['Adj Close'] = df['Adj Close'].ffill()

In [None]:
# Rechecking my null values
df.isnull().sum()

In [None]:
# Rechecking my Information
df.info()

In [None]:
# Looking at the first 10 of the dataframe
df.head(10)

* ### `Check the time series for staionarity`
    * Determine the number of differencing (d) needed to make the data stationary. (We recommend using pmdarima's ndiffs)

In [None]:
# ploting raw data
plt.plot(ts, label='Walmart Stock Price')
plt.title('Walmart Stock Price')
plt.xlabel('Date')
plt.ylabel('Stock Price')
plt.show()

In [None]:
# Testing the raw data for stationarity
get_adfuller_results(df)

In [None]:
# Applying diffenrencing to raw data
d = ndiffs(df)
print(d)

In [None]:
# # raw data is not stationary, so we will apply differencing
# df_diff = df.diff().dropna()
# df_diff.plot()

# # Checking for stationarity
# get_adfuller_results(df_diff)

In [None]:
# use ndiffs to determine differencing
d = ndiffs(ts)
print(f'd is {d}')

In [None]:
# # Plot differenced data
# df_diff.plot();

* ### `Use ACF/PACF plots of the stationary data to estimate initial time series model orders (p, d, q)`

In [None]:
# Use differenced (stationary) data to plot ACF and PACF
plot_acf_pacf(ts_diff2);

* ### `Split the time series into training and test data.` <u>Use a test_size of one quarter (13 weeks X 5 business days)

In [None]:
# Calculating number of test lags
# Split the time series into training and test data
n_test_lags = 5*13

# Modeling to predict 6 months into the future
train, test = train_test_split(df, test_size=n_test_lags)
ax = train.plot(label='Train')
test.plot(ax=ax, label='Test')
ax.legend();

In [None]:
# from pmdarima.model_selection import train_test_split
# # Splitting the ts into ttd
# train, test = train_test_split(df, test_size = .25)
# ax = train.plot(label='Train')
# test.plot(ax=ax, label='Test')
# ax.legend();

* ### `Fit an ARIMA model based on the orders determined during your exploration.`
    * Make forecasts with your model
    * Plot the forecasts versus the test data.
    * Obtain metrics for evaluation.

In [None]:
# Build the model 
p = 0  # AR component 

d = 2  # Number of differencing required to make stationary

q =  1 # MA component 

# Define and fit the model
arima_model = tsa.ARIMA(train, order=(p,d,q)).fit()

In [None]:
# Obtain summary of forecast as dataframe
forecast_df = arima_model.get_forecast(len(test)).summary_frame()

# Plot the forecast with true values
plot_forecast(train, test, forecast_df)

# Obtain metrics
regression_metrics_ts(test, forecast_df['mean'])

In [None]:
# Obtain summary of model
arima_model.summary()

`Warnings:`
[1] Covariance matrix calculated using the outer products of gradients (complex-step).

In [None]:
# Obtain diagnostic plots
fig = arima_model.plot_diagnostics()
fig.set_size_inches(10,6)
fig.tight_layout()

* ### `Try` <u>`at least one variation`</u> `of the model for comparison (we recommend using a loop to try combinations of model orders)`

In [None]:
# Define the value or range of values for p, d, q
p_values = range(0, 7)  
d_values = [2]          
q_values = range(0, 7)  

# Create combinations of pdq to test
pdq_to_try = list(itertools.product(p_values, d_values, q_values))
                                            
pdq_to_try

In [None]:
# define starting aic as infinity
best_aic = float("inf")  

# define baseline for pdq
best_pdq = (0,0,0)

# Loop through each combination
for pdq in pdq_to_try:
    
    model = tsa.ARIMA(train, order=pdq)
                              
    result = model.fit()
    
    print(pdq, result.aic)      
    
    # If lower, replace best AIC with new value
    if result.aic < best_aic:
        
        best_aic = result.aic
        best_pdq = pdq

# Print the best orders and AIC score
print("Best AIC:", best_aic)
print("Best pdq:", best_pdq)

In [None]:
# Build the model with the best AIC
# Try at least one variation of the model for comparison (we recommend using a loop to try combinations of model orders)
p = 6  # AR component 

d = 2  # Number of differencing required to make stationary

q =  3 # MA component 

# Define and fit the model
tuned_model = tsa.ARIMA(train, order=(p,d,q)).fit()

* ### `A MAPE of less than 2% on hte test data is achievable.`

In [None]:
# Obtain summary of forecast as dataframe
forecast_df = tuned_model.get_forecast(len(test)).summary_frame()
# Plot the forecast with true values
plot_forecast(train, test, forecast_df)
# Obtain metrics
regression_metrics_ts(test, forecast_df['mean'])

In [None]:
# Obtain summary of forecast as dataframe
forecast_df = arima_model.get_forecast(len(test)).summary_frame()

# Plot the forecast with true values
plot_forecast(train, test, forecast_df)

# Obtain metrics
regression_metrics_ts(test, forecast_df['mean'])

* ### `Choose a final model and explain:`
  * How good was your model, according to your Mean Absolute Percentage Error?
  * How good was your model in terms of how well the forecast seems to follow the test data, visually? ***`(Hint: You may want to plot fewer training data lags to see this)`***