# Filtre de Kalman

## Cas linéaire

### Simulation trajectoire et observation

In [None]:
from dataclasses import dataclass, field
from typing import Callable

import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm as normal


@dataclass
class ProcParams:
    """Stocke les parametres du processus aleatoire."""

    n_val: int
    delta_t: float
    p_0: np.ndarray
    m_0: np.ndarray = field(default_factory=lambda: np.array([]))
    f_mat: np.ndarray = field(default_factory=lambda: np.array([]))
    c_mat: np.ndarray = field(default_factory=lambda: np.array([]))
    g_mat: np.ndarray = field(default_factory=lambda: np.array([]))

    @property
    def x_size(self) -> int:
        return self.p_0.shape[0]


@dataclass
class ObsParams:
    """Stocke les parametres des observations."""

    r_mat: np.ndarray
    h_mat_or_fun: np.ndarray | Callable[[np.ndarray], np.ndarray] = field(
        default_factory=lambda: np.array([])
    )
    h_jacobian: Callable[[np.ndarray], np.ndarray] = lambda _: np.array([])

    @property
    def y_size(self) -> int:
        return self.r_mat.shape[0]


@dataclass
class PbValues:
    """Stocke les valeurs associes au probleme."""

    x_val: np.ndarray = field(default_factory=lambda: np.array([]))
    y_val: np.ndarray = field(default_factory=lambda: np.array([]))
    x_hat: np.ndarray = field(default_factory=lambda: np.array([]))
    p_hat: np.ndarray = field(default_factory=lambda: np.array([]))


proc_params = ProcParams(
    n_val=2000,
    delta_t=1,
    m_0=np.array([5 * 10**3, -20, 5, 20]),
    p_0=np.diag([(2 * 10**3) ** 2, 20**2, (2 * 10**3) ** 2, 20**2]),
    c_mat=np.diag([2**2, 2**2]),
)

rd = np.random.default_rng(10)

In [None]:
def simu_etat(params: ProcParams) -> tuple[ProcParams, PbValues]:
    values = PbValues(x_val=np.empty(shape=(params.n_val + 1, 4, 1)))

    values.x_val[0] = rd.multivariate_normal(mean=params.m_0, cov=params.p_0, size=1).T

    epsilon: np.ndarray = rd.multivariate_normal(
        mean=[0, 0], cov=params.c_mat, size=params.n_val
    ).reshape((params.n_val, 2, 1))

    params.f_mat = np.array(
        [
            [1, params.delta_t, 0, 0],
            [0, 1, 0, 0],
            [0, 0, 1, params.delta_t],
            [0, 0, 0, 1],
        ]
    )

    params.g_mat = np.array(
        [
            [params.delta_t**2 / 2, 0],
            [params.delta_t, 0],
            [0, params.delta_t**2 / 2],
            [0, params.delta_t],
        ]
    )

    for k in range(params.n_val):
        values.x_val[k + 1] = params.f_mat @ values.x_val[k] + params.g_mat @ epsilon[k]
    return params, values

In [None]:
proc_params, vals = simu_etat(proc_params)

In [None]:
obs_params = ObsParams(
    r_mat=np.diag([8**2, 8**2]), h_mat_or_fun=np.array([[1, 0, 0, 0], [0, 0, 1, 0]])
)

h_mat = np.array([[1, 0, 0, 0], [0, 0, 1, 0]])
r_mat = np.diag([8**2, 8**2])


def simu_obs(params: ProcParams, values: PbValues, obs_par: ObsParams) -> PbValues:
    values.y_val = np.empty(shape=(params.n_val, obs_par.y_size, 1))
    v_val = rd.multivariate_normal(
        mean=np.zeros(shape=(obs_par.y_size,)), cov=obs_par.r_mat, size=params.n_val
    ).reshape((params.n_val, obs_par.y_size, 1))
    for k in range(params.n_val):
        values.y_val[k] = obs_par.h_mat_or_fun @ values.x_val[k] + v_val[k]
    return values

In [None]:
vals = simu_obs(params=proc_params, values=vals, obs_par=obs_params)

### Filtre de Kalman

In [None]:
def kalman_filter(
    params: ProcParams,
    values: PbValues,
    obs_par: ObsParams,
) -> PbValues:
    values.x_hat = np.empty(shape=(params.n_val, params.x_size, 1))
    values.p_hat = np.empty(shape=(params.n_val, params.x_size, params.x_size))
    k_mat = np.empty(shape=(params.n_val, params.x_size, obs_par.y_size))
    h_mat = obs_par.h_mat_or_fun
    assert isinstance(h_mat, np.ndarray)  # nosec
    id_n = np.eye(params.x_size)
    x_pred = params.m_0.reshape((params.x_size, 1))
    p_pred = params.p_0
    for k in range(params.n_val):
        # Correction de l'etape precedente
        k_mat[k] = (
            p_pred
            @ (h_mat.T)
            @ np.linalg.inv(h_mat @ p_pred @ (h_mat.T) + obs_par.r_mat)
        )
        values.x_hat[k] = x_pred + k_mat[k] @ (values.y_val[k] - h_mat @ x_pred)
        values.p_hat[k] = (id_n - k_mat[k] @ h_mat) @ p_pred
        # Prediction
        x_pred = params.f_mat @ values.x_hat[k]
        p_pred = params.f_mat @ values.p_hat[k] @ (
            params.f_mat.T
        ) + params.g_mat @ params.c_mat @ (params.g_mat.T)
    return values

In [None]:
vals = kalman_filter(
    params=proc_params,
    values=vals,
    obs_par=obs_params,
)

### Contrôle des résultats

In [None]:
def show_full_traj(values: PbValues, start: int, end: int) -> None:
    plt.figure(figsize=(15, 15))
    plt.plot(
        values.x_val[start:end, 0, 0],
        values.x_val[start:end, 2, 0],
        label="X",
        color="blue",
    )
    plt.plot(
        values.y_val[start:end, 0, 0],
        values.y_val[start:end, 1, 0],
        label="Y",
        color="orange",
    )
    plt.plot(
        values.x_hat[start:end, 0, 0],
        values.x_hat[start:end, 2, 0],
        label="$\\hat{X}$",
        color="green",
    )
    plt.legend(loc="upper center", shadow=True)
    plt.show()


def show_confidence_interval(
    params: ProcParams, values: PbValues, coord: int, start: int, end: int, name: str
) -> None:
    time = params.delta_t * np.array(range(params.n_val))
    std_975_perc = normal.ppf(0.975)
    std_dev = np.sqrt(values.p_hat[:, coord, coord])
    plt.figure(figsize=(15, 15))
    plt.plot(
        time[start:end],
        values.x_val[start:end, coord, 0],
        color="blue",
        label=f"${name}$",
    )
    plt.plot(
        time[start:end],
        values.x_val[start:end, coord, 0] - std_975_perc * std_dev[start:end],
        color="red",
        label=f"${name}^{{95 \\%}}$",
    )
    plt.plot(
        time[start:end],
        values.x_val[start:end, coord, 0] + std_975_perc * std_dev[start:end],
        color="red",
    )
    plt.plot(
        time[start:end],
        values.x_hat[start:end, coord, 0],
        color="green",
        label=f"$\\hat{{{name}}}$",
    )
    plt.legend(loc="upper center", shadow=True)
    plt.show()

In [None]:
%matplotlib inline
START = 20
END = 90

show_full_traj(values=vals, start=START, end=END)

In [None]:
show_confidence_interval(
    params=proc_params, values=vals, coord=0, start=START, end=END, name="X_0"
)

In [None]:
show_confidence_interval(
    params=proc_params, values=vals, coord=2, start=START, end=END, name="X_1"
)

## Cas non linéaire

### Filtre de Kalman étendu

In [None]:
def extended_kalman_filter(
    params: ProcParams,
    values: PbValues,
    obs_par: ObsParams,
) -> PbValues:
    values.x_hat = np.empty(shape=(params.n_val, params.x_size, 1))
    values.p_hat = np.empty(shape=(params.n_val, params.x_size, params.x_size))
    k_mat = np.empty(shape=(params.n_val, params.x_size, obs_par.y_size))
    id_n = np.eye(params.x_size)
    x_pred = params.m_0.reshape((params.x_size, 1))
    p_pred = params.p_0
    for k in range(params.n_val):
        # Correction de l'etape precedente
        h_mat = obs_par.h_jacobian(x_pred)
        k_mat[k] = (
            p_pred
            @ (h_mat.T)
            @ np.linalg.inv(h_mat @ p_pred @ (h_mat.T) + obs_par.r_mat)
        )
        values.x_hat[k] = x_pred + k_mat[k] @ (
            values.y_val[k] - obs_par.h_mat_or_fun(x_pred)
        )
        values.p_hat[k] = (id_n - k_mat[k] @ h_mat) @ p_pred
        # Prediction
        x_pred = params.f_mat @ values.x_hat[k]
        p_pred = params.f_mat @ values.p_hat[k] @ (
            params.f_mat.T
        ) + params.g_mat @ params.c_mat @ (params.g_mat.T)
    return values

### Simulation observation

In [None]:
def simu_obs2(params: ProcParams, values: PbValues, obs_par: ObsParams) -> PbValues:
    values.y_val = np.empty(shape=(params.n_val, obs_par.y_size, 1))
    v_val = rd.multivariate_normal(
        mean=np.zeros(shape=(obs_par.y_size,)), cov=obs_par.r_mat, size=params.n_val
    ).reshape((params.n_val, obs_par.y_size, 1))
    for k in range(params.n_val):
        values.y_val[k] = obs_par.h_mat_or_fun(values.x_val[k]) + v_val[k]
    return values

In [None]:
def h_fun(x_val: np.ndarray) -> np.ndarray:
    x, y = x_val[0, 0], x_val[2, 0]  # pylint: disable=invalid-name
    return np.array([[x**2 + y**2], [np.arctan2(y, x)]])


def h_jacobian(x_val: np.ndarray) -> np.ndarray:
    x, y = x_val[0, 0], x_val[2, 0]  # pylint: disable=invalid-name
    return np.array(
        [[2 * x, 0, 2 * y, 0], [-y / (x**2 + y**2), 0, x / (x**2 + y**2), 0]]
    )


var_coeff = (2 / np.pi) ** (1 / 2)
var_norm = (20**2 / var_coeff) ** 2
r_mat2 = np.diag([var_norm, (np.pi / 180) ** 2])

obs_params2 = ObsParams(r_mat=r_mat2, h_mat_or_fun=h_fun, h_jacobian=h_jacobian)
vals2 = PbValues(x_val=vals.x_val)

In [None]:
vals2 = simu_obs2(params=proc_params, values=vals2, obs_par=obs_params2)

vals2 = extended_kalman_filter(
    params=proc_params,
    values=vals2,
    obs_par=obs_params2,
)

### Contrôle des résultats

In [None]:
def show_full_traj2(values: PbValues, start: int, end: int) -> None:
    x_norm = np.sqrt(values.y_val[start:end, 0, 0])
    x1_obs = np.cos(values.y_val[start:end, 1, 0]) * x_norm
    x2_obs = np.sin(values.y_val[start:end, 1, 0]) * x_norm
    plt.figure(figsize=(15, 15))
    plt.plot(
        values.x_val[start:end, 0, 0],
        values.x_val[start:end, 2, 0],
        label="X",
        color="blue",
    )
    plt.plot(
        x1_obs,
        x2_obs,
        label="Y",
        color="orange",
    )
    plt.plot(
        values.x_hat[start:end, 0, 0],
        values.x_hat[start:end, 2, 0],
        label="$\\hat{X}$",
        color="green",
    )
    plt.legend(loc="upper center", shadow=True)
    plt.show()

In [None]:
show_full_traj2(values=vals2, start=START, end=END)

In [None]:
show_confidence_interval(
    params=proc_params, values=vals2, coord=0, start=START, end=END, name="X_0"
)

In [None]:
show_confidence_interval(
    params=proc_params, values=vals2, coord=2, start=START, end=END, name="X_1"
)