-
Notifications
You must be signed in to change notification settings - Fork 108
/
var.py
124 lines (119 loc) · 6.34 KB
/
var.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
from typing import Optional
import numpy as np # type: ignore
import pandas as pd # type: ignore
import itertools
import operator
import copy
import matplotlib.pyplot as plt # type: ignore
import seaborn as sns # type: ignore
# This gives an error when running from a python script.
# Maybe, this should be set in the jupyter notebook directly.
# get_ipython().magic('matplotlib inline')
sns.set(style="white", color_codes=True)
# imported VARMAX from statsmodels pkg
from statsmodels.tsa.statespace.varmax import VARMAX # type: ignore
# helper functions
from ...utils import print_dynamic_rmse
from ...models.ar_based.param_finder import find_lowest_pq
class BuildVAR():
def __init__(self, criteria, forecast_period=2, p_max=3, q_max=3, verbose=0):
"""
Automatically build a VAR Model
"""
self.criteria = criteria
self.forecast_period = forecast_period
self.p_max = p_max
self.q_max = q_max
self.verbose = verbose
self.model = None
#def build_var_model(df, criteria, forecast_period=2, p_max=3, q_max=3, verbose=0):
def fit(self, ts_df): #, criteria, forecast_period=2, p_max=3, q_max=3, verbose=0):
"""
This builds a VAR model given a multivariate time series data frame with time as the Index.
Note that the input "y_train" can be a data frame with one column or multiple cols or a
multivariate array. However, the first column must be the target variable. The others are added.
You must include only Time Series data in it. DO NOT include "Non-Stationary" or "Trendy" data.
Make sure your Time Series is "Stationary" before you send it in!! If not, this will give spurious
results. Since it automatically builds a VAR model, you need to give it a Criteria to optimize on.
You can give it any of the following metrics as criteria: AIC, BIC, Deviance, Log-likelihood.
You can give the highest order values for p and q. Default is set to 3 for both.
"""
ts_df = ts_df[:]
#### dmax here means the column number of the data frame: it serves as a placeholder for columns
dmax = ts_df.shape[1]
###############################################################################################
cols = ts_df.columns.tolist()
ts_train = ts_df[:-self.forecast_period]
ts_test = ts_df[-self.forecast_period:]
if self.verbose == 1:
print('Data Set split into train %s and test %s for Cross Validation Purposes'
% (ts_train.shape, ts_test.shape))
# It is assumed that the first column of the dataframe is the target variable ####
### make sure that is the case before doing this program ####################
i = 1
results_dict = {}
for d_val in range(1, dmax):
y_train = ts_train.iloc[:, [0, d_val]]
print('\nAdditional Variable in VAR model = %s' % cols[d_val])
info_criteria = pd.DataFrame(
index=['AR{}'.format(i) for i in range(0, self.p_max+1)],
columns=['MA{}'.format(i) for i in range(0, self.q_max+1)]
)
for p_val, q_val in itertools.product(range(0, self.p_max+1), range(0, self.q_max+1)):
if p_val == 0 and q_val == 0:
info_criteria.loc['AR{}'.format(p_val), 'MA{}'.format(q_val)] = np.nan
print(' Iteration %d completed' % i)
i += 1
else:
try:
model = VARMAX(y_train, order=(p_val, q_val), trend='c')
model = model.fit(max_iter=1000, displ=False)
info_criteria.loc['AR{}'.format(p_val), 'MA{}'.format(q_val)] = eval('model.' + self.criteria)
print(' Iteration %d completed' % i)
i += 1
except:
i += 1
print(' Iteration %d completed' % i)
info_criteria = info_criteria[info_criteria.columns].astype(float)
interim_d = copy.deepcopy(d_val)
interim_p, interim_q, interim_bic = find_lowest_pq(info_criteria)
if self.verbose == 1:
_, ax = plt.subplots(figsize=(20, 10))
ax = sns.heatmap(info_criteria,
mask=info_criteria.isnull(),
ax=ax,
annot=True,
fmt='.0f'
)
ax.set_title(self.criteria)
results_dict[str(interim_p) + ' ' + str(interim_d) + ' ' + str(interim_q)] = interim_bic
best_bic = min(results_dict.items(), key=operator.itemgetter(1))[1]
best_pdq = min(results_dict.items(), key=operator.itemgetter(1))[0]
best_p = int(best_pdq.split(' ')[0])
best_d = int(best_pdq.split(' ')[1])
best_q = int(best_pdq.split(' ')[2])
print('Best variable selected for VAR: %s' % ts_train.columns.tolist()[best_d])
y_train = ts_train.iloc[:, [0, best_d]]
self.model = VARMAX(y_train, order=(best_p, best_q), trend='c')
self.model = self.model.fit()
if self.verbose == 1:
self.model.plot_diagnostics(figsize=(16, 12))
ax = self.model.impulse_responses(12, orthogonalized=True).plot(figsize=(12, 4))
ax.set(xlabel='Time Steps', title='Impulse Response Functions')
res2 = self.model.get_forecast(self.forecast_period)
res2_df = res2.summary_frame()
y_forecasted = res2_df['mean'].values
rmse, norm_rmse = print_dynamic_rmse(ts_test.iloc[:,0], y_forecasted, ts_train.iloc[:,0])
return self.model, res2_df, rmse, norm_rmse
def predict(self, forecast_period: Optional[int] = None):
"""
Return the predictions
"""
# Extract the dynamic predicted and true values of our time series
if forecast_period is None:
# use the forecast period used during training
y_forecasted = self.model.forecast(self.forecast_period)
else:
# use the forecast period provided by the user
y_forecasted = self.model.forecast(forecast_period)
return y_forecasted