Skip to content

Commit

Permalink
Added simple python kalman filter
Browse files Browse the repository at this point in the history
  • Loading branch information
asterixds authored and asterixds committed Jun 13, 2017
1 parent 2db7788 commit 177a792
Show file tree
Hide file tree
Showing 12 changed files with 13,611 additions and 0 deletions.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Binary file added python/KalmanCycle.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added python/SensorFusion.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
12,102 changes: 12,102 additions & 0 deletions python/SensorFusion_Kalman_Extended_Filter.html

Large diffs are not rendered by default.

400 changes: 400 additions & 0 deletions python/SensorFusion_Kalman_Extended_Filter.ipynb

Large diffs are not rendered by default.

Binary file added python/Sensorfusion.zip
Binary file not shown.
Binary file added python/__pycache__/matrix.cpython-35.pyc
Binary file not shown.
Binary file added python/input2.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added python/kalman.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
80 changes: 80 additions & 0 deletions python/kalman.py
@@ -0,0 +1,80 @@
%matplotlib inline
from matrix import *
import numpy as np

"""prediction step"""
def predict(x, P):
x = (F * x) + u
P = F * P * F.transpose()
return x, P

"""measurement update step"""
def update(x, P,z):
# measurement update
Z = matrix([z])
y = Z.transpose() - (H * x)
S = H * P * H.transpose() + R
K = P * H.transpose() * S.inverse()
x = x + (K * y)
P = (I - (K * H)) * P
return x, P

from filterpy.stats import plot_covariance_ellipse
from numpy.random import randn
import math

def plot_position_variance(x,P,edgecolor='r'):
x1= np.array([x.value[0][0],x.value[1][0]])
P1= np.array([[P.value[0][0],P.value[0][1]],[P.value[1][0],P.value[1][1]]])
plot_covariance_ellipse(x1, P1, edgecolor=edgecolor)


def generate_experiment_data(z_var, process_var, count=1, dt=1.):
"returns track, measurements 2D arrays"
x, y, vel = 1.,1., 1.
z_std = math.sqrt(z_var)
p_std = math.sqrt(process_var)
xs, zs = [], []
for _ in range(count):
v = vel + (randn() * p_std)
x += v*dt
y += v*dt
xs.append([x,y])
zs.append([x + randn() * z_std,x + randn() * z_std])
return xs, zs

"""simulate a series of measurements in (x,y) pairs"""
measurements = [[2., 10.], [4., 8.], [6., 6.], [8., 5.], [10., 4.], [12., 2.]] #set of x,y measurements

initial_xy = [2., 9.] #initial belief about position
dt = .1 #time interval between predictions
#truth, measurements = generate_experiment_data( 0., 0.1,count=50,dt=dt)
#print(measurements[49])


"""create the initial state vector"""
x = matrix([[initial_xy[0]], [initial_xy[1]], [0.], [0.]]) # initial state
u = matrix([[0.], [0.], [0.], [0.]]) # no external motion

"""model assumptions"""
# initial uncertainty: 10 for positions x and y, 1000 for the two velocities
P = matrix([[10.,0.,0.,0.],[0.,10.,0.,0.],[0.,0.,1000.,0.],[0.,0.,0.,1000.]])
# state transition function
F = matrix([[1., 0.,dt,0.], [0., 1.,0.,dt],[0.,0.,1.,0.],[0.,0.,0.,1.]])
# measurement function which converts predicted state to measurement space
H = matrix([[1., 0.,0.,0.],[0.,1.,0.,0.]])
R = matrix([[.1,0.],[0.,0.1]]) # measurement noise
I = matrix([[1., 0.,0.,0.], [0., 1.,0.,0.],[0.,0.,1.,0.],[0.,0.,0.,1.]])

"""iterate through the measurements with a series of prediction and update steps"""
plot_position_variance(x,P,edgecolor='r') #plot initial position and covariance in red
for z in measurements:
x,P = predict(x, P)
x,P = update(x, P,z)
plot_position_variance(x,P,edgecolor='b') #plot updates in blue
print(x)
print(P)


"""Final state vector: [[11.965167916928769], [2.0760203171503386], [19.84771525945677], [-15.01535052717562]]
Final state covariance: [[0.05206973927575543, 0.0, 0.14139384664494653, 0.0], [0.0, 0.05206973927575543, 0.0, 0.14139384664494653], [0.14139384664494653, 0.0, 0.5642609683506797, 0.0], [0.0, 0.14139384664494653, 0.0, 0.5642609683506797]]"""
136 changes: 136 additions & 0 deletions python/matrix.py
@@ -0,0 +1,136 @@
"""matrix class from Udacity robotics course"""

from math import *

class matrix:

# implements basic operations of a matrix class

def __init__(self, value):
self.value = value
self.dimx = len(value)
self.dimy = len(value[0])
if value == [[]]:
self.dimx = 0

def zero(self, dimx, dimy):
# check if valid dimensions
if dimx < 1 or dimy < 1:
raise ValueError("Invalid size of matrix")
else:
self.dimx = dimx
self.dimy = dimy
self.value = [[0 for row in range(dimy)] for col in range(dimx)]

def identity(self, dim):
# check if valid dimension
if dim < 1:
raise ValueError("Invalid size of matrix")
else:
self.dimx = dim
self.dimy = dim
self.value = [[0 for row in range(dim)] for col in range(dim)]
for i in range(dim):
self.value[i][i] = 1

def show(self):
for i in range(self.dimx):
print(self.value[i])
print(' ')


def __add__(self, other):
# check if correct dimensions
if self.dimx != other.dimx or self.dimy != other.dimy:
raise ValueError("Matrices must be of equal dimensions to add")
else:
# add if correct dimensions
res = matrix([[]])
res.zero(self.dimx, self.dimy)
for i in range(self.dimx):
for j in range(self.dimy):
res.value[i][j] = self.value[i][j] + other.value[i][j]
return res

def __sub__(self, other):
# check if correct dimensions
if self.dimx != other.dimx or self.dimy != other.dimy:
raise ValueError("Matrices must be of equal dimensions to subtract")
else:
# subtract if correct dimensions
res = matrix([[]])
res.zero(self.dimx, self.dimy)
for i in range(self.dimx):
for j in range(self.dimy):
res.value[i][j] = self.value[i][j] - other.value[i][j]
return res

def __mul__(self, other):
# check if correct dimensions
if self.dimy != other.dimx:
raise ValueError("Matrices must be m*n and n*p to multiply")
else:
# subtract if correct dimensions
res = matrix([[]])
res.zero(self.dimx, other.dimy)
for i in range(self.dimx):
for j in range(other.dimy):
for k in range(self.dimy):
res.value[i][j] += self.value[i][k] * other.value[k][j]
return res

def transpose(self):
# compute transpose
res = matrix([[]])
res.zero(self.dimy, self.dimx)
for i in range(self.dimx):
for j in range(self.dimy):
res.value[j][i] = self.value[i][j]
return res

# Thanks to Ernesto P. Adorio for use of Cholesky and CholeskyInverse functions

def Cholesky(self, ztol=1.0e-5):
# Computes the upper triangular Cholesky factorization of
# a positive definite matrix.
res = matrix([[]])
res.zero(self.dimx, self.dimx)

for i in range(self.dimx):
S = sum([(res.value[k][i])**2 for k in range(i)])
d = self.value[i][i] - S
if abs(d) < ztol:
res.value[i][i] = 0.0
else:
if d < 0.0:
raise ValueError("Matrix not positive-definite")
res.value[i][i] = sqrt(d)
for j in range(i+1, self.dimx):
S = sum([res.value[k][i] * res.value[k][j] for k in range(self.dimx)])
if abs(S) < ztol:
S = 0.0
res.value[i][j] = (self.value[i][j] - S)/res.value[i][i]
return res

def CholeskyInverse(self):
# Computes inverse of matrix given its Cholesky upper Triangular
# decomposition of matrix.
res = matrix([[]])
res.zero(self.dimx, self.dimx)

# Backward step for inverse.
for j in reversed(range(self.dimx)):
tjj = self.value[j][j]
S = sum([self.value[j][k]*res.value[j][k] for k in range(j+1, self.dimx)])
res.value[j][j] = 1.0/tjj**2 - S/tjj
for i in reversed(range(j)):
res.value[j][i] = res.value[i][j] = -sum([self.value[i][k]*res.value[k][j] for k in range(i+1, self.dimx)])/self.value[i][i]
return res

def inverse(self):
aux = self.Cholesky()
res = aux.CholeskyInverse()
return res

def __repr__(self):
return repr(self.value)

0 comments on commit 177a792

Please sign in to comment.