## Подготовка к работе

In [None]:
# подключаем необходимые библиотеки
import numpy as np

from scipy import stats
from tqdm import tqdm

## Модель

In [None]:
class ToolTissueModel():
    def __init__(self, \
                 m = 0.04, k0 = 970, b0 = 0.4, \
                 u_params = (0.1, 30), \
                 T = 0.5, num_obs = 1000, pred_per_obs = 10, \
                 init_state = np.array([0, 5, 500, 15]), \
                 noise_params = (1e-2, 5e-1)):
        # параметры системы
        self.m, self.k0, self.b0 = m, k0, b0

        # параметры внешнего воздействия
        assert len(u_params) == 2, '2 params: A, w'
        self.A, self.w = u_params

        # шаги по времени
        self.T = T
        self.num_obs = num_obs
        self.pred_per_obs = pred_per_obs
        self.cont_step = T / (num_obs * pred_per_obs ** 2)
        self.pred_step = T / (num_obs * pred_per_obs)
        self.obs_step = T / num_obs

        # начальное состояние
        self.init_state = init_state

        # параметры шума
        assert len(noise_params) == 2, '2 params: Q, R'
        self.Q, self.R = noise_params

        self.cont_states = None    # скрытые значения с шагом delta1 (для траектории)
        self.filt_states = None    # скрытые значения с шагом delta2 (для фильтра)
        self.observ = None         # наблюдения с шагом delta3

    def u(self, t):
        return self.A * np.sin(self.w * t)

    def f_func(self, x, t, time_step):
        return np.array([
            x[0] + time_step * x[1],
            x[1] + time_step / self.m * \
                (self.k0 * (self.u(t) - x[0]) + \
                 self.b0 * ((self.u(t) - self.u(t - time_step)) / time_step - x[1]) - \
                 x[2] * x[0] - x[3] * x[1]),
            x[2],
            x[3]
        ])

    def h_func(self, x, t):
        return self.k0 * (x[0] - self.u(t))

    def generate_states(self):
        states = [self.init_state]
        state_noises = stats.norm(loc=0, scale=self.Q / self.m * (self.cont_step) ** 0.5).rvs(size=self.num_obs * self.pred_per_obs ** 2)
        for cur_time, noise in tqdm(zip(np.linspace(0, self.T, self.num_obs * self.pred_per_obs ** 2 + 1)[:-1], state_noises)):
            states.append(self.f_func(states[-1], cur_time, self.cont_step) + np.array([0, noise, 0, 0]))
        self.cont_states = np.array(states)
        self.filt_states = np.array(states[::int(self.pred_step / self.cont_step)])

    def generate_states_for_filt(self):
        states = [self.init_state]
        state_noises = stats.norm(loc=0, scale=self.Q / self.m * self.pred_step ** 0.5).rvs(size=self.num_obs * self.pred_per_obs)
        for cur_time, noise in zip(np.linspace(0, self.T, self.num_obs * self.pred_per_obs + 1)[:-1], state_noises):
            states.append(self.f_func(states[-1], cur_time, self.pred_step) + np.array([0, noise, 0, 0]))
        self.filt_states = np.array(states)

    def generate_obs(self):
        assert not (self.filt_states is None), 'No states?'
        obs_noises = stats.norm(loc=0, scale=self.R).rvs(size=self.num_obs)
        obs = []
        for cur_time, noise, state in zip(np.linspace(self.obs_step, self.T, self.num_obs), obs_noises, self.filt_states[self.pred_per_obs::self.pred_per_obs]):
            obs.append(self.h_func(state, cur_time) + noise)
        self.observ = np.array(obs)

In [None]:
model = ToolTissueModel()
model.generate_states()
model.generate_obs()

100000it [00:00, 104880.25it/s]


In [None]:
# для вывода графиков
cont_time_grid = np.linspace(0, model.T, model.num_obs * model.pred_per_obs ** 2 + 1)
filt_time_grid = np.linspace(0, model.T, model.num_obs * model.pred_per_obs + 1)
obs_time_grid = np.linspace(model.obs_step, model.T, model.num_obs)

## Тривиальная оценка

In [None]:
class TrivialFilter():
    def __init__(self, dyn_system, init_mean = np.array([0, 4, 450, 10])):
        self.all_states = [init_mean]
        self.system = dyn_system
        self.T = self.system.T
        self.time_step = self.system.pred_step

        # self.all_cov = [init_cov]

    def step(self, cur_time):
        self.all_states.append(self.system.f_func(self.all_states[-1], cur_time, self.time_step))
        # self.all_cov.append()

    def train(self):
        for cur_time in np.linspace(0, self.T, self.system.num_obs * self.system.pred_per_obs + 1)[:-1]:
            self.step(cur_time)
        self.all_states = np.array(self.all_states)
        # self.all_cov = np.array(self.all_cov)

In [None]:
%time
triv = TrivialFilter(model)
triv.train()

CPU times: user 3 µs, sys: 1e+03 ns, total: 4 µs
Wall time: 6.91 µs


## CKF

In [None]:
class CubatureKalmanFilter():
    def __init__(self, dyn_system, init_state, init_cov):
        self.dyn_system = dyn_system
        self.time_step = self.dyn_system.pred_step
        self.all_states = [init_state]
        self.all_cov = [init_cov]
        self.all_obs = self.dyn_system.observ
        self.coef = -self.time_step / self.dyn_system.m
        self.state_second_deriv = self.coef * np.array([[0, 0, 1, 0],
                                                        [0, 0, 0, 1],
                                                        [1, 0, 0, 0],
                                                        [0, 1, 0, 0]])

    def state_first_deriv(self, x):
        return np.array([
            [1, self.time_step, 0, 0],
            [self.coef * (self.dyn_system.k0 + x[2]), 1 + self.coef * (self.dyn_system.b0 + x[3]), self.coef * x[0], self.coef * x[1]],
            [0, 0, 1, 0],
            [0, 0, 0, 1]
        ])

    def predict(self, cur_time):
        pred_state = self.dyn_system.f_func(self.all_states[-1], cur_time, self.time_step)
        pred_cov = self.state_first_deriv(self.all_states[-1]).dot(self.all_cov[-1]).dot(self.state_first_deriv(self.all_states[-1]).T) \
            + np.diag([0, self.time_step * (self.dyn_system.Q  / self.dyn_system.m) ** 2, 0, 0])
        return pred_state, pred_cov

    def correct(self, cur_obs, cur_time):
        pred_Y = self.dyn_system.h_func(self.all_states[-1], cur_time)
        pred_R = self.dyn_system.k0 ** 2 * self.all_cov[-1][0, 0] + self.dyn_system.R ** 2
        temp_matr = self.dyn_system.k0 * self.all_cov[-1][0]
        corr_state = self.all_states[-1] + temp_matr / pred_R * (cur_obs - pred_Y)
        corr_cov = self.all_cov[-1] - temp_matr[:, np.newaxis] * temp_matr[np.newaxis, :] / pred_R
        return corr_state, corr_cov

    def step(self, cur_time, ind):
        cur_state, cur_cov = self.predict(cur_time - self.time_step)
        self.all_states.append(cur_state)
        self.all_cov.append(cur_cov)
        if not ind % self.dyn_system.pred_per_obs:
            corr_state, corr_cov = self.correct(self.all_obs[ind // self.dyn_system.pred_per_obs - 1], cur_time)
            self.all_states[-1] = corr_state
            self.all_cov[-1] = corr_cov

    def reset(self):
        init_state, init_cov = self.all_states[0], self.all_cov[0]
        self.all_states.clear()
        self.all_cov.clear()
        self.all_states.append(init_state)
        self.all_cov.append(init_cov)

    def train(self):
        for ind, cur_time in enumerate(np.linspace(self.time_step, self.dyn_system.T, self.dyn_system.num_obs * self.dyn_system.pred_per_obs)):
            self.step(cur_time, ind + 1)
        self.all_states = np.array(self.all_states)
        self.all_cov = np.array(self.all_cov)

In [None]:
init_state = np.array([0, 5, 500, 15])
init_mean = np.array([0, 4, 450, 10])
init_cov = np.diag([0.001, 1, 50, 5]) ** 2
init_st_for_filt = stats.multivariate_normal(mean=init_mean, cov=init_cov).rvs(size=1, random_state=2)

In [None]:
%time
ckf = CubatureKalmanFilter(model, init_st_for_filt, init_cov=init_cov)
ckf.train()

CPU times: user 3 µs, sys: 1 µs, total: 4 µs
Wall time: 6.2 µs


## PF (boot)

In [None]:
class ParticleFilter_boot():
    def __init__(self, dyn_system, \
                 init_mean = np.array([0, 4, 450, 10]), init_cov = np.diag([0.001, 1, 50, 5]) ** 2, n_particles = 1000):
        self.system = dyn_system
        self.time_step = self.system.pred_step
        self.state_noise_matr = np.diag([self.system.Q / self.system.m * self.time_step ** 0.5, init_cov[2, 2] ** 0.5 / 100, init_cov[3, 3] ** 0.5 / 100])
        self.all_obs = self.system.observ
        self.n_particles = n_particles
        self.weights = np.repeat(1 / n_particles, n_particles)
        self.particles = stats.multivariate_normal(mean=init_mean, cov=init_cov).rvs(size=self.n_particles)
        self.all_states = [np.sum(self.weights[:, np.newaxis] * self.particles, axis=0)]

    def generate_particles(self, cur_time):
        particles_mean = np.apply_along_axis(self.system.f_func, axis=1, arr=self.particles, t=cur_time, time_step=self.time_step)
        particles_noises = np.hstack((np.zeros(shape=(self.n_particles, 1)), stats.multivariate_normal(mean=np.zeros(shape=(self.state_noise_matr.shape[0], )), cov=self.state_noise_matr ** 2).rvs(size=self.n_particles)))
        self.particles = particles_mean + particles_noises

    def update_weights(self, cur_obs, cur_time):
        obs_func_res = np.apply_along_axis(self.system.h_func, axis=1, arr=self.particles, t=cur_time)
        self.weights = stats.norm(loc=cur_obs, scale=self.system.R).pdf(obs_func_res) * self.weights
        self.weights /= np.sum(self.weights)

    def check_eff(self):
        return 1 / np.sum(self.weights ** 2) > self.n_particles / 10

    def particles_step(self, cur_time):
        self.particles = np.apply_along_axis(self.system.f_func, axis=1, arr=self.particles, t=cur_time, time_step=self.time_step)
        self.all_states.append(np.sum(self.weights[:, np.newaxis] * self.particles, axis=0))

    def correct(self, cur_obs, cur_time):
        self.generate_particles(cur_time - self.time_step)
        self.update_weights(cur_obs, cur_time)
        self.all_states.append(np.sum(self.weights[:, np.newaxis] * self.particles, axis=0))
        if not self.check_eff():
            self.particles = self.particles[np.random.choice(a=self.n_particles, size=self.n_particles, p=self.weights)]
            self.weights = np.repeat(1 / self.n_particles, self.n_particles)

    def step(self, cur_time, ind):
        if ind % self.system.pred_per_obs:
            self.particles_step(cur_time - self.time_step)
        else:
            self.correct(self.all_obs[ind // self.system.pred_per_obs - 1], cur_time)

    def train(self):
        for ind, cur_time in enumerate(np.linspace(self.time_step, self.system.T, self.system.num_obs * self.system.pred_per_obs)):
            self.step(cur_time, ind + 1)
        self.all_states = np.array(self.all_states)

In [None]:
%time
pf = ParticleFilter_boot(model)
pf.train()

CPU times: user 3 µs, sys: 0 ns, total: 3 µs
Wall time: 6.91 µs
