In [1]:
import numpy as np


class KalmanFilter(object):
    def __init__(self, initial_state, initial_covariance, motion_model, motion_error_model,
                 measurement_model, measurement_error_model):
        self._previous_covariance = initial_covariance
        self._previous_state = initial_state
        self._motion_model = motion_model
        self._motion_error_model = motion_error_model
        self._measurement_model = measurement_model
        self._measurement_error_model = measurement_error_model
        self._prediction_state = initial_state
        self._prediction_covariance = initial_covariance

    def filter(self, measurement):
        # Prediction
        state = self._motion_model.dot(self._previous_state)
        covariance = self._motion_model.dot(self._previous_covariance).dot(
            self._motion_model.T) + self._motion_error_model
        self._prediction_covariance = covariance
        self._prediction_state = state

        # Filtration
        error_covariance = self._measurement_model.dot(covariance).dot(
            self._measurement_model.T) + self._measurement_error_model
        kalman_gain = covariance.dot(self._measurement_model.T.dot(np.linalg.inv(error_covariance)))
        state = state + kalman_gain.dot(measurement - self._measurement_model.dot(state))
        covariance = (np.eye(covariance.shape[0]) - kalman_gain.dot(self._measurement_model)).dot(covariance)
        self._previous_state = state
        self._previous_covariance = covariance
        return state

    def log_likelihood(self, measurement):
        if len(np.array(measurement).shape) == 0:
            measurement = np.array([measurement])
        innovation_value = measurement - self._measurement_model.dot(self._prediction_state)
        innovation_covariance = self._measurement_model.dot(self._prediction_covariance).dot(
            self._measurement_model.T) + self._measurement_error_model
        result = 0.5 * np.log(np.linalg.det(innovation_covariance)) + 0.5 * innovation_value.T.dot(
            np.linalg.inv(innovation_covariance)).dot(innovation_value)
        return result

    @property
    def previous_covariance(self):
        return self._previous_covariance

    @property
    def previous_state(self):
        return self._previous_state
    

class RandomAccelerationKalmanFilter(KalmanFilter):
    def __init__(self, motion_error, measurement_error, initial_covariance_value=1e4,
                 initial_state=None, time_delta=None):
        self._time_delta = time_delta
        self._previous_time = None
        self._motion_error = motion_error
        measurement_model = np.array([[1, 0]])
        measurement_error_model = np.array([[measurement_error ** 2]])
        initial_covariance = np.eye(2) * initial_covariance_value
        motion_model, motion_error_model = self.calculate_motion_model(time_delta)
        super(RandomAccelerationKalmanFilter, self).__init__(initial_state, initial_covariance, motion_model,
                                                             motion_error_model, measurement_model,
                                                             measurement_error_model)

    def calculate_motion_model(self, time_delta=None):
        if time_delta is None:
            time_delta = 1.
        state_motion_error_model = np.array([[time_delta ** 2 / 2], [time_delta]])
        motion_model = np.array([[1, time_delta], [0, 1]])
        motion_error_model = state_motion_error_model.dot(state_motion_error_model.T) * self._motion_error ** 2
        return motion_model, motion_error_model

    def filter(self, measurement, timestamp=None):
        if self._time_delta is None and timestamp is None:
            raise ValueError("[ERROR][RandomAccelerationKalmanFilter] - for filter either time_delta or "
                             "timestamp need to be defined")
        elif self._time_delta is None:
            if self._previous_time is None:
                self._previous_time = timestamp
            time_delta = timestamp - self._previous_time
            self._previous_time = timestamp
            self._motion_model, self._motion_error_model = self.calculate_motion_model(time_delta)
        if self._previous_state is None:
            self._previous_state = np.array([measurement, 0])
        return super(RandomAccelerationKalmanFilter, self).filter(measurement)


In [2]:
filter_object = RandomAccelerationKalmanFilter(1, 1, time_delta=0.1)

In [3]:
print(filter_object._measurement_model)
print(filter_object._measurement_error_model)
print(filter_object._motion_model)
print(filter_object._motion_error_model)

[[1 0]]
[[1]]
[[1.  0.1]
 [0.  1. ]]
[[2.5e-05 5.0e-04]
 [5.0e-04 1.0e-02]]
