## Loading the data and making the association between the name of the Txt file and the position of the sensor

In [None]:
import os
import pandas as pd

# === Directory Setup ===
data_dir = "/Users/afonsoamaral/Desktop/data"

if not os.path.exists(data_dir):
    raise FileNotFoundError(f"Directory not found: {data_dir}")

# === Map of Device ID to Sensor Position ===
sensor_map = {
    "00B417C1": "head",
    "00B417C7": "torso",
    "00B417CE": "humerus_r",
    "00B417EB": "humerus_l",
    "00B41841": "ulna_r",
    "00B4184F": "ulna_l",
    "00B418AE": "hand_r",
    "00B44027": "hand_l",
    "00B45B76": "pelvis",
    "00B45B77": "femur_r",
    "00B45B79": "femur_l",
    "00B45B7A": "tibia_r",
    "00B45B7C": "tibia_l",
    "00B45B83": "calcn_r",
    "00B45B88": "calcn_l"
}

# === Find All TXT Files ===
txt_files = [f for f in os.listdir(data_dir) if f.endswith(".txt")]
if not txt_files:
    raise FileNotFoundError("No TXT files found in the directory.")

print(f"Found {len(txt_files)} TXT files: {txt_files}\n")

# === Loop Over Files ===
for filename in txt_files:
    file_path = os.path.join(data_dir, filename)

    # Attempt to identify the sensor based on file name
    matched_key = next((key for key in sensor_map if key in filename), None)
    if matched_key is None:
        print(f" Could not match device ID in file: {filename}")
        continue

    sensor_name = sensor_map[matched_key]
    print(f" Processing file: {filename} as '{sensor_name}'")

    # === Preview the file 
    with open(file_path, "r") as f:
        lines = f.readlines()

    print(f"First 3 lines from {sensor_name}:")
    for i, line in enumerate(lines[:3]):
        print(f"Line {i+1}: {line.strip()}")

## Make the file accessible for work

In [None]:
# === Define column names based on the IMU data structure ===
columns = ["PacketCounter", "Roll", "Pitch", "Yaw", "AccX", "AccY", "AccZ", "GyrX", "GyrY", "GyrZ", "MagX", "MagY", "MagZ"]

# === Map IMU identifiers to body segment names ===
sensor_map = {
    "00B417C1": "head",
    "00B417C7": "torso",
    "00B417CE": "humerus_r",
    "00B417EB": "humerus_l",
    "00B41841": "ulna_r",
    "00B4184F": "ulna_l",
    "00B418AE": "hand_r",
    "00B44027": "hand_l",
    "00B45B76": "pelvis",
    "00B45B77": "femur_r",
    "00B45B79": "femur_l",
    "00B45B7A": "tibia_r",
    "00B45B7C": "tibia_l",
    "00B45B83": "calcn_r",
    "00B45B88": "calcn_l"
}

# === Loop through and clean all TXT files ===
for file in txt_files:
    file_path = os.path.join(data_dir, file)

    # Match IMU identifier to a body part
    matched_key = next((key for key in sensor_map if key in file), None)
    sensor_name = sensor_map.get(matched_key, "unknown")

    # Load the file
    df = pd.read_csv(file_path, delim_whitespace=True, skiprows=7, header=None, names=columns)

    print(f"\n Loaded data for sensor: {sensor_name}")
    print(df.head(2))

    # Save a cleaned CSV with a descriptive name
    output_file = os.path.join(data_dir, f"cleaned_data_{sensor_name}.csv")
    df.to_csv(output_file, index=False)
    print(f" Saved cleaned file: {output_file}")


## Converting to the right units

In [None]:
import numpy as np
import pandas as pd
import os

# Define your cleaned files directory again
data_dir = "/Users/afonsoamaral/Desktop/data"
cleaned_files = [f for f in os.listdir(data_dir) if f.startswith("cleaned_data_") and f.endswith(".csv")]

# Function to convert units
def convert_to_correct_units(df):
    """Convert gyroscope data from deg/s to rad/s and magnetometer data from Gauss to Teslas."""
    gyro_columns = ["GyrX", "GyrY", "GyrZ"]
    mag_columns = ["MagX", "MagY", "MagZ"]
    df[gyro_columns] = df[gyro_columns] * (np.pi / 180)  # Convert deg/s to rad/s
    df[mag_columns] = df[mag_columns] * 1e-4  # Convert Gauss to Teslas
    # Accelerometer is already in m/s², no changes needed
    return df

# Process each cleaned CSV file
for file in cleaned_files:
    file_path = os.path.join(data_dir, file)
    df = pd.read_csv(file_path)

    # Apply conversion
    df_converted = convert_to_correct_units(df)

    # Define output filename
    sensor_name = file.replace("cleaned_data_", "").replace(".csv", "")
    converted_filename = f"cleaned_data_converted_{sensor_name}.csv"
    converted_filepath = os.path.join(data_dir, converted_filename)

    # Save converted file
    df_converted.to_csv(converted_filepath, index=False)
    print(f"Converted and saved: {converted_filename}")


# Pre-Processing

# GYROSCOPE

## Recognizing which type of bias drift we have in the gyroscope 

In [None]:
import os
import pandas as pd
import matplotlib.pyplot as plt

# === Define your directory and find converted files ===
data_dir = "/Users/afonsoamaral/Desktop/data"
converted_files = [f for f in os.listdir(data_dir) if f.startswith("cleaned_data_converted_") and f.endswith(".csv")]

# === Process each IMU file ===
for file in converted_files:
    sensor_name = file.replace("cleaned_data_converted_", "").replace(".csv", "")
    file_path = os.path.join(data_dir, file)
    df = pd.read_csv(file_path)

    # === Plot all gyroscope axes together ===
    plt.figure(figsize=(12, 6))
    plt.plot(df["GyrX"], label="GyrX", alpha=0.8)
    plt.plot(df["GyrY"], label="GyrY", alpha=0.8)
    plt.plot(df["GyrZ"], label="GyrZ", alpha=0.8)
    plt.axhline(y=0, color='black', linestyle='--', linewidth=0.8)
    plt.xlabel("Time (samples)")
    plt.ylabel("Gyroscope (rad/s)")
    plt.title(f"Raw Gyroscope Data - {sensor_name}")
    plt.legend()
    plt.tight_layout()
    plt.show()

    # === Plot each gyroscope axis separately ===
    fig, axes = plt.subplots(3, 1, figsize=(12, 12))

    axes[0].plot(df["GyrX"], label="GyrX", color="blue", alpha=0.8)
    axes[0].axhline(y=0, color='black', linestyle='--', linewidth=0.8)
    axes[0].set_ylabel("GyrX (rad/s)")
    axes[0].set_title(f"Gyroscope X - {sensor_name}")
    axes[0].legend()

    axes[1].plot(df["GyrY"], label="GyrY", color="green", alpha=0.8)
    axes[1].axhline(y=0, color='black', linestyle='--', linewidth=0.8)
    axes[1].set_ylabel("GyrY (rad/s)")
    axes[1].set_title(f"Gyroscope Y - {sensor_name}")
    axes[1].legend()

    axes[2].plot(df["GyrZ"], label="GyrZ", color="red", alpha=0.8)
    axes[2].axhline(y=0, color='black', linestyle='--', linewidth=0.8)
    axes[2].set_xlabel("Time (samples)")
    axes[2].set_ylabel("GyrZ (rad/s)")
    axes[2].set_title(f"Gyroscope Z - {sensor_name}")
    axes[2].legend()

    plt.tight_layout()
    plt.show()


## Processing Pipeline

In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.signal import butter, filtfilt
import pywt
from pykalman import KalmanFilter

# === Parameters ===
data_dir = "/Users/afonsoamaral/Desktop/data"
converted_files = [f for f in os.listdir(data_dir) if f.startswith("cleaned_data_converted_") and f.endswith(".csv")]
fs = 40  # Sampling frequency

# === Processing Functions ===

def band_pass_filter(data, low_cutoff=0.1, high_cutoff=5, fs=40, order=2):
    nyquist = 0.5 * fs
    low = low_cutoff / nyquist
    high = high_cutoff / nyquist
    b, a = butter(order, [low, high], btype='band', analog=False)
    return filtfilt(b, a, data)

def wavelet_denoise(data, wavelet="db4", level=2):
    coeffs = pywt.wavedec(data, wavelet, mode="per")
    sigma = np.median(np.abs(coeffs[-level])) / 0.6745
    threshold = sigma * np.sqrt(2 * np.log(len(data)))
    coeffs[1:] = [pywt.threshold(c, threshold, mode="soft") for c in coeffs[1:]]
    return pywt.waverec(coeffs, wavelet, mode="per")[:len(data)]

def kalman_filter(data):
    kf = KalmanFilter(
        transition_matrices=[1],
        observation_matrices=[1],
        initial_state_mean=0,
        initial_state_covariance=1e-2,
        transition_covariance=1e-3,
        observation_covariance=1e-2
    )
    filtered_state_means, _ = kf.smooth(data)
    return filtered_state_means.flatten()

# === Process each IMU file ===
for file in converted_files:
    sensor_name = file.replace("cleaned_data_converted_", "").replace(".csv", "")
    file_path = os.path.join(data_dir, file)
    df = pd.read_csv(file_path)
    #Define gyroscope columns
    gyro_columns = ["GyrX", "GyrY", "GyrZ"]

    # Apply Processing Pipeline
    for axis in gyro_columns:
        df[axis + "_filtered"] = band_pass_filter(df[axis], fs=fs)
        df[axis + "_denoised"] = wavelet_denoise(df[axis + "_filtered"])
        df[axis + "_kalman"] = kalman_filter(df[axis + "_denoised"])

    # Save processed file
    output_file = os.path.join(data_dir, f"final_corrected_gyroscope_data_{sensor_name}.csv")
    df.to_csv(output_file, index=False)
    print(f"Processed gyroscope data saved: {output_file}")

    # Plot Results
    fig, axes = plt.subplots(3, 1, figsize=(12, 12))
    colors = {"GyrX": "blue", "GyrY": "green", "GyrZ": "red"}

    for i, axis in enumerate(gyro_columns):
        corrected_axis = axis + "_kalman"
        axes[i].plot(df.index, df[axis], label=f"Raw {axis}", color=colors[axis], alpha=0.5)
        axes[i].plot(df.index, df[corrected_axis], label=f"Corrected {axis}", linestyle="dashed", color="black")
        axes[i].axhline(y=0, color='black', linestyle='--', linewidth=0.8)
        axes[i].set_xlabel("Time (samples)")
        axes[i].set_ylabel(f"{axis} (rad/s)")
        axes[i].set_title(f"Gyroscope Correction - {axis} - {sensor_name}")
        axes[i].legend()

    plt.tight_layout()
    plt.show()


## Plot processed gyroscope data

In [None]:
import os
import pandas as pd
import matplotlib.pyplot as plt

# === Setup Directory ===
data_dir = "/Users/afonsoamaral/Desktop/data"
gyro_files = [f for f in os.listdir(data_dir) if f.startswith("final_corrected_gyroscope_data_") and f.endswith(".csv")]

# === Plot Processed Gyroscope Data ===
for file in gyro_files:
    sensor_name = file.replace("final_corrected_gyroscope_data_", "").replace(".csv", "")
    file_path = os.path.join(data_dir, file)
    df = pd.read_csv(file_path)

    plt.figure(figsize=(12, 6))
    plt.plot(df["GyrX_kalman"], label="GyrX_kalman", alpha=0.8)
    plt.plot(df["GyrY_kalman"], label="GyrY_kalman", alpha=0.8)
    plt.plot(df["GyrZ_kalman"], label="GyrZ_kalman", alpha=0.8)
    plt.axhline(y=0, color='black', linestyle='--', linewidth=0.8)
    plt.xlabel("Time (samples)")
    plt.ylabel("Gyroscope (rad/s)")
    plt.title(f"Processed Gyroscope Data - {sensor_name}")
    plt.legend()
    plt.tight_layout()
    plt.show()

# ACCELEROMETER

## Recognizing which type of drift we have in the Accelerometer

In [None]:
import os
import pandas as pd
import matplotlib.pyplot as plt

# === Setup Directory ===
data_dir = "/Users/afonsoamaral/Desktop/data"
acc_files = [f for f in os.listdir(data_dir) if f.startswith("cleaned_data_converted_") and f.endswith(".csv")]

# === Loop Through Each Sensor's Cleaned Data ===
for file in acc_files:
    sensor_name = file.replace("cleaned_data_converted_", "").replace(".csv", "")
    file_path = os.path.join(data_dir, file)
    df = pd.read_csv(file_path)

    # --- Overview Plot (All Axes) ---
    plt.figure(figsize=(12, 6))
    plt.plot(df["AccX"], label="AccX", alpha=0.8)
    plt.plot(df["AccY"], label="AccY", alpha=0.8)
    plt.plot(df["AccZ"], label="AccZ", alpha=0.8)
    plt.axhline(y=0, color='black', linestyle='--', linewidth=0.8)
    plt.xlabel("Time (samples)")
    plt.ylabel("Accelerometer (m/s²)")
    plt.title(f"Raw Accelerometer Data - {sensor_name}")
    plt.legend()
    plt.tight_layout()
    plt.show()

    # --- Individual Axis Subplots ---
    fig, axes = plt.subplots(3, 1, figsize=(12, 12))
    axes[0].plot(df["AccX"], label="AccX", color="blue", alpha=0.8)
    axes[0].axhline(y=0, color='black', linestyle='--', linewidth=0.8)
    axes[0].set_ylabel("AccX (m/s²)")
    axes[0].set_title(f"{sensor_name} - AccX")
    axes[0].legend()

    axes[1].plot(df["AccY"], label="AccY", color="green", alpha=0.8)
    axes[1].axhline(y=0, color='black', linestyle='--', linewidth=0.8)
    axes[1].set_ylabel("AccY (m/s²)")
    axes[1].set_title(f"{sensor_name} - AccY")
    axes[1].legend()

    axes[2].plot(df["AccZ"], label="AccZ", color="red", alpha=0.8)
    axes[2].axhline(y=0, color='black', linestyle='--', linewidth=0.8)
    axes[2].set_xlabel("Time (samples)")
    axes[2].set_ylabel("AccZ (m/s²)")
    axes[2].set_title(f"{sensor_name} - AccZ")
    axes[2].legend()

    plt.tight_layout()
    plt.show()


## Processing Pipeline

In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.signal import butter, filtfilt, savgol_filter, medfilt, sosfilt

# === Parameters ===
data_dir = "/Users/afonsoamaral/Desktop/data"
fs = 40  # Sampling rate

# === Preprocessing Functions ===
def median_and_lowpass_filter(sensor_data, fs, medfilt_window_length=11):
    
    """
    Applies a median filter followed by a butterworth lowpass filter. The lowpass filter is 3rd order with a cutoff
    frequency of 20 Hz . The processing scheme is based on:
    "A Public Domain Dataset for Human Activity Recognition Using Smartphones"
    https://www.esann.org/sites/default/files/proceedings/legacy/es2013-84.pdf

    :param sensor_data: a 1-D or (MxN) array, where M is the signal length in samples and
                        N is the number of signals / channels.
    :param fs: the sampling frequency of the acc data.
    :param medfilt_window_length: the length of the median filter (has to be odd). Default: 11
    :return: the filtered data
    """

    #define the filter
    order = 3
    f_c = 18
    sos = butter(order, f_c, fs=fs, output='sos')
    
    #copy the array
    filtered_data = sensor_data.copy()
    
    #check the dimensionality of the input
    if filtered_data.ndim > 1:
        #cycle of the channels contained in data
        for channel in range(filtered_data.shape[1]):
            #get the channel and apply the median filter
            sig = medfilt(sensor_data[:, channel], medfilt_window_length)
            #apply butterworth filter
            filtered_data[:, channel] = sosfilt(sos, sig)
    else:
        #apply median filter
        med_filt = medfilt(sensor_data, medfilt_window_length)
        #apply butterworth filter
        filtered_data = sosfilt(sos, med_filt)
        
    return filtered_data

def gravitational_filter(acc_data, fs):
    
    """
    Function to filter out the gravitational component of ACC signals using a 3rd order butterworth lowpass filter with
    a cuttoff frequency of 0.3 Hz
    The implementation is based on:
    "A Public Domain Dataset for Human Activity Recognition Using Smartphones"
    https://www.esann.org/sites/default/files/proceedings/legacy/es2013-84.pdf
    :param acc_data: a 1-D or (MxN) array, where where M is the signal length in samples and
                 N is the number of signals / channels.
    :param fs: the sampling frequency of the acc data.
    :return: the gravitational component of each signal/channel contained in acc_data
    """
    
    #define the filter
    order = 3
    f_c = 0.5
    sos = butter(order, f_c, fs=fs, output='sos')
    
    #copy the array
    gravity_data = acc_data.copy()
    
    #check the dimensionality of the input
    if gravity_data.ndim > 1:
        #cycle of the channels contained in data
        for channel in range(gravity_data.shape[1]):
            #get the channel & apply butterworth filter
            gravity_data[:, channel] = sosfilt(sos, acc_data[:, channel])
    else:
        gravity_data = sosfilt(sos, acc_data)
        
    return gravity_data

def pre_process_inertial_data(sensor_data, is_acc=True, fs=40):
    
    """
    Applies the pre-processing pipeline of "A Public Domain Dataset for Human Activity Recognition Using Smartphones"
    (https://www.esann.org/sites/default/files/proceedings/legacy/es2013-84.pdf). The pipeline consists of:
    (1) applying a median filter
    (2) applying a 3rd order low-pass filter with a cut-off at 18 Hz
    (3) applying a 3rd order low-pass filter with a cut-off at 0.3 Hz to obtain gravitational component
    (4) subtract gravitational component from ACC signal

    :param sensor_data: the sensor data.
    :param is_acc: boolean indicating whether the sensor is an accelerometer.
    :param fs: the sampling frequency of the sensor data (in Hz).
    :return: numpy.array containing the pre-processed data.
    """
    
    #apply median and lowpass filter
    filtered_data = median_and_lowpass_filter(sensor_data, fs=fs)
    #check if sensor is ACC
    if is_acc:
        #get the gravitacional component
        gravitational_component = gravitational_filter(filtered_data, fs=fs)
        #subtract the gravitacional component
        filtered_data -= gravitational_component
    return filtered_data

def band_pass_filter(data, low_cutoff=0.1, high_cutoff=15, fs=40, order=3):
    nyquist = 0.5 * fs
    low = low_cutoff / nyquist
    high = high_cutoff / nyquist
    b, a = butter(order, [low, high], btype="band", analog=False)
    return filtfilt(b, a, data)

def adaptive_baseline_correction(data, alpha=0.08):
    baseline = np.zeros_like(data)
    estimate = data[0]
    for i in range(len(data)):
        estimate = alpha * data[i] + (1 - alpha) * estimate
        baseline[i] = data[i] - estimate
    return baseline

def savitzky_golay_smooth(data, window_size=5, poly_order=2):
    return savgol_filter(data, window_size, poly_order)

# === Process Each Sensor File ===
converted_files = [f for f in os.listdir(data_dir) if f.startswith("cleaned_data_converted_") and f.endswith(".csv")]

for file in converted_files:
    sensor_name = file.replace("cleaned_data_converted_", "").replace(".csv", "")
    file_path = os.path.join(data_dir, file)
    df = pd.read_csv(file_path)

    print(f"Processing accelerometer for sensor: {sensor_name}")

    # === Step 1: Remove Gravity ===
    accel_data = df[["AccX", "AccY", "AccZ"]].values
    accel_no_gravity = pre_process_inertial_data(accel_data, is_acc=True, fs=fs)

    # === Step 2: Band-pass Filter ===
    accel_bandpassed = np.apply_along_axis(band_pass_filter, 0, accel_no_gravity)

    # === Step 3: Baseline Correction ===
    accel_corrected = np.apply_along_axis(adaptive_baseline_correction, 0, accel_bandpassed)

    # === Step 4: Smoothing ===
    accel_smooth = np.apply_along_axis(savitzky_golay_smooth, 0, accel_corrected)

    # === Step 5: Save Processed Data ===
    df[["AccX_processed", "AccY_processed", "AccZ_processed"]] = accel_smooth
    processed_path = os.path.join(data_dir, f"final_corrected_accelerometer_data_{sensor_name}.csv")
    df.to_csv(processed_path, index=False)
    print(f" Saved processed accelerometer data: {processed_path}")

    # === Step 6: Plot for Visual Inspection ===
    fig, axes = plt.subplots(3, 1, figsize=(12, 12))
    colors = {"AccX": "blue", "AccY": "green", "AccZ": "red"}

    for i, axis in enumerate(["AccX", "AccY", "AccZ"]):
        corrected_axis = axis + "_processed"
        axes[i].plot(df.index, df[axis], label=f"Raw {axis}", color=colors[axis], alpha=0.5)
        axes[i].plot(df.index, df[corrected_axis], label=f"Processed {axis}", linestyle="dashed", color="black")
        axes[i].axhline(y=0, color='black', linestyle='--', linewidth=0.8)
        axes[i].set_xlabel("Time (samples)")
        axes[i].set_ylabel(f"{axis} (m/s²)")
        axes[i].set_title(f"{sensor_name} - Raw vs Processed {axis}")
        axes[i].legend()

    plt.tight_layout()
    plt.show()


## Plot processed accelerometer data

In [None]:
import os
import pandas as pd
import matplotlib.pyplot as plt

# Define the directory containing processed accelerometer files
data_dir = "/Users/afonsoamaral/Desktop/data"

# Get all processed accelerometer files
processed_files = [f for f in os.listdir(data_dir)
                   if f.startswith("final_corrected_accelerometer_data_") and f.endswith(".csv")]

# Plot each file
for file in processed_files:
    sensor_name = file.replace("final_corrected_accelerometer_data_", "").replace(".csv", "")
    file_path = os.path.join(data_dir, file)

    # Load the data
    df = pd.read_csv(file_path)

    # Identify processed accelerometer columns
    acc_columns = [col for col in df.columns if "Acc" in col and "processed" in col]

    # Plotting
    plt.figure(figsize=(12, 6))
    for col in acc_columns:
        plt.plot(df[col], label=col, alpha=0.8)

    plt.axhline(y=0, color='black', linestyle='--', linewidth=0.8)
    plt.xlabel("Time (samples)")
    plt.ylabel("Accelerometer (m/s²)")
    plt.title(f"Processed Accelerometer Data – {sensor_name}")
    plt.legend()
    plt.tight_layout()
    plt.show()

# MAGNETOMETER

## Recognizing which type of  drift we have in the magnetometer 

In [None]:
import os
import matplotlib.pyplot as plt
import pandas as pd

# Directory with processed files
data_dir = "/Users/afonsoamaral/Desktop/data"
converted_files = [f for f in os.listdir(data_dir) if f.startswith("cleaned_data_converted_") and f.endswith(".csv")]

if not converted_files:
    raise FileNotFoundError("No cleaned_data_converted_*.csv files found.")

for file in converted_files:
    file_path = os.path.join(data_dir, file)
    df = pd.read_csv(file_path)
    
    sensor_name = file.split("converted_")[1].replace(".csv", "")
    
    # Plot raw magnetometer data
    plt.figure(figsize=(12, 6))
    plt.plot(df["MagX"], label="MagX", alpha=0.8)
    plt.plot(df["MagY"], label="MagY", alpha=0.8)
    plt.plot(df["MagZ"], label="MagZ", alpha=0.8)
    plt.axhline(y=0, color='black', linestyle='--', linewidth=0.8)
    plt.xlabel("Time (samples)")
    plt.ylabel("Magnetometer (T)")
    plt.title(f"Raw Magnetometer Data - Identifying Drift ({sensor_name})")
    plt.legend()
    plt.tight_layout()
    plt.show()

    # Individual axis subplots
    fig, axes = plt.subplots(3, 1, figsize=(12, 12))
    axes[0].plot(df["MagX"], label="MagX", color="blue", alpha=0.8)
    axes[0].axhline(y=0, color='black', linestyle='--', linewidth=0.8)
    axes[0].set_ylabel("Mag X (T)")
    axes[0].set_title(f"{sensor_name} - MagX")
    axes[0].legend()

    axes[1].plot(df["MagY"], label="MagY", color="green", alpha=0.8)
    axes[1].axhline(y=0, color='black', linestyle='--', linewidth=0.8)
    axes[1].set_ylabel("Mag Y (T)")
    axes[1].set_title(f"{sensor_name} - MagY")
    axes[1].legend()

    axes[2].plot(df["MagZ"], label="MagZ", color="red", alpha=0.8)
    axes[2].axhline(y=0, color='black', linestyle='--', linewidth=0.8)
    axes[2].set_xlabel("Time (samples)")
    axes[2].set_ylabel("Mag Z (T)")
    axes[2].set_title(f"{sensor_name} - MagZ")
    axes[2].legend()

    plt.tight_layout()
    plt.show()


## Processing pipeline

In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.signal import butter, filtfilt, savgol_filter
from scipy.optimize import least_squares

# === Functions for Magnetometer Processing ===
def hard_iron_correction(mag_data):
    """Apply hard-iron correction using median instead of mean to remove constant bias."""
    bias = np.median(mag_data, axis=0)
    return mag_data - bias

def ellipsoid_fit(mag_data):
    """Estimate soft-iron correction using ellipsoid fitting."""
    def cost_function(params, x, y, z):
        A, B, C, D, E, F, G, H, I, J = params
        return (A*x**2 + B*y**2 + C*z**2 + 2*D*x*y + 2*E*x*z + 2*F*y*z +
                2*G*x + 2*H*y + 2*I*z + J)
    
    X, Y, Z = mag_data[:, 0], mag_data[:, 1], mag_data[:, 2]
    initial_guess = np.array([1, 1, 1, 0, 0, 0, 0, 0, 0, -1])
    result = least_squares(cost_function, initial_guess, args=(X, Y, Z))

    # Extract transformation matrix parameters
    A, B, C, D, E, F, G, H, I, J = result.x
    M_inv_raw = np.array([[A, D, E], [D, B, F], [E, F, C]])
    
    # Normalize matrix using Singular Value Decomposition (SVD)
    U, S, Vt = np.linalg.svd(M_inv_raw)
    scale_factor = np.median(S)
    M_inv = np.linalg.inv(M_inv_raw / scale_factor)
    
    # Extract offset (soft-iron bias)
    soft_iron_bias = np.array([G, H, I])
    return M_inv, soft_iron_bias

def band_pass_filter(data, low_cutoff=0.5, high_cutoff=10, fs=40, order=2):
    """Apply a Butterworth band-pass filter to remove both low-frequency drift and high-frequency noise."""
    nyquist = 0.5 * fs
    low = low_cutoff / nyquist
    high = high_cutoff / nyquist
    b, a = butter(order, [low, high], btype="band", analog=False)
    return filtfilt(b, a, data)

def savitzky_golay_smooth(data, window_size=9, poly_order=2):
    """Apply Savitzky-Golay smoothing to reduce noise while preserving trends."""
    return savgol_filter(data, window_size, poly_order)

# === Process All Cleaned Files ===
data_dir = "/Users/afonsoamaral/Desktop/data"
cleaned_files = [f for f in os.listdir(data_dir) if f.startswith("cleaned_data_converted_") and f.endswith(".csv")]

if not cleaned_files:
    raise FileNotFoundError("No cleaned_data_converted_<sensor>.csv files found.")

for file in cleaned_files:
    sensor_path = os.path.join(data_dir, file)
    df = pd.read_csv(sensor_path)
    sensor_name = file.split("cleaned_data_converted_")[1].replace(".csv", "")
    df_raw = df.copy()  # For plotting later

    # Extract and process magnetometer data
    mag_columns = ["MagX", "MagY", "MagZ"]
    mag_data = df[mag_columns].values

    mag_hard_iron = hard_iron_correction(mag_data)
    M_inv, soft_iron_bias = ellipsoid_fit(mag_hard_iron)
    mag_soft_iron = np.dot(M_inv, (mag_hard_iron - soft_iron_bias).T).T
    mag_filtered = np.apply_along_axis(band_pass_filter, 0, mag_soft_iron)
    mag_smooth = np.apply_along_axis(savitzky_golay_smooth, 0, mag_filtered)

    # Store in DataFrame
    df[["MagX_processed", "MagY_processed", "MagZ_processed"]] = mag_smooth

    # Save file
    processed_path = os.path.join(data_dir, f"final_corrected_magnetometer_data_{sensor_name}.csv")
    df.to_csv(processed_path, index=False)
    print(f"Saved magnetometer data for sensor: {sensor_name}")

# === Plot 4 Graphs Per Sensor ===
for file in cleaned_files:
    sensor_name = file.split("cleaned_data_converted_")[1].replace(".csv", "")
    raw_path = os.path.join(data_dir, f"cleaned_data_converted_{sensor_name}.csv")
    processed_path = os.path.join(data_dir, f"final_corrected_magnetometer_data_{sensor_name}.csv")
    
    if not os.path.exists(raw_path) or not os.path.exists(processed_path):
        continue

    df_raw = pd.read_csv(raw_path)
    df_proc = pd.read_csv(processed_path)

    # === Combined MagX, MagY, MagZ Raw vs Processed ===
    plt.figure(figsize=(12, 5))
    plt.plot(df_raw["MagX"], label="Raw MagX", color="blue", alpha=0.3)
    plt.plot(df_raw["MagY"], label="Raw MagY", color="green", alpha=0.3)
    plt.plot(df_raw["MagZ"], label="Raw MagZ", color="red", alpha=0.3)

    plt.plot(df_proc["MagX_processed"], label="Processed MagX", color="blue", linestyle="--")
    plt.plot(df_proc["MagY_processed"], label="Processed MagY", color="green", linestyle="--")
    plt.plot(df_proc["MagZ_processed"], label="Processed MagZ", color="red", linestyle="--")

    plt.axhline(y=0, color='black', linestyle='--', linewidth=0.8)
    plt.title(f"{sensor_name} - Magnetometer Raw vs Processed (Combined)")
    plt.xlabel("Time (samples)")
    plt.ylabel("Magnetic Field (T)")
    plt.legend()
    plt.tight_layout()
    plt.show()

    # === Separate Axes Plots: Raw vs Processed ===
    fig, axes = plt.subplots(3, 1, figsize=(12, 10))
    colors = ["blue", "green", "red"]
    mag_axes = ["MagX", "MagY", "MagZ"]

    for i, axis in enumerate(mag_axes):
        axes[i].plot(df_raw[axis], label=f"Raw {axis}", color=colors[i], alpha=0.3)
        axes[i].plot(df_proc[f"{axis}_processed"], label=f"Processed {axis}", linestyle="--", color="black")
        axes[i].set_title(f"{sensor_name} - {axis} Raw vs Processed")
        axes[i].set_ylabel("Magnetic Field (T)")
        axes[i].set_xlabel("Time (samples)")
        axes[i].axhline(y=0, color='black', linestyle='--', linewidth=0.8)
        axes[i].legend()

    plt.tight_layout()
    plt.show()


## Plot processed magnetometer data

In [None]:
import os
import pandas as pd
import matplotlib.pyplot as plt

# === Loop through each processed file ===
for file in cleaned_files:
    sensor_name = file.split("cleaned_data_converted_")[1].replace(".csv", "")
    processed_path = os.path.join(data_dir, f"final_corrected_magnetometer_data_{sensor_name}.csv")

    if not os.path.exists(processed_path):
        print(f"Processed file not found for sensor: {sensor_name}")
        continue

    # Load the processed DataFrame
    df = pd.read_csv(processed_path)

    # Print column names to verify structure (optional debug)
    print(f"\nProcessed columns for {sensor_name}:")
    print(df.columns)

    # Identify processed magnetometer columns
    mag_columns = [col for col in df.columns if "Mag" in col and "processed" in col]
    print(f"Processed magnetometer columns for {sensor_name}:", mag_columns)

    # Plot processed magnetometer data
    plt.figure(figsize=(12, 6))
    for col in mag_columns:
        plt.plot(df[col], label=col, alpha=0.8)

    plt.axhline(y=0, color='black', linestyle='--', linewidth=0.8)
    plt.xlabel("Time (samples)")
    plt.ylabel("Magnetometer (T)")
    plt.title(f"{sensor_name} - Processed Magnetometer Data")
    plt.legend()
    plt.tight_layout()
    plt.show()

## Creating the combined final data file for the sensor fusion operation

In [None]:
import os
import pandas as pd

# Define data directory
data_dir = "/Users/afonsoamaral/Desktop/data"

# Get all processed accelerometer files
accel_files = [f for f in os.listdir(data_dir) if f.startswith("final_corrected_accelerometer_data_") and f.endswith(".csv")]

for accel_file in accel_files:
    # Extract sensor name
    sensor_name = accel_file.replace("final_corrected_accelerometer_data_", "").replace(".csv", "")

    # Construct paths for all 3 processed files
    accel_path = os.path.join(data_dir, f"final_corrected_accelerometer_data_{sensor_name}.csv")
    gyro_path = os.path.join(data_dir, f"final_corrected_gyroscope_data_{sensor_name}.csv")
    mag_path = os.path.join(data_dir, f"final_corrected_magnetometer_data_{sensor_name}.csv")

    # Check if all 3 files exist
    if not os.path.exists(accel_path) or not os.path.exists(gyro_path) or not os.path.exists(mag_path):
        print(f"Skipping {sensor_name}: One or more processed files are missing.")
        continue

    # Load data
    accel_df = pd.read_csv(accel_path)
    gyro_df = pd.read_csv(gyro_path)
    mag_df = pd.read_csv(mag_path)

    # Required columns
    accel_columns = ["AccX_processed", "AccY_processed", "AccZ_processed"]
    gyro_columns = ["GyrX_kalman", "GyrY_kalman", "GyrZ_kalman"]
    mag_columns = ["MagX_processed", "MagY_processed", "MagZ_processed", "PacketCounter", "Roll", "Pitch", "Yaw"]

    # Check column availability
    for col in accel_columns:
        if col not in accel_df.columns:
            raise ValueError(f"Missing column in {accel_file}: {col}")
    for col in gyro_columns:
        if col not in gyro_df.columns:
            raise ValueError(f"Missing column in {gyro_file}: {col}")
    for col in mag_columns:
        if col not in mag_df.columns:
            raise ValueError(f"Missing column in {mag_file}: {col}")

    # Concatenate the selected columns
    combined_df = pd.concat([
        accel_df[accel_columns].reset_index(drop=True),
        gyro_df[gyro_columns].reset_index(drop=True),
        mag_df[mag_columns].reset_index(drop=True)
    ], axis=1)

    # Save the combined dataset
    combined_path = os.path.join(data_dir, f"final_combined_sensor_data_{sensor_name}.csv")
    combined_df.to_csv(combined_path, index=False)
    print(f"Saved final combined data for sensor: {sensor_name}")


## Estimating the Roll, Pitch and Yaw values based on the measured pre-processing sensor data through Sensor Fusion

In [None]:
import os
import numpy as np
import pandas as pd
from ahrs.filters import Mahony
from ahrs.common.orientation import q2euler

# === Settings ===
data_dir = "/Users/afonsoamaral/Desktop/data"
fs = 40  # Sampling rate (Hz)
dt = 1 / fs

# === Find All Sensor Files ===
combined_files = [f for f in os.listdir(data_dir)
                  if f.startswith("final_combined_sensor_data_") and f.endswith(".csv")]

# === Process Each Sensor File ===
for file in combined_files:
    sensor_name = file.replace("final_combined_sensor_data_", "").replace(".csv", "")
    file_path = os.path.join(data_dir, file)
    df = pd.read_csv(file_path)

    print(f"\nProcessing {sensor_name} with Mahony filter...")

    # Required sensor columns
    required_cols = ["AccX_processed", "AccY_processed", "AccZ_processed",
                     "GyrX_kalman", "GyrY_kalman", "GyrZ_kalman",
                     "MagX_processed", "MagY_processed", "MagZ_processed"]
    
    if not all(col in df.columns for col in required_cols):
        print(f"Skipping {sensor_name}: Missing required columns.")
        continue

    # === Extract Sensor Data ===
    acc = df[["AccX_processed", "AccY_processed", "AccZ_processed"]].values
    gyr = df[["GyrX_kalman", "GyrY_kalman", "GyrZ_kalman"]].values
    mag = df[["MagX_processed", "MagY_processed", "MagZ_processed"]].values

    # === Initialize Mahony Filter ===
    mahony = Mahony(sampleperiod=dt)
    q = np.zeros((len(df), 4))
    q[0] = np.array([1.0, 0.0, 0.0, 0.0])  # initial quaternion

    # === Apply Mahony Filter ===
    for i in range(1, len(df)):
        q[i] = mahony.updateIMU(q[i-1], gyr=gyr[i], acc=acc[i])
        # To include magnetometer: use mahony.update() if supported by the ahrs version
        # q[i] = mahony.update(q[i-1], gyr=gyr[i], acc=acc[i], mag=mag[i])

    # === Convert Quaternion to Euler Angles ===
    euler_rad = np.array([q2euler(qi) for qi in q])
    euler_deg = np.rad2deg(euler_rad)

    # === Store Orientation Estimations ===
    df["Roll_Estimated"] = euler_deg[:, 0]
    df["Pitch_Estimated"] = euler_deg[:, 1]
    df["Yaw_Estimated"] = euler_deg[:, 2]

    # === Save Output ===
    output_path = os.path.join(data_dir, f"final_orientation_estimation_{sensor_name}.csv")
    df.to_csv(output_path, index=False)
    print(f"Saved Mahony-estimated orientation for {sensor_name} to {output_path}")


## Comparing the calculated orientation values in the previous step with the measured Roll, Pitch and Yaw by the IMUs 

In [None]:
import os
import pandas as pd
import matplotlib.pyplot as plt

# Directory with processed orientation files
data_dir = "/Users/afonsoamaral/Desktop/data"

# Find all orientation files
orientation_files = [f for f in os.listdir(data_dir) if f.startswith("final_orientation_estimation_") and f.endswith(".csv")]

if not orientation_files:
    raise FileNotFoundError("No orientation estimation files found.")

for file in orientation_files:
    sensor_name = file.replace("final_orientation_estimation_", "").replace(".csv", "")
    file_path = os.path.join(data_dir, file)
    df = pd.read_csv(file_path)

    print(f"\n Plotting Measured vs Estimated Orientation for: {sensor_name}")

    # === Plot Roll, Pitch, Yaw Comparison ===
    fig, axes = plt.subplots(3, 1, figsize=(12, 12))

    axes[0].plot(df["Roll"], label="Measured Roll", color='blue', alpha=0.6)
    axes[0].plot(df["Roll_Estimated"], label="Estimated Roll", linestyle="dashed", color='black')
    axes[0].set_ylabel("Angle (°)")
    axes[0].set_title(f"{sensor_name} - Roll: Measured vs Estimated")
    axes[0].legend()
    axes[0].axhline(0, color='gray', linestyle='--', linewidth=0.8)

    axes[1].plot(df["Pitch"], label="Measured Pitch", color='green', alpha=0.6)
    axes[1].plot(df["Pitch_Estimated"], label="Estimated Pitch", linestyle="dashed", color='black')
    axes[1].set_ylabel("Angle (°)")
    axes[1].set_title(f"{sensor_name} - Pitch: Measured vs Estimated")
    axes[1].legend()
    axes[1].axhline(0, color='gray', linestyle='--', linewidth=0.8)

    axes[2].plot(df["Yaw"], label="Measured Yaw", color='red', alpha=0.6)
    axes[2].plot(df["Yaw_Estimated"], label="Estimated Yaw", linestyle="dashed", color='black')
    axes[2].set_xlabel("Time (samples)")
    axes[2].set_ylabel("Angle (°)")
    axes[2].set_title(f"{sensor_name} - Yaw: Measured vs Estimated")
    axes[2].legend()
    axes[2].axhline(0, color='gray', linestyle='--', linewidth=0.8)

    plt.tight_layout()
    plt.show()


## Convert Processed IMU Data to OpenSim Format

In [None]:
"""import os
import numpy as np
import pandas as pd

# === Configuration ===
data_dir = "/Users/afonsoamaral/Desktop/data"
sampling_rate = 40  # Hz
output_mot_path = os.path.join(data_dir, "opensim_combined_motion.mot")

# === Sensor to OpenSim coordinate mapping ===
sensor_to_opensim_mapping = {
    "chest": {
        "thorax_tilt": "Pitch_Estimated",
        "thorax_list": "Roll_Estimated",
        "thorax_rotation": "Yaw_Estimated"
    },
    "right-thigh": {
        "hip_flexion_r": "Pitch_Estimated",
        "hip_adduction_r": "Roll_Estimated",
        "hip_rotation_r": "Yaw_Estimated"
    },
    "right-knee": {
        "knee_angle_r": "Pitch_Estimated"
    },
    "right-foot": {
        "mtp_angle_r": "Pitch_Estimated"
    },
    "left-thigh": {
        "hip_flexion_l": "Pitch_Estimated",
        "hip_adduction_l": "Roll_Estimated",
        "hip_rotation_l": "Yaw_Estimated"
    },
    "left-knee": {
        "knee_angle_l": "Pitch_Estimated"
    },
    "left-foot": {
        "mtp_angle_l": "Pitch_Estimated"
    },
    "lumbar": {
        "lumbar_extension": "Pitch_Estimated",
        "lumbar_bending": "Roll_Estimated",
        "lumbar_rotation": "Yaw_Estimated"
    },
    "right-arm": {
        "arm_flex_r": "Pitch_Estimated",
        "arm_add_r": "Roll_Estimated",
        "arm_rot_r": "Yaw_Estimated"
    },
    "right-wrist": {
        "wrist_flex_r": "Pitch_Estimated",
        "wrist_dev_r": "Roll_Estimated"
    },
    "right-hand": {
        "pro_sup_r": "Yaw_Estimated"
    },
    "left-arm": {
        "arm_flex_l": "Pitch_Estimated",
        "arm_add_l": "Roll_Estimated",
        "arm_rot_l": "Yaw_Estimated"
    },
    "left-wrist": {
        "wrist_flex_l": "Pitch_Estimated",
        "wrist_dev_l": "Roll_Estimated"
    },
    "left-hand": {
        "pro_sup_l": "Yaw_Estimated"
    },
    "head": {
        "head_flexion": "Pitch_Estimated",
        "head_rotation": "Yaw_Estimated",
        "head_bending": "Roll_Estimated"
    }
}

# === Load and combine data ===
combined_df = None
time_column = None
used_columns = ["time"]  # Start with time

for sensor_name, joint_mapping in sensor_to_opensim_mapping.items():
    file_path = os.path.join(data_dir, f"final_orientation_estimation_{sensor_name}.csv")
    if not os.path.exists(file_path):
        print(f"Warning: File not found for sensor '{sensor_name}' at {file_path}")
        continue

    df = pd.read_csv(file_path)
    
# Ensure time column
    if "PacketCounter" in df.columns:
        df["time"] = df["PacketCounter"] / sampling_rate
        df["time"] = df["time"] - df["time"].iloc[0]
    elif "time" not in df.columns:
        dt = 1 / sampling_rate
        df["time"] = np.arange(0, len(df)*dt, dt)
        df["time"] = df["time"] - df["time"].iloc[0]

    if time_column is None:
        time_column = df["time"].values
        combined_df = pd.DataFrame({"time": time_column})

    for joint_name, source_column in joint_mapping.items():
        if source_column not in df.columns:
            print(f"Warning: Missing {source_column} in {sensor_name}. Skipping {joint_name}.")
            continue
        combined_df[joint_name] = df[source_column].values
        used_columns.append(joint_name)
 
# === Create OpenSim Header ===
header = [
    "name Combined_Motion",
    f"datacolumns {len(used_columns)}",
    f"datarows {len(combined_df)}",
    f"range {combined_df['time'].iloc[0]} {combined_df['time'].iloc[-1]}",
    "endheader"
]

# === Save to .mot ===
with open(output_mot_path, "w") as f:
    f.write("\n".join(header) + "\n")
    f.write("\t".join(used_columns) + "\n")
    combined_df.to_csv(f, sep="\t", columns=used_columns, index=False, header=False, float_format="%.6f")

print(f" Combined OpenSim .mot file created at:\n{output_mot_path}") """

## Creating the orientation files for OpenSim Inverse Kinematics

In [None]:
import pandas as pd
import numpy as np
import os
import glob

# Configuration
data_dir = "/Users/afonsoamaral/Desktop/data"
sampling_rate = 40  # Hz
input_pattern = os.path.join(data_dir, "final_orientation_estimation_*.csv")

# Find all relevant CSV files
files = glob.glob(input_pattern)

for file_path in files:
    try:
        # Extract body segment name from filename
        base_name = os.path.basename(file_path)
        segment = base_name.replace("final_orientation_estimation_", "").replace(".csv", "")

        # Load required columns only
        use_columns = ["PacketCounter", "Roll_Estimated", "Pitch_Estimated", "Yaw_Estimated"]
        df = pd.read_csv(file_path, usecols=lambda c: c in use_columns or c == "time")

        # Ensure time column
        if "PacketCounter" in df.columns:
            df["time"] = df["PacketCounter"] / sampling_rate
            df["time"] = df["time"] - df["time"].iloc[0]
        elif "time" not in df.columns:
            dt = 1 / sampling_rate
            df["time"] = np.arange(0, len(df) * dt, dt)
            df["time"] = df["time"] - df["time"].iloc[0]

        # Select and rename columns
        sto_df = df[["time", "Roll_Estimated", "Pitch_Estimated", "Yaw_Estimated"]].copy()
        sto_df.columns = [
            "time",
            f"{segment}_roll",
            f"{segment}_pitch",
            f"{segment}_yaw"
        ]

        # Save to .sto 
        sto_filename = f"{segment}_orientation.sto"
        sto_path = os.path.join(data_dir, sto_filename)
        sto_df.to_csv(sto_path, sep="\t", index=False, header=True)

        print(f"STO created: {sto_filename}")

    except Exception as e:
        print(f"Failed to process {file_path}: {e}")

## Creating the orientation file at placement pose for IMU placement in OpenSim

In [None]:
import pandas as pd
import os
import glob

# --- Configuration ---
data_dir = "/Users/afonsoamaral/Desktop/data"
input_pattern = os.path.join(data_dir, "*_orientation.sto")
output_file = os.path.join(data_dir, "full_placement_orientation.sto")

# Collect one-row DataFrames from each file
placement_rows = []

for file_path in glob.glob(input_pattern):
    try:
        df = pd.read_csv(file_path, sep="\t")
        if df.empty:
            continue

        # Take only first row
        row = df.iloc[[0]].copy()

        # Ensure column names are already in OpenSim format
        placement_rows.append(row)
        print(f" Added: {os.path.basename(file_path)}")

    except Exception as e:
        print(f" Failed to process {file_path}: {e}")

# Merge all into single-row DataFrame
if placement_rows:
    full_df = placement_rows[0]
    for next_df in placement_rows[1:]:
        full_df = pd.merge(full_df, next_df.drop(columns=["time"]), left_index=True, right_index=True)

    # Save to .sto file
    full_df.to_csv(output_file, sep="\t", index=False)
    print(f"\n Placement orientation file created:\n{output_file}")
else:
    print(" No placement data assembled.")

df_preview.iloc[:, :10]  # Show first 10 columns