In [3]:
import numpy as np
from scipy.spatial.transform import Rotation as R
import torch
import torch.nn.functional as F
from scipy.signal import butter, filtfilt, find_peaks
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from ahrs.filters import Madgwick, Mahony, EKF
from ahrs.utils.metrics import *
import spkit as sp

In [4]:
df = pd.read_csv("train.csv")

In [7]:
def quaternion_to_rotation_matrix(q: torch.Tensor) -> torch.Tensor:
    """
    Convierte un cuaternión q = [w, x, y, z] en una matriz de rotación 3x3 (R).
    Esta matriz puede aplicarse para rotar vectores o puntos en el espacio 3D.
    """

    # It decomposes the tensor q into 4 components:
    # w -> scalar component of the quaternion
    # x, y, z -> vector components
    # The ellipsis (...) allows it to work with any number of leading dimensions (e.g. batches)
    w, x, y, z = q[..., 0], q[..., 1], q[..., 2], q[..., 3]

    # Get how many dimensions (batch, time, etc.) are present
    B, T = w.shape

    # Create the output tensor R, which will contain 3x3 rotation matrices
    R = torch.zeros((B, T, 3, 3), device=q.device)

    # Apply the standard quaternion-to-rotation-matrix formulas
    R[..., 0, 0] = 1 - 2 * (y**2 + z**2)
    R[..., 0, 1] = 2 * (x*y - z*w)
    R[..., 0, 2] = 2 * (x*z + y*w)  

    R[..., 1, 0] = 2 * (x*y + z*w)
    R[..., 1, 1] = 1 - 2 * (x**2 + z**2)
    R[..., 1, 2] = 2 * (y*z - x*w)

    R[..., 2, 0] = 2 * (x*z - y*w)  
    R[..., 2, 1] = 2 * (y*z + x*w)
    R[..., 2, 2] = 1 - 2 * (x**2 + y**2)

    # Return a 3x3 matrix representing the same rotation as the quaternion
    return R




In [11]:
def rotate_acc_to_world(acc : torch.Tensor, q:torch.Tensor) -> torch.Tensor:
    """Usa los cuaterniones para generar la matriz de rotación y 
    aplica esa rotación a los vectores de aceleración, 
    devolviendo un vector 3D (en coordenadas del mundo).
    """
    
    R = quaternion_to_rotation_matrix(q) # (B,T,3,3)
    acc_world = torch.matmul(R, acc.unsqueeze(-1)).squeeze(-1) #(B,T,3)
    return acc_world

In [12]:
def calculate_angular_distance(dt):
    if isinstance(dt, pd.DataFrame):
        #quartetion
        quat_values = df[['rot_x', 'rot_y', 'rot_z', 'rot_w']].values
    else:
        quat_values = rot_data

    """
    transforma los datos de cuaternión en ángulos de rotación (distancia angular).

    Lo que obtienes no es un cuaternión nuevo, sino una medida escalar por paso de cuánto cambió la orientación: útil para detectar movimientos, velocidad angular o anomalías en la rotación.
    """
    num_samples_in_the_quat = quat_values.shape[0]
    angular_distance = np.zeros(num_samples_in_the_quat) #Starting in random seed
    for i in range(num_samples_in_the_quat - 1):
        quat1 = quat_values[i]
        quat2 = quat_values[i+1]

        if np.all(np.isnan(quat1)) or np.all(np.isclose(quat1,0)) or np.all(np.isnan(quat2)) or np.all(np.isclose(quat2, 0)):
            angular_dist[i] = 0
            continue
        try:
            r1 = R.from_quat(quat1)
            r2 = R.from_quat(quat2)

            relative_rotation = r1.inv() * 2

            angle = np.linalg.norm(relative_rotation.as_rotvect())
            angular_dist[i] = angle


        except ValueError:
            angular_dist[i] = 0
            pass
    return angular_dist



    """
Este bloque de código calcula la distancia angular entre orientaciones consecutivas usando los cuaterniones w, x, y, z. Cada cuaternión representa la orientación 3D de un sensor en un instante de tiempo. La función compara cuaterniones consecutivos, obtiene la rotación relativa que lleva de uno al siguiente y la convierte en un ángulo escalar mediante el vector de rotación. Este valor indica cuánto giró el sensor entre dos pasos consecutivos. Así, no se generan nuevos cuaterniones, sino una medida escalar por paso que refleja la magnitud del movimiento o la velocidad angular, útil para detectar movimientos, anomalías o crear features para modelos de machine learning.
    """

In [14]:
def calc_angular_distance(df):
    """
    crea el DataFrame con una misma columna con todos los cálculos
    """
    res = calculate_angular_distance(df[['rot_x', 'rot_y', 'rot_z', 'rot_w']])
    return pd.DataFrame(res, columns=['angular_distance'], index=df.index)


In [15]:
def calculate_angular_velocity_from_quat(df, time_delta=(1/200)): #Assuming 200Hz sampling rate
    #Extrae los quat de las columnas
    quat_values = df[['rot_x', 'rot_y', 'rot_z', 'rot_w']].values
    #Crea el shaoe 0 de la rotacion x para saber el numero de examples que hay
    num_samples = quat_values.shape[0]

    #Crea una matriz de (m x 3) 
    angular_velocity = np.zeros((num_samples, 3))

    for i in range(num_samples - 1):
        q_t = quat_values[i]
        q_t_plus_dt = quat_values[i + 1]
        if np.all(np.isnan(q_t)) or np.all(np.isclose(q_t, 0)) or \
                np.all(np.isnan(q_t_plus_dt)) or np.all(np.isclose(q_t_plus_dt, 0)):
            continue
        try:
            rot_t = R.from_quat(q_t)
            rot_t_plus_dt = R.from_quat(q_t_plus_dt)

            #Calculate the relative rotation of the quartetion
            delta_rot = rot_t.inv() * rot_t_plus_dt

            #Convert delta rotation to angular velocity vector
            # The rotation vector(Euler axis * angle) scaled by 1 / dt
            # Is a good approximation for small delta_rot
            angular_vel[i, :] = delta_rot.as_rotvec() / time_delta
        except ValueError:
            # If quaternion is invalid, angular velocity remains zero
            pass

        df['angular_vel_x'] = angular_vel[:, 0]
        df['angular_vel_y'] = angular_vel[:, 1]
        df['angular_vel_z'] = angular_vel[:, 2]
        return df

In [None]:
def add_temp_feat(df):
    for i in range(1,6): #For the 5 thermopiles
        df[f"temp_diff_{i}"] = df[f"thm_{i}"].diff().fillna(0)
        df[f"temp_acc_{i}"] = df[f"temp_diff_{i}"].diff().fillna(0)
        baseline_temp = df[f"thm_{i}"].rolling(1000, min_periods=1).mean()
        df[f"temp_dev_{i}"] = df[f"thm_{i}"] - baseline_temp
    return df    


    """
    Transforma los datos crudos de temperatura en features de velocidad, aceleración y anomalía.

    Facilita que el modelo aprenda patrones temporales y picos importantes.

    No normaliza estadísticamente, pero prepara los datos para que sean más interpretables y útiles para ML.
    """
    
    

In [16]:
#It gets the dataframe, an object of configuration, and its col of what to filter (acceleration|quat)
def filter_signals(df, cfg, col = 'acc'):
    axes = ['w', 'x', 'y', 'z'] if col == 'quat' else ['x','y','z']
    for axis in axes:
        #It lapses with the column that you are choosing
        x = df[f'{col}_{axis}'].values
        #Its using the filter ATAR(Adaptive Thresholding Algorithm for Reconstruction")
        #This eliminates the noise with a descomposition of wavelet of the signal
        x = sp.eeg.ATAR(x, wv='db3', winsize=cfg.win_len,
                        thr_method='ipr', beta=cfg.beta, k1=10, k2=50, OptMode='soft', verbose=0)
        df[f'{col}_{axis}'] = x
    return df

In [17]:
def apply_ahrs_filters(df, dt = 1/200):
    #The values of the accelertion columns
    acc = df[['acc_x', 'acc_y', 'acc_z']].values
    #The value of the gyroscopio columns
    gyro = df[['angular_vel_x', 'angular_vel_y', 'angular_vel_z']].values
    N = acc.shape[0] # Number of values

    #Intialize filter objects
    madgwick = Madgwick(sampleperiod=dt)
    mahony = Mahony(sampleperiod=dt)
    ekf = EKF(dt=dt)

    #Output quaternions

    q_madgwick = np.zeros((N, 4))
    q_mahony = np.zeros((N,4))
    q_ekf = np.zeros((N,4))

    q_madgwick[0] = np.array([1.0, 0.0, 0.0, 0.0])
    q_mahony[0] = np.array([1.0, 0.0, 0.0, 0.0])
    q_ekf[0] = np.array([1.0, 0.0, 0.0, 0.0])

    for t in range(1,N):
        q_madgwick[t] = madgwick.updateIMU(q_madgwick[t - 1], gyr=gyro[t], acc=acc[t])
        q_mahony[t] = mahony.updateIMU(q_mahony[t - 1], gyr=gyro[t], acc=acc[t])
        q_ekf[t] = ekf.update(q_ekf[t - 1], gyr=gyro[t], acc=acc[t])

    for i, ax in enumerate(['w', 'x', 'y', 'z']):
        df[f'q_madgwick_{ax}'] = q_madgwick[:, i]
        df[f'q_mahony_{ax}'] = q_mahony[:, i]
        df[f'q_ekf_{ax}'] = q_ekf[:, i]
    return df
        

In [18]:
def compute_combined_features(df, cfg):
    df = filter_signals(df, cfg=cfg)
    df = calculate_angular_velocity_from_quat(df, time_delta=1/cfg.hz)
    angular_dist = calc_angular_distance(df)
    df['angular_distance'] = angular_dist['angular_distance']
    acc = torch.tensor(df[['acc_x', 'acc_y', 'acc_z']].values, dtype=torch.float32)
    quat = F.normalize(torch.tensor(df[['rot_w', 'rot_x', 'rot_y', 'rot_z']].values, dtype=torch.float32), dim=-1)

    acc = acc.unsqueeze(0) #(1, T, 3)
    quat = quat.unsqueeze(0) #(1, T, 4)
    #world-frame acceleration
    acc_world = rotate_acc_to_world(acc, quat).numpy()
    df['acc_world_x'] = acc_world[0, ..., 0]
    df['acc_world_y'] = acc_world[0, ..., 1]
    df['acc_world_z'] = acc_world[0, ..., 2]

    # Tilt angles (from quaternion)
    w, x, y, z = quat[..., 0], quat[..., 1], quat[..., 2], quat[..., 3]
    sinr_cosp = 2 * (w * x + y * z)
    cosr_cosp = 1 - 2 * (x**2 + y**2)
    roll = torch.atan2(sinr_cosp, cosr_cosp)

    sinp = 2 * (w * y - z * x)
    pitch = torch.asin(torch.clamp(sinp, -1.0, 1.0))

    siny_cosp = 2 * (w * z + x * y)
    cosy_cosp = 1 - 2 * (y**2 + z**2)
    yaw = torch.atan2(siny_cosp, cosy_cosp)

    df['roll'] = roll[0].numpy()      # (B, T)
    df['pitch'] = pitch[0].numpy()    # (B, T)
    df['yaw'] = yaw[0].numpy()        # (B, T)

    # Jerk (acceleration derivative)
    jerk = acc[:, 1:] - acc[:, :-1]
    jerk = F.pad(jerk, (0, 0, 1, 0)).numpy()  # (B, T, 3)
    df['jerk_x'] = jerk[0, ... ,0]
    df['jerk_y'] = jerk[0, ... ,1]
    df['jerk_z'] = jerk[0, ... ,2]
    df = add_temp_feat(df)
    
    for i in range(1, 6):
        pixel_cols = [f"tof_{i}_v{p}" for p in range(64)]; tof_data = df[pixel_cols].replace(-1, np.nan)
        df[f'tof_{i}_mean'], df[f'tof_{i}_std'], df[f'tof_{i}_min'], df[f'tof_{i}_max'] = tof_data.mean(axis=1), tof_data.std(axis=1), tof_data.min(axis=1), tof_data.max(axis=1)
    df['angular_jerk_x'] = df['angular_vel_x'].diff().fillna(0)
    df['angular_jerk_y'] = df['angular_vel_y'].diff().fillna(0)
    df['angular_jerk_z'] = df['angular_vel_z'].diff().fillna(0)

    df['angular_snap_x'] = df['angular_jerk_x'].diff().fillna(0)
    df['angular_snap_y'] = df['angular_jerk_y'].diff().fillna(0)
    df['angular_snap_z'] = df['angular_jerk_z'].diff().fillna(0)
    return df