In [1]:
'''
‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾
Notebook for sensor fusion on 6-axis IMU (accelerometer + gyroscope) data.
_______________________________________________________________________________
''';

Import libraries:

In [10]:
import math
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import plotly.graph_objects as go
import ipywidgets as widgets
import os
import time
from IPython.display import display
from IPython.display import clear_output
from IPython.display import HTML
from IPython import get_ipython

def plot_path(df):
    """
    Plots the 3D path of a point over time using Plotly.
    
    Parameters:
    df (pd.DataFrame): DataFrame containing
    x, y, and z positions in the first three columns.
    """
    fig = go.Figure()
    fig.data = []

    # Add trace for the path
    fig.add_trace(go.Scatter3d(
        x=df.iloc[:, 0],  # x positions
        y=df.iloc[:, 1],  # y positions
        z=df.iloc[:, 2],  # z positions
        mode='lines',
        name='Path'
    ))

    # Add markers for start and end points
    fig.add_trace(go.Scatter3d(
        x=[df.iloc[0, 0]],  # start x position
        y=[df.iloc[0, 1]],  # start y position
        z=[df.iloc[0, 2]],  # start z position
        mode='markers',
        marker=dict(size=10, color='green'),
        name='Start'
    ))

    fig.add_trace(go.Scatter3d(
        x=[df.iloc[-1, 0]],  # end x position
        y=[df.iloc[-1, 1]],  # end y position
        z=[df.iloc[-1, 2]],  # end z position
        mode='markers',
        marker=dict(size=10, color='red'),
        name='End'
    ))

    # Update layout
    fig.update_layout(
        title='3D Path Over Time',
        scene=dict(
            xaxis_title='X Position',
            yaxis_title='Y Position',
            zaxis_title='Z Position'
        ),
        showlegend=True
    )

    fig.show()

class imuData:
    def __init__(self, name, df, sensor_num):
        self.name = name
        self.indices = [row for row in df.index]

        self.start_row_index = self.indices.index(
            'DelsysTrignoBase 1: Sensor '+str(sensor_num)+'IM ACC Pitch')

        self.all_data = df.iloc[
            self.start_row_index : self.start_row_index+6]

        acc_data = self.all_data.iloc[0:3]
        self.a_local = acc_data.transpose()
        self.a_local.columns.values[0:3] = ['a_pitch', 'a_roll', 'a_yaw']

        sqrt_acc = np.square(self.a_local)
        net_acc_sq = sqrt_acc.apply(np.sum, axis=1, raw=True)
        self.net_acc = np.sqrt(net_acc_sq)

        gyr_data = self.all_data.iloc[3:7]
        self.omega_local = gyr_data.transpose()
        self.omega_local.columns.values[0:3] = ['omega_pitch', 'omega_roll',
                                                'omega_yaw']

        self.measurements = self.a_local.join(self.omega_local)

        self.frames = len(acc_data.columns)

    def __str__(self):
        return f"ImuData object.\nname:  '{self.name}'\nframes: {self.frames}"

# Filter

In [11]:
import numpy as np
#12345678|212345678|312345678|412345678|512345678|612345678|712345678|
class KF6:
    def __init__(self, dt, Q, R, x_0, P_0):
        
        self.dt = float(dt)
        
        Q = np.array(Q)
        if Q.shape != (16, 16):
            raise ValueError(
                '"Q" shape error: expected (16, 16), '
                f'got {Q.shape} instead'
            )
        self.Q = Q
        
        R = np.array(R)
        if R.shape != (6, 6):
            raise ValueError(
                '"R" shape error: expected (6, 6), '
                f'got {R.shape} instead'
            )
        self.R = R
        
        x_0 = np.array(x_0)
        if x_0.ndim != 2 or x_0.shape != (16, 1):
            raise ValueError(
                '"x_0" shape error: '
                'expected 2D column vector (16, 1), '
                f'got {x_0.shape} instead'
            )
        self.x = x_0
        
        P_0 = np.array(P_0)
        if P_0.shape != (16, 16):
            raise ValueError(
                '"P_0" shape error: expected (16, 16), '
                f'got {P_0.shape} instead'
            )
        self.P = P_0

    def get_A(self, x, dt):
        wN, wE, wD = [float(x[i, 0]) for i in range(13, 16)]
        A = np.diag(16*[1])
        for i in range(6):
            A[i,i+3] = dt
        bottom = np.array(
            [[1, -dt*wN/2, -dt*wE/2, -dt*wD/2],
            [dt*wN/2, 1, dt*wD/2, -dt*wE/2],
            [dt*wE/2, -dt*wD/2, 1, dt*wN/2],
            [dt*wD/2, dt*wE/2, -dt*wN/2, 1]],
            dtype=float
            )
        A[9:13,9:13] = bottom
        return A

    def quat2matrix(self, q):
        q0, q1, q2, q3 = [float(q[i,0]) for i in range(4)]
        C = np.array(
            [[1 - 2*(q2**2 + q3**2), 2*(q1*q2 - q0*q3),
              2*(q1*q3 + q0*q2)]
            ,[2*(q1*q2 + q0*q3), 1 - 2*(q1**2 + q3**2),
              2*(q2*q3 - q0*q1)]
            ,[2*(q1*q3 - q0*q2), 2*(q2*q3 + q0*q1),
              1 - 2*(q1**2 + q2**2)]]
            )
        return C.reshape(3,3)
    
    def get_H(self, x):
        C = self.quat2matrix(x[9:13])
        H = np.zeros((6, 16))
        H[0:3, 6:9] = C.T
        H[3:6, 13:16] = C.T
        return H
    
    def quat_norm(self, q):
        norm = np.linalg.norm(q)
        if norm == 0:
            raise ValueError('Cannon normalize a zero vector')
        return q/norm

    def predict(self):
        self.A = self.get_A(self.x, self.dt)
        self.H = self.get_H(self.x)
        self.xp = self.A @ self.x
        self.xp[5] -= 9.8*self.dt
        self.xp[9:13] = self.quat_norm(self.xp[9:13])
        self.Pp = self.A @ self.P @ self.A.T + self.Q

    def update(self, z):

        z = np.array(z).reshape(-1, 1)

        self.y = z - self.H @ self.xp
        self.K = self.Pp @ self.H.T @ np.linalg.inv(
            self.H @ self.Pp @ self.H.T + self.R)
        self.x = self.xp + self.K @ (self.y)
        self.P = (np.eye(16) - self.K @ self.H) @ self.Pp
        return self.x

# Example usage
def testKF6():
    pvar, vvar, avar, qvar, wvar = 0.01, 0.01, 0.01, 0.01, 0.01
    testQ = np.diag([pvar]*3 + [vvar]*3 + [avar]*3 + [qvar]*4
                    + [wvar]*3)
    [a_pitch_var, a_roll_var, a_yaw_var, w_pitch_var, w_roll_var,
     w_yaw_var] = [1, 1, 1, 1, 1, 1]
    testR = np.diag([a_pitch_var, a_roll_var, a_yaw_var, w_pitch_var,
                    w_roll_var, w_yaw_var])
    testx = np.zeros(16).reshape(-1, 1)
    testx[9:13] = np.array([0.70710678,-0.70710678,0.,0.]).reshape(-1, 1)
    testP = np.diag([pvar]*3 + [vvar]*3 + [avar]*3 + [qvar]*4
                    + [wvar]*3)
    return KF6(dt=1., Q=testQ, R=testR, x_0=testx, P_0=testP)

testKF_ = testKF6()

testKF_.predict()


# Menu

In [13]:
# Get the list of subdirectories
participants = [d for d in os.listdir('trialData')]

# Create the participant dropdown widget with subdirectories as options
selected_participant = widgets.Dropdown(
    options=participants,
    value=participants[0] if participants else None,
    description='Participant:',
    disabled=False,
)

# Display the participant dropdown
display(selected_participant)

# Function to handle participant changes
def on_participant_change(change):
    if change['type'] == 'change' and change['name'] == 'value':
        selected_participant = change['new']
        # print(f'Selected participant: {selected_participant}')
        
        # Update the trial dropdown based on the selected participant
        trials = [d for d in os.listdir(f'trialData/{selected_participant}')]
        selected_trial.options = trials
        selected_trial.value = trials[0] if trials else None

# Attach the handler to the participant dropdown
selected_participant.observe(on_participant_change, names='value')

# Initial list of trials for the default participant
trials = [d for d in os.listdir(f'trialData/{selected_participant.value}')]

# Create the trial dropdown widget with subdirectories as options
selected_trial = widgets.Dropdown(
    options=trials,
    value=trials[0] if trials else None,
    description='Trial:',
    disabled=False,
)

output = widgets.Output()
# Display the trial dropdown
display(selected_trial)
display(output)

# Function to handle trial changes
def on_trial_change(change):
    global eg_measurements
    global filtered_df
    if change['type'] == 'change' and change['name'] == 'value':
        with output:
            clear_output(wait=True)
            display(HTML(f'<b>Getting measurements from "trialData/{selected_participant.value}/{change["new"]}":</b>'))
            # print('\n')
            trial_path = f'trialData/{selected_participant.value}/{change["new"]}'
            eg_df = pd.read_csv(trial_path, index_col=0)
            eg_imu = imuData('egIMU', eg_df, 4)
            eg_measurements = eg_imu.measurements
            display(eg_measurements)
            testKF = testKF6()
            filtered_data = []
            gravity = 9.8
            deg2rad = math.pi/180
            zconvesion = np.diag([gravity]*3 + [deg2rad]*3)
            for k, z in eg_measurements.iterrows():
                z = z.to_numpy().reshape(-1, 1)
                z = zconvesion @ z
                testKF.predict()
                filtered_state = testKF.update(z)
                filtered_data.append(filtered_state)
                
                if int(k) > 1000:
                    break

            # pd.DataFrame(filtered_data)
            filtered_data = np.array(filtered_data)
            filtered_data = filtered_data.reshape(filtered_data.shape[0], 16)
            filtered_df = pd.DataFrame(filtered_data)
            filtered_df.columns = ['pN', 'pE', 'pD', 'vN', 'vE', 'vD', 'aN',
                                    'aE', 'aD', 'q0', 'q1', 'q2', 'q3', 'wN', 'wE', 'wD']
        plot_path(filtered_df)

        # print(f'Participant: {selected_participant.value}    Trial: {selected_trial.value}', end='\r')

# Attach the handler to the trial dropdown
selected_trial.observe(on_trial_change, names='value')

Dropdown(description='Participant:', options=('A01', 'A02', 'A03', 'A04', 'A05', 'A06', 'C03', 'C04', 'C05', '…

Dropdown(description='Trial:', options=('A01_Fast_01.csv', 'A01_Fast_02.csv', 'A01_Fast_03.csv', 'A01_Fast_04.…

Output()

# Test

In [7]:
print(selected_participant.value)
print(selected_trial.value)
testKF = testKF6()
filtered_data = []
gravity = 9.8
deg2rad = math.pi/180
zconvesion = np.diag([gravity]*3 + [deg2rad]*3)
for k, z in eg_measurements.iterrows():
    z = z.to_numpy().reshape(-1, 1)
    z = zconvesion @ z
    testKF.predict()
    filtered_state = testKF.update(z)
    filtered_data.append(filtered_state)
    
    if int(k) > 1000:
        break

# pd.DataFrame(filtered_data)
filtered_data = np.array(filtered_data)
filtered_data = filtered_data.reshape(filtered_data.shape[0], 16)
filtered_df = pd.DataFrame(filtered_data)
filtered_df.columns = ['pN', 'pE', 'pD', 'vN', 'vE', 'vD', 'aN',
                        'aE', 'aD', 'q0', 'q1', 'q2', 'q3', 'wN', 'wE', 'wD']
filtered_df
plot_path(filtered_df)

A01
A01_Fast_04.csv


In [None]:
if eg_measurements.isnull().values.any():
    print('nan')
else:
    print('no nan')

no nan


# /

In [None]:
from IPython.display import display
from ipywidgets import interact

# Define a function dependent on a variable
def my_function(x):
    print(f"x is now: {x}")

# Create an interactive widget
interact(my_function, x=10)


interactive(children=(IntSlider(value=10, description='x', max=30, min=-10), Output()), _dom_classes=('widget-…

<function __main__.my_function(x)>

In [16]:
', '.join(['pN', 'pE', 'pD', 'vN', 'vE', 'vD', 'aN', 'aE', 'aD', 'q0', 'q1', 'q2', 'q3', 'wN', 'wE', 'wD'])

'pN, pE, pD, vN, vE, vD, aN, aE, aD, q0, q1, q2, q3, wN, wE, wD'