In [1]:
import numpy as np
import numpy.linalg

Following needs pip install ipympl

In [2]:
%matplotlib widget
np.set_printoptions(precision=8, suppress=True)

In [3]:
import data_receiver
from mathlib import *
from plotlib import *

In [4]:
# sampling rate
dt = 0.01    # s

# the initialization interval
ts = 1    # s

# Pull Data From Phone
data order: gyroscorpe, accelerometer, magnetometer
Phone is using the HYPERIMU app.

In [5]:
r = data_receiver.Receiver()

data = []

for line in r.receive():
    data.append(line.split(','))

data = np.array(data, dtype = np.float)

KeyboardInterrupt: 

# Initialization

In [None]:
# discard the first and last few readings
# for some reason they fluctuate a lot
w = data[10:-10, 0:3]
a = data[10:-10, 3:6]
m = data[10:-10, 6:9]

if(np.shape(w)[0] < ts/dt):
    print("not enough data for intialization!")

# gravity
gn = a[:int(ts/dt)].mean(axis = 0)
gn = -gn[:, np.newaxis]
g0 = np.linalg.norm(gn)  # save the initial magnitude of gravity

# magnetic field
mn = m[:int(ts/dt)].mean(axis = 0)
mn = Normalized(mn)[:, np.newaxis]  # magnitude is not important

avar = a[:int(ts/dt)].var(axis=0)
wvar = w[:int(ts/dt)].var(axis=0)
mvar = m[:int(ts/dt)].var(axis=0)
print('acc var: ', avar, ', ', np.linalg.norm(avar))
print('ang var: ', wvar, ', ', np.linalg.norm(wvar))
print('mag var: ', mvar, ', ', np.linalg.norm(mvar))

# cut the initialization data
w = w[int(ts/dt) - 1:] - w[:int(ts/dt)].mean(axis=0)
a = a[int(ts/dt):]
m = m[int(ts/dt):]

sample_number = np.shape(a)[0]

In [None]:
a_filtered, w_filtered = Filt_signal((a, w), dt=dt, wn=10, btype='lowpass')
plot_signal([a, a_filtered], [w, w_filtered], [m])

# Kalman Filter

In [None]:
gyro_noise = 10 * np.linalg.norm(wvar)
acc_noise = 10 * np.linalg.norm(avar)
mag_noise = 10 * np.linalg.norm(mvar)

P = 1e-10 * I(4)

In [None]:
a_nav = []
orientations = []

q = np.array([[1., 0., 0., 0.]]).T
orin = -gn / np.linalg.norm(gn)

t = 0
while t < sample_number:
    wt = w[t, np.newaxis].T
    at = a[t, np.newaxis].T
    mt = m[t, np.newaxis].T 
    mt = Normalized(mt)

    # Propagation
    Ft = F(q, wt, dt)
    Gt = G(q)
    Q = (gyro_noise * dt)**2 * Gt @ Gt.T
    
    q = Ft @ q
    q = Normalized(q)
    P = Ft @ P @ Ft.T + Q    

    # Measurement Update
    # Use only normalized measurements to reduce error!
    
    # acc and mag prediction
    pa = Normalized(-Rotate(q) @ gn)
    pm = Normalized(Rotate(q) @ mn)

    # Residual
    Eps = np.vstack((Normalized(at), mt)) - np.vstack((pa, pm))
    
    # internal error + external error
    Ra = [(acc_noise / np.linalg.norm(at))**2 + (1 - g0 / np.linalg.norm(at))**2] * 3
    Rm = [mag_noise**2] * 3
    R = np.diag(Ra + Rm)
    
    Ht = H(q, gn, mn)

    S = Ht @ P @ Ht.T + R
    K = P @ Ht.T @ np.linalg.inv(S)
    q = q + K @ Eps
    P = P - K @ Ht @ P
    
    # Post Correction
    q = Normalized(q)
    P = 0.5 * (P + P.T)  # make sure P is symmertical
    
    conj = -I(4)
    conj[0, 0] = 1
    an = Rotate(conj @ q) @ at + gn
    ori = Rotate(conj @ q) @ orin

    a_nav.append(an.T[0])
    orientations.append(ori.T[0])

    t += 1

a_nav = np.array(a_nav)
orientations = np.array(orientations)

# Accelerometer Bias/Error Correction

In [None]:
a_threshold = 0.2

In [None]:
t_start = 0
for t in range(sample_number):
    at = a_nav[t]
    if np.linalg.norm(at) > a_threshold:
        t_start = t
        break

t_end = 0
for t in range(sample_number - 1, -1,-1):
    at = a_nav[t]
    if np.linalg.norm(at - a_nav[-1]) > a_threshold:
        t_end = t
        break

In [None]:
print('motion starts at: ', t_start)
print('motion ends at: ', t_end)

In [None]:
an_drift = a_nav[t_end:].mean(axis=0)
an_drift_rate = an_drift / (t_end - t_start)

for i in range(t_end - t_start):
    a_nav[t_start + i] -= (i+1) * an_drift_rate

for i in range(sample_number - t_end):
    a_nav[t_end + i] -= an_drift

In [None]:
filtered_a_nav, = Filt_signal([a_nav], dt=dt, wn=(0.01, 15), btype='bandpass')
plot_3([a_nav, filtered_a_nav])
# plot_3([a_nav])

# Zero Velocity Update

In [None]:
velocities = []
prevt = -1
still_phase = False

v = np.zeros((3, 1))
t = 0
while t < sample_number:
    at = filtered_a_nav[t, np.newaxis].T

    if np.linalg.norm(at) < a_threshold:
        if not still_phase:
            predict_v = v + at * dt

            v_drift_rate = predict_v / (t - prevt)
            for i in range(t - prevt - 1):
                velocities[prevt + 1 + i] -= (i + 1) * v_drift_rate.T[0]

        v = np.zeros((3, 1))
        prevt = t
        still_phase = True
    else:
        v = v + at * dt
        still_phase = False
    
    t += 1
    velocities.append(v.T[0])
velocities = np.array(velocities)

In [None]:
plot_3([velocities])

# Integration To Get Position

In [None]:
positions = []
p = np.array([[0, 0, 0]]).T

t = 0
while t < sample_number:
    at = filtered_a_nav[t, np.newaxis].T
    vt = velocities[t, np.newaxis].T

    p = p + vt * dt + 0.5 * at * dt**2
    positions.append(p.T[0])

    t += 1

positions = np.array(positions)

In [None]:
plot_3D([[positions, 'position']])

In [None]:
plot_3([positions])

# Close All Graphs

In [None]:
plt.close('all')