Skip to content

Commit

Permalink
Merge branch 'update_kalman'
Browse files Browse the repository at this point in the history
  • Loading branch information
jstac committed Apr 17, 2015
2 parents 049aa0f + 59a5094 commit 833198d
Show file tree
Hide file tree
Showing 4 changed files with 1,676 additions and 349 deletions.
2 changes: 1 addition & 1 deletion quantecon.egg-info/PKG-INFO
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Metadata-Version: 1.1
Name: quantecon
Version: 0.1.7.post2
Version: 0.1.8
Summary: QuantEcon is a package to support all forms of quantitative economic modelling.
Home-page: https://github.com/QuantEcon/QuantEcon.py
Author: Thomas J. Sargent and John Stachurski (Project coordinators)
Expand Down
145 changes: 60 additions & 85 deletions quantecon/kalman.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,41 +9,38 @@
import numpy as np
from numpy import dot
from scipy.linalg import inv
from .matrix_eqn import solve_discrete_riccati
from quantecon.matrix_eqn import solve_discrete_riccati

class Kalman(object):
r"""
Implements the Kalman filter for the Gaussian state space model
x_{t+1} = A x_t + w_{t+1}
y_t = G x_t + v_t.
x_{t+1} = A x_t + C w_{t+1}
y_t = G x_t + H v_t.
Here x_t is the hidden state and y_t is the
measurement. The shocks w_t and v_t are iid zero
mean Gaussians with covariance matrices Q and R respectively.
Here x_t is the hidden state and y_t is the measurement. The shocks
w_t and v_t are iid standard normals. Below we use the notation
Q := CC'
R := HH'
Parameters
-----------
A : array_like or scalar(float)
The n x n matrix A
Q : array_like or scalar(float)
Q is n x n, symmetric and nonnegative definite
G : array_like or scalar(float)
G is k x n
R : array_like or scalar(float)
R is k x k, symmetric and nonnegative definite
ss : instance of LinearStateSpace
An instance of the quantecon.lss.LinearStateSpace class
x_hat : scalar(float) or array_like(float), optional(default=None)
An n x 1 array representing the mean x_hat and covariance
matrix Sigma of the prior/predictive density. Set to zero if
not supplied.
Sigma : scalar(float) or array_like(float), optional(default=None)
An n x n array representing the covariance matrix Sigma of
the prior/predictive density. Must be positive definite.
Set to the identity if not supplied.
Attributes
----------
A, Q, G, R : see Parameters
k : scalar(int)
Number of rows of G
n : scalar(int)
Number of columns of G
current_Sigma : array_like or scalar(float)
The n x n covariance matrix
current_x_hat : array_like or scalar(float)
The mean of the state
Sigma, x_hat : as above
Sigma_infinity : array_like or scalar(float)
The infinite limit of Sigma_t
K_infinity : array_like or scalar(float)
Expand All @@ -57,12 +54,23 @@ class Kalman(object):
"""

def __init__(self, A, G, Q, R):
self.A, self.G, self.Q, self.R = list(map(self.convert, (A, G, Q, R)))
self.k, self.n = self.G.shape
def __init__(self, ss, x_hat=None, Sigma=None):
self.ss = ss
self.set_state(x_hat, Sigma)
self.K_infinity = None
self.Sigma_infinity = None

def set_state(self, x_hat, Sigma):
if Sigma == None:
Sigma = np.identity(self.ss.n)
else:
self.Sigma = np.atleast_2d(Sigma)
if x_hat == None:
x_hat = np.zeros((self.ss.n, 1))
else:
self.x_hat = np.atleast_2d(x_hat)
self.x_hat.shape = self.ss.n, 1

def __repr__(self):
return self.__str__()

Expand All @@ -72,57 +80,20 @@ def __str__(self):
- dimension of state space : {n}
- dimension of observation equation : {k}
"""
return dedent(m.format(n=self.n, k=self.k))

def convert(self, x):
"""
Convert array_like objects (lists of lists, floats, etc.) into
well formed 2D NumPy arrays
Parameters
----------
x : scalar or array_like(float)
Argument to be converted into a 2D NumPy array
Returns
-------
array_like(float)
A 2D NumPy array
"""
return np.atleast_2d(np.asarray(x, dtype='float32'))

def set_state(self, x_hat, Sigma):
"""
Set the state of the filter (mean and variance of prior
density).
Parameters
----------
x_hat : scalar(float) or array_like(float)
An n x 1 array representing the mean x_hat and covariance
matrix Sigma of the prior/predictive density.
Sigma : scalar(float) or array_like(float)
An n x n array representing the covariance matrix Sigma of
the prior/predictive density. Must be positive definite.
return dedent(m.format(n=self.ss.n, k=self.ss.k))

"""
self.current_Sigma = self.convert(Sigma)
self.current_x_hat = self.convert(x_hat)
self.current_x_hat.shape = self.n, 1

def prior_to_filtered(self, y):
r"""
Updates the moments (x_hat, Sigma) of the time t prior to the
time t filtering distribution, using current measurement y_t.
The updates are according to
x_{hat}^F = x_{hat} + Sigma G' (G Sigma G' + R)^{-1}
(y - G x_{hat})
(y - G x_{hat})
Sigma^F = Sigma - Sigma G' (G Sigma G' + R)^{-1} G
Sigma
Sigma
Parameters
----------
Expand All @@ -131,17 +102,17 @@ def prior_to_filtered(self, y):
"""
# === simplify notation === #
G, R = self.G, self.R
x_hat, Sigma = self.current_x_hat, self.current_Sigma
G, H = self.ss.G, self.ss.H
R = np.dot(H, H.T)

# === and then update === #
y = self.convert(y)
y.shape = self.k, 1
A = dot(Sigma, G.T)
B = dot(dot(G, Sigma), G.T) + R
M = dot(A, inv(B))
self.current_x_hat = x_hat + dot(M, (y - dot(G, x_hat)))
self.current_Sigma = Sigma - dot(M, dot(G, Sigma))
y = np.atleast_2d(y)
y.shape = self.ss.k, 1
E = dot(self.Sigma, G.T)
F = dot(dot(G, self.Sigma), G.T) + R
M = dot(E, inv(F))
self.x_hat = self.x_hat + dot(M, (y - dot(G, self.x_hat)))
self.Sigma = self.Sigma - dot(M, dot(G, self.Sigma))

def filtered_to_forecast(self):
"""
Expand All @@ -151,12 +122,12 @@ def filtered_to_forecast(self):
"""
# === simplify notation === #
A, Q = self.A, self.Q
x_hat, Sigma = self.current_x_hat, self.current_Sigma
A, C = self.ss.A, self.ss.C
Q = np.dot(C, C.T)

# === and then update === #
self.current_x_hat = dot(A, x_hat)
self.current_Sigma = dot(A, dot(Sigma, A.T)) + Q
self.x_hat = dot(A, self.x_hat)
self.Sigma = dot(A, dot(self.Sigma, A.T)) + Q

def update(self, y):
"""
Expand Down Expand Up @@ -188,15 +159,17 @@ def stationary_values(self):
"""
# === simplify notation === #
A, Q, G, R = self.A, self.Q, self.G, self.R
A, C, G, H = self.ss.A, self.ss.C, self.ss.G, self.ss.H
Q, R = np.dot(C, C.T), np.dot(H, H.T)

# === solve Riccati equation, obtain Kalman gain === #
Sigma_infinity = solve_discrete_riccati(A.T, G.T, Q, R)
temp1 = dot(dot(A, Sigma_infinity), G.T)
temp2 = inv(dot(G, dot(Sigma_infinity, G.T)) + R)
K_infinity = dot(temp1, temp2)

# == record as attributes and return == #
self.Sigma_infinity, self.K_infinity = Sigma_infinity, K_infinity

return Sigma_infinity, K_infinity

def stationary_coefficients(self, j, coeff_type='ma'):
Expand All @@ -213,13 +186,13 @@ def stationary_coefficients(self, j, coeff_type='ma'):
moving average or 'var' for VAR.
"""
# == simplify notation == #
A, G = self.A, self.G
A, G = self.ss.A, self.ss.G
K_infinity = self.K_infinity
# == make sure that K_infinity has actually been computed == #
if K_infinity is None:
S, K_infinity = self.stationary_values()
# == compute and return coefficients == #
coeffs = [np.identity(self.k)]
coeffs = [np.identity(self.ss.k)]
i = 1
if coeff_type == 'ma':
P = A
Expand All @@ -235,9 +208,11 @@ def stationary_coefficients(self, j, coeff_type='ma'):

def stationary_innovation_covar(self):
# == simplify notation == #
R, G = self.R, self.G
H, G = self.ss.H, self.ss.G
R = np.dot(H, H.T)
Sigma_infinity = self.Sigma_infinity
# == Make sure that Sigma_infinity has been computed == #

# == make sure that Sigma_infinity has been computed == #
if Sigma_infinity is None:
Sigma_infinity, K = self.stationary_values()
return dot(G, dot(Sigma_infinity, G.T)) + R
19 changes: 12 additions & 7 deletions quantecon/tests/test_kalman.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import unittest
import numpy as np
from numpy.testing import assert_allclose
from quantecon.lss import LinearStateSpace
from quantecon.kalman import Kalman


Expand All @@ -18,12 +19,16 @@ class TestKalman(unittest.TestCase):
def setUp(self):
# Initial Values
self.A = np.array([[.95, 0], [0., .95]])
self.Q = np.eye(2) * 0.5
self.C = np.eye(2) * np.sqrt(0.5)
self.G = np.eye(2) * .5
self.R = np.eye(2) * 0.2
self.H = np.eye(2) * np.sqrt(0.2)

self.kf = Kalman(self.A, self.G, self.Q, self.R)
self.Q = np.dot(self.C, self.C.T)
self.R = np.dot(self.H, self.H.T)

ss = LinearStateSpace(self.A, self.C, self.G, self.H)

self.kf = Kalman(ss)


def tearDown(self):
Expand Down Expand Up @@ -58,8 +63,8 @@ def test_update_using_stationary(self):

kf.update(np.zeros((2, 1)))

assert_allclose(kf.current_Sigma, sig_inf, rtol=1e-4, atol=1e-2)
assert_allclose(kf.current_x_hat.squeeze(), np.zeros(2), rtol=1e-4, atol=1e-2)
assert_allclose(kf.Sigma, sig_inf, rtol=1e-4, atol=1e-2)
assert_allclose(kf.x_hat.squeeze(), np.zeros(2), rtol=1e-4, atol=1e-2)


def test_update_nonstationary(self):
Expand All @@ -79,8 +84,8 @@ def test_update_nonstationary(self):

new_xhat = A.dot(curr_x) + curr_k.dot(y_observed - G.dot(curr_x))

assert_allclose(kf.current_Sigma, new_sigma, rtol=1e-4, atol=1e-2)
assert_allclose(kf.current_x_hat, new_xhat, rtol=1e-4, atol=1e-2)
assert_allclose(kf.Sigma, new_sigma, rtol=1e-4, atol=1e-2)
assert_allclose(kf.x_hat, new_xhat, rtol=1e-4, atol=1e-2)

if __name__ == '__main__':
suite = unittest.TestLoader().loadTestsFromTestCase(TestKalman)
Expand Down

0 comments on commit 833198d

Please sign in to comment.