From 1a9a56fbd689e4fdd6da59d468a1b2890d01ca4c Mon Sep 17 00:00:00 2001 From: Kevin Sheppard Date: Wed, 22 Jun 2016 16:58:29 +0100 Subject: [PATCH] CLN/TST: Improve arima_process Clean and test arima process functions --- .coveragerc | 8 + .travis_coveragerc | 1 - statsmodels/sandbox/tsa/fftarma.py | 4 +- statsmodels/tsa/arima_model.py | 4 +- statsmodels/tsa/arima_process.py | 621 ++++++++------------ statsmodels/tsa/tests/test_arima_process.py | 299 ++++++++-- 6 files changed, 515 insertions(+), 422 deletions(-) diff --git a/.coveragerc b/.coveragerc index 5b264a626ab..a45ad38a075 100644 --- a/.coveragerc +++ b/.coveragerc @@ -20,6 +20,14 @@ exclude_lines = if 0: if __name__ == .__main__.: +omit = + # Test dirs + */tests/* + */test_* + + # print_version + */print_version.py + ignore_errors = False [html] diff --git a/.travis_coveragerc b/.travis_coveragerc index b68a8f421d3..20b69eccc0e 100644 --- a/.travis_coveragerc +++ b/.travis_coveragerc @@ -28,4 +28,3 @@ exclude_lines = # Don't complain if non-runnable code isn't run: if 0: if __name__ == .__main__.: - if name == .__main__.: \ No newline at end of file diff --git a/statsmodels/sandbox/tsa/fftarma.py b/statsmodels/sandbox/tsa/fftarma.py index 8bd2406f99a..bbce6f9aa9a 100644 --- a/statsmodels/sandbox/tsa/fftarma.py +++ b/statsmodels/sandbox/tsa/fftarma.py @@ -239,9 +239,9 @@ def spdroots(self, w): builds two arrays (number of roots, number of frequencies) ''' - return self.spdroots_(self.arroots, self.maroots, w) + return self._spdroots(self.arroots, self.maroots, w) - def spdroots_(self, arroots, maroots, w): + def _spdroots(self, arroots, maroots, w): '''spectral density for frequency using polynomial roots builds two arrays (number of roots, number of frequencies) diff --git a/statsmodels/tsa/arima_model.py b/statsmodels/tsa/arima_model.py index 85b716d45f7..2ca261e9892 100644 --- a/statsmodels/tsa/arima_model.py +++ b/statsmodels/tsa/arima_model.py @@ -1494,7 +1494,7 @@ def predict(self, start=None, end=None, exog=None, dynamic=False): def _forecast_error(self, steps): sigma2 = self.sigma2 ma_rep = arma2ma(np.r_[1, -self.arparams], - np.r_[1, self.maparams], nobs=steps) + np.r_[1, self.maparams], lags=steps) fcasterr = np.sqrt(sigma2 * np.cumsum(ma_rep**2)) return fcasterr @@ -1812,7 +1812,7 @@ def predict(self, start=None, end=None, exog=None, typ='linear', def _forecast_error(self, steps): sigma2 = self.sigma2 ma_rep = arma2ma(np.r_[1, -self.arparams], - np.r_[1, self.maparams], nobs=steps) + np.r_[1, self.maparams], lags=steps) fcerr = np.sqrt(np.cumsum(cumsum_n(ma_rep, self.k_diff)**2)*sigma2) return fcerr diff --git a/statsmodels/tsa/arima_process.py b/statsmodels/tsa/arima_process.py index bff43a2b8f7..ecd74d571e5 100644 --- a/statsmodels/tsa/arima_process.py +++ b/statsmodels/tsa/arima_process.py @@ -1,27 +1,7 @@ -'''ARMA process and estimation with scipy.signal.lfilter - -2009-09-06: copied from try_signal.py - reparameterized same as signal.lfilter (positive coefficients) - +"""ARMA process and estimation with scipy.signal.lfilter Notes ----- -* pretty fast -* checked with Monte Carlo and cross comparison with statsmodels yule_walker - for AR numbers are close but not identical to yule_walker - not compared to other statistics packages, no degrees of freedom correction -* ARMA(2,2) estimation (in Monte Carlo) requires longer time series to estimate parameters - without large variance. There might be different ARMA parameters - with similar impulse response function that cannot be well - distinguished with small samples (e.g. 100 observations) -* good for one time calculations for entire time series, not for recursive - prediction -* class structure not very clean yet -* many one-liners with scipy.signal, but takes time to figure out usage -* missing result statistics, e.g. t-values, but standard errors in examples -* no criteria for choice of number of lags -* no constant term in ARMA process -* no integration, differencing for ARIMA * written without textbook, works but not sure about everything briefly checked and it looks to be standard least squares, see below @@ -30,30 +10,15 @@ theoretical test cases, example file contains explicit formulas for acovf of MA(1), MA(2) and ARMA(1,1) -* two names for lag polynomials ar = rhoy, ma = rhoe ? - - Properties: Judge, ... (1985): The Theory and Practise of Econometrics -BigJudge p. 237ff: -If the time series process is a stationary ARMA(p,q), then -minimizing the sum of squares is asymptoticaly (as T-> inf) -equivalent to the exact Maximum Likelihood Estimator - -Because Least Squares conditional on the initial information -does not use all information, in small samples exact MLE can -be better. - -Without the normality assumption, the least squares estimator -is still consistent under suitable conditions, however not -efficient - Author: josefpktd License: BSD -''' +""" from __future__ import print_function from statsmodels.compat.python import range + import numpy as np from scipy import signal, optimize, linalg @@ -108,13 +73,14 @@ def arma_generate_sample(ar, ma, nsample, sigma=1, distrvs=np.random.randn, >>> model.params array([ 0.79044189, -0.23140636, 0.70072904, 0.40608028]) """ - #TODO: unify with ArmaProcess method - eta = sigma * distrvs(nsample+burnin) + # TODO: unify with ArmaProcess method + eta = sigma * distrvs(nsample + burnin) return signal.lfilter(ma, ar, eta)[burnin:] def arma_acovf(ar, ma, nobs=10): - '''theoretical autocovariance function of ARMA process + """ + Theoretical autocovariance function of ARMA process Parameters ---------- @@ -141,30 +107,33 @@ def arma_acovf(ar, ma, nobs=10): Tries to do some crude numerical speed improvements for cases with high persistance. However, this algorithm is slow if the process is highly persistent and only a few autocovariances are desired. - ''' - #increase length of impulse response for AR closer to 1 - #maybe cheap/fast enough to always keep nobs for ir large - if np.abs(np.sum(ar)-1) > 0.9: + """ + # increase length of impulse response for AR closer to 1 + # maybe cheap/fast enough to always keep nobs for ir large + + # TODO: This doesn't make sense should be analytical + if np.abs(np.sum(ar) - 1) > 0.9: nobs_ir = max(1000, 2 * nobs) # no idea right now how large is needed else: - nobs_ir = max(100, 2 * nobs) # no idea right now - ir = arma_impulse_response(ar, ma, nobs=nobs_ir) - #better save than sorry (?), I have no idea about the required precision - #only checked for AR(1) - while ir[-1] > 5*1e-5: + nobs_ir = max(100, 2 * nobs) # no idea right now + ir = arma_impulse_response(ar, ma, leads=nobs_ir) + # better save than sorry (?), I have no idea about the required precision + # only checked for AR(1) + while ir[-1] > 5 * 1e-5: nobs_ir *= 10 - ir = arma_impulse_response(ar, ma, nobs=nobs_ir) - #again no idea where the speed break points are: + ir = arma_impulse_response(ar, ma, leads=nobs_ir) + # again no idea where the speed break points are: if nobs_ir > 50000 and nobs < 1001: - acovf = np.array([np.dot(ir[:nobs-t], ir[t:nobs]) + acovf = np.array([np.dot(ir[:nobs - t], ir[t:nobs]) for t in range(nobs)]) else: - acovf = np.correlate(ir, ir, 'full')[len(ir)-1:] + acovf = np.correlate(ir, ir, 'full')[len(ir) - 1:] return acovf[:nobs] -def arma_acf(ar, ma, nobs=10): - '''theoretical autocorrelation function of an ARMA process +def arma_acf(ar, ma, lags=10, **kwargs): + """ + Theoretical autocorrelation function of an ARMA process Parameters ---------- @@ -172,7 +141,7 @@ def arma_acf(ar, ma, nobs=10): coefficient for autoregressive lag polynomial, including zero lag ma : array_like, 1d coefficient for moving-average lag polynomial, including zero lag - nobs : int + lags : int number of terms (lags plus zero lag) to include in returned acf Returns @@ -186,14 +155,19 @@ def arma_acf(ar, ma, nobs=10): arma_acovf acf acovf + """ + if 'nobs' in kwargs: + lags = kwargs['nobs'] + import warnings + warnings.warn('nobs is deprecated in favor of lags', DeprecationWarning) - ''' - acovf = arma_acovf(ar, ma, nobs) - return acovf/acovf[0] + acovf = arma_acovf(ar, ma, lags) + return acovf / acovf[0] -def arma_pacf(ar, ma, nobs=10): - '''partial autocorrelation function of an ARMA process +def arma_pacf(ar, ma, lags=10, **kwargs): + """ + Partial autocorrelation function of an ARMA process Parameters ---------- @@ -201,7 +175,7 @@ def arma_pacf(ar, ma, nobs=10): coefficient for autoregressive lag polynomial, including zero lag ma : array_like, 1d coefficient for moving-average lag polynomial, including zero lag - nobs : int + lags : int number of terms (lags plus zero lag) to include in returned pacf Returns @@ -214,19 +188,25 @@ def arma_pacf(ar, ma, nobs=10): solves yule-walker equation for each lag order up to nobs lags not tested/checked yet - ''' - apacf = np.zeros(nobs) - acov = arma_acf(ar, ma, nobs=nobs+1) + """ + if 'nobs' in kwargs: + lags = kwargs['nobs'] + import warnings + warnings.warn('nobs is deprecated in favor of lags', DeprecationWarning) + # TODO: Should use rank 1 inverse update + apacf = np.zeros(lags) + acov = arma_acf(ar, ma, lags=lags + 1) apacf[0] = 1. - for k in range(2, nobs+1): + for k in range(2, lags + 1): r = acov[:k] - apacf[k-1] = linalg.solve(linalg.toeplitz(r[:-1]), r[1:])[-1] + apacf[k - 1] = linalg.solve(linalg.toeplitz(r[:-1]), r[1:])[-1] return apacf def arma_periodogram(ar, ma, worN=None, whole=0): - '''periodogram for ARMA process given by lag-polynomials ar and ma + """ + Periodogram for ARMA process given by lag-polynomials ar and ma Parameters ---------- @@ -257,18 +237,18 @@ def arma_periodogram(ar, ma, worN=None, whole=0): This uses signal.freqz, which does not use fft. There is a fft version somewhere. - - ''' + """ w, h = signal.freqz(ma, ar, worN=worN, whole=whole) - sd = np.abs(h)**2/np.sqrt(2*np.pi) - if np.sum(np.isnan(h)) > 0: + sd = np.abs(h) ** 2 / np.sqrt(2 * np.pi) + if np.any(np.isnan(h)): # this happens with unit root or seasonal unit root' print('Warning: nan in frequency response h, maybe a unit root') return w, sd -def arma_impulse_response(ar, ma, nobs=100): - '''get the impulse response function (MA representation) for ARMA process +def arma_impulse_response(ar, ma, leads=100, **kwargs): + """ + Get the impulse response function (MA representation) for ARMA process Parameters ---------- @@ -276,7 +256,7 @@ def arma_impulse_response(ar, ma, nobs=100): moving average lag polynomial ar : array_like, 1d auto regressive lag polynomial - nobs : int + leads : int number of observations to calculate Returns @@ -290,8 +270,8 @@ def arma_impulse_response(ar, ma, nobs=100): By reversing the role of ar and ma in the function arguments, the returned result is the AR representation of an ARMA(p,q), i.e - ma_representation = arma_impulse_response(ar, ma, nobs=100) - ar_representation = arma_impulse_response(ma, ar, nobs=100) + ma_representation = arma_impulse_response(ar, ma, leads=100) + ar_representation = arma_impulse_response(ma, ar, leads=100) fully tested against matlab @@ -299,7 +279,7 @@ def arma_impulse_response(ar, ma, nobs=100): -------- AR(1) - >>> arma_impulse_response([1.0, -0.8], [1.], nobs=10) + >>> arma_impulse_response([1.0, -0.8], [1.], leads=10) array([ 1. , 0.8 , 0.64 , 0.512 , 0.4096 , 0.32768 , 0.262144 , 0.2097152 , 0.16777216, 0.13421773]) @@ -311,27 +291,61 @@ def arma_impulse_response(ar, ma, nobs=100): MA(2) - >>> arma_impulse_response([1.0], [1., 0.5, 0.2], nobs=10) + >>> arma_impulse_response([1.0], [1., 0.5, 0.2], leads=10) array([ 1. , 0.5, 0.2, 0. , 0. , 0. , 0. , 0. , 0. , 0. ]) ARMA(1,2) - >>> arma_impulse_response([1.0, -0.8], [1., 0.5, 0.2], nobs=10) + >>> arma_impulse_response([1.0, -0.8], [1., 0.5, 0.2], leads=10) array([ 1. , 1.3 , 1.24 , 0.992 , 0.7936 , 0.63488 , 0.507904 , 0.4063232 , 0.32505856, 0.26004685]) - ''' - impulse = np.zeros(nobs) + """ + if 'nobs' in kwargs: + leads = kwargs['nobs'] + import warnings + warnings.warn(DeprecationWarning, 'nobs is deprecated in favor of leads') + impulse = np.zeros(leads) impulse[0] = 1. return signal.lfilter(ma, ar, impulse) -#alias, easier to remember -arma2ma = arma_impulse_response +def arma2ma(ar, ma, lags=100, **kwargs): + """ + Get the MA representation of an ARMA process + Parameters + ---------- + ar : array_like, 1d + auto regressive lag polynomial + ma : array_like, 1d + moving average lag polynomial + lags : int + number of coefficients to calculate + + Returns + ------- + ar : array, 1d + coefficients of AR lag polynomial with nobs elements + + Notes + ----- + Equivalent to ``arma_impulse_response(ma, ar, leads=100)`` + + + Examples + -------- + """ + if 'nobs' in kwargs: + lags = kwargs['nobs'] + import warnings + warnings.warn('nobs is deprecated in favor of lags', DeprecationWarning) + + return arma_impulse_response(ar, ma, leads=lags) -#alias, easier to remember -def arma2ar(ar, ma, nobs=100): - '''get the AR representation of an ARMA process + +def arma2ar(ar, ma, lags=100, **kwargs): + """ + Get the AR representation of an ARMA process Parameters ---------- @@ -339,8 +353,8 @@ def arma2ar(ar, ma, nobs=100): auto regressive lag polynomial ma : array_like, 1d moving average lag polynomial - nobs : int - number of observations to calculate + lags : int + number of coefficients to calculate Returns ------- @@ -349,20 +363,23 @@ def arma2ar(ar, ma, nobs=100): Notes ----- - This is just an alias for - ``ar_representation = arma_impulse_response(ma, ar, nobs=100)`` which has - been fully tested against MATLAB. + Equivalent to ``arma_impulse_response(ma, ar, leads=100)`` + Examples -------- - - ''' - return arma_impulse_response(ma, ar, nobs=nobs) + """ + if 'nobs' in kwargs: + lags = kwargs['nobs'] + import warnings + warnings.warn('nobs is deprecated in favor of lags', DeprecationWarning) + return arma_impulse_response(ma, ar, leads=lags) -#moved from sandbox.tsa.try_fi +# moved from sandbox.tsa.try_fi def ar2arma(ar_des, p, q, n=20, mse='ar', start=None): - '''find arma approximation to ar process + """ + Find arma approximation to ar process This finds the ARMA(p,q) coefficients that minimize the integrated squared difference between the impulse_response functions @@ -375,10 +392,12 @@ def ar2arma(ar_des, p, q, n=20, mse='ar', start=None): ---------- ar_des : array_like coefficients of original AR lag polynomial, including lag zero - p, q : int - length of desired ARMA lag polynomials + p : int + length of desired AR lag polynomials + q : int + length of desired MA lag polynomials n : int - number of terms of the impuls_response function to include in the + number of terms of the impulse_response function to include in the objective function for the approximation mse : string, 'ar' not used yet, @@ -394,35 +413,32 @@ def ar2arma(ar_des, p, q, n=20, mse='ar', start=None): ----- Extension is possible if we want to match autocovariance instead of impulse response function. + """ - TODO: convert MA lag polynomial, ma_app, to be invertible, by mirroring - roots outside the unit intervall to ones that are inside. How do we do - this? + # TODO: convert MA lag polynomial, ma_app, to be invertible, by mirroring + # TODO: roots outside the unit intervall to ones that are inside. How do we do + # TODO: this? - ''' - #p,q = pq + # p,q = pq def msear_err(arma, ar_des): - ar, ma = np.r_[1, arma[:p-1]], np.r_[1, arma[p-1:]] + ar, ma = np.r_[1, arma[:p - 1]], np.r_[1, arma[p - 1:]] ar_approx = arma_impulse_response(ma, ar, n) -## print(ar,ma) -## print(ar_des.shape, ar_approx.shape) -## print(ar_des) -## print(ar_approx) return (ar_des - ar_approx) # ((ar - ar_approx)**2).sum() + if start is None: - arma0 = np.r_[-0.9 * np.ones(p-1), np.zeros(q-1)] + arma0 = np.r_[-0.9 * np.ones(p - 1), np.zeros(q - 1)] else: arma0 = start res = optimize.leastsq(msear_err, arma0, ar_des, maxfev=5000) - #print(res) arma_app = np.atleast_1d(res[0]) - ar_app = np.r_[1, arma_app[:p-1]], - ma_app = np.r_[1, arma_app[p-1:]] + ar_app = np.r_[1, arma_app[:p - 1]], + ma_app = np.r_[1, arma_app[p - 1:]] return ar_app, ma_app, res def lpol2index(ar): - '''remove zeros from lagpolynomial, squeezed representation with index + """ + Remove zeros from lagpolynomial Parameters ---------- @@ -435,7 +451,7 @@ def lpol2index(ar): non-zero coefficients of lag polynomial index : array index (lags) of lagpolynomial with non-zero elements - ''' + """ ar = np.asarray(ar) index = np.nonzero(ar)[0] coeffs = ar[index] @@ -443,7 +459,8 @@ def lpol2index(ar): def index2lpol(coeffs, index): - '''expand coefficients to lag poly + """ + Expand coefficients to lag poly Parameters ---------- @@ -451,24 +468,20 @@ def index2lpol(coeffs, index): non-zero coefficients of lag polynomial index : array index (lags) of lagpolynomial with non-zero elements - ar : array_like - coefficients of lag polynomial Returns ------- ar : array_like coefficients of lag polynomial - - ''' + """ n = max(index) - ar = np.zeros(n) + ar = np.zeros(n + 1) ar[index] = coeffs return ar -#moved from sandbox.tsa.try_fi def lpol_fima(d, n=20): - '''MA representation of fractional integration + """MA representation of fractional integration .. math:: (1-L)^{-d} for |d|<0.5 or |d|<1 (?) @@ -484,16 +497,16 @@ def lpol_fima(d, n=20): ma : array coefficients of lag polynomial - ''' - #hide import inside function until we use this heavily + """ + # hide import inside function until we use this heavily from scipy.special import gammaln j = np.arange(n) - return np.exp(gammaln(d+j) - gammaln(j+1) - gammaln(d)) + return np.exp(gammaln(d + j) - gammaln(j + 1) - gammaln(d)) -#moved from sandbox.tsa.try_fi +# moved from sandbox.tsa.try_fi def lpol_fiar(d, n=20): - '''AR representation of fractional integration + """AR representation of fractional integration .. math:: (1-L)^{d} for |d|<0.5 or |d|<1 (?) @@ -512,18 +525,18 @@ def lpol_fiar(d, n=20): Notes: first coefficient is 1, negative signs except for first term, ar(L)*x_t - ''' - #hide import inside function until we use this heavily + """ + # hide import inside function until we use this heavily from scipy.special import gammaln j = np.arange(n) - ar = - np.exp(gammaln(-d+j) - gammaln(j+1) - gammaln(-d)) + ar = - np.exp(gammaln(-d + j) - gammaln(j + 1) - gammaln(-d)) ar[0] = 1 return ar -#moved from sandbox.tsa.try_fi +# moved from sandbox.tsa.try_fi def lpol_sdiff(s): - '''return coefficients for seasonal difference (1-L^s) + """return coefficients for seasonal difference (1-L^s) just a trivial convenience function @@ -536,8 +549,8 @@ def lpol_sdiff(s): ------- sdiff : list, length s+1 - ''' - return [1] + [0]*(s-1) + [-1] + """ + return [1] + [0] * (s - 1) + [-1] def deconvolve(num, den, n=None): @@ -580,30 +593,27 @@ def deconvolve(num, den, n=None): rem = num else: if n is None: - n = N-D+1 + n = N - D + 1 input = np.zeros(n, float) input[0] = 1 quot = signal.lfilter(num, den, input) num_approx = signal.convolve(den, quot, mode='full') if len(num) < len(num_approx): # 1d only ? - num = np.concatenate((num, np.zeros(len(num_approx)-len(num)))) + num = np.concatenate((num, np.zeros(len(num_approx) - len(num)))) rem = num - num_approx return quot, rem class ArmaProcess(object): - """ - Represent an ARMA process for given lag-polynomials - - This is a class to bring together properties of the process. - It does not do any estimation or statistical analysis. + r""" + Theoretical properties of an ARMA process for specified lag-polynomials Parameters ---------- - ar : array_like, 1d + ar : array_like, 1d, optional Coefficient for autoregressive lag polynomial, including zero lag. See the notes for some information about the sign. - ma : array_like, 1d + ma : array_like, 1d, optional Coefficient for moving-average lag polynomial, including zero lag nobs : int, optional Length of simulated time series. Used, for example, if a sample is @@ -611,11 +621,26 @@ class ArmaProcess(object): Notes ----- - As mentioned above, both the AR and MA components should include the - coefficient on the zero-lag. This is typically 1. Further, due to the - conventions used in signal processing used in signal.lfilter vs. - conventions in statistics for ARMA processes, the AR paramters should - have the opposite sign of what you might expect. See the examples below. + Both the AR and MA components must include the coefficient on the + zero-lag. In almost all cases these values should be 1. Further, due to + using the lag-polynomial representation, the AR paramters should + have the opposite sign of what one would write in the ARMA representaion. + See the examples below. + + The ARMA(p,q) process is described by + + .. math:: + + y_{t}=\phi_{1}y_{t-1}+\ldots+\phi_{p}y_{t-p}+\theta_{1}\epsilon_{t-1} + +\ldots+\theta_{q}\epsilon_{t-q}+\epsilon_{t} + + and the parameterization used in this function uses the lag-polynomial + representation, + + .. math:: + + \left(1-\phi_{1}L-\ldots-\phi_{p}L^{p}\right)y_{t} = + \left(1-\theta_{1}L-\ldots-\theta_{q}L^{q}\right) Examples -------- @@ -635,8 +660,13 @@ class ArmaProcess(object): >>> model.params array([ 0.79044189, -0.23140636, 0.70072904, 0.40608028]) """ - # maybe needs special handling for unit roots - def __init__(self, ar, ma, nobs=100): + + # TODO: Check unit root behavior + def __init__(self, ar=None, ma=None, nobs=100): + if ar is None: + ar = np.array([1.]) + if ma is None: + ma = np.array([1.]) self.ar = np.asarray(ar) self.ma = np.asarray(ma) self.arcoefs = -self.ar[1:] @@ -646,29 +676,44 @@ def __init__(self, ar, ma, nobs=100): self.nobs = nobs @classmethod - def from_coeffs(cls, arcoefs, macoefs, nobs=100): + def from_coeffs(cls, arcoefs=None, macoefs=None, nobs=100): """ - Create ArmaProcess instance from coefficients of the lag-polynomials + Convenience function to create ArmaProcess from ARMA representation Parameters ---------- - arcoefs : array-like + arcoefs : array-like, optional Coefficient for autoregressive lag polynomial, not including zero lag. The sign is inverted to conform to the usual time series representation of an ARMA process in statistics. See the class docstring for more information. - macoefs : array-like - Coefficient for moving-average lag polynomial, including zero lag + macoefs : array-like, optional + Coefficient for moving-average lag polynomial, excluding zero lag nobs : int, optional Length of simulated time series. Used, for example, if a sample is generated. + + Examples + -------- + >>> arparams = [.75, -.25] + >>> maparams = [.65, .35] + >>> arma_process = sm.tsa.ArmaProcess.from_coeffs(ar, ma) + >>> arma_process.isstationary + True + >>> arma_process.isinvertible + True """ - return cls(np.r_[1, -arcoefs], np.r_[1, macoefs], nobs=nobs) + arcoefs = [] if arcoefs is None else arcoefs + macoefs = [] if macoefs is None else macoefs + return cls(np.r_[1, -np.asarray(arcoefs)], + np.r_[1, np.asarray(macoefs)], + nobs=nobs) @classmethod def from_estimation(cls, model_results, nobs=None): """ - Create ArmaProcess instance from ARMA estimation results + Convenience function to create an ArmaProcess from the results + of an ARMA estimation Parameters ---------- @@ -694,18 +739,18 @@ def __mul__(self, oth): ar = (self.arpoly * arpolyoth).coef ma = (self.mapoly * mapolyoth).coef except: - print('other is not a valid type') - raise + raise TypeError('Other type is not a valid type') return self.__class__(ar, ma, nobs=self.nobs) def __repr__(self): - return 'ArmaProcess(%r, %r, nobs=%d)' % (self.ar.tolist(), - self.ma.tolist(), - self.nobs) + return 'ArmaProcess({0}, {1}, nobs={2}) at {3}'.format(self.ar.tolist(), + self.ma.tolist(), + self.nobs, + hex(id(self))) def __str__(self): - return 'ArmaProcess\nAR: %r\nMA: %r' % (self.ar.tolist(), - self.ma.tolist()) + return 'ArmaProcess\nAR: {0}\nMA: {1}'.format(self.ar.tolist(), + self.ma.tolist()) def acovf(self, nobs=None): nobs = nobs or self.nobs @@ -713,15 +758,15 @@ def acovf(self, nobs=None): acovf.__doc__ = arma_acovf.__doc__ - def acf(self, nobs=None): - nobs = nobs or self.nobs - return arma_acf(self.ar, self.ma, nobs=nobs) + def acf(self, lags=None): + lags = lags or self.nobs + return arma_acf(self.ar, self.ma, lags=lags) acf.__doc__ = arma_acf.__doc__ - def pacf(self, nobs=None): - nobs = nobs or self.nobs - return arma_pacf(self.ar, self.ma, nobs=nobs) + def pacf(self, lags=None): + lags = lags or self.nobs + return arma_pacf(self.ar, self.ma, lags=lags) pacf.__doc__ = arma_pacf.__doc__ @@ -731,68 +776,67 @@ def periodogram(self, nobs=None): periodogram.__doc__ = arma_periodogram.__doc__ - def impulse_response(self, nobs=None): - nobs = nobs or self.nobs - return arma_impulse_response(self.ar, self.ma, worN=nobs) + def impulse_response(self, leads=None): + leads = leads or self.nobs + return arma_impulse_response(self.ar, self.ma, leads=leads) impulse_response.__doc__ = arma_impulse_response.__doc__ - def arma2ma(self, nobs=None): - nobs = nobs or self.nobs - return arma2ma(self.ar, self.ma, nobs=nobs) + def arma2ma(self, lags=None): + lags = lags or self.lags + return arma2ma(self.ar, self.ma, lags=lags) arma2ma.__doc__ = arma2ma.__doc__ - def arma2ar(self, nobs=None): - nobs = nobs or self.nobs - return arma2ar(self.ar, self.ma, nobs=nobs) + def arma2ar(self, lags=None): + lags = lags or self.lags + return arma2ar(self.ar, self.ma, lags=lags) arma2ar.__doc__ = arma2ar.__doc__ @property def arroots(self): - """ - Roots of autoregressive lag-polynomial - """ + """Roots of autoregressive lag-polynomial""" return self.arpoly.roots() @property def maroots(self): - """ - Roots of moving average lag-polynomial - """ + """Roots of moving average lag-polynomial""" return self.mapoly.roots() @property def isstationary(self): - '''Arma process is stationary if AR roots are outside unit circle + """ + Arma process is stationary if AR roots are outside unit circle Returns ------- isstationary : boolean True if autoregressive roots are outside unit circle - ''' - if np.all(np.abs(self.arroots) > 1): + """ + if np.all(np.abs(self.arroots) > 1.0): return True else: return False @property def isinvertible(self): - '''Arma process is invertible if MA roots are outside unit circle + """ + Arma process is invertible if MA roots are outside unit circle Returns ------- isinvertible : boolean True if moving average roots are outside unit circle - ''' + """ if np.all(np.abs(self.maroots) > 1): return True else: return False def invertroots(self, retnew=False): - '''make MA polynomial invertible by inverting roots inside unit circle + """ + Make MA polynomial invertible by inverting roots inside unit circle Parameters ---------- @@ -810,33 +854,34 @@ def invertroots(self, retnew=False): armaprocess : new instance of class If retnew is true, then return a new instance with invertible MA-polynomial - ''' - #TODO: variable returns like this? - pr = self.ma_roots() - insideroots = np.abs(pr) < 1 - if insideroots.any(): - pr[np.abs(pr) < 1] = 1./pr[np.abs(pr) < 1] + """ + # TODO: variable returns like this? + pr = self.maroots + mainv = self.ma + invertible = self.isinvertible + if not invertible: + pr[np.abs(pr) < 1] = 1. / pr[np.abs(pr) < 1] pnew = np.polynomial.Polynomial.fromroots(pr) - mainv = pnew.coef/pnew.coef[0] - wasinvertible = False - else: - mainv = self.ma - wasinvertible = True + mainv = pnew.coef / pnew.coef[0] + if retnew: return self.__class__(self.ar, mainv, nobs=self.nobs) else: - return mainv, wasinvertible + return mainv, invertible def generate_sample(self, nsample=100, scale=1., distrvs=None, axis=0, burnin=0): - '''generate ARMA samples + """ + Simulate an ARMA Parameters ---------- nsample : int or tuple of ints If nsample is an integer, then this creates a 1d timeseries of - length size. If nsample is a tuple, then the timeseries is along - axis. All other axis have independent arma samples. + length size. If nsample is a tuple, creates a len(nsample) + dimensional time series where time is indexed along the input + variable ``axis``. All series are unless ``distrvs`` generates + dependent data. scale : float standard deviation of noise distrvs : function, random number generator @@ -859,24 +904,23 @@ def generate_sample(self, nsample=100, scale=1., distrvs=None, axis=0, ----- Should work for n-dimensional with time series along axis, but not tested yet. Processes are sampled independently. - ''' + """ if distrvs is None: distrvs = np.random.normal if np.ndim(nsample) == 0: - nsample = [nsample] + nsample = [nsample] if burnin: - #handle burin time for nd arrays - #maybe there is a better trick in scipy.fft code + # handle burin time for nd arrays + # maybe there is a better trick in scipy.fft code newsize = list(nsample) newsize[axis] += burnin newsize = tuple(newsize) - fslice = [slice(None)]*len(newsize) + fslice = [slice(None)] * len(newsize) fslice[axis] = slice(burnin, None, None) fslice = tuple(fslice) else: newsize = tuple(nsample) - fslice = tuple([slice(None)]*np.ndim(newsize)) - + fslice = tuple([slice(None)] * np.ndim(newsize)) eta = scale * distrvs(size=newsize) return signal.lfilter(self.ma, self.ar, eta, axis=axis)[fslice] @@ -884,148 +928,3 @@ def generate_sample(self, nsample=100, scale=1., distrvs=None, axis=0, __all__ = ['arma_acf', 'arma_acovf', 'arma_generate_sample', 'arma_impulse_response', 'arma2ar', 'arma2ma', 'deconvolve', 'lpol2index', 'index2lpol'] - - -if __name__ == '__main__': - - - # Simulate AR(1) - #-------------- - # ar * y = ma * eta - ar = [1, -0.8] - ma = [1.0] - - # generate AR data - eta = 0.1 * np.random.randn(1000) - yar1 = signal.lfilter(ar, ma, eta) - - print("\nExample 0") - arest = ARIMAProcess(yar1) - rhohat, cov_x, infodict, mesg, ier = arest.fit((1,0,1)) - print(rhohat) - print(cov_x) - - print("\nExample 1") - ar = [1.0, -0.8] - ma = [1.0, 0.5] - y1 = arest.generate_sample(ar,ma,1000,0.1) - arest = ARIMAProcess(y1) - rhohat1, cov_x1, infodict, mesg, ier = arest.fit((1,0,1)) - print(rhohat1) - print(cov_x1) - err1 = arest.errfn(x=y1) - print(np.var(err1)) - import statsmodels.api as sm - print(sm.regression.yule_walker(y1, order=2, inv=True)) - - print("\nExample 2") - nsample = 1000 - ar = [1.0, -0.6, -0.1] - ma = [1.0, 0.3, 0.2] - y2 = ARIMA.generate_sample(ar,ma,nsample,0.1) - arest2 = ARIMAProcess(y2) - rhohat2, cov_x2, infodict, mesg, ier = arest2.fit((1,0,2)) - print(rhohat2) - print(cov_x2) - err2 = arest.errfn(x=y2) - print(np.var(err2)) - print(arest2.rhoy) - print(arest2.rhoe) - print("true") - print(ar) - print(ma) - rhohat2a, cov_x2a, infodict, mesg, ier = arest2.fit((2,0,2)) - print(rhohat2a) - print(cov_x2a) - err2a = arest.errfn(x=y2) - print(np.var(err2a)) - print(arest2.rhoy) - print(arest2.rhoe) - print("true") - print(ar) - print(ma) - - print(sm.regression.yule_walker(y2, order=2, inv=True)) - - print("\nExample 20") - nsample = 1000 - ar = [1.0]#, -0.8, -0.4] - ma = [1.0, 0.5, 0.2] - y3 = ARIMA.generate_sample(ar,ma,nsample,0.01) - arest20 = ARIMAProcess(y3) - rhohat3, cov_x3, infodict, mesg, ier = arest20.fit((2,0,0)) - print(rhohat3) - print(cov_x3) - err3 = arest20.errfn(x=y3) - print(np.var(err3)) - print(np.sqrt(np.dot(err3,err3)/nsample)) - print(arest20.rhoy) - print(arest20.rhoe) - print("true") - print(ar) - print(ma) - - rhohat3a, cov_x3a, infodict, mesg, ier = arest20.fit((0,0,2)) - print(rhohat3a) - print(cov_x3a) - err3a = arest20.errfn(x=y3) - print(np.var(err3a)) - print(np.sqrt(np.dot(err3a,err3a)/nsample)) - print(arest20.rhoy) - print(arest20.rhoe) - print("true") - print(ar) - print(ma) - - print(sm.regression.yule_walker(y3, order=2, inv=True)) - - print("\nExample 02") - nsample = 1000 - ar = [1.0, -0.8, 0.4] #-0.8, -0.4] - ma = [1.0]#, 0.8, 0.4] - y4 = ARIMA.generate_sample(ar,ma,nsample) - arest02 = ARIMAProcess(y4) - rhohat4, cov_x4, infodict, mesg, ier = arest02.fit((2,0,0)) - print(rhohat4) - print(cov_x4) - err4 = arest02.errfn(x=y4) - print(np.var(err4)) - sige = np.sqrt(np.dot(err4,err4)/nsample) - print(sige) - print(sige * np.sqrt(np.diag(cov_x4))) - print(np.sqrt(np.diag(cov_x4))) - print(arest02.rhoy) - print(arest02.rhoe) - print("true") - print(ar) - print(ma) - - rhohat4a, cov_x4a, infodict, mesg, ier = arest02.fit((0,0,2)) - print(rhohat4a) - print(cov_x4a) - err4a = arest02.errfn(x=y4) - print(np.var(err4a)) - sige = np.sqrt(np.dot(err4a,err4a)/nsample) - print(sige) - print(sige * np.sqrt(np.diag(cov_x4a))) - print(np.sqrt(np.diag(cov_x4a))) - print(arest02.rhoy) - print(arest02.rhoe) - print("true") - print(ar) - print(ma) - import statsmodels.api as sm - print(sm.regression.yule_walker(y4, order=2, method='mle', inv=True)) - - - import matplotlib.pyplot as plt - plt.plot(arest2.forecast()[-100:]) - #plt.show() - - ar1, ar2 = ([1, -0.4], [1, 0.5]) - ar2 = [1, -1] - lagpolyproduct = np.convolve(ar1, ar2) - print(deconvolve(lagpolyproduct, ar2, n=None)) - print(signal.deconvolve(lagpolyproduct, ar2)) - print(deconvolve(lagpolyproduct, ar2, n=10)) - diff --git a/statsmodels/tsa/tests/test_arima_process.py b/statsmodels/tsa/tests/test_arima_process.py index 881f1ba7c24..dda707e8265 100644 --- a/statsmodels/tsa/tests/test_arima_process.py +++ b/statsmodels/tsa/tests/test_arima_process.py @@ -1,100 +1,109 @@ - from statsmodels.compat.python import range + +from distutils.version import LooseVersion + +from statsmodels.tsa.arima_model import ARMA +from unittest import TestCase + import numpy as np from numpy.testing import (assert_array_almost_equal, assert_almost_equal, - assert_equal) - + assert_equal, assert_raises, assert_, dec) from statsmodels.tsa.arima_process import (arma_generate_sample, arma_acovf, - arma_acf, arma_impulse_response, lpol_fiar, lpol_fima) + arma_acf, arma_impulse_response, lpol_fiar, lpol_fima, + ArmaProcess, lpol2index, index2lpol) from statsmodels.sandbox.tsa.fftarma import ArmaFft -from .results.results_process import armarep #benchmarkdata +from statsmodels.tsa.tests.results.results_process import armarep # benchmarkdata arlist = [[1.], - [1, -0.9], #ma representation will need many terms to get high precision - [1, 0.9], + [1, -0.9], # ma representation will need many terms to get high precision + [1, 0.9], [1, -0.9, 0.3]] malist = [[1.], - [1, 0.9], + [1, 0.9], [1, -0.9], - [1, 0.9, -0.3]] + [1, 0.9, -0.3]] + +DECIMAL_4 = 4 +NP16 = LooseVersion(np.__version__) < '1.7' def test_arma_acovf(): # Check for specific AR(1) - N = 20; - phi = 0.9; - sigma = 1; + N = 20 + phi = 0.9 + sigma = 1 # rep 1: from module function - rep1 = arma_acovf([1, -phi], [1], N); + rep1 = arma_acovf([1, -phi], [1], N) # rep 2: manually - rep2 = [1.*sigma*phi**i/(1-phi**2) for i in range(N)]; - assert_almost_equal(rep1, rep2, 7); # 7 is max precision here + rep2 = [1. * sigma * phi ** i / (1 - phi ** 2) for i in range(N)] + assert_almost_equal(rep1, rep2, 7) # 7 is max precision here def test_arma_acf(): # Check for specific AR(1) - N = 20; - phi = 0.9; - sigma = 1; + N = 20 + phi = 0.9 + sigma = 1 # rep 1: from module function - rep1 = arma_acf([1, -phi], [1], N); + rep1 = arma_acf([1, -phi], [1], N) # rep 2: manually - acovf = np.array([1.*sigma*phi**i/(1-phi**2) for i in range(N)]) - rep2 = acovf / (1./(1-phi**2)); - assert_almost_equal(rep1, rep2, 8); # 8 is max precision here + acovf = np.array([1. * sigma * phi ** i / (1 - phi ** 2) for i in range(N)]) + rep2 = acovf / (1. / (1 - phi ** 2)) + assert_almost_equal(rep1, rep2, 8) # 8 is max precision here def _manual_arma_generate_sample(ar, ma, eta): - T = len(eta); - ar = ar[::-1]; - ma = ma[::-1]; - p,q = len(ar), len(ma); - rep2 = [0]*max(p,q); # initialize with zeroes + T = len(eta) + ar = ar[::-1] + ma = ma[::-1] + p, q = len(ar), len(ma) + rep2 = [0] * max(p, q) # initialize with zeroes for t in range(T): - yt = eta[t]; + yt = eta[t] if p: - yt += np.dot(rep2[-p:], ar); + yt += np.dot(rep2[-p:], ar) if q: # left pad shocks with zeros - yt += np.dot([0]*(q-t) + list(eta[max(0,t-q):t]), ma); - rep2.append(yt); - return np.array(rep2[max(p,q):]); + yt += np.dot([0] * (q - t) + list(eta[max(0, t - q):t]), ma) + rep2.append(yt) + return np.array(rep2[max(p, q):]) + def test_arma_generate_sample(): # Test that this generates a true ARMA process # (amounts to just a test that scipy.signal.lfilter does what we want) - T = 100; + T = 100 dists = [np.random.randn] for dist in dists: - np.random.seed(1234); - eta = dist(T); + np.random.seed(1234) + eta = dist(T) for ar in arlist: for ma in malist: # rep1: from module function - np.random.seed(1234); - rep1 = arma_generate_sample(ar, ma, T, distrvs=dist); + np.random.seed(1234) + rep1 = arma_generate_sample(ar, ma, T, distrvs=dist) # rep2: "manually" create the ARMA process - ar_params = -1*np.array(ar[1:]); - ma_params = np.array(ma[1:]); + ar_params = -1 * np.array(ar[1:]) + ma_params = np.array(ma[1:]) rep2 = _manual_arma_generate_sample(ar_params, ma_params, eta) - assert_array_almost_equal(rep1, rep2, 13); + assert_array_almost_equal(rep1, rep2, 13) def test_fi(): - #test identity of ma and ar representation of fi lag polynomial + # test identity of ma and ar representation of fi lag polynomial n = 100 mafromar = arma_impulse_response(lpol_fiar(0.4, n=n), [1], n) assert_array_almost_equal(mafromar, lpol_fima(0.4, n=n), 13) def test_arma_impulse_response(): - arrep = arma_impulse_response(armarep.ma, armarep.ar, nobs=21)[1:] - marep = arma_impulse_response(armarep.ar, armarep.ma, nobs=21)[1:] + arrep = arma_impulse_response(armarep.ma, armarep.ar, leads=21)[1:] + marep = arma_impulse_response(armarep.ar, armarep.ma, leads=21)[1:] assert_array_almost_equal(armarep.marep.ravel(), marep, 14) - #difference in sign convention to matlab for AR term + # difference in sign convention to matlab for AR term assert_array_almost_equal(-armarep.arrep.ravel(), arrep, 14) @@ -106,17 +115,18 @@ def test_spectrum(): arma = ArmaFft(ar, ma, 20) spdr, wr = arma.spdroots(w) spdp, wp = arma.spdpoly(w, 200) - spdd, wd = arma.spddirect(nfreq*2) + spdd, wd = arma.spddirect(nfreq * 2) assert_equal(w, wr) assert_equal(w, wp) assert_almost_equal(w, wd[:nfreq], decimal=14) - assert_almost_equal(spdr, spdp, decimal=7, - err_msg='spdr spdp not equal for %s, %s' % (ar, ma)) assert_almost_equal(spdr, spdd[:nfreq], decimal=7, err_msg='spdr spdd not equal for %s, %s' % (ar, ma)) + assert_almost_equal(spdr, spdp, decimal=7, + err_msg='spdr spdp not equal for %s, %s' % (ar, ma)) + def test_armafft(): - #test other methods + # test other methods nfreq = 20 w = np.linspace(0, np.pi, nfreq, endpoint=False) for ar in arlist: @@ -127,12 +137,189 @@ def test_armafft(): assert_almost_equal(ac1, ac2, decimal=7, err_msg='acovf not equal for %s, %s' % (ar, ma)) +def test_lpol2index_index2lpol(): + process = ArmaProcess([1, 0, 0, -0.8]) + coefs, locs = lpol2index(process.arcoefs) + assert_almost_equal(coefs, [0.8]) + assert_equal(locs, [2]) + + process = ArmaProcess([1, .1, .1, -0.8]) + coefs, locs = lpol2index(process.arcoefs) + assert_almost_equal(coefs, [-.1, -.1, 0.8]) + assert_equal(locs, [0, 1, 2]) + ar = index2lpol(coefs, locs) + assert_equal(process.arcoefs, ar) + + +class TestArmaProcess(TestCase): + def test_empty_coeff(self): + process = ArmaProcess() + assert_equal(process.arcoefs, np.array([])) + assert_equal(process.macoefs, np.array([])) + + process = ArmaProcess([1, -0.8]) + assert_equal(process.arcoefs, np.array([0.8])) + assert_equal(process.macoefs, np.array([])) + + process = ArmaProcess(ma=[1, -0.8]) + assert_equal(process.arcoefs, np.array([])) + assert_equal(process.macoefs, np.array([-0.8])) + + def test_from_coeff(self): + ar = [1.8, -0.9] + ma = [0.3] + process = ArmaProcess.from_coeffs(np.array(ar), np.array(ma)) + + ar.insert(0, -1) + ma.insert(0, -1) + ar_p = -1 * np.array(ar) + ma_p = ma + process_direct = ArmaProcess(ar_p, ma_p) + + assert_equal(process.arcoefs, process_direct.arcoefs) + assert_equal(process.macoefs, process_direct.macoefs) + assert_equal(process.nobs, process_direct.nobs) + assert_equal(process.isinvertible, process_direct.isinvertible) + assert_equal(process.isstationary, process_direct.isstationary) + + def test_from_model(self): + process = ArmaProcess([1, -.8], [1, .3], 1000) + t = 1000 + rs = np.random.RandomState(12345) + y = process.generate_sample(t, burnin=100, distrvs=rs.standard_normal) + res = ARMA(y, (1, 1)).fit(disp=False) + process_model = ArmaProcess.from_estimation(res) + process_coef = ArmaProcess.from_coeffs(res.arparams, res.maparams, t) + + assert_equal(process_model.arcoefs, process_coef.arcoefs) + assert_equal(process_model.macoefs, process_coef.macoefs) + assert_equal(process_model.nobs, process_coef.nobs) + assert_equal(process_model.isinvertible, process_coef.isinvertible) + assert_equal(process_model.isstationary, process_coef.isstationary) + + def test_process_multiplication(self): + process1 = ArmaProcess.from_coeffs([.9]) + process2 = ArmaProcess.from_coeffs([.7]) + process3 = process1 * process2 + assert_equal(process3.arcoefs, np.array([1.6, -0.7 * 0.9])) + assert_equal(process3.macoefs, np.array([])) + + process1 = ArmaProcess.from_coeffs([.9], [.2]) + process2 = ArmaProcess.from_coeffs([.7]) + process3 = process1 * process2 + + assert_equal(process3.arcoefs, np.array([1.6, -0.7 * 0.9])) + assert_equal(process3.macoefs, np.array([0.2])) + + process1 = ArmaProcess.from_coeffs([.9], [.2]) + process2 = process1 * (np.array([1.0, -0.7]), np.array([1.0])) + assert_equal(process2.arcoefs, np.array([1.6, -0.7 * 0.9])) + + assert_raises(TypeError, process1.__mul__, [3]) + + @dec.skipif(NP16) + def test_str_repr(self): + process1 = ArmaProcess.from_coeffs([.9], [.2]) + out = process1.__str__() + print(out) + assert_(out.find('AR: [1.0, -0.9]') != -1) + assert_(out.find('MA: [1.0, 0.2]') != -1) + + out = process1.__repr__() + assert_(out.find('nobs=100') != -1) + assert_(out.find('at ' + str(hex(id(process1)))) != -1) + + def test_acf(self): + process1 = ArmaProcess.from_coeffs([.9]) + acf = process1.acf(10) + expected = np.array(0.9) ** np.arange(10.0) + assert_array_almost_equal(acf, expected) + + acf = process1.acf() + assert_(acf.shape[0] == process1.nobs) + + def test_pacf(self): + process1 = ArmaProcess.from_coeffs([.9]) + pacf = process1.pacf(10) + expected = np.array([1, 0.9] + [0] * 8) + assert_array_almost_equal(pacf, expected) + + pacf = process1.pacf() + assert_(pacf.shape[0] == process1.nobs) + + def test_isstationary(self): + process1 = ArmaProcess.from_coeffs([1.1]) + assert_equal(process1.isstationary, False) + + process1 = ArmaProcess.from_coeffs([1.8, -0.9]) + assert_equal(process1.isstationary, True) + + process1 = ArmaProcess.from_coeffs([1.5, -0.5]) + print(np.abs(process1.arroots)) + assert_equal(process1.isstationary, False) + + def test_arma2ar(self): + process1 = ArmaProcess.from_coeffs([], [0.8]) + vals = process1.arma2ar(100) + assert_almost_equal(vals, (-0.8) ** np.arange(100.0)) + + def test_invertroots(self): + process1 = ArmaProcess.from_coeffs([], [2.5]) + process2 = process1.invertroots(True) + assert_almost_equal(process2.ma, np.array([1.0, 0.4])) + + process1 = ArmaProcess.from_coeffs([], [0.4]) + process2 = process1.invertroots(True) + assert_almost_equal(process2.ma, np.array([1.0, 0.4])) + + process1 = ArmaProcess.from_coeffs([], [2.5]) + roots, invertable = process1.invertroots(False) + assert_equal(invertable, False) + assert_almost_equal(roots, np.array([1, 0.4])) + + def test_generate_sample(self): + process = ArmaProcess.from_coeffs([0.9]) + np.random.seed(12345) + sample = process.generate_sample() + np.random.seed(12345) + expected = np.random.randn(100) + for i in range(1, 100): + expected[i] = 0.9 * expected[i - 1] + expected[i] + assert_almost_equal(sample, expected) + + process = ArmaProcess.from_coeffs([1.6, -0.9]) + np.random.seed(12345) + sample = process.generate_sample() + np.random.seed(12345) + expected = np.random.randn(100) + expected[1] = 1.6 * expected[0] + expected[1] + for i in range(2, 100): + expected[i] = 1.6 * expected[i - 1] - 0.9 * expected[i - 2] + expected[i] + assert_almost_equal(sample, expected) + + process = ArmaProcess.from_coeffs([1.6, -0.9]) + np.random.seed(12345) + sample = process.generate_sample(burnin=100) + np.random.seed(12345) + expected = np.random.randn(200) + expected[1] = 1.6 * expected[0] + expected[1] + for i in range(2, 200): + expected[i] = 1.6 * expected[i - 1] - 0.9 * expected[i - 2] + expected[i] + assert_almost_equal(sample, expected[100:]) + + + np.random.seed(12345) + sample = process.generate_sample(nsample=(100,5)) + assert_equal(sample.shape, (100,5)) + + def test_impulse_response(self): + process = ArmaProcess.from_coeffs([0.9]) + ir = process.impulse_response(10) + assert_almost_equal(ir, 0.9 ** np.arange(10)) + + def test_periodogram(self): + process = ArmaProcess() + pg = process.periodogram() + assert_almost_equal(pg[0], np.linspace(0,np.pi,100,False)) + assert_almost_equal(pg[1], np.sqrt(2 / np.pi) / 2 * np.ones(100)) -if __name__ == '__main__': - test_arma_acovf() - test_arma_acf() - test_arma_generate_sample() - test_fi() - test_arma_impulse_response() - test_spectrum() - test_armafft()