# Filter implementations

This file contains implementations of the methods described in my matura thesis. Please see the matura thesis for more
information concerning the theory behind these implementations.

In [None]:
import math
import numpy

# Complementary Filter

In [None]:
class ComplementaryFilter():
    """Implementation of a complementary filter"""
    def __init__(self, x, t, data, hp_weight=0.98, lp_weight=0.02):
        """Init function
        
        Keyword arguments:
        x  --  state of the system (angle)
        t  --  sampling interval (time between samples in seconds)
        data  --  array of data to process
                format: (accelerometer or magnetometer angle, gyro rate)
        hp_weight  --  High pass filter weight; optional but recommended
        lp_weight  --  Low pass filter weight; optional but recommended
        """
        self.x = x  # the state of the system (angle)
        self.t = t
        self.data = data  # (accelerometer/magnetometer angle, gyro rate)
        self.hp = hp_weight
        self.lp = lp_weight
        
        self.k = 0
        self.x_results = [self.x]
    
    def iterate(self):
        while self.k < len(self.data[0]):
            self.timestep()
    
    def timestep(self):
        angle = self.data[0][self.k]
        rate = math.degrees(self.data[1][self.k])
        
        self.x = self.hp * (self.x + rate * self.t) + self.lp * angle
        
        self.k = self.k + 1
        self.x_results.append(self.x)
        
    def set_angle(self, angle):
        self.x = angle
    
    def get_angle(self):
        return self.x
        
    def set_data(self, data):
        self.data = data
        
    def get_results(self):
        return self.x_results

# Complementary Filter using the Central Limit Theorem

In [2]:
class CLTFilter():
    """Implementation of a Sensor Fusion method using the central limit theorem"""
    def __init__(self, x , t, data, sigmas):
        """
        x  --  state of the system (angle)
        t  --  sampling interval (time between samples)
        data -- see above
        sigmas  --  tuple / array containing the variances of the measurements
            (sigma 1st measurement, sigma 2nd m., sigma combined)
        """
        self.x = x
        self.t = t  
        self.data = data
        self.sigmas = sigmas
        
        self.k = 0
        self.x_results = [self.x]
        
    def iterate(self):
        while self.k < len(self.data[0]):
            self.timestep()
    
    def timestep(self):
        angle = self.data[0][self.k]
        rate = self.data[1][self.k]
        
        self.x = ((self.sigmas[0] * angle) + (self.sigma[1] * (self.x + (rate * self.t)))) * self.sigmas[2]
        
        self.k = self.k + 1
        self.x_results.append(self.x)
        
    def set_angle(self, angle):
        self.x = angle
    
    def get_angle(self):
        return self.x
        
    def set_data(self, data):
        self.data = data
        
    def get_results(self):
        return self.x_results

# Kalman Filter

In [5]:
"""
This program is based on

https://github.com/TKJElectronics/KalmanFilter

by Kristian Lauszus, TKJ Electronis 2012

(available under the GNU General Public License)
"""
import math
import numpy as np
import scipy as sp


class KalmanFilter():
    """Implementation of the Kalman Filter"""
    def __init__(self, angle, t, data, Q_angle, Q_bias, R_measure):
        self.x = [angle, 0]  # state vector
        self.P = np.zeros((2, 2))
        
        self.Q_angle = Q_angle
        self.Q_bias = Q_bias
        self.R_measure = R_measure
        
        self.t = t
        self.data = data  # has the format (accelerometer/magnetometer angle, gyro rate)
        self.rate = 0
        
        self.k = 0
        self.x_results = [(self.x[0], self.x[1])]
        
    def iterate(self):
        while self.k < len(self.data[0]):
            self.timestep()
        print(self.x_results)
        return self.x_results
        
    def timestep(self):
        new_angle = self.data[0][self.k]
        new_rate = math.degrees(self.data[1][self.k])

        # (1) State Projetion
        self.rate = new_rate - self.x[1]
        self.x[0] += (self.t * self.rate)
        
        # (2) Error Covariance Estimation
        self.P[0][0] += self.t * (self.t * self.P[1][1] - self.P[0][1] - self.P[1][0] + self.Q_angle)
        self.P[0][1] -= self.t * self.P[1][1]
        self.P[1][0] -= self.t * self.P[1][1]
        self.P[1][1] += self.Q_bias * self.t
        
        # (3) Angle difference
        y = new_angle - self.x[0]
        
        # (4)
        S = self.P[0][0] + self.R_measure
        
        # (5) Calculate Kalman gain
        K = np.zeros((2,1))
        K[0] = self.P[0][0] / S
        K[1] = self.P[1][0] / S
        
        # (6) Calculate angle and bias
        self.x[0] += (K[0] * y)
        self.x[1] += (K[1] * y)
        
        # (7) Calculate error covariance
        _P00 = self.P[0][0]
        _P01 = self.P[0][1]
        
        self.P[0][0] -= (K[0] * _P00)
        self.P[0][1] -= (K[0] * _P01)
        self.P[1][0] -= (K[1] * _P00)
        self.P[1][1] -= (K[1] * _P01)
        
        # increase counter; push result
        self.k = self.k + 1
        angle = float(self.x[0])
        rate = float(self.x[1])
        self.x_results.append([angle, rate])

    
    
    def set_angle(self, angle):
        self.x[0] = angle
    
    def get_angle(self):
        return self.x[0]
    
    def set_data(self, data):
        self.data = data
    
    def set_Q_angle(self, q):
        self.Q_angle = q
    
    def set_Q_bias(self, q):
        self.Q_bias = q
        
    def set_R_measure(self, r):
        self.R_measure = r
        
    def get_Q_angle(self, ):
        return self.Q_angle
        
    def get_Q_bias(self):
        return self.Q_bias
        
    def get_R_measure(self):
        return self.R_measure

    def get_results(self):
        return self.x_results
   

# Madgwick Filter

In [None]:
"""
This program is based on

https://github.com/arduino-libraries/MadgwickAHRS

by Arduino LLC

(available under the GNU Lesser General Public License)

"""
class MadgwickFilter():
    """Implementation of the Madgwick Filter"""
    def __init__(self, t, data, beta=0.1, q0=1.0, q1=0.0, q2=0.0, q3=0.0, sixdof=False):
        self.beta = beta
        self.q0 = q0
        self.q1 = q1
        self.q2 = q2
        self.q3 = q3
        self.roll = 0
        self.pitch = 0
        self.yaw = 0
        
        self.data = data  # row format: [ax ay az gx gy gz mx my mz] (g in radians/s)
        
        self.k = 0
        self.results = [(self.q0, self.q1, self.q2, self.q3)]
        self.results_euler = []
        
        self.angles_computed = False
        
        self.t =  t  #  sample interval in seconds
        
        self.sixdof = sixdof
        
    def begin(self):
        pass
    
    def compute_angles(self):
        q0 = self.q0
        q1 = self.q1
        q2 = self.q2
        q3 = self.q3
        self.roll = math.atan2(q0 * q1 + q2 * q3, 0.5 - q1 * q1 - q2 * q2)
        self.pitch = math.asin(-2.0 * (q1 * q3 - q0 * q2))
        self.yaw = math.atan2(q1 * q2 + q0 * q3, 0.5 - q2 * q2 - q3 * q3)
        self.angles_computed = True
        
    def inv_sqrt(self, x):
        """
        Python implementation of the inverse square root method
    
        This function was taken from 
        
        https://github.com/ajcr/ajcr.github.io/blob/master/_posts/2016-04-01-fast-inverse-square-root-python.md
    
        by Alex Riley
        
        (available under the MIT License)""" 
        x05 = 0.5 * x
        y = np.float32(x)
        
        i = y.view(np.int32)
        i = np.int32(0x5f3759df) - np.int32(i >> 1)
        y = i.view(np.float32)
        
        y = y * (1.5 - (x05 * y * y))
        return float(y)
    
    def iterate(self):
        while self.k < len(self.data):
            self.timestep()
            self.compute_angles()
            self.results_euler.append([self.roll, self.pitch, self.yaw])
    
    def timestep(self):
        if self.sixdof:
            self.timestepIMU()
            return
        
        #  Use 6 DOF algorithm if the magnetometer measurements cannot be used
        if self.data[self.k][6] == 0 and self.data[self.k][7] == 0 and self.data[self.k][8] == 0:
            self.timestepIMU()
            return
            
        ax, ay, az, gx, gy, gz, mx, my, mz = self.data[self.k]
        
        q0 = self.q0
        q1 = self.q1
        q2 = self.q2
        q3 = self.q3
        
        qDot1 = 0.5 * (-q1 * gx - q2 * gy - q3 * gz)
        qDot2 = 0.5 * (q0 * gx + q2 * gz - q3 * gy)
        qDot3 = 0.5 * (q0 * gy - q1 * gz + q3 * gx)
        qDot4 = 0.5 * (q0 * gz + q1 * gy - q2 * gx)
       
        rec_norm = self.inv_sqrt(ax * ax + ay * ay + az * az)
        
        ax = ax * rec_norm
        ay = ay * rec_norm
        az = az * rec_norm
        
        rec_norm = self.inv_sqrt(mx * mx + my * my + mz * mz)
        mx = mx * rec_norm
        my = my * rec_norm
        mz = mz * rec_norm
        
        _2q0mx = 2.0 * q0 * mx
        _2q0my = 2.0 * q0 * my
        _2q0mz = 2.0 * q0 * mz
        _2q1mx = 2.0 * q1 * mx
        _2q0 = 2.0 * q0
        _2q1 = 2.0 * q1
        _2q2 = 2.0 * q2
        _2q3 = 2.0 * q3
        _2q0q2 = 2.0 * q0 * q2
        _2q2q3 = 2.0 * q2 * q3
        q0q0 = q0 * q0
        q0q1 = q0 * q1
        q0q2 = q0 * q2
        q0q3 = q0 * q3
        q1q1 = q1 * q1
        q1q2 = q1 * q2
        q1q3 = q1 * q3
        q2q2 = q2 * q2
        q2q3 = q2 * q3
        q3q3 = q3 * q3

        hx = mx * q0q0 - _2q0my * q3 + _2q0mz * q2 + mx * q1q1 + _2q1 * my * q2 + _2q1 * mz * q3 - mx * q2q2 - mx * q3q3
        hy = _2q0mx * q3 + my * q0q0 - _2q0mz * q1 + _2q1mx * q2 - my * q1q1 + my * q2q2 + _2q2 * mz * q3 - my * q3q3
        _2bx = math.sqrt(hx * hx + hy * hy)
        _2bz = -_2q0mx * q2 + _2q0my * q1 + mz * q0q0 + _2q1mx * q3 - mz * q1q1 + _2q2 * my * q3 - mz * q2q2 + mz * q3q3
        _4bx = 2.0 * _2bx
        _4bz = 2.0 * _2bz

        s0 = -_2q2 * (2.0 * q1q3 - _2q0q2 - ax) + _2q1 * (2.0 * q0q1 + _2q2q3 - ay) - _2bz * q2 * (_2bx * (0.5 - q2q2 - q3q3) + _2bz * (q1q3 - q0q2) - mx) + (-_2bx * q3 + _2bz * q1) * (_2bx * (q1q2 - q0q3) + _2bz * (q0q1 + q2q3) - my) + _2bx * q2 * (_2bx * (q0q2 + q1q3) + _2bz * (0.5 - q1q1 - q2q2) - mz)
        s1 = _2q3 * (2.0 * q1q3 - _2q0q2 - ax) + _2q0 * (2.0 * q0q1 + _2q2q3 - ay) - 4.0 * q1 * (1 - 2.0 * q1q1 - 2.0 * q2q2 - az) + _2bz * q3 * (_2bx * (0.5 - q2q2 - q3q3) + _2bz * (q1q3 - q0q2) - mx) + (_2bx * q2 + _2bz * q0) * (_2bx * (q1q2 - q0q3) + _2bz * (q0q1 + q2q3) - my) + (_2bx * q3 - _4bz * q1) * (_2bx * (q0q2 + q1q3) + _2bz * (0.5 - q1q1 - q2q2) - mz)
        s2 = -_2q0 * (2.0 * q1q3 - _2q0q2 - ax) + _2q3 * (2.0 * q0q1 + _2q2q3 - ay) - 4.0 * q2 * (1 - 2.0 * q1q1 - 2.0 * q2q2 - az) + (-_4bx * q2 - _2bz * q0) * (_2bx * (0.5 - q2q2 - q3q3) + _2bz * (q1q3 - q0q2) - mx) + (_2bx * q1 + _2bz * q3) * (_2bx * (q1q2 - q0q3) + _2bz * (q0q1 + q2q3) - my) + (_2bx * q0 - _4bz * q2) * (_2bx * (q0q2 + q1q3) + _2bz * (0.5 - q1q1 - q2q2) - mz)
        s3 = _2q1 * (2.0 * q1q3 - _2q0q2 - ax) + _2q2 * (2.0 * q0q1 + _2q2q3 - ay) + (-_4bx * q3 + _2bz * q1) * (_2bx * (0.5 - q2q2 - q3q3) + _2bz * (q1q3 - q0q2) - mx) + (-_2bx * q0 + _2bz * q2) * (_2bx * (q1q2 - q0q3) + _2bz * (q0q1 + q2q3) - my) + _2bx * q1 * (_2bx * (q0q2 + q1q3) + _2bz * (0.5 - q1q1 - q2q2) - mz)
        rec_norm = self.inv_sqrt(s0 * s0 + s1 * s1 + s2 * s2 + s3 * s3)
        s0 *= rec_norm
        s1 *= rec_norm
        s2 *= rec_norm
        s3 *= rec_norm

        qDot1 -= self.beta * s0
        qDot2 -= self.beta * s1
        qDot3 -= self.beta * s2
        qDot4 -= self.beta * s3
        
        q0 += qDot1 * self.t
        q1 += qDot2 * self.t
        q2 += qDot3 * self.t
        q3 += qDot4 * self.t

        rec_norm = self.inv_sqrt(q0 * q0 + q1 * q1 + q2 * q2 + q3 * q3)
        q0 *= rec_norm
        q1 *= rec_norm
        q2 *= rec_norm
        q3 *= rec_norm
        self.angles_computed = False
        
        self.q0 = q0
        self.q1 = q1
        self.q2 = q2
        self.q3 = q3
        
        self.k += 1
        self.results.append([q0, q1, q2, q3])
        
    def timestepIMU(self):
        
        ax, ay, az, gx, gy, gz, mx, my, mz = self.data[self.k]
        
        q0 = self.q0
        q1 = self.q1
        q2 = self.q2
        q3 = self.q3
        
        qDot1 = 0.5 * (-q1 * gx - q2 * gy - q3 * gz)
        qDot2 = 0.5 * (q0 * gx + q2 * gz - q3 * gy)
        qDot3 = 0.5 * (q0 * gy - q1 * gz + q3 * gx)
        qDot4 = 0.5 * (q0 * gz + q1 * gy - q2 * gx)
        
        rec_norm = self.inv_sqrt(ax * ax + ay * ay + az * az)
        ax = ax * rec_norm
        ay = ay * rec_norm
        az = az * rec_norm
        
        _2q0 = 2.0 * q0
        _2q1 = 2.0 * q1
        _2q2 = 2.0 * q2
        _2q3 = 2.0 * q3
        _4q0 = 4.0 * q0
        _4q1 = 4.0 * q1
        _4q2 = 4.0 * q2
        _8q1 = 8.0 * q1
        _8q2 = 8.0 * q2
        q0q0 = q0 * q0
        q1q1 = q1 * q1
        q2q2 = q2 * q2
        q3q3 = q3 * q3
        
        # Gradient decent algorithm corrective step
        s0 = _4q0 * q2q2 + _2q2 * ax + _4q0 * q1q1 - _2q1 * ay
        s1 = _4q1 * q3q3 - _2q3 * ax + 4.0 * q0q0 * q1 - _2q0 * ay - _4q1 + _8q1 * q1q1 + _8q1 * q2q2 + _4q1 * az
        s2 = 4.0 * q0q0 * q2 + _2q0 * ax + _4q2 * q3q3 - _2q3 * ay - _4q2 + _8q2 * q1q1 + _8q2 * q2q2 + _4q2 * az
        s3 = 4.0 * q1q1 * q3 - _2q1 * ax + 4.0 * q2q2 * q3 - _2q2 * ay
        rec_norm = self.inv_sqrt(s0 * s0 + s1 * s1 + s2 * s2 + s3 * s3)
        s0 *= rec_norm
        s1 *= rec_norm
        s2 *= rec_norm
        s3 *= rec_norm
        
        qDot1 -= self.beta * s0
        qDot2 -= self.beta * s1
        qDot3 -= self.beta * s2
        qDot4 -= self.beta * s3
        
        q0 += qDot1 * self.t
        q1 += qDot2 * self.t
        q2 += qDot3 * self.t
        q3 += qDot4 * self.t

        recipNorm = self.inv_sqrt(q0 * q0 + q1 * q1 + q2 * q2 + q3 * q3)
        q0 *= rec_norm
        q1 *= rec_norm
        q2 *= rec_norm
        q3 *= rec_norm
        
        self.angles_computed = False
        
        self.k += 1
        self.results.append([q0, q1, q2, q3])
        
        self.q0 = q0
        self.q1 = q1
        self.q2 = q2
        self.q3 = q3

    def get_results(self):
        return self.results
        
    def get_results_euler(self):
        return self.results_euler
     
    def get_roll(self):
        if not self.angles_computed:
            self.compute_angles()
        return self.roll
        
    def get_pitch(self):
        if not self.angles_computed:
            self.compute_angles()
        return self.pitch
        
    def get_yaw(self):
        if not self.angles_computed:
            self.compute_angles()
        return self.yaw
    