In [1]:
import logging
logging.basicConfig (
    format = '%(asctime)s [%(levelname)s] %(message)s',
    level = logging.DEBUG
)

from init import *

In [2]:
import numpy as np

def CreateMatrix (rows, cols, value=0):
    return np.matrix([[value]*cols]*rows, dtype=np.float64)

def PrintThis (vec_matrix, prefix=''):
    delimiter = '------------------'
    if prefix:
        print(prefix)
    print(delimiter)
    for p in vec_matrix:
        print(p)
        print(delimiter)

class UnscentedKalmanFilter:
    
    def __init__ (self):
        logging.debug('UnscentedKalmanFilter.__init__')
        self.L = 0
        self.M = 0
        self.alpha = 0.5
        self.beta  = 2
    
    def Init (self, L, M):
        logging.debug('UnscentedKalmanFilter.Init')
        assert L>0 and M>0
        assert self.L==0 and self.M==0
        self.L = L
        self.M = M
        self.kappa = 3-L
        self._lambda = self.alpha**2 * (self.L+self.kappa) - self.L;
        w = 1.0/(2*(self.L+self._lambda))

        self.Wm = [w] * (2*self.L+1)
        self.Wc = [w] * (2*self.L+1)
        
        self.Wm[0] = self._lambda / (self.L+self._lambda)
        self.Wc[0] = self.Wm[0] + (1 - self.alpha**2 + self.beta)
        
    def RunAllIterations (self, iteration_start:int, iteration_last:int):
        iteration = iteration_start
        while iteration<=iteration_last:
            self.RunSingleIteration (iteration)
            iteration += 1
        
    def RunSingleIteration (self, iteration):
        debug = True
#         logging.debug('UnscentedKalmanFilter.RunSingleIteration')
#         logging.debug(f'{self.x.shape} {self.P.shape}')
        assert self.x.shape == (self.L,1)
        assert self.P.shape == (self.L,self.L)
        
        sigma_points_generated = self.GenerateSigmaPoints()
        
        sigma_points_predicted = self.PropogateSigmaPoints(sigma_points_generated, iteration)
        
        PrintThis(sigma_points_predicted,'predicted sigma points:')
        
        # Eq (12)
        predicted_state_estimate = CreateMatrix(self.L,1)
        for i in range(self.L*2+1):
            predicted_state_estimate += self.Wm[i] * sigma_points_predicted[i]

        if debug:
            PrintThis([predicted_state_estimate],'predicted_state_estimate:')
        
        # Eq (13)
        diff_predicted_sigma_points = []
        error_cov = CreateMatrix(self.L,self.L)
        for i in range(self.L*2+1):
            diff = sigma_points_predicted[i] - predicted_state_estimate
            diff_predicted_sigma_points.append(diff)
            error_cov += self.Wc[i] * diff * diff.getT()
        
        if debug:
            PrintThis([error_cov],'error_cov:')
        
        # Eq (14)
        predicted_measurement  = CreateMatrix(self.M,1)
        predicted_measurements_for_sigma_points = []
        for i in range(self.L*2+1):
            measurement = self.GetPredictedMeasurement (sigma_points_predicted[i], iteration)
            predicted_measurements_for_sigma_points.append(measurement)
            predicted_measurement += self.Wm[i] * measurement
        
        if debug:
            PrintThis([predicted_measurement],'predicted_measurement:')
        
        # Eq (15), Eq (16)
        
        measurement_tuple = self.GetMeasurement (iteration)
        if debug:
            PrintThis([measurement_tuple],'actual measurement:')
        measurement_value = measurement_tuple[0]
        measurement_error = measurement_tuple[1]
        
        predicted_covariance = CreateMatrix(self.M, self.M)
        predicted_covariance += measurement_error
        cross_predicted_covariance = CreateMatrix (self.L, self.M)
        
        diff_measurements = []
        for i in range(self.L*2+1):
            diff = predicted_measurements_for_sigma_points[i] - predicted_measurement
            diff_measurements.append(diff)
            predicted_covariance += self.Wc[i] * diff * diff.getT()
#             PrintThis([
#                 diff_predicted_sigma_points[i],
#                 diff.transpose(),
#                 cross_predicted_covariance
#             ],'xxxxxxxxxxxxx')
            cross_predicted_covariance += self.Wc[i] * diff_predicted_sigma_points[i] * diff.transpose()
        
        if debug:
            PrintThis([cross_predicted_covariance],'[1] cross_predicted_covariance')
            PrintThis([predicted_covariance],'[1] predicted_covariance')
        
        # Eq (17)
        residual = predicted_measurement - measurement_value
        if debug:
            PrintThis ([residual],'residual')

        # Eq (18)
        kalman_gain = cross_predicted_covariance * predicted_covariance.getI()
        if debug:
            PrintThis ([kalman_gain],f'iteration {iteration} kalman_gain')
        
        updated_state_estimate = predicted_state_estimate - kalman_gain * residual
        
        updated_cov_matrix = error_cov - kalman_gain * predicted_covariance * kalman_gain.getT()

        if debug:
            PrintThis([error_cov],f'iteration {iteration} old error_cov')
            PrintThis([kalman_gain * predicted_covariance * kalman_gain.getT()],f'iteration {iteration} K*Innovations*Ktransposed')

        self.x = updated_state_estimate
        self.P = updated_cov_matrix
        
        if debug:
            PrintThis([updated_state_estimate,updated_cov_matrix],f'iteration {iteration} new state and its error matrix')
        
    def GenerateSigmaPoints (self):
        '''This is a const method'''
#         logging.debug('UnscentedKalmanFilter.GenerateSigmaPoints')
        
        try:

            sigma_points = [self.x]

            Psqrt = np.linalg.cholesky(self.P) * np.sqrt(self.L+self._lambda)

            for i in range(self.L):
                sigma_points.append(self.x+Psqrt[:,i])
            for i in range(self.L):
                sigma_points.append(self.x-Psqrt[:,i])

            return sigma_points
        except:
            PrintThis([self.P],'error matrix P:')
            raise
        
    def PropogateSigmaPoints (self, sigma_points, iteration, debug=0):
        '''Propogate sigma points (from one iteration to another)
        
        We don't modify sigma points anything by default.
        
        This is a const method
        '''
        return sigma_points
        
    def GetPredictedMeasurement (self, state, iteration):
        raise NotImplementedError('This method must be implemented in a derived class.')
        
    def GetMeasurement (self, iteration):
        raise NotImplementedError('This method must be implemented in a derived class.')
    



In [3]:
class Example1UKF (UnscentedKalmanFilter):
    
    def __init__ (self):
        super().__init__()

        # the true state (used in this test)
        self.__true_state = np.matrix([
            [11],
            [22]
        ], dtype=np.float64)
        
        # initial state estimate
        self.x = np.matrix([
            [-100],
            [-100]
        ], dtype=np.float64)

        # initial state error matrix
        big = 10
        self.P = np.matrix([
            [big, 0 ],
            [ 0, big]
        ], dtype=np.float64)
        
        super().Init(2,1)

    def GetIterationTime (self,iteration):
        return float(iteration)
        
    def GetPredictedMeasurement (self, state, iteration):
        '''override'''
        assert state.shape == (2,1)
        value = state[0,0] + state[1,0]*self.GetIterationTime(iteration)
        return np.matrix([[value]], dtype=np.float64)
        
    def GetMeasurement (self, iteration):
        '''override'''
        scaling = 1
        measurement_error = scaling * np.random.default_rng().normal(0,1)
        error_matrix = np.matrix([[scaling]], dtype=np.float64)
        measurement_value = self.GetPredictedMeasurement(self.__true_state, iteration) + measurement_error
        return (measurement_value, error_matrix)

In [4]:
e = Example1UKF()
e.RunAllIterations(1,10)

2023-01-01 00:54:11,659 [DEBUG] UnscentedKalmanFilter.__init__
2023-01-01 00:54:11,661 [DEBUG] UnscentedKalmanFilter.Init


predicted sigma points:
------------------
[[-100.]
 [-100.]]
------------------
[[ -97.26138721]
 [-100.        ]]
------------------
[[-100.        ]
 [ -97.26138721]]
------------------
[[-102.73861279]
 [-100.        ]]
------------------
[[-100.        ]
 [-102.73861279]]
------------------
predicted_state_estimate:
------------------
[[-100.]
 [-100.]]
------------------
error_cov:
------------------
[[1.00000000e+01 6.81575822e-27]
 [6.81575822e-27 1.00000000e+01]]
------------------
predicted_measurement:
------------------
[[-200.]]
------------------
actual measurement:
------------------
(matrix([[32.41282822]]), matrix([[1.]]))
------------------
[1] cross_predicted_covariance
------------------
[[10.]
 [10.]]
------------------
[1] predicted_covariance
------------------
[[21.]]
------------------
residual
------------------
[[-232.41282822]]
------------------
iteration 1 kalman_gain
------------------
[[0.47619048]
 [0.47619048]]
------------------
iteration 1 old error_