In [None]:
import pandas as pd
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn import set_config
set_config(transform_output="pandas")
plt.rcParams["figure.figsize"] = (12, 4)
sns.set_context("talk", font_scale=0.9)



In [None]:
​def regression_metrics(y_true, y_pred, label='', verbose = True, output_dict=False):
  # Get metrics
  mae = mean_absolute_error(y_true, y_pred)
  mse = mean_squared_error(y_true, y_pred)
  rmse = mean_squared_error(y_true, y_pred, squared=False) 
  r_squared = r2_score(y_true, y_pred)
  if verbose == True:
    # Print Result with Label and Header
    header = "-"*60
    print(header, f"Regression Metrics: {label}", header, sep='\n')
    print(f"- MAE = {mae:,.3f}")
    print(f"- MSE = {mse:,.3f}")
    print(f"- RMSE = {rmse:,.3f}")
    print(f"- R^2 = {r_squared:,.3f}")
  if output_dict == True:
      metrics = {'Label':label, 'MAE':mae,
                 'MSE':mse, 'RMSE':rmse, 'R^2':r_squared}
      return metrics


In [None]:
def evaluate_regression(reg, X_train, y_train, X_test, y_test, verbose = True,
                        output_frame=False):
  # Get predictions for training data
  y_train_pred = reg.predict(X_train)
 
  # Call the helper function to obtain regression metrics for training data
  results_train = regression_metrics(y_train, y_train_pred, verbose = verbose,
                                     output_dict=output_frame,
                                     label='Training Data')
  print()
  # Get predictions for test data
  y_test_pred = reg.predict(X_test)
  # Call the helper function to obtain regression metrics for test data
  results_test = regression_metrics(y_test, y_test_pred, verbose = verbose,
                                  output_dict=output_frame,
                                    label='Test Data' )
  
  # Store results in a dataframe if ouput_frame is True
  if output_frame:
    results_df = pd.DataFrame([results_train,results_test])
    # Set the label as the index 
    results_df = results_df.set_index('Label')
    # Set index.name to none to get a cleaner looking result
    results_df.index.name=None
    # Return the dataframe
    return results_df.round(3)



In [None]:
# import statsmodels base api module for the data
import statsmodels.api as sm
co2_data = sm.datasets.co2.load_pandas()
df = co2_data.data
df.head()



In [None]:
# Visualize the data
ax = df.plot()
ax.set(ylabel="CO2 (PPM)", xlabel="Week", title="CO2 Levels Over Time");



In [None]:
# Check for null values
df.isna().sum()



In [None]:
# Impute null values
df = df.interpolate()
df.isna().sum()



In [None]:
# Let's add a column with the values from the previous week (one lag)
df["t-1"] = df["co2"].shift(1)
df.head()



In [None]:
# Loop to add columns with lags 1-4
for i in range(1, 5):
    df[f"t-{i}"] = df["co2"].shift(i)
df



In [None]:
# Dropping early rows with NA values
df_model = df.dropna()
df_model



In [None]:
# Our target is co2, the actual value for that rows' date
y = df_model["co2"]
X = df_model.drop(columns="co2")
X


In [None]:
# calculating integer index for 75%/25% split
idx_split = round(len(X) * 0.75)
idx_split



In [None]:
# What date corresponds to 75%?
split_date = X.index[idx_split]
split_date



In [None]:
# Time Series train-test-split
# All data before split date is training
X_train = X.loc[:split_date]
y_train = y.loc[:split_date]
# All data after split date is testing
X_test = X.loc[split_date:]
y_test = y.loc[split_date:]



In [None]:
# Plotting the training and test data
ax = y_train.plot(label="Training Data")
y_test.plot(ax=ax, label="Test Data")
# Saving the date as a string for matplotlib
split_date_str = split_date.strftime("%Y-%m-%d")
# Annotating the split data
ax.axvline(
    split_date_str, color="black", ls="--", lw=1.0, label=f"Split Date:{split_date_str}"
)
ax.set(ylabel="CO2 (PPM)", xlabel="Week", title="CO2 Levels Over Time")
ax.legend();


In [None]:
# Instantiate a Linear Regression Model
lin_reg = LinearRegression()
# Fit on the training data
lin_reg.fit(X_train, y_train)



In [None]:
# Making a Series of our test predictions
y_pred_test = lin_reg.predict(X_test)
y_pred_test = pd.Series(y_pred_test, index=y_test.index)
y_pred_test



In [None]:
# Plotting the training and test data and predictions for test
ax = y_train.plot(label="Training Data")
y_test.plot(ax=ax, label="Test Data")
y_pred_test.plot(ax=ax, label="Predicted", color="green")
# Saving the date as a string for matplotlib
split_date_str = split_date.strftime("%Y-%m-%d")
# Annotating the split data
ax.axvline(
    split_date_str, color="black", ls="--", lw=1.0, label=f"Split Date:{split_date_str}"
)
ax.set(ylabel="CO2 (PPM)", xlabel="Week", title="CO2 Levels Over Time")
ax.legend();



In [None]:
# Getting predictions and metrics
evaluate_regression(lin_reg, X_train, y_train, X_test, y_test)



# Stationarity

In [None]:
import pandas as pd
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import  statsmodels.tsa.api as tsa # new import
from sklearn import set_config
#set_config(transform_output="pandas")
plt.rcParams["figure.figsize"] = (12, 4)
sns.set_context("talk", font_scale=0.9)
# set random seed
SEED = 321
np.random.seed(SEED)



In [None]:
# Calculating a simulated white noise time series (one value)
c = 49
noise_t = np.random.normal()
# Add the random value to the mean to get the duration of the lap for one day
y_t= c + noise_t
y_t



In [None]:
# Calculating a simulated white noise time series for 120 days
c = 49
n_lags = 120
y = []
for t in range(n_lags):
    
    noise_t = np.random.normal(size=1)
    y_t = c + noise_t[0] # slicing 0 to get value instead of arrays
    y.append(y_t)



In [None]:
# Convet list to a Pandas Series
ts_white_noise = pd.Series(y, name='Simulated White Noise')     
ts_white_noise



In [None]:
# Plotting the white noise series with annotated mean
ax = ts_white_noise.plot()
ax.set(title='Bus Route Duration\n(Simulated White Noise Series',
       ylabel='$Y_t', xlabel='observation/time');
ax.axhline( ts_white_noise.mean(),  color='k', ls='--',
           label=f'mean: {ts_white_noise.mean():.2f}');
ax.legend();



In [None]:
# Running the adfuller test to demonstrate return
tsa.adfuller(ts_white_noise)



In [None]:
# Saving each output separately
(test_stat, pval, nlags, nobs, crit_vals_d, icbest) = tsa.adfuller(ts_white_noise)



In [None]:
# Saving the most important results as a dictionary
adfuller_results = {'Test Statistic': test_stat,
                    "# of Lags Used":nlags, 
                   '# of Observations':nobs,
                    'p-value': round(pval,6)}



In [None]:
# Adding interpretation of p-value to dictionary
alpha =.05
adfuller_results['sig/stationary?'] = pval < alpha
adfuller_results



In [None]:
# Convert dictionary of results to a dataframe
adfuller_df = pd.DataFrame(adfuller_results, index=['AD Fuller Test'])
adfuller_df



In [None]:
# Custom function for Ad Fuller Test
def get_adfuller_results(ts, alpha=.05, label='adfuller', **kwargs): #kwargs for adfuller()
    # Saving each output
    (test_stat, pval, nlags, nobs, crit_vals_d, 
    icbest ) = tsa.adfuller(ts, **kwargs)
    # Converting output to a dictionary with the interpretation of p
    adfuller_results = {'Test Statistic': test_stat,
                        "# of Lags Used":nlags, 
                       '# of Observations':nobs,
                        'p-value': round(pval,6),
                        'alpha': alpha,
                       'sig/stationary?': pval < alpha}
    return pd.DataFrame(adfuller_results, index =[label])



In [None]:
# Testing the function
adfuller_results = get_adfuller_results(ts_white_noise, label='White Noise')
adfuller_results



# 
Autocorrelation

In [None]:
# Shift the data one step 
ts_lag1 = ts_white_noise.shift(1)
ts_lag1 = ts_lag1.rename('Lag 1')
ts_lag1.head()



In [None]:
# Combine original ts + with lag 1
ts_lagged = pd.concat([ts_white_noise,ts_lag1], axis=1)
ts_lagged.head()



In [None]:
# Checking for correlation
ts_lagged.corr()



In [None]:
# Generate 20 time-shifted columns
ts_lagged = pd.DataFrame()
total_shifts = 20
for t in range(0,total_shifts+1):    
    ts_lagged[f"Lag {t}"] =  ts_white_noise.shift(t)
ts_lagged.head()



In [None]:
# Calculate correlations for all values
corr = ts_lagged.corr()
corr.head()



In [None]:
# Slice out the original ts (lag 0)
auto_corr = corr['Lag 0']
auto_corr



In [None]:
# Plot the calculated correlations
ax = auto_corr.plot(style='o:')
ax.axhline(0, color='k', ls='--', zorder=-1);
ax.set(ylabel='Autocorrelation', xlabel='Time Lag');


In [None]:
# Plotting autocorrelation with built in function
fig = tsa.graphics.plot_acf(ts_white_noise);



In [None]:
# Calculating a simulated random walk 
first_t = 150
y_walk = [first_t]
n_lags=120
for t in range(1,n_lags):
    # get the previous time lag's value
    y_prev_t = y_walk[t-1]
    
    # Get new noise
    noise_t = np.random.normal(size=1)
    
    # Add noise on to previous value
    y_t = y_prev_t + noise_t[0] # slicing 0 to get value instead of arrays
    y_walk.append(y_t)
    
ts_rand_walk = pd.Series(y_walk, name='Simulated Random Walk')    
ts_rand_walk



In [None]:
# Visualize the random walk time series
ax = ts_rand_walk.plot(label='Random Walk')
ax.set(ylabel='$Y_t', title='Simulated Random Walk Time Series');
ax.axhline(ts_rand_walk.mean(), color='black', ls=':', label='mean');
ax.legend();



In [None]:
# Test random walk forstationarity
get_adfuller_results(ts_rand_walk)



In [None]:
# Check for autocorrelation
tsa.graphics.plot_acf(ts_rand_walk);



In [None]:
# Orignal random walk
ts_rand_walk.head()



In [None]:
# Differenced random walk
ts_rand_walk.diff().head()



In [None]:
# Plot the differenced random walk
ts_rand_walk_diff = ts_rand_walk.diff().dropna()
ts_rand_walk_diff.plot(title='Random Walk - Differenced');



In [None]:
# Test differenced random walk for stationarity
get_adfuller_results(ts_rand_walk_diff)



In [None]:
# Check differenced random walk for autocorrelation
tsa.graphics.plot_acf(ts_rand_walk_diff)



In [None]:
# Creating simulated random walk with a drift
first_t = 150
c = .3
y_walk_drift = [first_t]
for t in range(1,n_lags):
    # get the previous time lag's value
    y_prev_t = y_walk_drift[t-1]
    
    # Get new noise
    noise_t = np.random.normal(size=1)
    # Add noise on to previous value
    y_t = c + y_prev_t + noise_t[0]# slicing 0 to get value instead of arrays
    y_walk_drift.append(y_t)
    
ts_rand_walk_drift = pd.Series(y_walk_drift, name = "Simulated Random walk (+dift)")    
ax = ts_rand_walk_drift.plot()
ax.set(ylabel='$Y_t, title='Simulated Random Walk with a Drift Time Series');
ax.axhline(ts_rand_walk_drift.mean(), color='black', ls=':');



In [None]:
# compare random walk with a drift vs without
ax = ts_rand_walk_drift.plot(label='Random Walk with a Drift')
ts_rand_walk.plot(ax=ax, label='Random Walk (no drift)')
ax.legend();



In [None]:
# Test random walk with a drift for stationarity
get_adfuller_results(ts_rand_walk_drift)



In [None]:
# Test random walk a with a drift for autocorrelation
tsa.graphics.plot_acf(ts_rand_walk_drift);



In [None]:
# Difference the random walk with a drift
ts_rand_walk_drift_diff = ts_rand_walk_drift.diff().dropna()
# Visualize the differenced random walk with a drift
ts_rand_walk_drift_diff.plot(title='Random Walk - Differenced');



In [None]:
# Confirm that a differenced random walk with drift is stationary
get_adfuller_results(ts_rand_walk_drift_diff)



In [None]:
# Check the differenced random walk with drift for autocorrelation
tsa.graphics.plot_acf(ts_rand_walk_drift_diff)



# Autoregressive (AR) Models

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import statsmodels.tsa.api as tsa
from pmdarima.model_selection import train_test_split
from pmdarima.arima.utils import ndiffs


In [None]:
# Custom function for Ad Fuller Test
def get_adfuller_results(ts, alpha=.05, label='adfuller', **kwargs): #kwargs for adfuller()
    # Saving each output
    (test_stat, pval, nlags, nobs, crit_vals_d, 
    icbest ) = tsa.adfuller(ts, **kwargs)
    # Converting output to a dictionary with the interpretation of p
    adfuller_results = {'Test Statistic': test_stat,
                        "# of Lags Used":nlags, 
                       '# of Observations':nobs,
                        'p-value': round(pval,6),
                        'alpha': alpha,
                       'sig/stationary?': pval < alpha}
    return pd.DataFrame(adfuller_results, index =[label])


In [None]:
# Load in data
ts = pd.read_csv('AR_lesson_ts.csv', index_col = 'date')
ts.head()



In [None]:
# Make index datetime
ts.index = pd.to_datetime(ts.index)
# We have weekly data so we will set our frequency to W
ts.index.freq= "W"
# Make a series
ts = ts['dollars']



In [None]:
# Check for nulls
ts.isna().sum().sum()


In [None]:
# Plot the time series
ts.plot();



In [None]:
# Make acf plot of raw data to look for seasonality
tsa.graphics.plot_acf(ts);



In [None]:
# Call custom function to check to see if data is stationary
get_adfuller_results(ts)



In [None]:
from pmdarima.arima.utils import ndiffs
# use ndiffs to determine differencing
d = ndiffs(ts)
print(f'd is {d}')



In [None]:
# Make acf plot to determine if this is AR or MA 
tsa.graphics.plot_acf(ts);



In [None]:
# Examine the PACF plot to determine order of AR model
tsa.graphics.plot_pacf(ts);



In [None]:
from pmdarima.model_selection import train_test_split
# tts
train, test = train_test_split(ts, test_size=.20)
# Visualize the train and test data
ax = train.plot(label='Train')
test.plot(ax=ax, label='Test')
ax.legend();



In [None]:
# proper way to import tsa submodule
import statsmodels.tsa.api as tsa



We will discuss ARIMA models in depth in an upcoming lesson, but for now, the main thing to know is that an ARIMA model's order is defined by 3 parameters (p,d,q):
- 
AR(p): an auto-regressive component, building coefficients for previous lagged value- s
Integration(d): applying differencing to achieve stationari- ty
MA(q): a moving average component, building coefficients based on the models' errors at previous time lags.

In [None]:
# First define the orders (p,d,q)
p = 1 # AR(1) model based on significant lags in PACF
d = 0 # No differcing needed to make stationary
q = 0 # q will be used for MA models (set to 0 for an AR only model)
# Now instantiate the model with the data and fit
ar_1_model = tsa.ARIMA(train, order = (p,d,q)).fit()
ar_1_model



In [None]:
# Obtain the parameters of the fit model
ar_1_model.params



In [None]:
# Obtain forecast as a dataframe with confidence intervals
forecast_df = ar_1_model.get_forecast(steps=len(test)).summary_frame()
forecast_df.head()



In [None]:
def plot_forecast(ts_train, ts_test, forecast_df, n_train_lags=None, 
                  figsize=(10,4), title='Comparing Forecast vs. True Data'):
    ### PLot training data, and forecast (with upper/,lower ci)
    fig, ax = plt.subplots(figsize=figsize)
    # setting the number of train lags to plot if not specified
    if n_train_lags==None:
        n_train_lags = len(ts_train)
            
    # Plotting Training  and test data
    ts_train.iloc[-n_train_lags:].plot(ax=ax, label="train")
    ts_test.plot(label="test", ax=ax)
    # Plot forecast
    forecast_df['mean'].plot(ax=ax, color='green', label="forecast")
    # Add the shaded confidence interval
    ax.fill_between(forecast_df.index, 
                    forecast_df['mean_ci_lower'],
                   forecast_df['mean_ci_upper'],
                   color='green', alpha=0.3,  lw=2)
    # set the title and add legend
    ax.set_title(title)
    ax.legend();
    
    return fig, ax



In [None]:
# Call the custom function to plot the forecasts with confidence intervals and true values
plot_forecast(train, test, forecast_df);


Add MAPE(Mean Absolute Percentage Error) to function

# MA Models

In [None]:
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score, mean_absolute_percentage_error

def regression_metrics_ts(ts_true, ts_pred, label="", verbose=True, output_dict=False,):
    # Get metrics
    mae = mean_absolute_error(ts_true, ts_pred)
    mse = mean_squared_error(ts_true, ts_pred)
    rmse = mean_squared_error(ts_true, ts_pred, squared=False)
    r_squared = r2_score(ts_true, ts_pred)
    mae_perc = mean_absolute_percentage_error(ts_true, ts_pred) * 100

    if verbose == True:
        # Print Result with label
        header = "---" * 20
        print(header, f"Regression Metrics: {label}", header, sep="\n")
        print(f"- MAE = {mae:,.3f}")
        print(f"- MSE = {mse:,.3f}")
        print(f"- RMSE = {rmse:,.3f}")
        print(f"- R^2 = {r_squared:,.3f}")
        print(f"- MAPE = {mae_perc:,.2f}%")

    if output_dict == True:
        metrics = {
            "Label": label,
            "MAE": mae,
            "MSE": mse,
            "RMSE": rmse,
            "R^2": r_squared,
            "MAPE(%)": mae_perc,
        }
        return metrics



In [None]:
# Obtain metrics
regression_metrics_ts(test, forecast_df['mean'])



In [None]:
# Custom function for Ad Fuller Test
def get_adfuller_results(ts, alpha=.05, label='adfuller', **kwargs): #kwargs for adfuller()
    # Saving each output
    (test_stat, pval, nlags, nobs, crit_vals_d, 
    icbest ) = tsa.adfuller(ts, **kwargs)
    # Converting output to a dictionary with the interpretation of p
    adfuller_results = {'Test Statistic': test_stat,
                        "# of Lags Used":nlags, 
                       '# of Observations':nobs,
                        'p-value': round(pval,6),
                        'alpha': alpha,
                       'sig/stationary?': pval < alpha}
    return pd.DataFrame(adfuller_results, index =[label])



In [None]:
# Load in data
ts = pd.read_csv('MA_lesson_ts.csv', index_col = 'date')
ts.head()



In [None]:
# We have monthly data so we will set our frequency to M
ts.index.freq= "M"
# Define the series
ts = ts['value']


In [None]:
# Check for nulls
ts.isna().sum()



In [None]:
# Plot the time series
ts.plot();



In [None]:
# Make acf plot of raw data to look for seasonality
tsa.graphics.plot_acf(ts);



In [None]:
# Call custom function to check to see if data is stationary
get_adfuller_results(ts)



In [None]:
# check differencing with ndiffs
d = ndiffs(ts)
print (f'd = {d}')



In [None]:
### NEW FUNCTION FOR COMBINED ACF/PACF WITH ANNOTATIONS
def plot_acf_pacf(ts, nlags=40, figsize=(10, 5), 
                  annotate_sig=False, alpha=.05,
                 acf_kws={}, pacf_kws={},  
                  annotate_seas=False, m = None,
                 seas_color='black'):
    
    fig, axes = plt.subplots(nrows=2, figsize=figsize)
    
    # Sig lags line style
    sig_vline_kwargs = dict( ls=':', lw=1, zorder=0, color='red')
    # ACF
    tsa.graphics.plot_acf(ts, ax=axes[0], lags=nlags, **acf_kws)
    
    ## Annotating sig acf lags
    if annotate_sig == True:
        sig_acf_lags = get_sig_lags(ts,nlags=nlags,alpha=alpha, type='ACF')
        for lag in sig_acf_lags:
            axes[0].axvline(lag,label='sig', **sig_vline_kwargs )
    # PACF
    tsa.graphics.plot_pacf(ts,ax=axes[1], lags=nlags, **pacf_kws)
    
    ## Annotating sig pacf lags
    if annotate_sig == True:
        ## ANNOTATING SIG LAGS
        sig_pacf_lags = get_sig_lags(ts,nlags=nlags,alpha=alpha, type='PACF')
        for lag in sig_pacf_lags:
            axes[1].axvline(lag, label='sig', **sig_vline_kwargs)
    
    ### ANNOTATE SEASONS
    if annotate_seas == True:
        # Ensure m was defined
        if m is None:
            raise Exception("Must define value of m if annotate_seas=True.")
        ## Calculate number of complete seasons to annotate
        n_seasons = nlags//m 
        # Seasonal Lines style
        seas_vline_kwargs = dict( ls='--',lw=1, alpha=.7, color=seas_color, zorder=-1)
        
        ## for each season, add a line
        for i in range(1, n_seasons+1):
            axes[0].axvline(m*i, **seas_vline_kwargs, label="season")
            axes[1].axvline(m*i, **seas_vline_kwargs, label="season")
    fig.tight_layout()
    
    return fig



In [None]:
# Let's call our custom acf/pacf plot on our stationary (d = 0) data
plot_acf_pacf(ts);



In [None]:
# tts
train, test = train_test_split(ts, test_size=.20)
# Visualize the train and test data
ax = train.plot(label='Train')
test.plot(ax=ax, label='Test')
ax.legend();


In [None]:
# First define the orders (p,d,q)
p = 0 # p is us ed for AR models (set to 0 for an MA only model)
d = 0 # no differcing was required to make the data stationary
q = 2 # q based on significant lags in ACF
# Now instantiate the model with the data and fit
ma_2_model = tsa.ARIMA(train, order = (p,d,q)).fit()
ma_2_model



In [None]:
# Obtain the parameters of the fit model
ma_2_model.params



In [None]:
# Obtain forecast as a dataframe with confidence intervals
forecast_df = ma_2_model.get_forecast(steps=len(test)).summary_frame()
# Call the custom function to plot the forecasts with confidence intervals and true values
plot_forecast(train, test, forecast_df);



In [None]:
# Load in data
ts = pd.read_csv('AR_MA_lesson_ts.csv', parse_dates = ['date'], index_col = 'date')
ts.head()


In [None]:
# We have daily data so we will set our frequency to D
ts.index.freq= "D"
ts.index



In [None]:
# Check for nulls
ts.isna().sum().sum()



In [None]:
# Plot the time series
ts.plot();



In [None]:
# Call custom function to check to see if data is stationary
get_adfuller_results(ts)



In [None]:
# Check to see how many differencing are needed
ndiffs(ts)


In [None]:
# We need to apply differencing one time
ts_diff = ts.diff().dropna()
# Confirm stationarity with adfuller test
get_adfuller_results(ts_diff)


In [None]:
# Let's call our custom acf/pacf plot on our stationary (d = 1) data
plot_acf_pacf(ts_diff);



In [None]:
# First define the orders (p,d,q)
p = 1 # p is used for AR component of AR-MA model
d = 1 # 1 differcing needed to make stationary
q = 1 # q is used for MA component of AR-MA model
# Now instantiate the model with the data and fit
ar_ma_model = tsa.ARIMA(train, order = (p,d,q)).fit()
ar_ma_model



In [None]:
# Obtain the parameters of the fit model
ar_ma_model.params



# Comparing Models

In [None]:
# Obtain summary of model
ma_2_model.summary()



In [None]:
# Obtain diagnostic plots
fig = ma_2_model.plot_diagnostics()
fig.set_size_inches(10,6)
fig.tight_layout()



In [None]:
# First define the orders (p,d,q)
p = 0 
d = 0 
q = 1 

# Now instantiate the model with the data and fit
ma_1_model = tsa.ARIMA(train, order = (p,d,q)).fit()



In [None]:
# Obtain model summary
ma_1_model.summary()



In [None]:
# Obtain diagnostic plots
fig = ma_1_model.plot_diagnostics()
fig.set_size_inches(10,6)
fig.tight_layout()



In [None]:
# First define the orders (p,d,q)
p = 1 
d = 0 
q = 2  

# Now instantiate the model with the data and fit
ar_1_ma_2_model = tsa.ARIMA(train, order = (p,d,q)).fit()



In [None]:
# Obtain diagnostic plots
fig = ar_1_ma_2_model.plot_diagnostics()
fig.set_size_inches(10,6)
fig.tight_layout()



In [None]:
# Looking at AIC of all three models
print(f'MA(2) had an AIC of {ma_2_model.aic.round(2)}')
print(f'MA(1) had an AIC of {ma_1_model.aic.round(2)}')
print(f'AR(1)MA(2) had an AIC of {ar_1_ma_2_model.aic.round(2)}')



In [None]:
​# Looking at BIC of all three models
print(f'MA(2) had a BIC of {ma_2_model.bic.round(2)}')
print(f'MA(1) had a BIC of {ma_1_model.bic.round(2)}')
print(f'AR(1)MA(2) had a BIC of {ar_1_ma_2_model.bic.round(2)}')


# ARIMA Models

In [None]:
​import pandas as pd
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
import seaborn as sns
import os
import statsmodels.tsa.api as tsa
from pmdarima.model_selection import train_test_split
from pmdarima.arima.utils import ndiffs

# Set wide fig size for plots
plt.rcParams['figure.figsize']=(12,3)


In [None]:
def plot_forecast(ts_train, ts_test, forecast_df, n_train_lags=None, 
                  figsize=(10,4), title='Comparing Forecast vs. True Data'):
    ### PLot training data, and forecast (with upper/,lower ci)
    fig, ax = plt.subplots(figsize=figsize)

    # setting the number of train lags to plot if not specified
    if n_train_lags==None:
        n_train_lags = len(ts_train)
            
    # Plotting Training  and test data
    ts_train.iloc[-n_train_lags:].plot(ax=ax, label="train")
    ts_test.plot(label="test", ax=ax)

    # Plot forecast
    forecast_df['mean'].plot(ax=ax, color='green', label="forecast")

    # Add the shaded confidence interval
    ax.fill_between(forecast_df.index, 
                    forecast_df['mean_ci_lower'],
                   forecast_df['mean_ci_upper'],
                   color='green', alpha=0.3,  lw=2)

    # set the title and add legend
    ax.set_title(title)
    ax.legend();
    
    return fig, ax



In [None]:
# Custom function for Ad Fuller Test
def get_adfuller_results(ts, alpha=.05, label='adfuller', **kwargs): #kwargs for adfuller()
    # Saving each output
    (test_stat, pval, nlags, nobs, crit_vals_d, 
    icbest ) = tsa.adfuller(ts, **kwargs)
    # Converting output to a dictionary with the interpretation of p
    adfuller_results = {'Test Statistic': test_stat,
                        "# of Lags Used":nlags, 
                       '# of Observations':nobs,
                        'p-value': round(pval,6),
                        'alpha': alpha,
                       'sig/stationary?': pval < alpha}
    return pd.DataFrame(adfuller_results, index =[label])



In [None]:
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score, mean_absolute_percentage_error

def regression_metrics_ts(ts_true, ts_pred, label="", verbose=True, output_dict=False,):
    # Get metrics
    mae = mean_absolute_error(ts_true, ts_pred)
    mse = mean_squared_error(ts_true, ts_pred)
    rmse = mean_squared_error(ts_true, ts_pred, squared=False)
    r_squared = r2_score(ts_true, ts_pred)
    mae_perc = mean_absolute_percentage_error(ts_true, ts_pred) * 100

    if verbose == True:
        # Print Result with label
        header = "---" * 20
        print(header, f"Regression Metrics: {label}", header, sep="\n")
        print(f"- MAE = {mae:,.3f}")
        print(f"- MSE = {mse:,.3f}")
        print(f"- RMSE = {rmse:,.3f}")
        print(f"- R^2 = {r_squared:,.3f}")
        print(f"- MAPE = {mae_perc:,.2f}%")

    if output_dict == True:
        metrics = {
            "Label": label,
            "MAE": mae,
            "MSE": mse,
            "RMSE": rmse,
            "R^2": r_squared,
            "MAPE(%)": mae_perc,
        }
        return metrics



In [None]:
def plot_acf_pacf(ts, nlags=40, figsize=(10, 5), 
                  annotate_sig=False, alpha=.05,
                 acf_kws={}, pacf_kws={},  
                  annotate_seas=False, m = None,
                 seas_color='black'):
    
    fig, axes = plt.subplots(nrows=2, figsize=figsize)

    
    # Sig lags line style
    sig_vline_kwargs = dict( ls=':', lw=1, zorder=0, color='red')

    # ACF
    tsa.graphics.plot_acf(ts, ax=axes[0], lags=nlags, **acf_kws)
    
    ## Annotating sig acf lags
    if annotate_sig == True:
        sig_acf_lags = get_sig_lags(ts,nlags=nlags,alpha=alpha, type='ACF')
        for lag in sig_acf_lags:
            axes[0].axvline(lag,label='sig', **sig_vline_kwargs )

    # PACF
    tsa.graphics.plot_pacf(ts,ax=axes[1], lags=nlags, **pacf_kws)
    
    ## Annotating sig pacf lags
    if annotate_sig == True:
        ## ANNOTATING SIG LAGS
        sig_pacf_lags = get_sig_lags(ts,nlags=nlags,alpha=alpha, type='PACF')
        for lag in sig_pacf_lags:
            axes[1].axvline(lag, label='sig', **sig_vline_kwargs)



    
    ### ANNOTATE SEASONS
    if annotate_seas == True:
        # Ensure m was defined
        if m is None:
            raise Exception("Must define value of m if annotate_seas=True.")

        ## Calculate number of complete seasons to annotate
        n_seasons = nlags//m 

        # Seasonal Lines style
        seas_vline_kwargs = dict( ls='--',lw=1, alpha=.7, color=seas_color, zorder=-1)
        
        ## for each season, add a line
        for i in range(1, n_seasons+1):
            axes[0].axvline(m*i, **seas_vline_kwargs, label="season")
            axes[1].axvline(m*i, **seas_vline_kwargs, label="season")

    fig.tight_layout()
    
    return fig



In [None]:
# Load in stock data
ts = pd.read_csv("Data/msft-daily-closing-price.csv", 
                  parse_dates=['Date'], index_col='Date')
ts.head()



In [None]:
# Filter for 2015-2019 and only the adj close value
ts = ts.loc['2015':'2019', 'Adj Close']
ts.plot();​


In [None]:
# Preview the index
ts.index



In [None]:
# Resample for business day with 'B'
ts = ts.resample('B').asfreq()
ts



In [None]:
# Check for nulls
ts.isna().sum()



In [None]:
# Inspect null values
null = ts.isna()
ts[null].head(20)



In [None]:
# Fill missing values with previous value
ts = ts.fillna(method='ffill')
ts.plot();



In [None]:
# Testing the raw data for stationarity
get_adfuller_results(ts)



In [None]:
# Differencing the data once
ts_diff = ts.diff().dropna()
ts_diff.plot()
# Checking for stationarity
get_adfuller_results(ts_diff)



In [None]:
# use ndiffs to determine differencing
d = ndiffs(ts)
print(f'd is {d}')


In [None]:
# Difference twice (d = 2)
ts_diff2 =ts.diff().diff().dropna()



In [None]:
plot_acf_pacf(ts_diff2);


In [None]:
# Calculating number of test lags
n_test_lags = 5*26

# Modeling to predict 6 months into the future
train, test = train_test_split(ts, test_size=n_test_lags)
ax = train.plot(label='Train')
test.plot(ax=ax, label='Test')
ax.legend();



In [None]:
# Build the model 
p = 0  # AR component 

d = 2  # Number of differencing required to make stationary

q =  1 # MA component 

# Define and fit the model
arima_model = tsa.ARIMA(train, order=(p,d,q)).fit()



In [None]:
# Obtain summary of forecast as dataframe
forecast_df = arima_model.get_forecast(len(test)).summary_frame()

# Plot the forecast with true values
plot_forecast(train, test, forecast_df)

# Obtain metrics
regression_metrics_ts(test, forecast_df['mean'])



In [None]:
# Obtain summary of model
arima_model.summary()



In [None]:
# Obtain diagnostic plots
fig = arima_model.plot_diagnostics()
fig.set_size_inches(10,6)
fig.tight_layout()



In [None]:
import itertools

# Define the value or range of values for p, d, q
p_values = range(0, 4)  
d_values = [2]          
q_values = range(0, 4)  

# Create combinations of pdq to test
pdq_to_try = list(itertools.product(p_values, d_values, q_values))
                                            
pdq_to_try



In [None]:
# define starting aic as infinity
best_aic = float("inf")  

# define baseline for pdq
best_pdq = (0,0,0)

# Loop through each combination
for pdq in pdq_to_try:
    
    model = tsa.ARIMA(train, order=pdq)
                              
    result = model.fit()
    
    print(pdq, result.aic)      
    
    # If lower, replace best AIC with new value
    if result.aic < best_aic:
        
        best_aic = result.aic
        best_pdq = pdq

# Print the best orders and AIC score
print("Best AIC:", best_aic)
print("Best pdq:", best_pdq)



In [None]:
# Build the model with the best AIC
p = 2  # AR component 

d = 2  # Number of differencing required to make stationary

q =  3 # MA component 

# Define and fit the model
ar_2_ma_3_model = tsa.ARIMA(train, order=(p,d,q)).fit()



In [None]:
# Obtain diagnostic plots
fig = ar_2_ma_3_model.plot_diagnostics()
fig.set_size_inches(10,6)
fig.tight_layout()


In [None]:
# Obtain summary of forecast as dataframe
forecast_df = ar_2_ma_3_model.get_forecast(len(test)).summary_frame()
# Plot the forecast with true values
plot_forecast(train, test, forecast_df)
# Obtain metrics
regression_metrics_ts(test, forecast_df['mean'])



# Seasonality in Modeling (SARIMA)

In [None]:
import pandas as pd
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
import seaborn as sns
import os
import statsmodels.tsa.api as tsa
from pmdarima.model_selection import train_test_split
from pmdarima.arima.utils import ndiffs, nsdiffs
# set random seed
SEED = 321
np.random.seed(SEED)

sns.set_context('notebook', font_scale=1.2)
plt.rcParams['figure.figsize']=(12,3)



In [None]:
def plot_forecast(ts_train, ts_test, forecast_df, n_train_lags=None, 
                  figsize=(10,4), title='Comparing Forecast vs. True Data'):
    ### PLot training data, and forecast (with upper/,lower ci)
    fig, ax = plt.subplots(figsize=figsize)

    # setting the number of train lags to plot if not specified
    if n_train_lags==None:
        n_train_lags = len(ts_train)
            
    # Plotting Training  and test data
    ts_train.iloc[-n_train_lags:].plot(ax=ax, label="train")
    ts_test.plot(label="test", ax=ax)

    # Plot forecast
    forecast_df['mean'].plot(ax=ax, color='green', label="forecast")

    # Add the shaded confidence interval
    ax.fill_between(forecast_df.index, 
                    forecast_df['mean_ci_lower'],
                   forecast_df['mean_ci_upper'],
                   color='green', alpha=0.3,  lw=2)

    # set the title and add legend
    ax.set_title(title)
    ax.legend();
    
    return fig, ax



In [None]:
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score, mean_absolute_percentage_error

def regression_metrics_ts(ts_true, ts_pred, label="", verbose=True, output_dict=False,):
    # Get metrics
    mae = mean_absolute_error(ts_true, ts_pred)
    mse = mean_squared_error(ts_true, ts_pred)
    rmse = mean_squared_error(ts_true, ts_pred, squared=False)
    r_squared = r2_score(ts_true, ts_pred)
    mae_perc = mean_absolute_percentage_error(ts_true, ts_pred) * 100

    if verbose == True:
        # Print Result with label
        header = "---" * 20
        print(header, f"Regression Metrics: {label}", header, sep="\n")
        print(f"- MAE = {mae:,.3f}")
        print(f"- MSE = {mse:,.3f}")
        print(f"- RMSE = {rmse:,.3f}")
        print(f"- R^2 = {r_squared:,.3f}")
        print(f"- MAPE = {mae_perc:,.2f}%")

    if output_dict == True:
        metrics = {
            "Label": label,
            "MAE": mae,
            "MSE": mse,
            "RMSE": rmse,
            "R^2": r_squared,
            "MAPE(%)": mae_perc,
        }
        return metrics



In [None]:
# Custom function for Ad Fuller Test
def get_adfuller_results(ts, alpha=.05, label='adfuller', **kwargs): #kwargs for adfuller()
    # Saving each output
    (test_stat, pval, nlags, nobs, crit_vals_d, 
    icbest ) = tsa.adfuller(ts, **kwargs)
    # Converting output to a dictionary with the interpretation of p
    adfuller_results = {'Test Statistic': test_stat,
                        "# of Lags Used":nlags, 
                       '# of Observations':nobs,
                        'p-value': round(pval,6),
                        'alpha': alpha,
                       'sig/stationary?': pval < alpha}
    return pd.DataFrame(adfuller_results, index =[label])



In [None]:
def plot_acf_pacf(ts, nlags=40, figsize=(10, 5), 
                  annotate_sig=False, alpha=.05,
                 acf_kws={}, pacf_kws={},  
                  annotate_seas=False, m = None,
                 seas_color='black'):
    
    fig, axes = plt.subplots(nrows=2, figsize=figsize)

    
    # Sig lags line style
    sig_vline_kwargs = dict( ls=':', lw=1, zorder=0, color='red')

    # ACF
    tsa.graphics.plot_acf(ts, ax=axes[0], lags=nlags, **acf_kws)
    
    ## Annotating sig acf lags
    if annotate_sig == True:
        sig_acf_lags = get_sig_lags(ts,nlags=nlags,alpha=alpha, type='ACF')
        for lag in sig_acf_lags:
            axes[0].axvline(lag,label='sig', **sig_vline_kwargs )

    # PACF
    tsa.graphics.plot_pacf(ts,ax=axes[1], lags=nlags, **pacf_kws)
    
    ## Annotating sig pacf lags
    if annotate_sig == True:
        ## ANNOTATING SIG LAGS
        sig_pacf_lags = get_sig_lags(ts,nlags=nlags,alpha=alpha, type='PACF')
        for lag in sig_pacf_lags:
            axes[1].axvline(lag, label='sig', **sig_vline_kwargs)



    
    ### ANNOTATE SEASONS
    if annotate_seas == True:
        # Ensure m was defined
        if m is None:
            raise Exception("Must define value of m if annotate_seas=True.")

        ## Calculate number of complete seasons to annotate
        n_seasons = nlags//m 

        # Seasonal Lines style
        seas_vline_kwargs = dict( ls='--',lw=1, alpha=.7, color=seas_color, zorder=-1)
        
        ## for each season, add a line
        for i in range(1, n_seasons+1):
            axes[0].axvline(m*i, **seas_vline_kwargs, label="season")
            axes[1].axvline(m*i, **seas_vline_kwargs, label="season")

    fig.tight_layout()
    
    return fig



In [None]:
# Load data from statsmodels
​​import statsmodels.api as sm
co2_data = sm.datasets.co2.load_pandas()
df = co2_data.data
df.head()


In [None]:
# Impute null values
df['co2'] = df['co2'].interpolate()
df.isna().sum().sum()



In [None]:
# Resample to monthly
ts = df.resample("M").mean()
ts.head()



In [None]:
# Define the series
ts = ts['co2']
# Plot
ts.plot();



In [None]:
# We see a repeating pattern that is likely seasonal
# Apply seasonal decomposition
decomp = tsa.seasonal_decompose(ts)
fig = decomp.plot()
fig.set_size_inches(12,5)
fig.tight_layout()



In [None]:
# How big is the seasonal component
seasonal_delta = decomp.seasonal.max() - decomp.seasonal.min()

# How big is the seasonal component relative to the time series?
print(f"The seasonal component is {seasonal_delta} which is ~{seasonal_delta/(ts.max()-ts.min()) * 100 :.2f}% of the variation in time series.")



In [None]:
# Narrow down the date range of the plot
seasonal = decomp.seasonal
ax = seasonal.loc['1975': '1978'].plot(marker = 'o')



In [None]:
# determine d
d = ndiffs(ts)
print (f'd = {d}')



In [None]:
# Determine D
D = nsdiffs(ts, m =12)
print(f'D = {D}')



In [None]:
​​# Difference the data
ts_diff = ts.diff().dropna()


In [None]:
# We can use our function to highlight the seasonal lags by adding the arguments
plot_acf_pacf(ts_diff, annotate_seas=True, m = 12);



In [None]:
# tts
train, test = train_test_split(ts, test_size=.25)
ax = train.plot(label='Train')
test.plot(ax=ax, label='Test')
ax.legend();



In [None]:
# Orders for non seasonal components
p = 1  # nonseasonal AR
d = 1  # nonseasonal differencing
q = 1  # nonseasonal MA

# Orders for seasonal components
P = 1  # Seasonal AR
D = 0  # Seasonal differencing
Q = 1  # Seasonal MA
m = 12 # Seasonal period

sarima = tsa.ARIMA(train, order = (p,d,q), seasonal_order=(P,D,Q,m)).fit()



In [None]:
# Obtain summary of forecast as dataframe
forecast_df = sarima.get_forecast(len(test)).summary_frame()
# Plot the forecast with true values
plot_forecast(train, test, forecast_df)
# Obtain metrics
regression_metrics_ts(test, forecast_df["mean"])



In [None]:
# Obtain summary
sarima.summary()



In [None]:
# Obtain diagnostic plots
fig = sarima.plot_diagnostics()
fig.set_size_inches(10,6)
fig.tight_layout()



In [None]:
import itertools

# Define the value or range of values for p, d, q
p_values = range(0, 3)  
d_values = [1]          
q_values = range(0, 3)  
P_values = range (0, 3)
D_values = [0]
Q_values = range (0,3)
m = [12]

# Create combinations of pdq to test
pdqPDQm_to_try = list(itertools.product(p_values, d_values, q_values, P_values, D_values, Q_values, m))

# Display first 10 combinations
pdqPDQm_to_try[:10]



In [None]:
# define starting aic as infinity
best_aic = float("inf")  

# define baseline for pdq
best_pdqPDQm = (0,0,0,0,0,0,0)

# Loop through each combination
for pdqPDQm in pdqPDQm_to_try:
    order = pdqPDQm[:3] # first three values are non seasonal (p,d,q)
    seasonal_order = pdqPDQm[3:] # Remaining values for seasonal (P,D,Q,m)
    
    model = tsa.ARIMA(train, order=order, seasonal_order = seasonal_order)
    try:                         
        result = model.fit()
        print(pdqPDQm, result.aic)      
   
    except:
        print(f'{pdqPDQm}: caused an error')
    
    # If lower, replace best AIC with new value
    if result.aic < best_aic:
        
        best_aic = result.aic
        best_pdqPDQm = pdqPDQm

# Print the best orders and AIC score
print("Best AIC:", best_aic)
print("Best pdqPDQm:", best_pdqPDQm)



--------------------------------------

# GridSearch

In [None]:
import pmdarima as pm


In [None]:
import pandas as pd
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
import seaborn as sns
import os
import statsmodels.tsa.api as tsa
from pmdarima.model_selection import train_test_split
from pmdarima.arima.utils import ndiffs, nsdiffs

# Set wide fig size for plots
plt.rcParams['figure.figsize']=(12,3)



In [None]:
def plot_forecast(ts_train, ts_test, forecast_df, n_train_lags=None, 
                  figsize=(10,4), title='Comparing Forecast vs. True Data'):
    ### PLot training data, and forecast (with upper/,lower ci)
    fig, ax = plt.subplots(figsize=figsize)

    # setting the number of train lags to plot if not specified
    if n_train_lags==None:
        n_train_lags = len(ts_train)
            
    # Plotting Training  and test data
    ts_train.iloc[-n_train_lags:].plot(ax=ax, label="train")
    ts_test.plot(label="test", ax=ax)

    # Plot forecast
    forecast_df['mean'].plot(ax=ax, color='green', label="forecast")

    # Add the shaded confidence interval
    ax.fill_between(forecast_df.index, 
                    forecast_df['mean_ci_lower'],
                   forecast_df['mean_ci_upper'],
                   color='green', alpha=0.3,  lw=2)

    # set the title and add legend
    ax.set_title(title)
    ax.legend();
    
    return fig, ax



In [None]:
# Custom function for Ad Fuller Test
def get_adfuller_results(ts, alpha=.05, label='adfuller', **kwargs): #kwargs for adfuller()
    # Saving each output
    (test_stat, pval, nlags, nobs, crit_vals_d, 
    icbest ) = tsa.adfuller(ts, **kwargs)
    # Converting output to a dictionary with the interpretation of p
    adfuller_results = {'Test Statistic': test_stat,
                        "# of Lags Used":nlags, 
                       '# of Observations':nobs,
                        'p-value': round(pval,6),
                        'alpha': alpha,
                       'sig/stationary?': pval < alpha}
    return pd.DataFrame(adfuller_results, index =[label])



In [None]:
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score, mean_absolute_percentage_error

def regression_metrics_ts(ts_true, ts_pred, label="", verbose=True, output_dict=False,):
    # Get metrics
    mae = mean_absolute_error(ts_true, ts_pred)
    mse = mean_squared_error(ts_true, ts_pred)
    rmse = mean_squared_error(ts_true, ts_pred, squared=False)
    r_squared = r2_score(ts_true, ts_pred)
    mae_perc = mean_absolute_percentage_error(ts_true, ts_pred) * 100

    if verbose == True:
        # Print Result with label
        header = "---" * 20
        print(header, f"Regression Metrics: {label}", header, sep="\n")
        print(f"- MAE = {mae:,.3f}")
        print(f"- MSE = {mse:,.3f}")
        print(f"- RMSE = {rmse:,.3f}")
        print(f"- R^2 = {r_squared:,.3f}")
        print(f"- MAPE = {mae_perc:,.2f}%")

    if output_dict == True:
        metrics = {
            "Label": label,
            "MAE": mae,
            "MSE": mse,
            "RMSE": rmse,
            "R^2": r_squared,
            "MAPE(%)": mae_perc,
        }
        return metrics



In [None]:
def plot_acf_pacf(ts, nlags=40, figsize=(10, 5), 
                  annotate_sig=False, alpha=.05,
                 acf_kws={}, pacf_kws={},  
                  annotate_seas=False, m = None,
                 seas_color='black'):
    
    fig, axes = plt.subplots(nrows=2, figsize=figsize)

    
    # Sig lags line style
    sig_vline_kwargs = dict( ls=':', lw=1, zorder=0, color='red')

    # ACF
    tsa.graphics.plot_acf(ts, ax=axes[0], lags=nlags, **acf_kws)
    
    ## Annotating sig acf lags
    if annotate_sig == True:
        sig_acf_lags = get_sig_lags(ts,nlags=nlags,alpha=alpha, type='ACF')
        for lag in sig_acf_lags:
            axes[0].axvline(lag,label='sig', **sig_vline_kwargs )

    # PACF
    tsa.graphics.plot_pacf(ts,ax=axes[1], lags=nlags, **pacf_kws)
    
    ## Annotating sig pacf lags
    if annotate_sig == True:
        ## ANNOTATING SIG LAGS
        sig_pacf_lags = get_sig_lags(ts,nlags=nlags,alpha=alpha, type='PACF')
        for lag in sig_pacf_lags:
            axes[1].axvline(lag, label='sig', **sig_vline_kwargs)



    
    ### ANNOTATE SEASONS
    if annotate_seas == True:
        # Ensure m was defined
        if m is None:
            raise Exception("Must define value of m if annotate_seas=True.")

        ## Calculate number of complete seasons to annotate
        n_seasons = nlags//m 

        # Seasonal Lines style
        seas_vline_kwargs = dict( ls='--',lw=1, alpha=.7, color=seas_color, zorder=-1)
        
        ## for each season, add a line
        for i in range(1, n_seasons+1):
            axes[0].axvline(m*i, **seas_vline_kwargs, label="season")
            axes[1].axvline(m*i, **seas_vline_kwargs, label="season")

    fig.tight_layout()
    
    return fig



In [None]:
df = pd.read_csv('Data/AirPassengers.csv')
df.head()



In [None]:
# Change the Month column to datetime
df['Month'] = pd.to_datetime(df['Month'])
# Set the Month column to the index
df = df.set_index('Month')
df.head()



In [None]:
# Set frequency to first day of the month
df = df.asfreq('MS')
# Define the time series
ts = df['#Passengers']
ts.plot();


In [None]:
# Apply seasonal decomposition
decomp = tsa.seasonal_decompose(ts)
fig = decomp.plot()
fig.set_size_inches(12,5)
fig.tight_layout()



In [None]:
# How big is the seasonal component
seasonal_delta = decomp.seasonal.max() - decomp.seasonal.min()

# How big is the seasonal component relative to the time series?
print(f"The seasonal component is {seasonal_delta: .2f} which is ~{seasonal_delta/(ts.max()-ts.min()) * 100 :.2f}% of the variation in time series.")



In [None]:
# Narrow down the date range of the plot
seasonal = decomp.seasonal
ax = seasonal.loc['1954': '1956'].plot(marker = 'o')



In [None]:
# Check for stationarity
get_adfuller_results(ts)



In [None]:
# determine d
d = ndiffs(ts)
print (f'd = {d}')
# determine D
D = nsdiffs(ts, m = 12)
print (f'D = {D}')



In [None]:
# apply both differencings
ts_diff = ts.diff().diff(12).dropna()
ts_diff.plot();



In [None]:
# now look at the acf/pacf of the stationary data
plot_acf_pacf(ts_diff, annotate_seas = True, m = 12, nlags = 60);



In [None]:
# tts
train, test = train_test_split(ts, test_size=.25)
ax = train.plot(label='Train')
test.plot(ax=ax, label='Test')
ax.legend();



In [None]:
# Orders for non seasonal components
p = 1  # nonseasonal AR
d = 1  # nonseasonal differencing
q = 0  # nonseasonal MA

# Orders for seasonal components
P = 1  # Seasonal AR
D = 1  # Seasonal differencing
Q = 1  # Seasonal MA
m = 12 # Seasonal period

sarima = tsa.ARIMA(train, order = (p,d,q), seasonal_order=(P,D,Q,m)).fit()

# Obtain summary
sarima.summary()



In [None]:
# Obtain diagnostic plots
fig = sarima.plot_diagnostics()
fig.set_size_inches(10,6)
fig.tight_layout()



In [None]:
# Obtain forecast as a dataframe with confidence intervals
forecast_df = sarima.get_forecast(steps=len(test)).summary_frame()
# Call the custom function to plot the forecasts with confidence intervals and true values
plot_forecast(train, test, forecast_df);
# Obtain metrics
regression_metrics_ts(test, forecast_df['mean'])



## Auto_Arima

In [None]:
import pmdarima as pm

# Default auto_arima will select model based on AIC score
auto_model = pm.auto_arima(
    train,
    seasonal=True,  
    m=12,
    trace=True
)



In [None]:
# the auto_arima will store our best nonseasonal and seasonal orders separtely
print(auto_model.order)
print(auto_model.seasonal_order)



In [None]:
# Obtain summary of the best model from auto_arima
auto_model.summary()



In [None]:
# Obtain diagnostic plots
fig = auto_model.plot_diagnostics()
fig.set_size_inches(10,6)
fig.tight_layout()



In [None]:
# Use auto_arima parameters to fit an ARIMA
auto_model = tsa.ARIMA(
    train, order=auto_model.order, seasonal_order=auto_model.seasonal_order
).fit()


# Obtain forecast as a dataframe with confidence intervals
forecast_df = auto_model.get_forecast(steps=len(test)).summary_frame()
# Call the custom function to plot the forecasts with confidence intervals and true values
plot_forecast(train, test, forecast_df);
# Obtain metrics
regression_metrics_ts(test, forecast_df['mean'])



# 
Extracting Future Forecasts

In [None]:
# These are the parameters of our final model
# Orders for non seasonal components
p = 1  # nonseasonal AR
d = 1  # nonseasonal differencing
q = 0  # nonseasonal MA

# Orders for seasonal components
P = 0  # Seasonal AR
D = 1  # Seasonal differencing
Q = 0  # Seasonal MA
m = 12 # Seasonal period

final_model = tsa.ARIMA(ts, order = (p,d,q), seasonal_order=(P,D,Q,m)).fit()



In [None]:
len(test)



In [None]:
# Obtain future forecasts beyond test data
forecast_df  = final_model.get_forecast(len(test)).summary_frame()
plot_forecast(train,test,forecast_df);



In [None]:
forecast_df.index[0],forecast_df.index[-1]


In [None]:
starting_value = forecast_df['mean'].iloc[0]
starting_value



In [None]:
final_value = forecast_df['mean'].iloc[-1]
final_value



In [None]:
change = final_value - starting_value
change



In [None]:
perc_change = (change / starting_value) * 100
perc_change



# Overall Workflow

In [None]:
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
import statsmodels.api as sm
import statsmodels.tsa.api as tsa
​import pmdarima as pm
from pmdarima.arima.utils import ndiffs, nsdiffs
​from pmdarima.model_selection import train_test_split
import pmdarima as pm
​plt.rcParams['figure.figsize']=(12,3)



In [None]:
## Load the data
fname =''
df = pd.read_csv(fname,
                 # use args below if know datetime column already
#                 parse_dates=[''], index_col="" 
                )
df



In [None]:
# make sure index is datetime index



In [None]:
​# Select time series to model.
col = '' # if a dataframe
ts = df[col]
ts


In [None]:
​# check frequency
ts.index


In [None]:
# resample to desired frequency
# ts = ts.resample(...)..agg()
# ts



In [None]:
# Visualize selected time series
ax = ts.plot()



In [None]:
# Check for null values
ts.isna().sum()



In [None]:
## Impute null values (if any)
# ts = ts.fillna(0)
# ts = ts.interpolate()
# ts = ts.fillna(method='ffill')
# ts = ts.fillna(method='bfill')



In [None]:
## Use Seasonal Decompose to check for seasonality
decomp = tsa.seasonal_decompose(ts)
fig = decomp.plot()
fig.set_size_inches(9, 5)
fig.tight_layout()



In [None]:
# How big is the seasonal component
seasonal_delta = decomp.seasonal.max() - decomp.seasonal.min()

# How big is the seasonal component relative to the time series?
print(f"The seasonal component is {seasonal_delta} which is ~{seasonal_delta/(ts.max()-ts.min()) * 100 :.2f}% of the variation in time series.")



In [None]:
# zooming in on smaller time period to see length of season
# decomp.seasonal.loc["...":].plot();


In [None]:
# Check for stationarity
get_adfuller_results(ts)


In [None]:
# Determine differencing
d = ndiffs(ts)
print(f'd is {d}')
D = nsdiffs(ts, m = _)
print(f'D is {D}')



In [None]:
# For example, one non seasonal differencing
​ts_diff = ts.diff().dropna()


In [None]:
​plot_acf_pacf(ts_diff, annotate_seas=True, m = _);


In [None]:
​from pmdarima.model_selection import train_test_split
train, test = train_test_split(ts, test_size=___)
​
​## Visualize train-test-split
ax = train.plot(label="train")
test.plot(label="test")
ax.legend();



In [None]:
# Orders for non seasonal components
p = _  # nonseasonal AR
d = _  # nonseasonal differencing
q = _ # nonseasonal MA

# Orders for seasonal components (if seasonal model)
P = _  # Seasonal AR
D = _  # Seasonal differencing
Q = _  # Seasonal MA
m = _ # Seasonal period

sarima = tsa.ARIMA(train, order = (p,d,q), seasonal_order=(P,D,Q,m)).fit()





In [None]:
# Obtain summary
sarima.summary()



In [None]:
# Obtain diagnostic plots
fig = sarima.plot_diagnostics()
fig.set_size_inches(10,6)
fig.tight_layout()



In [None]:
# Obtain summary of forecast as dataframe
forecast_df = sarima.get_forecast(len(test)).summary_frame()
# Plot the forecast with true values
plot_forecast(train, test, forecast_df, n_train_lags = 50)



In [None]:
regression_metrics_ts(test, forecast_df["forecast"])


In [None]:
import pmdarima as pm
# Default auto_arima will select model based on AIC score
auto_model = pm.auto_arima(
    train,
    seasonal=___,  # True or False
    m=____,  # if seasonal
    trace=True
)



In [None]:
# Try auto_arima orders
sarima = tsa.ARIMA(train, order = auto_model.order, seasonal_order=auto_model.seasonal_order).fit()

# Obtain summary
sarima.summary()



In [None]:
final_p = "?"
final_q = "?"
final_d = "?"
final_P = "?"
final_Q = "?"
final_D = "?"
​
​final_model = tsa.ARIMA(
    ts,
    order=(final_p, final_d, final_q),
    seasonal_order=(final_P, final_D, final_Q, m),
).fit()



In [None]:
# Ger forecast into true future (fit on entrie time series)
forecast_df = final_model.get_forecast(len(test)).summary_frame()

plot_forecast(train, test, forecast_df, n_train_lags = 20);



In [None]:
# Define starting and final values
starting_value = forecast_df['mean'].iloc[0]
final_value = forecast_df['mean'].iloc[-1]
# Change in x
delta = final_value - starting_value
print(f'The change in X over the forecast is {delta: .2f}.')
perc_change = (delta/starting_value) *100
print (f'The percentage change is {perc_change :.2f}%.')

