In [None]:
import matplotlib.pyplot as plt
from load import load_2023_12_30
from tile import Tile
from track import Track
from signal_processing import maxIndex

a50_2023_12_30, a50_all_2023_12_30, f6p_2023_12_30, tile_2023_12_30_runs = load_2023_12_30()

## Plotting Euler Data with G-forces

G-forces should register at twice the frequency due to similar angular accelerations felt at every turn, while rolling should occur +ve, then -ve between turns. Assuming this, we can clearly calculate the number of turns and determine if they were left or right.

In [None]:
run1 = tile_2023_12_30_runs[0]
run2 = tile_2023_12_30_runs[1]
run1_euler6 = run1.imu6.euler
run1_euler9 = run1.imu9.euler
run2_euler6 = run2.imu6.euler
run2_euler9 = run2.imu9.euler

In [None]:
plt.rc('lines', linewidth=1)
fig, ax = plt.subplots(5, figsize=(15, 8))

ax[0].plot(run1.time, run1_euler6[:, 0], label='6dof')
ax[0].plot(run1.time, run1_euler9[:, 0], label='9dof')
ax[0].set_title('Run 1 Roll', wrap=True)
ax[0].legend()

ax[1].plot(run1.time, run1_euler6[:, 1], label='6dof')
ax[1].plot(run1.time, run1_euler9[:, 1], label='9dof')
ax[1].set_title('Run 1 Pitch', wrap=True)
ax[1].legend()

ax[2].plot(run1.time, run1_euler6[:, 2], label='6dof')
ax[2].plot(run1.time, run1_euler9[:, 2], label='9dof')
ax[2].set_title('Run 1 Yaw', wrap=True)
ax[2].legend()

ax[3].plot(run1.time, run1.mG_lpf())
ax[3].set_title('Run 1 Filtered mG-forces', wrap=True)

ax[4].plot(run1.time, run1.mG)
ax[4].set_title('Run 1 Unfiltered mG-forces', wrap=True)

plt.tight_layout()
plt.show()

## Identifying the Still Points

Maintain a causal design!

In [None]:
_, _, _, tile_2023_12_30 = load_2023_12_30(split_tile=False)
tile_2023_12_30_euler6 = tile_2023_12_30.imu6.euler
tile_2023_12_30_euler9 = tile_2023_12_30.imu9.euler

In [None]:
a50_lifts = [track for track in a50_all_2023_12_30 if track.track_type == "Lift"]

### Reprenting Lift and Ski Zones

In [None]:
def addRunAndLiftZones(ax, runs, lifts):
    [ax.axvspan(run.time[0], run.time[-1], color='green', alpha=0.25) for run in runs]
    [ax.axvspan(lift.time[0], lift.time[-1], color='red', alpha=0.25) for lift in lifts]

In [None]:
plt.rc('lines', linewidth=1)
fig, ax = plt.subplots(5, figsize=(15, 8))

ax[0].plot(tile_2023_12_30.time, tile_2023_12_30_euler6[:, 0], label='6dof')
ax[0].plot(tile_2023_12_30.time, tile_2023_12_30_euler9[:, 0], label='9dof')
addRunAndLiftZones(ax[0], tile_2023_12_30_runs, a50_lifts)
ax[0].set_title('All Tile Roll with Run & Lift Zones', wrap=True)
ax[0].legend()

ax[1].plot(tile_2023_12_30.time, tile_2023_12_30_euler6[:, 1], label='6dof')
ax[1].plot(tile_2023_12_30.time, tile_2023_12_30_euler9[:, 1], label='9dof')
addRunAndLiftZones(ax[1], tile_2023_12_30_runs, a50_lifts)
ax[1].set_title('All Tile Pitch with Run & Lift Zones', wrap=True)
ax[1].legend()

ax[2].plot(tile_2023_12_30.time, tile_2023_12_30_euler6[:, 2], label='6dof')
ax[2].plot(tile_2023_12_30.time, tile_2023_12_30_euler9[:, 2], label='9dof')
addRunAndLiftZones(ax[2], tile_2023_12_30_runs, a50_lifts)
ax[2].set_title('All Tile Yaw with Run & Lift Zones', wrap=True)
ax[2].legend()

ax[3].plot(tile_2023_12_30.time, tile_2023_12_30.mG_lpf())
addRunAndLiftZones(ax[3], tile_2023_12_30_runs, a50_lifts)
ax[3].set_title('All Tile Filtered mG-forces with Run & Lift Zones', wrap=True)

ax[4].plot(tile_2023_12_30.time, tile_2023_12_30.mG)
addRunAndLiftZones(ax[4], tile_2023_12_30_runs, a50_lifts)
ax[4].set_title('All Tile Unfiltered mG-forces with Run & Lift Zones', wrap=True)

plt.tight_layout()
plt.show()

### At the Point of Lift Liftoff

Taking a closer look at the start of the lift zone, starting with the first lift:

In [None]:
start = tile_2023_12_30.time.index(a50_lifts[0].time[0])
x1 = start - 2500 # sub time to window
x2 = start + 2500 # add time to window
print(x1, x2)

In [None]:
impact_idx = maxIndex(tile_2023_12_30.mG, [x1, x2])

In [None]:
def plotEulerAndGs(x1, x2):
    plt.rc('lines', linewidth=1)
    fig, ax = plt.subplots(6, figsize=(14, 9))

    ax[0].plot(tile_2023_12_30.time[x1:x2], tile_2023_12_30.corrected_alt[x1:x2], label='tile')
    ax[0].axvline(x=tile_2023_12_30.time[impact_idx], color='k')
    ax[0].set_title('Moment of Lift 1 Liftoff Tile Altitude (Correced from A50)', wrap=True)
    ax[0].legend()

    ax[1].plot(tile_2023_12_30.time[x1:x2], tile_2023_12_30_euler6[x1:x2, 0], label='6dof')
    ax[1].plot(tile_2023_12_30.time[x1:x2], tile_2023_12_30_euler9[x1:x2, 0], label='9dof')
    ax[1].axvline(x=tile_2023_12_30.time[impact_idx], color='k')
    ax[1].set_title('Moment of Lift 1 Liftoff Tile Roll', wrap=True)
    ax[1].legend()

    ax[2].plot(tile_2023_12_30.time[x1:x2], tile_2023_12_30_euler6[x1:x2, 1], label='6dof')
    ax[2].plot(tile_2023_12_30.time[x1:x2], tile_2023_12_30_euler9[x1:x2, 1], label='9dof')
    ax[2].axvline(x=tile_2023_12_30.time[impact_idx], color='k')
    ax[2].set_title('Moment of Lift 1 Liftoff Tile Pitch', wrap=True)
    ax[2].legend()

    ax[3].plot(tile_2023_12_30.time[x1:x2], tile_2023_12_30_euler6[x1:x2, 2], label='6dof')
    ax[3].plot(tile_2023_12_30.time[x1:x2], tile_2023_12_30_euler9[x1:x2, 2], label='9dof')
    ax[3].axvline(x=tile_2023_12_30.time[impact_idx], color='k')
    ax[3].set_title('Moment of Lift 1 Liftoff Tile Yaw', wrap=True)
    ax[3].legend()

    ax[4].plot(tile_2023_12_30.time[x1:x2], tile_2023_12_30.mG_lpf()[x1:x2])
    ax[4].axvline(x=tile_2023_12_30.time[impact_idx], color='k')
    ax[4].set_title('Moment of Lift 1 Liftoff Tile Filtered mG-forces', wrap=True)

    ax[5].plot(tile_2023_12_30.time[x1:x2], tile_2023_12_30.mG[x1:x2])
    ax[5].axvline(x=tile_2023_12_30.time[impact_idx], color='k')
    ax[5].set_title('Moment of Lift 1 Liftoff Tile Unfiltered mG-forces', wrap=True)

    plt.tight_layout()
    plt.show()

In [None]:
plotEulerAndGs(x1, x2)

#### Analysis

- the first flat period is waiting in line for the gate to open, not always available
- the second is standing waiting for the lift to hit the legs, seen by the impulse in the unfiltered mG-forces
  - motion during this time may happen, only correct for times it doesn't since this will always happen

### At the Point of Lift Landing

Taking a closer look at the end of the lift zone, starting with the first lift:

In [None]:
stop = tile_2023_12_30.time.index(a50_lifts[0].time[-1])
x1 = stop - 2500 # sub time to window
x2 = stop + 2500 # add time to window
print(x1, x2)
impact_idx = maxIndex(tile_2023_12_30.mG, [x1, x2])
plotEulerAndGs(x1, x2)

#### Analysis

- the main flat period is seen as the point where the chair is pushing you while your skis are flat on the platform
- results in a pretty good steady state for all signals!

#### Confirming this Occurs on the Next Chair Landing

In [None]:
stop = tile_2023_12_30.time.index(a50_lifts[1].time[-1])
x1 = stop - 2500 # sub time to window
x2 = stop + 2500 # add time to window
print(x1, x2)
impact_idx = maxIndex(tile_2023_12_30.mG, [x1, x2])
plotEulerAndGs(x1, x2)

## Identification Steps

### v1

0. must first be in lift state, identified by constant acc (& vel), and increasing altitude
1. triggered by altitude & when it begins decreasing from its max
2. search all signals in +t until a medium window has a low enough std dev
3. confirm an impact in the unfiltered mG was observed, to isolate the cases where the chair stopped
4. confirm altitude never increses, otherwise break
5. capture signals during a small window of low std dev, stretching to the length of time SS is observed

In [None]:
from numpy import std
# from signal_processing import std

def testMotionForStillness(tile: Tile, r):
    # alt_lpf_std_th = 3
    # roll_std_th = 5
    # pitch_std_th = 5
    # yaw_std_th = 5
    euler_std_th = 3
    # mG_lpf_std_th = 25
    mG_std_th = 50
    return [
        # std(tile.corrected_alt[r[0]:r[1]]) < alt_lpf_std_th,
        # std(tile_euler9[r[0]:r[1], 0]) < roll_std_th,
        # std(tile_euler9[r[0]:r[1], 1]) < pitch_std_th,
        # std(tile_euler9[r[0]:r[1], 2]) < yaw_std_th,
        std(tile.imu9.euler_norm[r[0]:r[1]]) < euler_std_th,
        # std(tile.mG_lpf()[r[0]:r[1]]) < mG_lpf_std_th,
        std(tile.mG[r[0]:r[1]]) < mG_std_th,
    ]

In [None]:
def identifyStillRanges(tile: Tile, min_s=6, r=[0, -1], print_out=False):
    """Identifies ranges of still motion with sampling length `min_s` in seconds
    and returns them as a list of lists representing their indices.
    """
    wsamples = min_s * 100
    search = (len(tile.time[r[0]:r[1]])) - wsamples
    coarse_mult = wsamples
    fine_mult = round(coarse_mult / 10)
    still_ranges = []
    prev_tail = r[0]
    print('search:', search)
    print('coarse_mult:', coarse_mult)
    print('fine_mult:', fine_mult)

    for i in range(search):
        if print_out: print('i iter:\t', i)
        for j in range(search):
            # coarse search every 10th idx
            head = prev_tail + coarse_mult * j
            tail = head + wsamples
            
            if tail >= search + r[0]:
                if print_out: print('hit the end of the line. tail >= search:', tail, '>=', search + r[0])
                return still_ranges
            
            trailing_tests = testMotionForStillness(tile, [head, tail])

            if print_out: print('j iter:\t', j)
            if print_out: print('head = prev_tail + coarse_mult * j:\t', head, '=', prev_tail, '+', coarse_mult * j)
            if print_out: print('tail = head + wsamples:\t', tail, '=', head, '+', wsamples)
            if print_out: print('still_tests:\t', trailing_tests)
            
            if sum(trailing_tests) == len(trailing_tests):
                if print_out: print('\tcoarse range found:\t', head, tail)
                refinedHead = None
                refinedTail = None
                for k in range(search - j):
                    # fine search every idx
                    fine_leading_head = head - fine_mult * (k + 1)
                    fine_leading_tail = tail - fine_mult * (k + 1)
                    fine_trailing_head = head + fine_mult * (k + 1)
                    fine_trailing_tail = tail + fine_mult * (k + 1)

                    if print_out: print('\tk iter:\t', k)
                    if print_out: print('\tfine_leading_head = head - fine_mult * k + 1:\t', fine_leading_head, '=', head, '-', fine_mult, '*', (k + 1))
                    if print_out: print('\tfine_leading_tail = tail - fine_mult * k + 1:\t', fine_leading_tail, '=', tail, '-', fine_mult, '*', (k + 1))
                    if print_out: print('\tfine_trailing_head = head + fine_mult * k + 1:\t', fine_trailing_head, '=', head, '+', fine_mult, '*', (k + 1))
                    if print_out: print('\tfine_trailing_tail = tail + fine_mult * k + 1:\t', fine_trailing_tail, '=', tail, '+', fine_mult, '*', (k + 1))

                    if refinedHead is None:
                        if fine_leading_head <= 0:
                            refinedHead = 0
                            print('\t\trefined head set:\t', refinedHead)
                        
                        leading_tests = testMotionForStillness(tile, [fine_leading_head, fine_leading_tail])
                        if print_out: print('\tleading_tests:\t', leading_tests)

                        if sum(leading_tests) < len(leading_tests):
                            refinedHead = fine_leading_head + fine_mult
                            print('\t\trefined head set:\t', refinedHead)

                    if refinedTail is None:
                        if fine_trailing_tail >= search:
                            refinedTail = fine_trailing_tail
                            print('\t\trefined tail set:\t', refinedTail)

                        trailing_tests = testMotionForStillness(tile, [fine_trailing_head, fine_trailing_tail])
                        if print_out: print('\ttrailing_tests:\t', trailing_tests)

                        if sum(trailing_tests) < len(trailing_tests):
                            refinedTail = fine_trailing_tail - fine_mult
                            if print_out: print('\t\trefined tail set:\t', refinedTail)

                    if refinedHead is not None and refinedTail is not None:
                        if refinedHead <= prev_tail:
                            if print_out: print('\t\tstitched to previous set:\t', [still_ranges[-1][0], refinedTail])
                            still_ranges[-1] = [still_ranges[-1][0], refinedTail]
                        else:
                            if print_out: print('\t\trefined still range set:\t', [refinedHead, refinedTail])
                            still_ranges.append([refinedHead, refinedTail])
                        prev_tail = refinedTail
                        break
                break

In [None]:
r = [0, len(tile_2023_12_30.time)]
# still_ranges = identifyStillRanges(tile_2023_12_30, r=r, print_out=True)

In [None]:
# print(len(still_ranges), "still points:", still_ranges)

In [None]:
def addStillZones(ax, t, ranges, r):
    for range in ranges:
        if range[0] < r[0] or range[1] > r[1]: 
            continue
        ax.axvspan(t[range[0]], t[range[1]], color='green', alpha=0.25)

In [None]:
def plotTileWithStillZones(tile, still_ranges, r=r):
    tile_euler6 = tile.imu6.euler
    tile_euler9 = tile.imu9.euler
    plt.rc('lines', linewidth=1)
    fig, ax = plt.subplots(6, figsize=(15, 10))

    ax[0].plot(tile.time[r[0]:r[1]], tile.corrected_alt[r[0]:r[1]])
    addStillZones(ax[0], tile.time, still_ranges, r)
    ax[0].set_title('All Tile Altitude with Still Zones', wrap=True)

    ax[1].plot(tile.time[r[0]:r[1]], tile_euler6[r[0]:r[1], 0], label='6dof')
    ax[1].plot(tile.time[r[0]:r[1]], tile_euler9[r[0]:r[1], 0], label='9dof')
    addStillZones(ax[1], tile.time, still_ranges, r)
    ax[1].set_title('All Tile Roll with Still Zones', wrap=True)
    ax[1].legend()

    ax[2].plot(tile.time[r[0]:r[1]], tile_euler6[r[0]:r[1], 1], label='6dof')
    ax[2].plot(tile.time[r[0]:r[1]], tile_euler9[r[0]:r[1], 1], label='9dof')
    addStillZones(ax[2], tile.time, still_ranges, r)
    ax[2].set_title('All Tile Pitch with Still Zones', wrap=True)
    ax[2].legend()

    ax[3].plot(tile.time[r[0]:r[1]], tile_euler6[r[0]:r[1], 2], label='6dof')
    ax[3].plot(tile.time[r[0]:r[1]], tile_euler9[r[0]:r[1], 2], label='9dof')
    addStillZones(ax[3], tile.time, still_ranges, r)
    ax[3].set_title('All Tile Yaw with Still Zones', wrap=True)
    ax[3].legend()

    ax[4].plot(tile.time[r[0]:r[1]], tile.mG_lpf()[r[0]:r[1]])
    addStillZones(ax[4], tile.time, still_ranges, r)
    ax[4].set_title('All Tile Filtered mG-forces with Still Zones', wrap=True)

    ax[5].plot(tile.time[r[0]:r[1]], tile.mG[r[0]:r[1]])
    addStillZones(ax[5], tile.time, still_ranges, r)
    ax[5].set_title('All Tile Unfiltered mG-forces with Still Zones', wrap=True)

    plt.tight_layout()
    plt.show()

In [None]:
# plotTileWithStillZones(tile_2023_12_30)
# print('Max alt occurs at:', tile_2023_12_30.corrected_alt.index(max(tile_2023_12_30.corrected_alt[90000:100000])))
# plotTileWithStillZones(tile_2023_12_30, still_ranges, [90000, 100000])
# print('Max alt occurs at:', tile_2023_12_30.corrected_alt.index(max(tile_2023_12_30.corrected_alt[150000:160000])))
# plotTileWithStillZones(tile_2023_12_30, still_ranges, [150000, 175000])

#### Check for Peak Altitude before `still_range`

- search back from `head` until `prev_tail` for an impulse(s) in mG and max in alt
  - confirm the alt at the current `head` is lower
- in a much larger range, confirm the the previous max alt seen before current `head` and `prev_tail` has been the max for a while
  - the "moving" max, account for lift stops

In [None]:
from stat_tests import StatTests as ST

def testContainsLocalPeak(x, r, print_out=False, header=""):
    """tests the input `x` signal for a peak inside the range `r`, excluding the endponits.
    
    if so, returns `True` else `False`
    """
    window, isValid = ST.assertWindow(x, r)
    if not isValid:
        return False

    max_window = max(window)
    head = x[r[0]]
    tail = x[r[1]]
    if print_out: print('\t', head, ' < ', max_window, ' < ', tail, '?', sep='')
    if head < max_window and max_window > tail:
        return True
    return False


def testLargeImpulse(x, r, th=None, print_out=False, header=""):
    """tests the input `x` signal whether a large impulse occured during the range `r`.
    
    `th` will override the threshold of the impulse magnitude test, defaults to 3 * stddev
    """
    window, isValid = ST.assertWindow(x, r)
    if not isValid:
        return False
    
    baseline = 3 * std(window) if th is None else th
    impulse = max(window)
    if print_out: print('\t', impulse, ' > ', baseline, '?', sep='')
    if impulse > baseline:
        return True
    return False

In [None]:
# # create IC
# prev_tail = 0

# for still_range in still_ranges:
#     testing_range = [prev_tail, still_range[0]]
#     print(testing_range)

#     if testContainsLocalPeak(tile_2023_12_30.corrected_alt, testing_range, print_out=True):
#         print('\tcontains alt peak?', True)
#     else:
#         print('\tcontains alt peak?', False)

#     if testLargeImpulse(tile_2023_12_30.corrected_alt, testing_range, th=2+tile_2023_12_30.corrected_alt[testing_range[1]], print_out=True):
#         print('\tcontains alt impulse?', True)
#     else:
#         print('\tcontains alt impulse?', False)

#     if testLargeImpulse(tile_2023_12_30.mG, testing_range, print_out=True):
#         print('\tcontains mG impulse?', True)
#     else:
#         print('\tcontains mG impulse?', False)

#     # update the tail for the next search
#     prev_tail = still_range[1]

### V2

0. run is triggered by parent device, store idx
1. search left from this point for the first max in alt
2. find the first still point after this max and before the lift start
3. use the avg inside this still point as the platform zone


In [None]:
# run1_tile_start_idx = tile_2023_12_30.time.index(a50_2023_12_30[0].time[0])
# print(run1_tile_start_idx)
# run2_tile_start_idx = tile_2023_12_30.time.index(a50_2023_12_30[1].time[0])
# print(run2_tile_start_idx)

In [None]:
def findFirstHistoricMax(x, end, th=2):
    x1 = 0
    for i in range(len(x[0:end])):
        if x[end - i] - x[end - (i + 1)] > th:
            x1 = end - (i + 1)
            break
    return maxIndex(x, [x1, end])

In [None]:
# liftoff1_tile_idx = findFirstHistoricMax(tile_2023_12_30.corrected_alt, run1_tile_start_idx)
# print(liftoff1_tile_idx)
# liftoff2_tile_idx = findFirstHistoricMax(tile_2023_12_30.corrected_alt, run2_tile_start_idx)
# print(liftoff2_tile_idx)

In [None]:
# platform1_range = identifyStillRanges(tile_2023_12_30, r=[liftoff1_tile_idx, run1_tile_start_idx], min_s=1, print_out=True)
# print('\n\nNumber of still ranges found:', len(platform1_range))

The `run1_tile_start_idx` isn't reliable and requires feedback from parent, non ideal...

### V3

0. Poll altitude at the start of a lift track
1. record when it starts decreasing past a certain threshold
2. wait for the first still point after this alt peak to define the platform zone
3. use the avg inside the platform zone

In [None]:
def getLiftPeakIdx(x, window_s=3*60, th=5, within_s=60):
    """Causal!
    
    Returns the index on input signal `x` where a max altitude is reached based on a
    drop detection within a certain period `within_s` seconds and threshold `th`.
    
    Confirms that this is a max over the last period `window_s` seconds."""
    fs = 100
    for i in range(len(x)):
        if i < window_s * fs:
            continue
        if x[i - (within_s * fs)] - x[i] > th and x[i - (within_s * fs)] == max(x[i - (window_s * fs):i]):
            return i - (within_s * fs)

In [None]:
max_lift1 = getLiftPeakIdx(tile_2023_12_30.corrected_alt)
max_lift2 = 100000 + getLiftPeakIdx(tile_2023_12_30.corrected_alt[100000:]) # represented as real-time

In [None]:
# wait for still point
# TODO write a more causal based algorithm to identify a rolling window, right now make an arbitrary window of + 10000 idx (10s)
platform1_ranges = identifyStillRanges(tile_2023_12_30, r=[max_lift1, max_lift1+10000], min_s=5, print_out=True)
platform2_ranges = identifyStillRanges(tile_2023_12_30, r=[max_lift2, max_lift2+10000], min_s=5, print_out=True)

platform1_lengths = [p[1] - p[0] for p in platform1_ranges]
still_idx_platform1 = maxIndex(platform1_lengths)
platform2_lengths = [p[1] - p[0] for p in platform2_ranges]
still_idx_platform2 = maxIndex(platform2_lengths)

print(still_idx_platform1)
print(still_idx_platform2)

platform1_range = platform1_ranges[still_idx_platform1]
platform2_range = platform2_ranges[still_idx_platform2]

## Calculating the Rotation Matrix Based on the Still Range Averages



In [None]:
plotTileWithStillZones(tile_2023_12_30, still_ranges=[platform1_range, platform2_range])
plotTileWithStillZones(tile_2023_12_30, r=[90000, 100000], still_ranges=[platform1_range])
plotTileWithStillZones(tile_2023_12_30, r=[150000, 175000], still_ranges=[platform2_range])

In [None]:
from signal_processing import mean

print('From static boot angles of [0, 0, 0] for NWU:\n')

r1_avg_roll = mean(tile_2023_12_30.imu9.computeEuler(cts_yaw=False)[platform1_range[0]:platform1_range[1], 0].tolist())
r1_avg_pitch = mean(tile_2023_12_30.imu9.computeEuler(cts_yaw=False)[platform1_range[0]:platform1_range[1], 1].tolist())
r1_avg_yaw = mean(tile_2023_12_30.imu9.computeEuler(cts_yaw=False)[platform1_range[0]:platform1_range[1], 2].tolist())
print('Clamped rotation offsets from top of lift 1:')
print('\t', r1_avg_roll, r1_avg_pitch, r1_avg_yaw)

r2_avg_roll = mean(tile_2023_12_30.imu9.computeEuler(cts_yaw=False)[platform2_range[0]:platform2_range[1], 0].tolist())
r2_avg_pitch = mean(tile_2023_12_30.imu9.computeEuler(cts_yaw=False)[platform2_range[0]:platform2_range[1], 1].tolist())
r2_avg_yaw = mean(tile_2023_12_30.imu9.computeEuler(cts_yaw=False)[platform2_range[0]:platform2_range[1], 2].tolist())
print('Clamped rotation offsets from top of lift 2:')
print('\t', r2_avg_roll, r2_avg_pitch, r2_avg_yaw)

In [None]:
import numpy as np
import math

# https://www.zacobria.com/universal-robots-knowledge-base-tech-support-forum-hints-tips-cb2-cb3/index.php/python-code-example-of-converting-rpyeuler-angles-to-rotation-vectorangle-axis-for-universal-robots/
def eulerToRotMatrix(roll, pitch, yaw):
    """zyx cardan sequence"""
    yawMatrix = np.matrix([
    [math.cos(yaw), -math.sin(yaw), 0],
    [math.sin(yaw), math.cos(yaw), 0],
    [0, 0, 1]
    ])

    pitchMatrix = np.matrix([
    [math.cos(pitch), 0, math.sin(pitch)],
    [0, 1, 0],
    [-math.sin(pitch), 0, math.cos(pitch)]
    ])

    rollMatrix = np.matrix([
    [1, 0, 0],
    [0, math.cos(roll), -math.sin(roll)],
    [0, math.sin(roll), math.cos(roll)]
    ])

    return yawMatrix * pitchMatrix * rollMatrix

    # for robotics and position:

    # theta = math.acos(((R[0, 0] + R[1, 1] + R[2, 2]) - 1) / 2)
    # multi = 1 / (2 * math.sin(theta))

    # rx = multi * (R[2, 1] - R[1, 2]) * theta
    # ry = multi * (R[0, 2] - R[2, 0]) * theta
    # rz = multi * (R[1, 0] - R[0, 1]) * theta

    # return rx, ry, rz

In [None]:
R_SB_1 = eulerToRotMatrix(r1_avg_roll, r1_avg_pitch, r1_avg_yaw)
R_SB_2 = eulerToRotMatrix(r2_avg_roll, r2_avg_pitch, r2_avg_yaw)

print('R_SB_1:', R_SB_1)
print('\nR_SB_2:', R_SB_2)

### Use Quaternions for Rotations...

Prefer an average quaternion, using a direct method that accounts for double crossovers:

In [None]:
# def avgQuat(quats, weights=None):
#     """Assumes equal weighting between all the quaternions, otherwise override `weights` list."""
#     ws = np.ones(len(quats)) if weights is None else weights
#     qavg = np.array([0., 0., 0., 0.])
#     quats_arr = [np.array([q.x, q.y, q.z, q.w]) for q in quats]
#     for i, quat in enumerate(quats_arr):
#         # ensure each quat is normalized
#         quat = quat / np.linalg.norm(quat)

#         # flip, account for double cross over
#         if i > 0 and quat.dot(quats_arr[0]) < 0.:
#             print('double cross over!')
#             ws[i] -= 1

#         qavg = np.add(qavg, quat * ws[i])
#     return qavg / np.linalg.norm(qavg)

def avgQuat(quats, weights=None):
    """Performs the calculation for the average quaternion, based on the range `r` provided.
    
    Assumes equal weighting between all the quaternions, otherwise override `weights` list.

    Returns
    -------
    Average orientation quaternion [w, x, y, z] over range `r`.
    """
    ws = np.ones(len(quats)) if weights is None else weights
    qavg = np.array([0., 0., 0., 0.])
    for i, quat in enumerate(quats):
        # ensure each quat is normalized
        quat = quat / np.linalg.norm(quat)

        # flip, account for double cross over
        if i > 0 and quat.dot(quats[0]) < 0.:
            print('double cross over!')
            ws[i] -= 1

        qavg = np.add(qavg, quat * ws[i])
    qavg /= np.linalg.norm(qavg)
    print(qavg)
    return qavg / np.linalg.norm(qavg)

In [None]:
import numpy.matlib as npm

# https://github.com/christophhagen/averaging-quaternions/blob/8e75707891d2ad2f1bdecc5dc7b9bdb81fa7935f/averageQuaternions.py#L66
def weightedAverageQuaternions(quats, weights=None):
    Q = quats # np.array([np.array([q.w, q.x, q.y, q.z]) for q in quats])
    w = np.ones(len(quats)) if weights is None else weights

    # Number of quaternions to average
    M = Q.shape[0]
    A = npm.zeros(shape=(4,4))
    weightSum = 0

    for i in range(0,M):
        q = Q[i,:]
        A = w[i] * np.outer(q,q) + A
        weightSum += w[i]

    # scale
    A = (1.0/weightSum) * A

    # compute eigenvalues and -vectors
    eigenValues, eigenVectors = np.linalg.eig(A)

    # Sort by largest eigenvalue
    eigenVectors = eigenVectors[:,eigenValues.argsort()[::-1]]

    # return the real part of the largest eigenvector (has only real part)
    return np.real(eigenVectors[:,0].A1)

In [None]:
from signal_processing import rmse

print('Simple Average:')
qSB_1 = avgQuat(tile_2023_12_30.imu9.quat[platform1_range[0]:platform1_range[1], :])
print('\tqSB_1 (x, y, z, w):', qSB_1)
qSB_2 = avgQuat(tile_2023_12_30.imu9.quat[platform2_range[0]:platform2_range[1], :]) * -1
print('\tqSB_2 (x, y, z, w):', qSB_2)
print('\tRMSE between runs:', rmse(qSB_1, qSB_2))

print('\nAccurate Average:')
qSB_1 = weightedAverageQuaternions(tile_2023_12_30.imu9.quat[platform1_range[0]:platform1_range[1], :])
print('\tqSB_1 (w, x, y, z):', qSB_1)
qSB_2 = weightedAverageQuaternions(tile_2023_12_30.imu9.quat[platform2_range[0]:platform2_range[1], :]) * -1
print('\tqSB_2 (w, x, y, z):', qSB_2)
print('\tRMSE between runs:', rmse(qSB_1, qSB_2))

`q` = `-q` represent the same rotation! It's addressed inside the function itself element-wise, but comparing outputs between runs may be the opposite signs.

## Results

Using the quaternion rotation, specifically on the split run data:

In [None]:
run1.setQsbFromRange(platform1_range)
run2.setQsbFromRange(platform2_range)

# re-compute the euler data after the tare quaternions are set
run1_euler6_b = run1.euler6_b
run1_euler9_b = run1.euler9_b
run2_euler6_b = run2.euler6_b
run2_euler9_b = run2.euler9_b

In [None]:
plt.rc('lines', linewidth=1)
fig, ax = plt.subplots(3, figsize=(15, 8))

ax[0].plot(run1.time, run1_euler6_b[:, 0], label='6dof_b')
ax[0].plot(run1.time, run1_euler9_b[:, 0], label='9dof_b')
ax[0].plot(run1.time, run1_euler6[:, 0], label='6dof')
ax[0].plot(run1.time, run1_euler9[:, 0], label='9dof')
ax[0].set_title('Run 1 Roll', wrap=True)
ax[0].legend()

ax[1].plot(run1.time, run1_euler6_b[:, 1], label='6dof_b')
ax[1].plot(run1.time, run1_euler9_b[:, 1], label='9dof_b')
ax[1].plot(run1.time, run1_euler6[:, 1], label='6dof')
ax[1].plot(run1.time, run1_euler9[:, 1], label='9dof')
ax[1].set_title('Run 1 Pitch', wrap=True)
ax[1].legend()

ax[2].plot(run1.time, run1_euler6_b[:, 2], label='6dof_b')
ax[2].plot(run1.time, run1_euler9_b[:, 2], label='9dof_b')
ax[2].plot(run1.time, run1_euler6[:, 2], label='6dof')
ax[2].plot(run1.time, run1_euler9[:, 2], label='9dof')
ax[2].set_title('Run 1 Yaw', wrap=True)
ax[2].legend()

plt.tight_layout()
plt.show()