Skip to content

Commit

Permalink
CLN: Final cleanup
Browse files Browse the repository at this point in the history
Clean and standardize contribution
  • Loading branch information
bashtage committed Jul 25, 2019
1 parent 2e17bfc commit b20a635
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 325 deletions.
4 changes: 2 additions & 2 deletions statsmodels/tools/validation/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ def string_like(value, name, optional=False, options=None, lower=True):
if lower:
value = value.lower()
if options is not None and value not in options:
extra_text = 'If not None,' if optional else ''
extra_text = 'If not None, ' if optional else ''
options_text = "'" + '\', \''.join(options) + "'"
msg = '{0}{1} must be one of: {2}'.format(extra_text,
name, options_text)
Expand Down Expand Up @@ -386,7 +386,7 @@ def dict_like(value, name, optional=False, strict=True):
return None
if (not isinstance(value, Mapping) or
(strict and not(isinstance(value, dict)))):
extra_text = 'If not None,' if optional else ''
extra_text = 'If not None, ' if optional else ''
strict_text = ' or dict_like (i.e., a Mapping)' if strict else ''
msg = '{0}{1} must be a dict{2}'.format(extra_text, name, strict_text)
raise TypeError(msg)
Expand Down
232 changes: 32 additions & 200 deletions statsmodels/tsa/stattools.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from statsmodels.compat.scipy import _next_regular

import numpy as np
import pandas as pd
from numpy.linalg import LinAlgError
from scipy import stats
import pandas as pd
Expand All @@ -21,19 +20,15 @@
int_like, dict_like, float_like)
from statsmodels.tsa._bds import bds
from statsmodels.tsa._innovations import innovations_filter, innovations_algo
from statsmodels.tsa._bds import bds
from statsmodels.tsa.adfvalues import mackinnonp, mackinnoncrit
from statsmodels.tsa.arima_model import ARMA
from statsmodels.tsa.tsatools import lagmat, lagmat2ds, add_trend

__all__ = ['acovf', 'acf', 'pacf', 'pacf_yw', 'pacf_ols', 'ccovf', 'ccf',
'periodogram', 'q_stat', 'coint', 'arma_order_select_ic',
'adfuller', 'kpss', 'zivot_andrews', 'bds', 'pacf_burg',
'innovations_algo', 'innovations_filter', 'levinson_durbin_pacf',
'levinson_durbin', 'adfuller', 'kpss', 'bds', 'levinson_durbin',
'adfuller', 'kpss', 'bds', 'pacf_burg', 'innovations_algo',
'adfuller', 'kpss', 'zivot_andrews', 'bds', 'pacf_burg', 'innovations_algo',
'innovations_filter', 'levinson_durbin_pacf', 'levinson_durbin']
'innovations_filter', 'levinson_durbin_pacf', 'levinson_durbin',
'zivot_andrews']

SQRTEPS = np.sqrt(np.finfo(np.double).eps)

Expand Down Expand Up @@ -1094,152 +1089,6 @@ def levinson_durbin_pacf(pacf, nlags=None):
return arcoefs, acf


def innovations_algo(acov, nobs=None, rtol=None):
"""
Innovations algorithm to convert autocovariances to MA parameters
Parameters
----------
acov : array-like
Array containing autocovariances including lag 0
nobs : int, optional
Number of periods to run the algorithm. If not provided, nobs is
equal to the length of acovf
rtol : float, optional
Tolerance used to check for convergence. Default value is 0 which will
never prematurely end the algorithm. Checks after 10 iterations and
stops if sigma2[i] - sigma2[i - 10] < rtol * sigma2[0]. When the
stopping condition is met, the remaining values in theta and sigma2
are forward filled using the value of the final iteration.
Returns
-------
theta : ndarray
Innovation coefficients of MA representation. Array is (nobs, q) where
q is the largest index of a non-zero autocovariance. theta
corresponds to the first q columns of the coefficient matrix in the
common description of the innovation algorithm.
sigma2 : ndarray
The prediction error variance (nobs,).
Examples
--------
>>> import statsmodels.api as sm
>>> data = sm.datasets.macrodata.load_pandas()
>>> rgdpg = data.data['realgdp'].pct_change().dropna()
>>> acov = sm.tsa.acovf(rgdpg)
>>> nobs = activity.shape[0]
>>> theta, sigma2 = innovations_algo(acov[:4], nobs=nobs)
See also
--------
innovations_filter
References
----------
Brockwell, P.J. and Davis, R.A., 2016. Introduction to time series and
forecasting. Springer.
"""
acov = np.squeeze(np.asarray(acov))
if acov.ndim != 1:
raise ValueError('acov must be 1-d or squeezable to 1-d.')
rtol = 0.0 if rtol is None else rtol
if not isinstance(rtol, float):
raise ValueError('rtol must be a non-negative float or None.')
if nobs is not None and (nobs != int(nobs) or nobs < 1):
raise ValueError('nobs must be a positive integer')
n = acov.shape[0] if nobs is None else int(nobs)
max_lag = int(np.max(np.argwhere(acov != 0)))

v = np.zeros(n + 1)
v[0] = acov[0]
# Retain only the relevant columns of theta
theta = np.zeros((n + 1, max_lag + 1))
for i in range(1, n):
for k in range(max(i - max_lag, 0), i):
sub = 0
for j in range(max(i - max_lag, 0), k):
sub += theta[k, k - j] * theta[i, i - j] * v[j]
theta[i, i - k] = 1. / v[k] * (acov[i - k] - sub)
v[i] = acov[0]
for j in range(max(i - max_lag, 0), i):
v[i] -= theta[i, i - j] ** 2 * v[j]
# Break if v has converged
if i >= 10:
if v[i - 10] - v[i] < v[0] * rtol:
# Forward fill all remaining values
v[i + 1:] = v[i]
theta[i + 1:] = theta[i]
break

theta = theta[:-1, 1:]
v = v[:-1]
return theta, v


def innovations_filter(endog, theta):
"""
Filter observations using the innovations algorithm
Parameters
----------
endog : array-like
The time series to filter (nobs,). Should be demeaned if not mean 0.
theta : ndarray
Innovation coefficients of MA representation. Array must be (nobs, q)
where q order of the MA.
Returns
-------
resid : ndarray
Array of filtered innovations
Examples
--------
>>> import statsmodels.api as sm
>>> data = sm.datasets.macrodata.load_pandas()
>>> rgdpg = data.data['realgdp'].pct_change().dropna()
>>> acov = sm.tsa.acovf(rgdpg)
>>> nobs = activity.shape[0]
>>> theta, sigma2 = innovations_algo(acov[:4], nobs=nobs)
>>> resid = innovations_filter(rgdpg, theta)
See also
--------
innovations_algo
References
----------
Brockwell, P.J. and Davis, R.A., 2016. Introduction to time series and
forecasting. Springer.
"""
orig_endog = endog
endog = np.squeeze(np.asarray(endog))
if endog.ndim != 1:
raise ValueError('endog must be 1-d or squeezable to 1-d.')
nobs = endog.shape[0]
n_theta, k = theta.shape
if nobs != n_theta:
raise ValueError('theta must be (nobs, q) where q is the moder order')
is_pandas = isinstance(orig_endog, (pd.DataFrame, pd.Series))
if is_pandas:
if len(orig_endog.index) != nobs:
msg = 'If endog is a Series or DataFrame, the index must ' \
'correspond to the number of time series observations.'
raise ValueError(msg)
u = np.empty(nobs)
u[0] = endog[0]
for i in range(1, nobs):
if i < k:
hat = (theta[i, :i] * u[:i][::-1]).sum()
else:
hat = (theta[i] * u[i - k:i][::-1]).sum()
u[i] = endog[i] - hat
if is_pandas:
u = pd.Series(u, index=orig_endog.index.copy())
return u


def grangercausalitytests(x, maxlag, addconst=True, verbose=True):
"""four tests for granger non causality of 2 timeseries
Expand Down Expand Up @@ -1983,14 +1832,14 @@ def _format_regression_data(self, series, nobs, const, trend, cols, lags):
from the original (standardized) series under test.
"""
# first-diff y and standardize for numerical stability
endog = np.diff(series, axis=0)[:, 0]
endog = np.diff(series, axis=0)
endog /= np.sqrt(endog.T.dot(endog))
series /= np.sqrt(series.T.dot(series))
# reserve exog space
exog = np.zeros((endog[lags:].shape[0], cols + lags))
exog[:, 0] = const
# lagged y and dy
exog[:, cols - 1] = series[lags:(nobs - 1), 0]
exog[:, cols - 1] = series[lags:(nobs - 1)]
exog[:, cols:] = lagmat(
endog, lags, trim='none')[lags:exog.shape[0] + lags]
return endog, exog
Expand All @@ -2017,9 +1866,9 @@ def _update_regression_exog(self, exog, regression, period, nobs, const,
def run(self, x, trim=0.15, maxlag=None, regression='c', autolag='AIC'):
"""
Zivot-Andrews structural-break unit-root test
The Zivot-Andrews test can be used to test for a unit root in a
univariate process in the presence of serial correlation and a
single structural break.
The Zivot-Andrews test tests for a unit root in a univariate process
in the presence of serial correlation and a single structural break.
Parameters
----------
Expand Down Expand Up @@ -2075,34 +1924,33 @@ def run(self, x, trim=0.15, maxlag=None, regression='c', autolag='AIC'):
References
----------
Baum, C.F. (2004). ZANDREWS: Stata module to calculate Zivot-Andrews
unit root test in presence of structural break," Statistical Software
Components S437301, Boston College Department of Economics, revised
2015.
Schwert, G.W. (1989). Tests for unit roots: A Monte Carlo
investigation. Journal of Business & Economic Statistics, 7: 147-159.
Zivot, E., and Andrews, D.W.K. (1992). Further evidence on the great
crash, the oil-price shock, and the unit-root hypothesis. Journal of
Business & Economic Studies, 10: 251-270.
.. [*] Baum, C.F. (2004). ZANDREWS: Stata module to calculate
Zivot-Andrews unit root test in presence of structural break,"
Statistical Software Components S437301, Boston College Department
of Economics, revised 2015.
.. [*] Schwert, G.W. (1989). Tests for unit roots: A Monte Carlo
investigation. Journal of Business & Economic Statistics, 7:
147-159.
.. [*] Zivot, E., and Andrews, D.W.K. (1992). Further evidence on the
great crash, the oil-price shock, and the unit-root hypothesis.
Journal of Business & Economic Studies, 10: 251-270.
"""
if regression not in ['c', 't', 'ct']:
raise ValueError(
'ZA: regression option \'{}\' not understood'.format(
regression))
if not isinstance(trim, float) or trim < 0 or trim > (1. / 3.):
raise ValueError(
'ZA: trim value must be a float in range [0, 0.333]')
x = np.asarray(x)
if x.ndim > 2 or (x.ndim == 2 and x.shape[1] != 1):
raise ValueError(
'ZA: x must be a 1d array or a 2d array with a single column')
x = np.reshape(x, (-1, 1))
x = array_like(x, 'x')
trim = float_like(trim, 'trim')
maxlag = int_like(maxlag, 'maxlag', optional=True)
regression = string_like(regression, 'regression',
options=('c', 't', 'ct'))
autolag = string_like(autolag, 'autolag',
options=('AIC', 'BIC', 't-stat'), optional=True)
if trim < 0 or trim > (1. / 3.):
raise ValueError('trim value must be a float in range [0, 1/3)')
nobs = x.shape[0]
if autolag:
baselags = adfuller(x[:, 0], maxlag=maxlag, regression='ct',
autolag=autolag)[2]
adf_res = adfuller(x, maxlag=maxlag, regression='ct',
autolag=autolag)
baselags = adf_res[2]
elif maxlag:
baselags = maxlag
else:
Expand All @@ -2114,25 +1962,8 @@ def run(self, x, trim=0.15, maxlag=None, regression='c', autolag='AIC'):
basecols = 5
else:
basecols = 4
<<<<<<< HEAD
# first-diff y and standardize for numerical stability
dy = np.diff(x, axis=0)[:, 0]
dy /= np.sqrt(dy.T.dot(dy))
x = x / np.sqrt(x.T.dot(x))
# reserve exog space
exog = np.zeros((dy[baselags:].shape[0], basecols + baselags))
# normalize constant for stability in long time series
c_const = 1 / np.sqrt(nobs) # Normalize
exog[:, 0] = c_const
# lagged y and dy
exog[:, basecols - 1] = x[baselags:(nobs - 1), 0]
ex_cols = lagmat(dy, baselags, trim='none')
exog[:, basecols:] = ex_cols[baselags:exog.shape[0] + baselags]
# better time trend: t_const @ t_const = 1 for large nobs
=======
# normalize constant and trend terms for stability
c_const = 1 / np.sqrt(nobs)
>>>>>>> Style corrections (PR 5455)
t_const = np.arange(1.0, nobs + 2)
t_const *= np.sqrt(3) / nobs ** (3 / 2)
# format the auxiliary regression data
Expand Down Expand Up @@ -2170,5 +2001,6 @@ def __call__(self, x, trim=0.15, maxlag=None, regression='c',
return self.run(x, trim=trim, maxlag=maxlag, regression=regression,
autolag=autolag)


zivot_andrews = ZivotAndrewsUnitRoot()
zivot_andrews.__doc__ = zivot_andrews.run.__doc__
Loading

0 comments on commit b20a635

Please sign in to comment.