In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Build a dataset

In [None]:
def draw_states_and_observations(rng_seed: int = 123):
    """Truck example from Wikipedia"""
    
    # Parameters
    sigma_a = 0.01  # Stddev of acceleration
    sigma_obs = 0.1  # Stddev of position observation
    n_steps = 1000
    
    # RNG setup
    rng = np.random.RandomState(rng_seed)
    
    def get_noisy_obs(state):
        v = rng.normal(loc=0.0, scale=sigma_obs, size=())
        obs = state[0] + v
        return obs
    
    # Initial state
    state_history = [np.array([0.0, 0.0])]
    obs_history = [get_noisy_obs(state_history[0])]
    
    for k in range(n_steps):
        old_state = state_history[-1]
        w = rng.normal(loc=0.0, scale=sigma_a, size=()) * np.array([0.5, 1.0])
        new_state = np.array([[1, 1], [0, 0.9]]) @ old_state + w
        state_history.append(new_state)
        obs_history.append(get_noisy_obs(new_state))
    
    return state_history, obs_history

In [None]:
state_history, obs_history = draw_states_and_observations(rng_seed=123)

In [None]:
plt.plot(obs_history, label='Observations')
plt.plot([x for x, _ in state_history], label='Position')
plt.legend()

# Implement the Kalman Filter

In [None]:
class KalmanFilter:
    
    def __init__(
        self,
        initial_state_mean: np.array,
        initial_covariance: np.array,
        state_transition_model: np.array,
        process_noise_covariance: np.array,
        observation_model: np.array,
        observation_noise_covariance: np.array
    ):
        # Test that shapes of arrays are self-consistent
        (d_state,) = initial_state_mean.shape
        assert initial_covariance.shape == (d_state, d_state)
        assert state_transition_model.shape == (d_state, d_state)
        assert process_noise_covariance.shape == (d_state, d_state)
        assert observation_model.shape[1] == d_state
        d_obs = observation_model.shape[0]
        assert observation_noise_covariance.shape == (d_obs, d_obs)
        self._d_obs = d_obs
        self._d_state = d_state
        
        # Use Wikipedia notation internally
        self._F = state_transition_model
        self._H = observation_model
        self._Q = process_noise_covariance
        self._R = observation_noise_covariance
        
        self._x = initial_state_mean
        self._P = initial_covariance
    
    def predict(self) -> np.array:
        """Run a single timestep of the model and return the a priori estimates"""
        self._x = self._F @ self._x
        self._P = self._F @ self._P @ self._F.T + self._Q
        return self._x, self._P
    
    def update(self, observation: np.array) -> np.array:
        """Incorporate the observations and return the a posteriori estimates"""
        assert observation.shape == (self._d_obs,)
        z = observation  # Match Wikipedia notation
        y = z - self._H @ self._x  # Innovation
        S = self._H @ self._P @ self._H.T + self._R  # Innovation covariance
        K = self._P @ self._H.T @ np.linalg.inv(S)  # Optimal Kalman gain
        self._x = self._x + K @ y  # Update the state estimate
        self._P = (np.eye(self._d_state) - K @ self._H) @ self._P  # Update the covariance estimate
        return self._x, self._P

# Use Kalman filter on the 'truck' problem

In [None]:
kf = KalmanFilter(
    initial_state_mean=np.array([0.0, 0.0]),
    initial_covariance=np.zeros((2, 2)),
    state_transition_model=np.array([[1, 1], [0, 0.9]]),
    process_noise_covariance=(0.01**2)*np.array([[0.25, 0.5], [0.5, 1.0]]),
    observation_model=np.array([[1.0, 0.0]]),
    observation_noise_covariance=np.array([[0.1**2]])
)

In [None]:
x_apriori_estimate_history = [np.array([0.0, 0.0])]
x_estimate_history = [np.array([0.0, 0.0])]


for obs in obs_history[1:]:
    x_prior, _ = kf.predict()
    x_apriori_estimate_history.append(x_prior)
    x_post, _ = kf.update(np.array([obs]))
    x_estimate_history.append(x_post)

In [None]:
plt.plot([x for x, _ in state_history], label='True position')
plt.plot([x for x, _ in x_estimate_history], label='Position (posterior) estimate')
plt.legend()

In [None]:
plt.plot([x for x, _ in state_history], '.-', label='True position')
plt.plot([x for x, _ in x_estimate_history], '.-', label='Posterior estimate')
plt.plot([x for x, _ in x_apriori_estimate_history], '.-', label='A priori estimate')
plt.xlim(200, 220)
plt.ylim(-2.5, -1.5)
plt.legend()
plt.show()

In [None]:
def make_moving_avg(xs, n):
    ys = np.array(xs).copy()
    for i in range(1, n):
        ys[i:] += xs[:-i]
    ys[n:] *= (1/n)
    for i in range(n):
        ys[i] *= (1/(i+1))
    return ys

smoothed_obs_2_history = make_moving_avg(obs_history, n=2)
smoothed_obs_3_history = make_moving_avg(obs_history, n=3)

In [None]:
mse_obs = np.array([(t[0] - t[1][0])**2 for t in zip(obs_history, state_history)]).mean()
mse_prior = np.array([(t[0] - t[1][0])**2 for t in zip(obs_history, x_apriori_estimate_history)]).mean()
mse_post = np.array([(t[0] - t[1][0])**2 for t in zip(obs_history, x_estimate_history)]).mean()

print('MSEs:')
print(f'Observations:\t\t{mse_obs:.6f}')
print(f'A priori estimates:\t{mse_prior:.6f} ({(mse_obs - mse_prior)/mse_obs:+.2%} reduction)')
print(f'A posteriori estimates:\t{mse_post:.6f} ({(mse_obs - mse_post)/mse_obs:+.2%} reduction)')

mse_smoothing_2 = np.array([(t[0] - t[1])**2 for t in zip(obs_history, smoothed_obs_2_history)]).mean()
mse_smoothing_3 = np.array([(t[0] - t[1])**2 for t in zip(obs_history, smoothed_obs_3_history)]).mean()

print(f'Moving avg (n=2):\t{mse_smoothing_2:.6f} ({(mse_obs - mse_smoothing_2)/mse_obs:+.2%} reduction)')
print(f'Moving avg (n=3):\t{mse_smoothing_3:.6f} ({(mse_obs - mse_smoothing_3)/mse_obs:+.2%} reduction)')