In [None]:
import os
import numpy as np
import pandas as pd
import statsmodels.api as sm
from statsmodels.tsa.statespace import mlemodel
import matplotlib.pyplot as plt
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.stats.diagnostic import acorr_ljungbox
# conda update statsmodels

In [2]:
outpath = 'D:\\OneDrive - University of Missouri\\transfer_desktop\MU\\project 3\\1_try\\output'

In [None]:
df = pd.read_csv(os.path.join(outpath, 'x_basis_topdegree.csv'), index_col=None)
Xb = df[['phi1', 'phi2', 'phi4', 'phi5', 'phi6', 'phi7']].to_numpy(dtype=float)
y = df['logy'].to_numpy(dtype=float)

In [None]:
# Load the PLS/PCA transformed data 
# ----- Not used in later steps. Can be ignored -----
yx_pls = np.load(os.path.join(outpath, 'yx_pls.npy'), allow_pickle=True)
y_a = yx_pls[:, 3]
x_a = yx_pls[:, 0:3]
# Model (2)
xa1 = sm.add_constant(x_a)
model_a = sm.OLS(y_a, xa1).fit() 
# Model (1)
Xb1 = sm.add_constant(Xb)
model = sm.OLS(y, Xb1).fit()

# Save the model results into a text file
summary_1 = model.summary().as_text()
summary_2 = model_a.summary().as_text()
with open(os.path.join(outpath, 'summary_simplemodel.txt'), 'w') as f:
    f.write('Regressors: Selected basis polynomials \n')
    f.write(summary_1)
    f.write('\n Regressors: PLS/PCA transformed \n')
    f.write(summary_2)

In [None]:
#############################
####  State Space Model  ####
#############################
# y_t=d_t+x'_t\beta_t+\epsilon_t
# d_t=d_{t-1}+\eta_t
# \beta_t=\beta_{t-1}+u_t, (mx1) Gaussian innovations

In [12]:
# Define the model class
class TVCModel(mlemodel.MLEModel):
    def __init__(self, endog, exog):
        self.k_exog = exog.shape[1]
        k_states = 1+self.k_exog
        super().__init__(endog, k_states=k_states, exog=exog, initialization='approximate_diffuse')

        self.ssm.transition = np.eye(k_states)  # T: both alpha and beta follow random walks
        self.ssm.selection = np.eye(k_states)   # R
        
        self.ssm.design = np.zeros((1, k_states, self.nobs))  # Z_t: [1, x'_t]
        self.ssm.design[0, 0, :] = 1.0
        self.ssm.design[0, 1:, :] = exog.T

    def update(self, params, **kwargs):
        params = np.array(params) 
        sigma2_eps = np.exp(params[0])  # var of observation noise
        sigma2_eta = np.exp(params[1])  # var of state noise for d_t
        sigma2_u = np.exp(params[2:])   # var of state noise for beta_t

        self.ssm['obs_cov', 0, 0] = sigma2_eps       # H: observation covariance
        self.ssm['state_cov'] = np.diag(
            np.concatenate(([sigma2_eta], sigma2_u))  # Q: state covariance
        )

    @property
    def start_params(self):
        # Initial guesses (log-scale)
        # params = [log_sigma_eps, log_sigma_eta, log_sigma_u's...]
        sigma2_eps0 = np.log(np.var(self.endog))
        sigma2_eta0 = np.log(0.01)
        sigma2_u0   = np.log(0.01*np.ones(self.k_exog))
        return np.r_[sigma2_eps0, sigma2_eta0, sigma2_u0]

In [68]:
# Fit the model
Xb = df[['phi1', 'phi2', 'phi4', 'phi5', 'phi6']].to_numpy(dtype=float)
y_std = (y - np.mean(y))/np.std(y)
mod = TVCModel(y_std, Xb)
res = mod.fit()

In [46]:
# Save the summary into a text file
summary = res.summary().as_text()
with open(os.path.join(outpath, 'summary_tvc_mod1.txt'), 'w') as f:
    f.write('State Space Model: Time-Varying Coefficients \n')
    f.write(summary)

In [None]:
# Some diagnostics
one_rsdl = res.filter_results.standardized_forecasts_error[0]
one_rsdl = one_rsdl.flatten()

# Plot residuals over time
plt.figure(figsize=(12, 4))
plt.plot(one_rsdl)
plt.xlabel("Period")
plt.grid(True)
plt.savefig(os.path.join(outpath, 'std_residual_1stepahead_mod1.png'))

# Plot ACF and PACF
fig, axes = plt.subplots(1, 2, figsize=(14, 4)) # 1 row, 2 columns
plot_acf(one_rsdl, lags=40, marker='', zero=False, ax=axes[0])
axes[0].set_ylim(-0.25, 0.25)
axes[0].set_title("ACF")
plot_pacf(one_rsdl, lags=40, marker='', zero=False, ax=axes[1])
axes[1].set_ylim(-0.25, 0.25)
axes[1].set_title("PACF")
plt.savefig(os.path.join(outpath, 'PACF_stdresid1s_mod1.png'))

# Q–Q plot # normal quantile vs residual quantile
fig = plt.figure(figsize=(6, 5))
sm.qqplot(one_rsdl, line='45')
plt.savefig(os.path.join(outpath, 'QQp_stdresid1s_mod1.png'))

In [None]:
# Ljung–Box Q statistic
# Aggregate autocorrelation up to the specified lags p
# null: residuals are uncorrelated up to lag p vs alternative: residuals are autocorrelated at some lag<=p
lb_test = acorr_ljungbox(one_rsdl, lags=[1], return_df=True)
print(lb_test)

In [77]:
# Smoothed states
smoothed = res.smoothed_state
smoothed_org = smoothed*np.std(y)
smoothed_org[0, :] = smoothed[0, :]*np.std(y) + np.mean(y) # back to original scale
# Smoothed state variances
smoothed_var = res.smoothed_state_cov
smoothed_var_org = smoothed_var*(np.std(y)**2)

In [None]:
# d_t
plt.figure(figsize=(14, 6))
plt.plot(np.arange(len(y)), smoothed_org[0, :], label='local trend', lw=1, ls='--')
plt.plot(np.arange(len(y)), y, label='observed load', lw=1)
plt.fill_between(np.arange(len(y)),
                 smoothed_org[0, :] - 2*np.sqrt(smoothed_var_org[0, 0, :]),
                 smoothed_org[0, :] + 2*np.sqrt(smoothed_var_org[0, 0, :]),
                 color='blue', alpha=0.2)
plt.legend()
plt.tight_layout()
plt.savefig(os.path.join(outpath, 'smoothedstate_mod1_dt.png'))

In [None]:
# beta_kt for k=1, 2
fig, axes = plt.subplots(2, 1, figsize=(12, 8)) # 2 row, 1 columns
for i in range(2):
    axi = axes[i]
    axi.set_ylabel(f"beta_{i+1}")
    axi.plot(np.arange(len(y)), smoothed_org[1+i, :], label=f'beta_{i+1}') # 1, 2
    axi.fill_between(np.arange(len(y)),
                     smoothed_org[1+i, :] - 2*np.sqrt(smoothed_var_org[1+i, 1+i, :]),
                     smoothed_org[1+i, :] + 2*np.sqrt(smoothed_var_org[1+i, 1+i, :]),
                     alpha=0.2)
    axi.set_ylim(0.1, 0.3)
plt.savefig(os.path.join(outpath, 'smoothedstate_mod1_beta12t.png'))

In [None]:
# beta_kt for k=4, 5, 6
fig, axes = plt.subplots(3, 1, figsize=(12, 12)) # 3 row, 1 columns
for i in range(3):
    k = i+3
    axi = axes[i]
    axi.set_ylabel(f"beta_{k+1}")
    axi.plot(np.arange(len(y)), smoothed_org[k, :], label=f'beta_{k+1}') # 3, 4, 5
    axi.fill_between(np.arange(len(y)),
                     smoothed_org[k, :] - 2*np.sqrt(smoothed_var_org[k, k, :]),
                     smoothed_org[k, :] + 2*np.sqrt(smoothed_var_org[k, k, :]),
                     alpha=0.2)
    axi.set_ylim(-0.08, 0.07)
plt.savefig(os.path.join(outpath, 'smoothedstate_mod1_beta456t.png'))