In [None]:
import numpy as np
import matplotlib.pyplot as plt
from modules._import_helper import safe_import_from

# Import filters
KalmanFilter, constant_velocity_model, position_observation_model = safe_import_from(
    '04_time_series_state_space.src.kalman',
    'KalmanFilter', 'constant_velocity_model', 'position_observation_model'
)

ExtendedKalmanFilter, pendulum_dynamics, angle_observation_model = safe_import_from(
    '04_time_series_state_space.src.ekf',
    'ExtendedKalmanFilter', 'pendulum_dynamics', 'angle_observation_model'
)

ParticleFilter, gaussian_likelihood, create_process_noise_wrapper = safe_import_from(
    '04_time_series_state_space.src.particle_filter',
    'ParticleFilter', 'gaussian_likelihood', 'create_process_noise_wrapper'
)

set_seed = safe_import_from('00_repo_standards.src.mlphys_core', 'set_seed')

set_seed(42)
plt.style.use('seaborn-v0_8-darkgrid')
%matplotlib inline

## 1. Constant Velocity Tracking with Kalman Filter

Track an object moving with constant velocity (linear dynamics).

In [None]:
# Setup
dt = 0.1
n_steps = 100
process_noise = 0.05
obs_noise = 0.5

# Ground truth simulation
F, Q = constant_velocity_model(dt, process_noise)
H, R = position_observation_model(obs_noise)

rng = np.random.default_rng(42)
x_true = np.array([0.0, 1.0])  # Initial: pos=0, vel=1
true_states = []
observations = []

for _ in range(n_steps):
    # True dynamics with process noise
    w = rng.multivariate_normal(np.zeros(2), Q)
    x_true = F @ x_true + w
    true_states.append(x_true.copy())
    
    # Noisy observation
    v = rng.normal(0, obs_noise)
    z = H @ x_true + v
    observations.append(z[0])

true_states = np.array(true_states)
observations = np.array(observations)

print(f"Simulated {n_steps} timesteps")
print(f"Final true position: {true_states[-1, 0]:.2f}")
print(f"Final true velocity: {true_states[-1, 1]:.2f}")

In [None]:
# Run Kalman Filter
kf = KalmanFilter(F, H, Q, R)
kf.initialize(x0=np.array([0, 0]), P0=np.eye(2)*10)  # Uncertain initial guess

estimated_states = []
uncertainties = []

for z in observations:
    kf.predict()
    kf.update(np.array([z]))
    x, P = kf.get_state()
    estimated_states.append(x)
    uncertainties.append(np.sqrt(np.diag(P)))  # Standard deviations

estimated_states = np.array(estimated_states)
uncertainties = np.array(uncertainties)

# Compute RMSE
rmse = np.sqrt(np.mean((true_states - estimated_states)**2))
print(f"\nKalman Filter RMSE: {rmse:.4f}")

In [None]:
# Visualization
fig, axes = plt.subplots(2, 1, figsize=(14, 8))
times = np.arange(n_steps) * dt

# Position
axes[0].plot(times, true_states[:, 0], 'k-', label='True', linewidth=2)
axes[0].plot(times, estimated_states[:, 0], 'r--', label='Estimated (KF)', linewidth=2)
axes[0].scatter(times, observations, c='blue', s=20, alpha=0.3, label='Observations')
axes[0].fill_between(
    times,
    estimated_states[:, 0] - 2*uncertainties[:, 0],
    estimated_states[:, 0] + 2*uncertainties[:, 0],
    alpha=0.2, color='red', label='95% confidence'
)
axes[0].set_ylabel('Position', fontsize=12)
axes[0].legend(fontsize=10)
axes[0].grid(True, alpha=0.3)
axes[0].set_title('Constant Velocity Tracking with Kalman Filter', fontsize=14)

# Velocity
axes[1].plot(times, true_states[:, 1], 'k-', label='True', linewidth=2)
axes[1].plot(times, estimated_states[:, 1], 'r--', label='Estimated (KF)', linewidth=2)
axes[1].fill_between(
    times,
    estimated_states[:, 1] - 2*uncertainties[:, 1],
    estimated_states[:, 1] + 2*uncertainties[:, 1],
    alpha=0.2, color='red'
)
axes[1].set_xlabel('Time (s)', fontsize=12)
axes[1].set_ylabel('Velocity', fontsize=12)
axes[1].legend(fontsize=10)
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print("\n✓ Kalman Filter successfully tracks constant velocity motion!")

## 2. Pendulum Tracking with Extended Kalman Filter

Track a nonlinear pendulum system using EKF.

In [None]:
# Setup pendulum
dt = 0.05
n_steps = 150
g, L = 9.81, 1.0
process_noise = 0.05
obs_noise = 0.1

f, F_jac = pendulum_dynamics(dt, g, L)
h, H_jac = angle_observation_model()
Q = np.eye(2) * process_noise**2
R = np.array([[obs_noise**2]])

# Simulate pendulum
rng = np.random.default_rng(42)
x_true = np.array([0.5, 0.0])  # Start at 0.5 rad, zero velocity
true_states_pend = []
observations_pend = []

for _ in range(n_steps):
    w = rng.multivariate_normal(np.zeros(2), Q)
    x_true = f(x_true, None) + w
    true_states_pend.append(x_true.copy())
    
    v = rng.normal(0, obs_noise)
    z = h(x_true) + v
    observations_pend.append(z[0])

true_states_pend = np.array(true_states_pend)
observations_pend = np.array(observations_pend)

print(f"Simulated pendulum for {n_steps} timesteps")
print(f"Period ≈ {2*np.pi*np.sqrt(L/g):.2f}s (small angle approximation)")

In [None]:
# Run EKF
ekf = ExtendedKalmanFilter(f, h, F_jac, H_jac, Q, R)
ekf.initialize(x0=np.array([0, 0]), P0=np.eye(2)*0.5)  # Start with uncertainty

estimated_states_ekf = []
for z in observations_pend:
    ekf.predict()
    ekf.update(np.array([z]))
    x, _ = ekf.get_state()
    estimated_states_ekf.append(x)

estimated_states_ekf = np.array(estimated_states_ekf)

# Compute RMSE
rmse_ekf = np.sqrt(np.mean((true_states_pend - estimated_states_ekf)**2))
print(f"EKF RMSE: {rmse_ekf:.4f}")

In [None]:
# Visualization
fig, axes = plt.subplots(2, 1, figsize=(14, 8))
times_pend = np.arange(n_steps) * dt

# Angle
axes[0].plot(times_pend, true_states_pend[:, 0], 'k-', label='True', linewidth=2)
axes[0].plot(times_pend, estimated_states_ekf[:, 0], 'r--', label='EKF Estimate', linewidth=2)
axes[0].scatter(times_pend, observations_pend, c='blue', s=15, alpha=0.3, label='Observations')
axes[0].set_ylabel('Angle (rad)', fontsize=12)
axes[0].legend(fontsize=10)
axes[0].grid(True, alpha=0.3)
axes[0].set_title('Pendulum Tracking with Extended Kalman Filter', fontsize=14)

# Angular velocity
axes[1].plot(times_pend, true_states_pend[:, 1], 'k-', label='True', linewidth=2)
axes[1].plot(times_pend, estimated_states_ekf[:, 1], 'r--', label='EKF Estimate', linewidth=2)
axes[1].set_xlabel('Time (s)', fontsize=12)
axes[1].set_ylabel('Angular Velocity (rad/s)', fontsize=12)
axes[1].legend(fontsize=10)
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print("\n✓ EKF successfully tracks nonlinear pendulum dynamics!")

## Key Takeaways

1. **Kalman Filter**: Optimal for linear Gaussian systems (constant velocity)
2. **Extended Kalman Filter**: Handles weak nonlinearity via linearization (pendulum)
3. **Uncertainty Quantification**: Both provide covariance estimates for uncertainty
4. **Fusion**: Filters optimally combine model predictions with noisy measurements

**When to use which**:
- KF: Linear dynamics, Gaussian noise (optimal)
- EKF: Weakly nonlinear, small noise
- PF: Highly nonlinear, non-Gaussian, multimodal (see other notebooks)