<a href="https://colab.research.google.com/github/SchmetterlingIII/D.T./blob/main/main.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

This is an improvement from the initial `spline-interp.ipynb` with a focus on a clearer preceding pseudocode as well as code structure, with classes and helper functions being developed first.

### Next Steps

3. calibration phase (extenstion is storing of the data at the same frequency for now in a database)
4. the other spineclass functions (deviation handling and all that, adding other ones for prolonged static posture as well)
5. animate plottin
6. extension (visual cues in plotting, more detail on calibration stats)

# Checklist
**27/11/25**
- write the full spine analysis function (in whatever the simplest form looks like)
- get started on the animate function

**28/11/25**
- finish the animate function and run the whole thing

**29/11 -> interview**
- debug and *get some useful output from this that can be shown on a graph (like the graph on this paper `file:///C:/Users/25okotir/Downloads/2018_ICCSA_PostureSensei_Preprint.pdf`)


# Pseudocode

- Read in the data from the serial and unpack it so that it can be interpreted
- Clean up this data further such that the accelerometer data is cleaned (`madgwick-filter(accelerometer_data)`) and the output of this are the direction vectors of the tilt. *Further improvements to this function could also input the gyroscope and magnetometer data and have a more comprehensive input--the output will still be the direction vector of the tilt for each IMU so it doesn't change much*.
- Interpolate this appropriately (`forward-kinematics` then `cubic-spline-interp` is the current approach)
- Curvature analysis class that -- using whatever interpolated functions -- creates a list of the curvatures at each input
    - at first, the calibration phase happens where a distribution of the inputted curvatures (wrt their indices and time) are plotted for that duration (i.e. the state `calibration_duration`). This distribution is stored and then checked against for each instance outside of this test phase. I intend to use one-class support vector machine (SVM) but not sure currently. With enough tests (and ML that happens offline) I would like to produce a very inexpensive and accurate method of interpreting the data in the calibration phase but I am not sure currently and need guidance on how to interpret this data (in semi-real time).
    - After this, if an inputted curvature (i.e. instance of the digital twin of the user's spine) has deviated beyond a defined boundary (**how should this be defined?**) then the segment of the spine that this data falls within (which has a corresponding haptic motor) will be alerted through the serial and vibrate on the area of deviation. On the updated plot, there will be a red dots along the arclength of deviation for an update.
        - (for future proofing, I would also like for if there are high instances of static posture, but this feels like just another function and having the if condition be `if deviated or prolonged-static` where both of those are states within the class)
    - Otherwise, there is just a plot of the digital twin on the screen (what other metrics would be cool to have later?)
- plot the interpolated spline at a lower frequency than data is being interpreted and stored in database
- all of this data needs to be stored in a database for ML later and a clearer interpretation of what is happening at specific timesteps and to isolate errors that can be debugged

# Main

### Module Imports

In [None]:
import serial.tools.list_ports
import string
import serial

import matplotlib.pyplot as plt
from  mpl_toolkits.mplot3d import Axes3D
from matplotlib.animation import FuncAnimation

import time
from collections import deque
import numpy as np
from scipy.interpolate import CubicSpline
import scipy
import sys # specifically for debugging

### Serial Reading

In [None]:
BAUDRATE = 115200
try:
    '''
    Basic setup for port communication
    '''

    ports = serial.tools.list_ports.comports()
    serialInst = serial.Serial()
    portList = [str(i) for i in ports]
    print(portList)

    com = input("Select COM PORT for Arduino: ")

    for i in range(len(portList)):
        if portList[i].startswith("COM" + str(com)):
            SERIAL_PORT = "COM" + str(com)
            print(SERIAL_PORT)

    serialInst.baudrate = BAUDRATE
    serialInst.port = SERIAL_PORT
    serialInst.open()
    print(f"Connected to {SERIAL_PORT} at {BAUDRATE} baud.")

    '''
    Initial data initialisation.
    This is a complementary function to the c++ code which says how many sensors are connected and which positions they are in.
    '''
    while True:
        line = serialInst.readline().decode('utf-8') # each line is the decoded form of the serial
        if line: # if there is data in the readline i.e if line == True
            print(f"Arduino: {line}")
        if "Available channels: " in line:
            channels_part = line.split(":")[-1].strip() # extract all data after colon
            # parse the csv into on stringed list
            IMU_ID_LIST = [id.strip() for id in channels_part.split(",") if id.strip()]
        if "Number of sensors: " in line:
           ID_NUM = int(line.strip(":")[-3]) # the number of read sensors, last instance is "\n" and so index = -3 is the appropriate index
           IMU_DEQUES = [deque(maxlen=50) for i in range(ID_NUM)]
        if "Waiting for 'begin program' command" in line:
            break

    '''
    Input of linear distances for the forward kinematics chain (and subsequent calculations)
    '''
    print("INSTRUCTIONS:\nInput the linear distances between your sensors in metres.\nMeasure from lowest to highest.\nI would recommend using a high resolution ruler to reduce drift.\n")
    linear_distances = []
    for i in range(ID_NUM - 1):
        value = float(input(f"{i + 1}: "))
        linear_distances.append(value)

    print("Sending 'begin' command to Arduino")
    serialInst.write(b'begin program') # sent in bytes rather than a high level string since it is sent to back to the compiler

    time.sleep(2)

### Helper Functions
- data filtering (madgwick filter)
- forward kinematics function
- cubic spline (outputting discretised function)

In [None]:
def angle_tilt_filter(IMU_data, dt):
    '''
    https://www.youtube.com/watch?v=7VW_XVbtu9k
    Use the above video to extract the angle between each of the IMUs.

    This is less susceptible to gyro tilt over time

    Return: normalised matrix containing the vector values of each of the filtered IMUs using this angle extraction method.
    '''

    '''
    each data is streamed one at a time.
    find the local pitch and yaw
    from these angles, get the appropriate direction vector
    store into the matrix

    return np.array([(), (), (), ..., ()])
    '''

    accel_angle_x = np.arctan2(ay, az) # about the x-axis
    accel_angle_y = np.arctan2(ax, az) # about the y-axis

    gyro_x += gyro_x * dt
    gyro_y += gyro_y * dt

    # rather than a filter yet, I will just have a bias towards angular tilt
    fused_angle_x = 0.98 * (gyro_x) + 0.02 * accel_angle_x # roll
    fused_angle_y = 0.98 * (gyro_y) + 0.02 * accel_angle_y # pitch

def angles_to_direction_vector(roll, pitch):
    '''
    Calculates normal vector and direction vector in direction of y-axis.

    The y-axis direction vector will be used for the forward kinematics; the normal vector will reinforce calculations of curvature (and be the precursor to having a 3D understanding of curvature along the spine).
    '''
    # normal
    normal = np.array([
        np.sin(pitch) * np.cos(roll),
        -np.sin(roll),
        np.cos(roll) * np.cos(pitch)
    ])

    # y direction
    y_direction = np.array([
        np.sin(pitch) * np.sin(roll),
        np.cos(roll),
        np.cos(pitch) * np.sin(roll)
    ])

    normal = normal / np.linalg.norm(normal)
    y_direction = y_direction / np.linalg.norm(y_direction)

    return normal, y_direction

def kalman_filter(angle_filtered_data):
    '''
    https://www.youtube.com/watch?v=5HuN9iL-zxU&list=PLeuMA6tJBPKsAfRfFuGrEljpBow5hPVD4&index=18
    Another tutorial that will help me actually have an accurate input of the data, without resorting to l'IA or learning the maths behind this.

    I will look into how Kalman filters work from a high level but if there is a python module for it then I will be happy to just copy it.
    '''
    return filtered_data

def forward_kinematics(filtered_data, linear_distances):
    '''
    Computes positions using the IMU data and distances, assuming that the base IMU is at the origin.
    Returns: list of 3D positions where the IMUs are in an arbitrary & scaled 3D space

    For later improvements, I will use quaternions to handle the tilt as done in the CHARM Lab device.
    '''

    # if no data, return nothing important (in the same format which can be unpacked but not causing a crash)
    if not filtered_data:
        return np.array([]), np.array([])

    origin = np.array([0,0,0])
    p_n = [origin]
    cumulative_distance = 0
    t_values = [0]

    for i in range(len(filtered_data)):
        v_n = filtered_data[i] # the direction vector at this point
        l_n = linear_distances[i] if i != (len(filtered_data) - 1) else 0.1 # the scalar distance between the upcoming sensor and the current
        # I have added the 0.1 since the final IMU will not have a subsequent sensor to work towards so this tilt will approximate what's happening up to the neck area

        p_n_plus_1 = p_n[-1] + (v_n * l_n) # the next position vector along the chain
        p_n.append(p_n_plus_1)

        cumulative_distance += l_n
        t_values.append(cumulative_distance)

    return p_n, t_values # returns vector position & t_values for interpolation

def cubic_spline_interpolation(IMU_positions, t_values):
    '''
    Interpolates the function using the formed kinematic chain in a parametrised format.

    Returns the plotting values
    '''
    x = IMU_positions[:, 0]
    y = IMU_positions[:, 1]
    z = IMU_positions[:, 2]

    xc = CubicSpline(t_values, x)
    yc = CubicSpline(t_values, y)
    zc = CubicSpline(t_values, z)

    # this variable stores the t_values that are along this interpolated spline in a discrete package i.e. "plotting_t_values"
    discrete_points = 250
    plot_t = np.linspace(min(t_values), max(t_values), discrete points)

    return xc, yc, zc, plot_t

def curvature_list(plot_t):
    '''
    This function returns an array of scalar curvatures for each point on the interpolated spline function.

    The issue with this (for future reference) is that these points only show a scalar and so improvements of this could be to interpolate the normal values of the vectors along the function to have a better understanding of curvature in 3D space.
    '''
    r = (xc(plot_t, 0), yc(plot_t, 0), zc(plot_t, 0))
    r_prime = (xc(plot_t, 1), yc(plot_t, 1), zc(plot_t, 1))
    r_double_prime = (xc(plot_t, 2), yc(plot_t, 2), zc(plot_t, 2))

    kappa = (np.linalg.norm(np.cross(r_prime, r_double_prime)))/(np.linalg.norm(r_prime)**3)
    return kappa # an array of scalar values for the curvature along this interpolated spline


### IMU Class
This just holds the state for each of the IMUs (their direction vectors) and the `dt` for each of it. Further, it will hold how filtered the data is as a quality.

In [None]:
class IMU_data:
    def __init__(self):
        self.direction_vector = None
        self.time = time.time() # used for calculating the `dt` of the sensors

### Spine Analysis Class
This is the class that effectively analyses the instance of the curvature and, by comparing this instance with the distribution of curvatures stored during the calibration mode, outputs whether there is (a) a deviation from "good" posture* or (b) prolonged static posture.

*The feeback mechanism of this is inherently flawed given there shows diminishing links to this effect. However, working towards the distribution is still a good reason to learn ML (and with enough data, this awareness and deviation from "current posture" will help to inform more complex decisions of when alerts should go off).

In [None]:
class SpineAnalysis:
    def __init__(self):
        self.is_calibrating = True
        self.baseline_curvature = None
        self.calibration_duration = 30
        self.calibration_dataset = None


        self.poor_posture_timer = 5
        self.poor_posture_start = None # using time.time() (or more efficient/local clocks for performance reasons)

    def calibration_phase(self, curvature_instance):
        '''
        This calibrates, using statistical distributions, what the user's baseline posture is in a semi-static way.

        Result: storing values to know what a good posture looks like (learning one-class SVM is not worth it currently but good to consider for later)
        '''
        # exit if this function was mistakenly called
        if self.is_calibrating = False:
            # exit this function

        # initially the data is being collected so this should be stored in the calibration-dataset
        self.calibration_dataset.append(curvature_instance)

        elapsed_time = time.time() - self.calibration_duration
        if elapsed_time > self.calibration_duration:
            # now there will be attempts at understanding how this data is distributed and what a "deviation" looks like

        self.is_calibrating = False
        print("\nNOW EXITING CALIBRATION PHASE\n\n")

    def posture_deviance(self, curvature_instance):
        '''
        Records any instance of posture deviation anywhere. If it goes on for a time greater than the threshold then another function will be called to handle this.
        '''

    def sustained_posture_deviance(self, curvature_instance):
        '''
        1. Find where along the spline this has occurred
        2. Map this with the segments from the interpolation and output that
        '''


### Database Logging
Rather than scaffolding with a `.csv` file first, going straight to a database will be just more useful for real data analysis (and storing such large data).

Also, pretty complex scripts would be necessary to label the specific times when certain tags are necessary (e.g. "calibrating" or "deviated") so this is just better practice.

Further, looking at how to properly thread this (potentially at a higher frequency) would be more beneficial.

### Plot Setup
Quite simple and self-explanatory.

In [None]:
    fig = plt.figure(figsize=(15,9))
    ax = fig.add_subplot(121, projection='3d')
    ax2 = fig.add_subplot(122)

    ## set the points for the animate(i) function
    ## these variables are what are updated (and saves clearing the plot each time)
    scatter = ax.scatter([], [], [], s=50)
    line, = ax.plot([], [], [])
    scatter_hotspot = ax.scatter([], [], [], color='red', s=50)

    ax.set_title("IMU Positions")
    ax.set_xlabel("X (m)")
    ax.set_ylabel("Y (m)")
    ax.set_zlabel("Z (m)")

### Animate Function
The function that brings together all the helper functions and it written closer to the style of the earlier pseudocode without the initial pitfalls of being completely unable to debug this monolithic block of code.

In [None]:
def animate(i):
    return scatter, line, scatter_hotspot

anim = FuncAnimation(fig, animate, cache_frame_data=False, interval=100, blit=True) # blitting only draws the dynamic aspects of the plot

ax.set_box_aspect([1,1,1])
ax.set_proj_type('ortho')
plt.tight_layout()
plt.show() # is plt.show() the most efficient?

### Exceptions & Error Handling
This is the error handling for the entire system.

Sometimes, for some previous tests, the system freezes and I kind of don't want that to happen so looking at ways of stopping the program without the risk of it fully crashing would be nice.