In [4]:
import numpy as np
import matplotlib.pyplot as plt
import scipy.integrate as integrate

In [23]:
#Define Gaussian Distribution Struct
class GaussianDistribution:
    def __init__(self, mean, covariance):
        self.mean = mean
        self.covariance = covariance

    #return sampled distribution
    def sample(self, x):
        #compute scaling factor
        a = 2 * np.pi * np.sqrt(np.linalg.det(self.covariance))

        return (1/a) * np.exp((x -  self.mean).T * np.linalg.inv(self.covariance) * (x - self.mean))
    
    #get ellipse parameters for 1 std level set
    #assumes: x/a + y/b = 1 at angle theta in rads
    #returns: height, width, theta (radians)
    def levelSet(self, confidence=0.95):
        #only for 2D gaussians
        assert self.mean.shape[1] == 2, "Ellipse function only works for d=2"

        #chi squared table [confidence - param]
        confidences = np.array([
            [0.995, 0.99, 0.975, 0.95, 0.90, 0.10, 0.05, 0.025, 0.01, 0.005],
            [0.010, 0.020, 0.051, 0.103, 0.211, 4.605, 5.991, 7.378, 9.210, 10.597]
        ]).T

        #check if confidence is within LUT and if not interpolate
        diff = np.array([np.abs(confidence - confidences[idx,0]) for idx in np.ndindex(confidences.shape[0])])
        idx = np.argmin(diff)
        chi = confidences[idx,1]
        if not diff[idx] <= 0.01:
            #interpolate chi squared value
            confidences[idx,1]
            p0 = np.array([0.0,0.0])
            p1 = np.array([0.0,0.0])
            if confidence - confidences[idx,1] <= 0:
                p0 = confidences[idx]
                if not idx == confidences.shape[0]:
                    p1 = confidences[idx+1]
            else:
                p1 = confidences[idx]
                if not idx == confidences.shape[0]:
                    p0 = confidences[idx-1]

            chi = p1[1] + (confidence - p1[0]) * ((p1[1] - p0[1]) / (p1[0] - p0[0]))

        #find eigenvalues and get rotation angle
        vals, vecs = np.linalg.eigs(self.covariance)
        idx = np.argmin(vals)

        #compute angle and convert from: [-pi, pi] -> [0, pi]
        theta = np.arctan(vecs[idx,1], vecs[idx,0])
        if theta  < 0:
            theta += 2 * np.pi

        #compute height and width
        height = chi * np.sqrt(vals[1])
        width = chi * np.sqrt(vals[0])

        return height, width, theta

In [25]:
#define template dynamics class
class Dynamics_Model:
    def __init__(self, linear, noise, dt=None):
        self.linear = linear
        self.R = noise

    #return linearity
    def isLinear(self):
        return self.linear

    #return noise
    def noise(self):
        return self.R

    #linear state space model
    def ss(self, state, control):
        pass

    #nonlinear model
    def f(self, state, control):
        pass

    #For simulating dynamics from nonlinear system
    def step(self, state, control):
        pass

#Define random walk model
class WeinerDynamics(Dynamics_Model):
    def __init__(self, dt, noise):
        #init params
        super.__init__(self, True, noise)
        self.linear = True

        #define state matrix
        self.A = np.array([
            [1, dt, 0, 0, 0, 0],
            [0, 1, dt, 0, 0, 0],
            [0, 0, 1, dt, 0, 0],
            [0, 0, 0, 1, dt, 0],
            [0, 0, 0, 0, 1, dt],
            [0, 0, 0, 0, 0, 1]
        ])

    #return state space
    def ss(self, state=None, control=None):
        return self.A, 0.0, 0.0
    
#Define 2D drone dynamics
class Drone2D(Dynamics_Model):
    def __init__(self, linear, m, l, noise, dt=None):
        #define model params
        super.__init__(self, linear, noise, dt)
        self.m - m
        self.l = l

    #Return jacobians of A and B
    #   -For kalman filtering input state
    #   -For linearized simulation input [dx_k, du_k]:
    #        dx_{k+1} = (x_k - x_{k-1}) and du_{k+1} = (u_k - u_{k-1})
    #   -Note: ALREADY INCLUDES GRAVITY EQUILIBRIUM
    def ss(self, state, control):
        #get params
        mass = 1/self.m
        c = np.cos(state[4])
        s = np.sin(state[4])
        fl = control[0] - .5 * self.m * g
        fr = control[1] - .5 * self.m * g

        #get jacobians
        Jx = np.array([
            [0, 1, 0, 0, 0, 0],
            [0, 0, 0, 0, (mass * (fl * c - fr * c)), 0],
            [0, 0, 0, 0, 0],
            [0, 0, 0, 0, (mass * (fl * s - fr * s)), 0],
            [0, 0, 0, 0, 1],
            [0, 0, 0, 0, 0]  
        ])

        Ju = mass * np.array([
            [0, 0],
            [-c, s],
            [0, 0],
            [s, -s],
            [0, 0],
            [(1/self.l), -(1/self.l)]
        ])

        return Jx, Ju, np.array([0, 0, 0, -g, 0, 0]).T

    #Nonlinear dynamics
    def f(self, state, control):
        #define params
        g = 9.81
        mass = 1/self.m
        inertia = 1/(self.mass * self.l)
        c = np.cos(state[4])
        s = np.sin(state[4])
        fl = control[0]
        fr = control[1]

        #define nonlinear dynamics
        new = np.zeros(())
        new[0] = state[0]
        new[1] = -mass * (fl * s + fr * s)
        new[2] = state[2]
        new[3] = mass * (fl * c + fr * c - self.m * g)
        new[4] = state[4]
        new[5] = -inertia * (fl + fr)

        return new
    
    #Step using RK45
    def step(self, state, control):
        #Init RK45
        def f(self, x):
            return self.f(x, control)
        
        self.integrator = integrate.solve_ivp(
            f, 
            [0, 10], 
            y0=state, 
            method="RK45", 
            max_step=self.dt, 
            atol = 1, 
            rtol = 1
        )

        #runge kutta step
        self.integrator.step()
        
        return self.integrator.y[0]

#Define 2D missile dynamics
class Missile2D(Dynamics_Model):
    def __init__(self, linear, m, J, r, d, noise, dt=None):
        #define model params
        super.__init__(self, linear, noise, dt)
        self.m - m #mass
        self.J = J #moment of inertia
        self.r = r #thrust offset 
        self.d = d #damping

    #Return jacobians of A and B
    #   -For kalman filtering input state
    #   -For linearized simulation input [dx_k, du_k]:
    #        dx_{k+1} = (x_k - x_{k-1}) and du_{k+1} = (u_k - u_{k-1})
    #   -Note: ALREADY INCLUDES GRAVITY EQUILIBRIUM
    def ss(self, state, control):
        #get params
        g = 9.81
        mass = 1/self.m
        damping = -self.c/self.m
        c = np.cos(state[4])
        s = np.sin(state[4])
        fx = control[0]
        fy = control[1] - self.m * g

        #get jacobians
        Jx = np.array([
            [0, 1, 0, 0, 0, 0],
            [0, damping, 0, 0, -(mass * (fx * s + fy * c)), 0],
            [0, 0, 0, 0, 0],
            [0, 0, 0, damping, (mass * (fx * c - fy * s)), 0],
            [0, 0, 0, 0, 1],
            [0, 0, 0, 0, 0]  
        ])

        Ju = mass * np.array([
            [0, 0],
            [c, -s],
            [0, 0],
            [s, -c],
            [0, 0],
            [-self.r/self.J, 0]
        ])

        return Jx, Ju

    #Nonlinear dynamics
    def f(self, state, control):
        #define params
        g = 9.81
        mass = 1/self.m
        c = np.cos(state[4])
        s = np.sin(state[4])
        fx = control[0] 
        fy = control[1]

        #define nonlinear dynamics
        new = np.zeros(state.shape)
        new[0] = state[0]
        new[1] = -mass * (fx * c - fy * s - self.c * state[0])
        new[2] = state[2]
        new[3] = mass * (fx * c + fy * c - self.m * g - self.c * state[2])
        new[4] = state[4]
        new[5] = (self.r / self.J) * fx

        return new
    
    #Step using RK45
    def step(self, state, control):
        #Init RK45
        def f(self, x):
            return self.f(x, control)
        
        self.integrator = integrate.solve_ivp(
            f, 
            [0, 10], 
            y0=state, 
            method="RK45", 
            max_step=self.dt, 
            atol = 1, 
            rtol = 1
        )

        #runge kutta step
        self.integrator.step()
        
        return self.integrator.y[0]

In [None]:
'''
Defines template dynamics class:


'''

class Sensor_Model:
    def __init__(self, linear, noise):
        self.linear = linear
        self.Q = noise

    #return linearity
    def isLinear(self):
        return self.linear

    #return noise
    def noise(self):
        return self.Q

    #linear state space model
    def ss(self, state):
        pass

    #nonlinear model
    def g(self, state):
        pass

    #step function for consistancy
    def step(self, state):
        return self.g(state)

#Define GPS and IMU linear sensor model
class LinearSensors(Sensor_Model):
    def __init__(self, noise):
        super.__init__(self, True, noise)
        self.A = np.array([
            [1, 0, 0, 0, 0, 0],
            [0, 0, 1, 0, 0, 0],
            [0, 0, 0, 0, 1, 0]
        ])

    #Sensor model
    def ss(self, state):
        return self.A

#Define GPS, IMU, and Proximity sensor nonlinear sensor model
class NonlinearSensors(Sensor_Model):
    def __init__(self, linear, l, noise):
        super.__init__(self, linear, noise)

        #define parameters
        self.l = l

    #Sensor model
    def ss(self, state):
        #define theta function and numerical derivative
        s = np.sin(state[4])
        dp = self.l * s
        theta_x = lambda x: (-s / np.abs(s)) * np.arcsin((x - np.abs(dp)) / self.l)
        theta_th = lambda th: (-np.sin(th) / np.abs(np.sin(th))) * np.arcsin((state[2] - np.abs(self.l * np.sin(th))) / self.l)
        dth_x = np.gradient(theta_x(state[0]))
        dth_th = np.gradient(theta_th(state[4]))

        #define jacobian
        JayZ = np.array([
            [1, 0, 0, 0, 0, 0],
            [0, 0, 1, 0, 0, 0],
            [0, 0, 1, 0, 0, 0],
            [dth_x, 0, 0, 0, dth_th, 0],
            [0, 0, 0, 0, 1, 0]
        ])

        return JayZ
    
    #define nonlinear sensor model
    def g(self, state):
        #define params
        s = np.sin(state[4])
        dp = self.l * s

        #define nonlinear dynamics
        new = np.zeros(state.shape)
        new[0] = state[0]
        new[1] = state[2]
        new[2] = state[2]
        new[3] = (-s / np.abs(s)) * np.arcsin((state[2] - np.abs(dp)) / self.l)
        new[4] = state[4]

        return new

In [None]:
#Set Estimator Class
class Estimator:
    def __init__(self):
        pass

    #Initialize
    def initialize(self, state, control=None):
        pass

    #step function for consistancy
    def step(self, measurement, state=None, control=None):
        pass

    #Get trajectory 
    def getTrajectory(self):
        pass

# Kalman Filter (linear or extended)
class ExtendedKalmanFilter(Estimator):
    def __init__(self, dynamics=None, sensors=None):
        #define modeltype and system
        self.dynamics = dynamics
        self.sensors = sensors
        self.linear = {
            "dynamics": self.dynamics.isLinear(),
            "sensors": self.sensors.isLinear()
        }

        #define noise
        self.R = self.dynamics.noise()
        self.Q = self.sensors.noise()

        #Define states
        self.dims = self.Q.shape[0]
        self.beliefs = None

    #Initialize filter with 
    def initialize(self, state):
        self.beliefs.append(GaussianDistribution(state, self.dynamics.R))

    #Prediction: compute model timestep
    def _predict(self, belief, control):
        #get every loop in case they're jacobians
        A, B = self.dynamics.ss(belief.mean, control)

        #compute prediction mean
        mean = None
        if self.linear['dynamics']:
            mean = A * belief.mean + B * control
        else:
            mean = self.dynamics.step(belief.mean, control)
        
        #compute prediction covariance
        covariance = A * belief.covariance * A.T + self.R

        return GaussianDistribution(mean, covariance)
    
    #Update step: compute kalman gain and incorporate measurement data
    def _update(self, belief, measurement):
        #get sensor model
        C = self.sensors.ss(belief.mean)

        #compute kalman gain
        K = belief.covariance * C.T * np.linalg.inv(C * belief.covariance * C.T + self.Q)

        #compute mean update
        mean = None
        if self.linear['sensors']:
            mean = belief.mean + K * (measurement - C * belief.mean)
        else:
            mean = belief.mean + K * (measurement - self.sensors.step(belief.mean))

        #compute covariance update
        covariance = (np.eye(self.dims) - K * C) * belief.covariance

        return GaussianDistribution(mean, covariance)

    #Run a single Kalman Filter Step
    def step(self, measurement, control):
        #get belief
        belief0 = self.beliefs[-1]

        #Step #1: Prediction
        prediction = self._predict(belief0, control)

        #Step #2: Update Step
        belief = self._update(prediction, measurement)
        self.beliefs.append(belief)

        #return belief distribution for filtering step
        return belief
    
    #return trajectory of distributions
    def getTrajectory(self):
        return self.beliefs

In [None]:
#Generate Trajectory samples and run through sensor model


#Run Kalman Filter on noisy data


#Plot results
