# Thetis Calibration Script

## Section 0: Imports and global function definitions

In [78]:
"""
ChatGPT Conversation:   https://chat.openai.com/c/ef545f4e-6aa6-4598-8c37-371934ac2cd9
                        https://chat.openai.com/c/55ac6d8d-8591-43f0-b7a5-d36e088d83aa
"""

import os
import matplotlib.pyplot as plt
import pandas as pd
import math
import numpy as np
from matplotlib.backends.backend_pdf import PdfPages  # Add this import
from scipy.stats import pearsonr 

# Define the input and output directories
CALIBRATION_DATA_ROOT_DIR   = "C:/Users/bduffy2018/OneDrive - Florida Institute of Technology/School/Thesis/Experiments/Calibration/x-IMU3"
RAW_OUTPUT_PATH             = 'output/raw/'
PRELIMINARY_OUTPUT_PATH     = 'output/prelim/'
INTERMEDIATE_OUTPUT_PATH    = 'output/intermediate/'
FINAL_OUTPUT_PATH           = 'output/final/'

# Define the size of series windows
SAMPLE_RATE         = 50                            # Data captured at ~F Hz sample rate
WINDOW_SIZE_SEC     = 10                            # Capture ~N-seconds of data
WINDOW_SIZE = SAMPLE_RATE * WINDOW_SIZE_SEC # Calculate number of indexes required for the window


def RMSE(time_series: np.ndarray, ground_truth: float) -> float:
    """Returns the root-mean-square-error between a time series and a ground truth over the entire series

    Args:
        time_series (np.ndarray): The measurement time series
        ground_truth (float): The ground truth, "real" value that is compared against

    Returns:
        float: the RMSE between the time series and the ground truth
    """
    return np.sqrt(np.mean((time_series - ground_truth) ** 2))


def RMSE(arr1: np.ndarray, arr2: np.ndarray) -> float:
    """Returns the root-mean-square-error between two arrays

    Args:
        arr1 (np.ndarray): Input array 
        arr2 (np.ndarray): Input array

    Returns:
        float: the RMSE between the two arrays
    """
    return np.sqrt(np.mean((arr1 - arr2) ** 2))


def reject_outliers(data, timestamps, m = 2.) -> np.ndarray:
    """Removes outliers from a data time series based on distance from the standard deviation of the entire dataset

    Args:
        data (np.ndarray): the data series to be evaluated
        timestamps (np.ndarray): the timestamps the correlate with the data series
        m (int, optional): The threshold in standard deviations for removing outliers. Defaults to 2..

    Returns:
        np.ndarray, np.ndarray: The original data time series sans any outliers
    """
    d = np.abs(data - np.median(data))
    mdev = np.median(d)
    s = d/mdev if mdev else np.zeros(len(d))
    return data[s<m], timestamps[s<m]


def down_sample_average(data: np.array, n: int=2) -> np.array:
    """Down samples a data array by a factor.
    For example, if data is a time series sampled at 100 Hz, down_sample_average(data)
    will return the array averaged to 50 Hz.

    Args:
        data (np.array): The time series data to be modified
        n (int): the factor of reduction. 2 for half rate, 3 for third, etc. Defaults to 2

    Returns:
        np.array: The down sampled array
    """
    end = n * int(len(data)/n)
    return np.mean(data[:end].reshape(-1, n), axis=1)


# Initialize an empty list to store the sensor information
calibration_data = []


# Iterate through the directories
for sensor_dir in os.listdir(CALIBRATION_DATA_ROOT_DIR):
    sensor_name, axis, ground_truth_name = sensor_dir.split("_")
    
    # Process sensor names and units into a more ingestible format
    if sensor_name == "accel":
        sensor_name = "Accelerometer"
        y_label = "Acceleration"
        unit = "g"
        
        if axis == "x-axis":
            ground_truth_value = -1 * math.cos(math.radians(float(ground_truth_name)))
        elif axis == "y-axis":
            ground_truth_value = 1 * math.cos(math.radians(float(ground_truth_name)))
        elif axis == "z-axis":
            ground_truth_value = -1 * math.cos(math.radians(float(ground_truth_name)))
            
        ground_truth_name += " degrees"
            
    elif sensor_name == "gyro":
        sensor_name = "Gyroscope"
        y_label = "Rotation Rate"
        unit = "deg/s"
        ground_truth_value = float(ground_truth_name)
        ground_truth_name = f"{abs(ground_truth_value)} {unit} CCW" if ground_truth_value < 0 else f"{abs(ground_truth_value)} {unit} CW"
        
        
    axis = axis[0].capitalize()
    
    # Initialize a dictionary to store the sensor information
    calibration_dataset = {
        "sensor_name": sensor_name,
        "axis": axis,
        "ground_truth_name": ground_truth_name,
        "ground_truth_value": ground_truth_value,
        "devices": [],
        "device_deviation": None,
        "y_label": y_label,
        "unit": unit,
        "output_name": sensor_dir
    }

    # Iterate through the device directories
    device_dir = os.path.join(CALIBRATION_DATA_ROOT_DIR, sensor_dir)
    for device_dir_name in os.listdir(device_dir):
        # print(device_dir_name.split())
        device_name = device_dir_name.split()[0]
        inertial_csv_path = os.path.join(device_dir, device_dir_name, "Inertial.csv")
        
        # Append the device information to the sensor dictionary
        calibration_dataset["devices"].append({
            "device_name": device_name,
            "inertial_csv_path": inertial_csv_path,
            "cleaned_time": None,
            "cleaned_data": None,
            "window_start": None,
            "window_end": None,
            "windowed_time": None,
            "windowed_data": None,
            "ground_truth_deviation": None
        })

    # Append the sensor dictionary to the list
    calibration_data.append(calibration_dataset)

## Section 1: Windowing the Raw Data
Here, we will process the raw data into windows that are more usable for future calculations. We will also create plots of the raw data with the windows represented.

In [79]:
for calibration_dataset in calibration_data:
    fig, axes = plt.subplots(1, 2, figsize=(12, 4), layout='constrained', sharey=True)
    
    for i, device in enumerate(calibration_dataset["devices"]):
        
        # ========================================
        # == Clean The Raw Data For Each Device ==
        # ========================================
    
        # Load the CSV data into a DataFrame
        df = pd.read_csv(device["inertial_csv_path"])
        
        timestamps = df["Timestamp (us)"].to_numpy() / 1e6
        data = df[f"{calibration_dataset['sensor_name']} {calibration_dataset['axis']} ({calibration_dataset['unit']})"].to_numpy()
        
        if device['device_name'] == "x-IMU3": # Downsample x-IMU3 data to 100 Hz
            data = down_sample_average(data)
            timestamps = down_sample_average(timestamps)
        
        data, timestamps = reject_outliers(data, timestamps)
        
        device["cleaned_time"] = timestamps
        device["cleaned_data"] = data
        
        device["window_start"]  = int(len(data)/2) - int(WINDOW_SIZE/2)
        device["window_end"]    = device["window_start"] + WINDOW_SIZE
        device["windowed_data"] = data[device["window_start"]:device["window_end"]]        
    
        # ======================================================
        # == Create Plots of the Raw Data with Windows Marked ==
        # ======================================================
        
        ax = axes[i]
    
        # Plot the data on the respective subplot
        ax.plot(timestamps, data)

        # Plot the ground truth value
        ax.axhline(y=calibration_dataset['ground_truth_value'], color='r', linestyle='--')
        
        # Plot the window bounds
        ax.axvline(x=timestamps[device["window_start"]], color='g', linestyle='--')
        ax.axvline(x=timestamps[device["window_end"]], color='g', linestyle='--')
        
        # Add a label for the RMSE
        device['ground_truth_deviation'] = RMSE(data, calibration_dataset['ground_truth_value'])
        fig.text(0.09+0.78*i, 0.9, f"RMSE: {device['ground_truth_deviation']:0.3f} {calibration_dataset['unit']}", fontsize=12, color='blue', va='center', ha='left')
        
        # Set individual plot title
        ax.set_title(f"{device['device_name']}")
    
    # Calculate deviation of Thetis from x-IMU3 window data
    calibration_dataset["device_deviation"] = RMSE(calibration_dataset["devices"][0]["windowed_data"], calibration_dataset["devices"][1]["windowed_data"])
        
    # =====================================================
    # == Finish Creating The Raw Figure for Each Dataset ==
    # =====================================================      
        
    # Set the title and labels for the whole figure
    fig.suptitle(f"{calibration_dataset['sensor_name']} {calibration_dataset['axis']} | {calibration_dataset['ground_truth_name']}")
    fig.text(0.5, 0.9, f"Thetis Deviation from x-IMU3: {calibration_dataset['device_deviation']:0.3f} {calibration_dataset['unit']}", ha='center')
    fig.supxlabel("Timestamp (s)")
    fig.supylabel(f"{calibration_dataset['y_label']} ({calibration_dataset['unit']})")
    
    # Save figure to files
    FIG_OUTPUT_PATH = RAW_OUTPUT_PATH + f"raw_{calibration_dataset['output_name']}.png"
    if not os.path.exists(FIG_OUTPUT_PATH): # Check if output already exists to save execution time
        fig.savefig(FIG_OUTPUT_PATH)
    
    # Close the figure to save memory
    plt.close(fig)

## Section 2: Plot MSE as a Function of Ground Truth
Create two figures with three plots each. One figure for accelerometer, one for gyroscope. Each plot will be MSE as a function of the test value. One plot will be x-IMU3 wrt ground truth, the second plot will be Thetis wrt x-IMU3, and the final plot will be Thetis wrt the ground truth.

In [111]:
# Create figure for accelerometer error
fig_accel, axes_accel = plt.subplots(1, 3, figsize=(18, 4), layout='constrained', sharey=True, sharex=True)

# Create figure for gyroscope error
fig_gyro, axes_gyro = plt.subplots(1, 3, figsize=(18, 4), layout='constrained', sharey=True, sharex=True)

calibration_errors = [{
    "sensor_name": dataset["sensor_name"],
    "axis": dataset["axis"],
    "ground_truth_value": float(dataset["ground_truth_name"].split(" ")[0]) if dataset["sensor_name"] == "Accelerometer" else dataset["ground_truth_value"], 
    "thetis_ximu3_deviation": dataset["device_deviation"], 
    "thetis_ground_deviation": dataset["devices"][0]["ground_truth_deviation"], 
    "ximu3_ground_deviation": dataset["devices"][1]["ground_truth_deviation"]}
    for dataset in calibration_data]
print(calibration_errors)

def sort_ascending(x_values, y_values):
    combined = list(zip(x_values, y_values))
    sorted_combined = sorted(combined, key=lambda x: x[0])
    return zip(*sorted_combined)

# ===============================
# == Plot Accelerometer Errors ==
# ===============================

for i, ax in enumerate(axes_accel):
    if i == 0: # x-IMU3 w.r.t Ground Truth
        for axis in ["X", "Y", "Z"]:
            x_values = [item["ground_truth_value"] for item in calibration_errors if item["sensor_name"] == "Accelerometer" and item["axis"] == axis]
            errors = [item["ximu3_ground_deviation"] * 1e3 for item in calibration_errors if item["sensor_name"] == "Accelerometer" and item["axis"] == axis]
            x_sorted, errors_sorted = sort_ascending(x_values, errors)
            ax.plot(x_sorted, errors_sorted, label=axis)
        ax.set_title("x-IMU3 w.r.t Ground Truth")

    if i == 1: # Thetis w.r.t x-IMU3
        for axis in ["X", "Y", "Z"]:
            x_values = [item["ground_truth_value"] for item in calibration_errors if item["sensor_name"] == "Accelerometer" and item["axis"] == axis]
            errors = [item["thetis_ximu3_deviation"] * 1e3 for item in calibration_errors if item["sensor_name"] == "Accelerometer" and item["axis"] == axis]
            x_sorted, errors_sorted = sort_ascending(x_values, errors)
            ax.plot(x_sorted, errors_sorted, label=axis)
        ax.set_title("Thetis w.r.t x-IMU3")
            
    if i == 2: # Thetis w.r.t Ground Truth
        for axis in ["X", "Y", "Z"]:
            x_values = [item["ground_truth_value"] for item in calibration_errors if item["sensor_name"] == "Accelerometer" and item["axis"] == axis]
            errors = [item["thetis_ground_deviation"] * 1e3 for item in calibration_errors if item["sensor_name"] == "Accelerometer" and item["axis"] == axis]
            x_sorted, errors_sorted = sort_ascending(x_values, errors)
            ax.plot(x_sorted, errors_sorted, label=axis)
        ax.set_title("Thetis w.r.t Ground Truth")
    ax.grid()
    ax.legend()
    
fig_accel.suptitle("Accelerometer Error")
fig_accel.supylabel("Error (mg)")
fig_accel.supxlabel("Angle (deg)")

# ===========================
# == Plot Gyroscope Errors ==
# ===========================

for i, ax in enumerate(axes_gyro):
    if i == 0: # x-IMU3 w.r.t Ground Truth
        for axis in ["X", "Y", "Z"]:
            x_values = [item["ground_truth_value"] for item in calibration_errors if item["sensor_name"] == "Gyroscope" and item["axis"] == axis]
            errors = [item["ximu3_ground_deviation"] for item in calibration_errors if item["sensor_name"] == "Gyroscope" and item["axis"] == axis]
            x_sorted, errors_sorted = sort_ascending(x_values, errors)
            ax.plot(x_sorted, errors_sorted, label=axis)
        ax.set_title("x-IMU3 w.r.t Ground Truth")

    if i == 1: # Thetis w.r.t x-IMU3
        for axis in ["X", "Y", "Z"]:
            x_values = [item["ground_truth_value"] for item in calibration_errors if item["sensor_name"] == "Gyroscope" and item["axis"] == axis]
            errors = [item["thetis_ximu3_deviation"] for item in calibration_errors if item["sensor_name"] == "Gyroscope" and item["axis"] == axis]
            x_sorted, errors_sorted = sort_ascending(x_values, errors)
            ax.plot(x_sorted, errors_sorted, label=axis)
        ax.set_title("Thetis w.r.t x-IMU3")
            
    if i == 2: # Thetis w.r.t Ground Truth
        for axis in ["X", "Y", "Z"]:
            x_values = [item["ground_truth_value"] for item in calibration_errors if item["sensor_name"] == "Gyroscope" and item["axis"] == axis]
            errors = [item["thetis_ground_deviation"] for item in calibration_errors if item["sensor_name"] == "Gyroscope" and item["axis"] == axis]
            x_sorted, errors_sorted = sort_ascending(x_values, errors)
            ax.plot(x_sorted, errors_sorted, label=axis)
        ax.set_title("Thetis w.r.t Ground Truth")
    ax.tick_params(axis='y')
    ax.grid()
    ax.legend()
    
fig_gyro.suptitle("Gyroscope Error")
fig_gyro.supylabel("Error (deg/sec)")
fig_gyro.supxlabel("Angular Velocity (deg/sec)")

# ===========================
# == Save Figures to Files ==
# ===========================

# plt.show() # Debug

FIG_OUTPUT_PATH = PRELIMINARY_OUTPUT_PATH + "prelim_accel.png"
if not os.path.exists(FIG_OUTPUT_PATH): # Check if output already exists to save execution time
    fig_accel.savefig(FIG_OUTPUT_PATH)
    
FIG_OUTPUT_PATH = PRELIMINARY_OUTPUT_PATH + "prelim_gyro.png"
if not os.path.exists(FIG_OUTPUT_PATH): # Check if output already exists to save execution time
    fig_gyro.savefig(FIG_OUTPUT_PATH)

# Close the figures to save memory
plt.close(fig_accel)
plt.close(fig_gyro)

[{'sensor_name': 'Accelerometer', 'axis': 'X', 'ground_truth_value': 0.0, 'thetis_ximu3_deviation': 0.020568558677481516, 'thetis_ground_deviation': 0.01539627267939551, 'ximu3_ground_deviation': 0.005184918366764168}, {'sensor_name': 'Accelerometer', 'axis': 'X', 'ground_truth_value': 135.0, 'thetis_ximu3_deviation': 0.006747883905677397, 'thetis_ground_deviation': 0.017301438004962293, 'ximu3_ground_deviation': 0.010667658367905104}, {'sensor_name': 'Accelerometer', 'axis': 'X', 'ground_truth_value': 180.0, 'thetis_ximu3_deviation': 0.011786402305601991, 'thetis_ground_deviation': 0.011618283150577944, 'ximu3_ground_deviation': 0.0007637990568019164}, {'sensor_name': 'Accelerometer', 'axis': 'X', 'ground_truth_value': 225.0, 'thetis_ximu3_deviation': 0.016942349755878012, 'thetis_ground_deviation': 0.0075385287008642105, 'ximu3_ground_deviation': 0.00938623150678163}, {'sensor_name': 'Accelerometer', 'axis': 'X', 'ground_truth_value': 270.0, 'thetis_ximu3_deviation': 0.01401774865798