# Kalman filters

### Measurement
* Bayes' rule
* Belief updated with a product
* Gaussian

### Prediction (as before with movement)
* Total probability
* Updated with a convolution (where the probability went when the robot moved)
* Gaussian

In [14]:
import numpy as np
from math import *

In [15]:
# Gaussian probability
def g(mean, var, x):
    return (
        np.exp(-(1/2) * (x - mean)**2 / var) * (1 / np.sqrt(2 * np.pi * var))
        )

In [16]:
g(10, 4, 8)

0.12098536225957168

## Measurement update step

In [17]:
# Mean and variance for the product of two Gaussians
def update(mean1, var1, mean2, var2):
    new_mean = float(var2 * mean1 + var1 * mean2) / (var1 + var2)
    new_var = 1./(1./var1 + 1./var2)
    return [new_mean, new_var]

In [18]:
update(10, 8, 13, 2)

[12.4, 1.6]

## Motion prediction step
Add the means to get the resulting mean.  
Add the variances to get the resulting variance.

In [19]:
def predict(mean1, var1, mean2, var2):
    new_mean = mean1 + mean2
    new_var = var1 + var2
    return [new_mean, new_var]

In [20]:
predict(10, 4, 13, 2)

[23, 6]

## Kalman filter (1D)

In [21]:
# test data
measurements = [5., 6., 7., 9., 10.]
motion = [1., 1., 2., 1., 1.]
measurement_sig = 4.
motion_sig = 2.
mu = 0.
sig = 10000.

In [22]:
def kalman(mu, sig, measurements, measurement_sig, motion, motion_sig):
    """
    Kalman filter the initial probability distribution, 
    as defined by mean `mu` and variance `sig`, by updating
    over measurements and motion prediction steps.
    
    Args:
        mu (float or int): mean of the probability distribution
        sig (float or int): variance of the probability distribution
        measurements (array of float or int): measurement means
        measurement_sig (float or int): measurement variance
        motion (array of float or int): motion means
        motion_sig (float or int): motion variance
    
    Returns:
        list: mean and variance of resulting Gaussian
    """
    for ind in range(max(len(measurements), len(motion))):
        # update if there is a measurement
        try:
            [mu, sig] = update(mu, sig, measurements[ind], measurement_sig)
        except IndexError:
            pass
        
        # predict if there is a motion
        try:
            [mu, sig] = predict(mu, sig, motion[ind], motion_sig)
        except IndexError:
            pass
    return [mu, sig]

In [23]:
[mu, sig] = kalman(mu, sig, measurements, measurement_sig, motion, motion_sig)
print([mu, sig])

[10.999906177177365, 4.005861580844194]


## Multidimensional Kalman filter
Making use of matrix class from the course.

In [51]:
class matrix:
    """
    Matrix class provided as course materials.
    Implements basic matrix operations.
    """
    
    def __init__(self, value):
        self.value = value
        self.dimx = len(value)
        self.dimy = len(value[0])
        if value == [[]]:
            self.dimx = 0
    
    def zero(self, dimx, dimy):
        # check if valid dimensions
        if dimx < 1 or dimy < 1:
            raise ValueError("Invalid size of matrix")
        else:
            self.dimx = dimx
            self.dimy = dimy
            self.value = [[0 for row in range(dimy)] for col in range(dimx)]
    
    def identity(self, dim):
        # check if valid dimension
        if dim < 1:
            raise ValueError("Invalid size of matrix")
        else:
            self.dimx = dim
            self.dimy = dim
            self.value = [[0 for row in range(dim)] for col in range(dim)]
            for i in range(dim):
                self.value[i][i] = 1
    
    def show(self):
        for i in range(self.dimx):
            print(self.value[i])
        print(' ')
    
    def __add__(self, other):
        # check if correct dimensions
        if self.dimx != other.dimx or self.dimy != other.dimy:
            raise ValueError("Matrices must be of equal dimensions to add")
        else:
            # add if correct dimensions
            res = matrix([[]])
            res.zero(self.dimx, self.dimy)
            for i in range(self.dimx):
                for j in range(self.dimy):
                    res.value[i][j] = self.value[i][j] + other.value[i][j]
            return res
    
    def __sub__(self, other):
        # check if correct dimensions
        if self.dimx != other.dimx or self.dimy != other.dimy:
            raise ValueError("Matrices must be of equal dimensions to subtract")
        else:
            # subtract if correct dimensions
            res = matrix([[]])
            res.zero(self.dimx, self.dimy)
            for i in range(self.dimx):
                for j in range(self.dimy):
                    res.value[i][j] = self.value[i][j] - other.value[i][j]
            return res
    
    def __mul__(self, other):
        # check if correct dimensions
        if self.dimy != other.dimx:
            raise ValueError("Matrices must be m*n and n*p to multiply")
        else:
            # multiply if correct dimensions
            res = matrix([[]])
            res.zero(self.dimx, other.dimy)
            for i in range(self.dimx):
                for j in range(other.dimy):
                    for k in range(self.dimy):
                        res.value[i][j] += self.value[i][k] * other.value[k][j]
            return res
    
    def transpose(self):
        # compute transpose
        res = matrix([[]])
        res.zero(self.dimy, self.dimx)
        for i in range(self.dimx):
            for j in range(self.dimy):
                res.value[j][i] = self.value[i][j]
        return res
    
    # Thanks to Ernesto P. Adorio for use of Cholesky and CholeskyInverse functions
    
    def Cholesky(self, ztol=1.0e-5):
        # Computes the upper triangular Cholesky factorization of
        # a positive definite matrix.
        res = matrix([[]])
        res.zero(self.dimx, self.dimx)
        
        for i in range(self.dimx):
            S = sum([(res.value[k][i])**2 for k in range(i)])
            d = self.value[i][i] - S
            if abs(d) < ztol:
                res.value[i][i] = 0.0
            else:
                if d < 0.0:
                    raise ValueError("Matrix not positive-definite")
                res.value[i][i] = sqrt(d)
            for j in range(i+1, self.dimx):
                S = sum([res.value[k][i] * res.value[k][j] for k in range(self.dimx)])
                if abs(S) < ztol:
                    S = 0.0
                try:
                   res.value[i][j] = (self.value[i][j] - S)/res.value[i][i]
                except:
                   raise ValueError("Zero diagonal")
        return res
    
    def CholeskyInverse(self):
        # Computes inverse of matrix given its Cholesky upper Triangular
        # decomposition of matrix.
        res = matrix([[]])
        res.zero(self.dimx, self.dimx)
        
        # Backward step for inverse.
        for j in reversed(range(self.dimx)):
            tjj = self.value[j][j]
            S = sum([self.value[j][k]*res.value[j][k] for k in range(j+1, self.dimx)])
            res.value[j][j] = 1.0/tjj**2 - S/tjj
            for i in reversed(range(j)):
                res.value[j][i] = res.value[i][j] = -sum([
                    self.value[i][k]*res.value[k][j] for k in range(i+1, self.dimx)
                    ])/self.value[i][i]
        return res
    
    def inverse(self):
        aux = self.Cholesky()
        res = aux.CholeskyInverse()
        return res
    
    def __repr__(self):
        return repr(self.value)

In [165]:
# test data
measurements = [1, 2, 3]

x = matrix([[0.], [0.]]) # initial state (location and velocity)
P = matrix([[1000., 0.], [0., 1000.]]) # initial uncertainty
u = matrix([[0.], [0.]]) # external motion
F = matrix([[1., 1.], [0, 1.]]) # next state function
H = matrix([[1., 0.]]) # measurement function
R = matrix([[1.]]) # measurement uncertainty
I = matrix([[1., 0.], [0., 1.]]) # identity matrix

In [166]:
def kalman_filter(x, P, H, R, F, u, I, measurements):

    for measurement in measurements:
        Z = matrix([[measurement]])

        # update the estimate after an observation
        y = Z - (H * x)
        S = H * P * H.transpose() + R
        K = P * H.transpose() * S.inverse()
        x = x + (K * y)
        P = (I - (K * H)) * P

        # forward projection of the estimate to next iteration
        x = (F * x) + u
        P = F * P * F.transpose()

    return x, P

In [167]:
kalman_filter(x, P, H, R, F, u, I, measurements)

([[3.9996664447958645], [0.9999998335552873]],
 [[2.3318904241194827, 0.9991676099921091], [0.9991676099921067, 0.49950058263974184]])