In [1]:
import ast
from typing import Union, Tuple, List
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import math
from scipy.signal import butter, filtfilt

In [2]:
# Load datasets

df2 = pd.read_csv(r"C:\Users\pasupuleti\Desktop\group-project\experiments\drowsiness_data\lloyd_1\session_metrics.csv")
df1 = pd.read_csv(r"C:\Users\pasupuleti\Desktop\group-project\experiments\drowsiness_data\Prof_ignacio1\session_metrics.csv")
df3 = pd.read_csv(r"C:\Users\pasupuleti\Desktop\group-project\experiments\drowsiness_data\devano_palma\session_metrics.csv")

In [3]:
df3 = df3[1:-1]  # Remove first and last rows of devano_palma dataset

In [4]:
import numpy as np
from scipy.signal import butter, filtfilt

def steering_reversal_rate(steering_angle, gap_size=3.0, lowpass_cutoff=0.6, filter_order=2):
    """
    Calculate the Steering Wheel Reversal Rate (SRR) as defined by Markkula & Engström (2006).
    
    Parameters
    ----------
    steering_angle : np.ndarray
        Array of steering wheel angle values (degrees).
    fs : float
        Sampling frequency in Hz.
    gap_size : float, optional
        Minimum angular difference (degrees) between reversals (default 3.0° for visual load).
    lowpass_cutoff : float, optional
        Low-pass filter cutoff frequency (Hz) (default 0.6 Hz).
    filter_order : int, optional
        Butterworth filter order (default 2).
    
    Returns
    -------
    reversal_rate : float
        Steering wheel reversal rate (reversals per minute).
    """
    
    # 1 Low-pass filter to remove noise
    fs = len(steering_angle) / 60.0  # Assuming steering_angle is sampled over 60 seconds
    nyquist = 0.5 * fs
    normal_cutoff = lowpass_cutoff / nyquist
    b, a = butter(filter_order, normal_cutoff, btype='low', analog=False)
    filtered_angle = filtfilt(b, a, steering_angle)
    
    # 2️ Find stationary points (local minima and maxima)
    diff = np.diff(filtered_angle)
    sign_changes = np.sign(diff)
    stationary_idx = np.where(np.diff(sign_changes) != 0)[0] + 1
    
    # 3️ Compute reversals
    reversals = 0
    k = 0
    for l in range(1, len(stationary_idx)):
        theta_k = filtered_angle[stationary_idx[k]]
        theta_l = filtered_angle[stationary_idx[l]]
        delta_theta = theta_l - theta_k
        
        if abs(delta_theta) >= gap_size:
            reversals += 1
            k = l
        elif abs(theta_l) < abs(theta_k):
            k = l
    # 4️ Calculate rate per minute
    duration_min = len(steering_angle) / (fs * 60.0)
    reversal_rate = reversals / duration_min if duration_min > 0 else 0.0
    
    return reversal_rate



def calculate_perclos(
    ear_values: np.ndarray, ear_threshold: float, min_consec_frames: int
) -> float:
    """
    Calculate PERCLOS (Percentage of Eye Closure) from stored EAR values.

    Args:
        ear_values (np.ndarray): Array of EAR values sampled over time.
        ear_threshold (float): EAR threshold below which eyes are considered closed.
        min_consec_frames (int): Minimum consecutive frames below threshold to count as closed.

    Returns:
        float: PERCLOS percentage over the EAR values window.
    """
    closed_frames = 0
    consec_count = 0

    for ear in ear_values:
        if ear is not None:  # frames with no mediapipe mesh records aa none
            if ear < ear_threshold:  # eye closed
                consec_count += 1
            else:
                if consec_count >= min_consec_frames:  # eye opened
                    closed_frames += (
                        consec_count  # add the consecutive frames to closed frames
                    )
                consec_count = 0  # restart the count

    # Account for closing at the end of the window
    if consec_count >= min_consec_frames:
        closed_frames += consec_count

    total_frames = np.sum([ear is not None for ear in ear_values])

    if total_frames == 0:
        return 0.0

    perclos = (closed_frames / total_frames) * 100.0
    return perclos



def approx_entropy(time_series: np.array, run_length: int = 2) -> float:
    """Approximate entropy (2sec window) [https://www.mdpi.com/1424-8220/17/3/495]

    Args:
        time_series (np.array): steering movement data
        run_length (int): length of the run data (window with overlapping of the data,
        example x = [1,2,3], if runlength=2 then output will be [[1,2], [2,3]])

    Returns:
        float: regularity (close to 0 : no irregularity, close to 1: irregularity)
    """
    std_dev = np.std(time_series)
    filter_level = 0.2 * std_dev

    def _maxdist(x_i, x_j):
        return max(abs(ua - va) for ua, va in zip(x_i, x_j))

    def _phi(m):
        n = time_series_length - m + 1
        x = [
            [time_series[j] for j in range(i, i + m - 1 + 1)]
            for i in range(time_series_length - m + 1)
        ]
        counts = [
            sum(1 for x_j in x if _maxdist(x_i, x_j) <= filter_level) / n for x_i in x
        ]
        return sum(math.log(c) for c in counts) / n

    time_series_length = len(time_series)

    return abs(_phi(run_length + 1) - _phi(run_length))



def steering_reversal_rate_simple(
    steering_angle: np.ndarray,
    gap_size: float = 3.0,
    lowpass_cutoff: float = 0.6,
    filter_order: int = 2,
) -> float:
    """
    Simple and clear implementation of the Markkula & Engström (2006)
    Steering Wheel Reversal Rate (SRR) algorithm.

    Parameters
    ----------
    steering_angle : np.ndarray
        Steering wheel angle signal (degrees)
    gap_size : float, optional
        Minimum angular change to count as a reversal (default 3.0° for visual load)
    lowpass_cutoff : float, optional
        Low-pass filter cutoff frequency (Hz). Use 0.6 Hz for visual, 2 Hz for cognitive.
    filter_order : int, optional
        Butterworth filter order (default 2)

    Returns
    -------
    float
        Steering wheel reversal rate (reversals per minute)
    """

    # === 1. LOW-PASS FILTER ===
    fs = len(steering_angle) / 60.0  # Sampling frequency (Hz), assuming 60s window
    nyquist = 0.5 * fs
    normal_cutoff = lowpass_cutoff / nyquist
    b, a = butter(filter_order, normal_cutoff, btype="low", analog=False)
    filtered = filtfilt(b, a, steering_angle)

    # === 2. FIND STATIONARY POINTS (local minima & maxima) ===
    diff = np.diff(filtered)
    sign_diff = np.sign(diff)

    stationary_points = []
    for i in range(1, len(sign_diff)):
        if sign_diff[i] != sign_diff[i - 1]:  # sign change → extremum
            stationary_points.append(i)

    if len(stationary_points) < 2:
        return 0.0  # no reversals possible

    # === 3. COUNT REVERSALS (for both up and down directions) ===
    def count_reversals(theta_values, gap):
        count = 0
        k = 0
        for l in range(1, len(theta_values)):
            if theta_values[l] - theta_values[k] >= gap:
                count += 1
                k = l
            elif theta_values[l] < theta_values[k]:
                k = l
        return count

    # Extract steering values at stationary points
    theta_vals = filtered[stationary_points]

    # Count upward and downward reversals
    n_up = count_reversals(theta_vals, gap_size)
    n_down = count_reversals(-theta_vals, gap_size)

    total_reversals = n_up + n_down

    # === 4. CALCULATE REVERSALS PER MINUTE ===
    duration_min = len(filtered) / (fs * 60.0)
    reversal_rate = total_reversals / duration_min if duration_min > 0 else 0.0

    return reversal_rate



# create a new column label by checking karthik_drowsiness_level and Vanchha_drowsiness_level if they are same then keep that value otherwise remove that row
def checking_conflict(row: pd.Series) -> Union[str, np.nan]:
    """checking both annotators labelled same label or different.

    Args:
        row (pd.Series): data

    Returns:
        Union[str, np.nan]: if both annotators labelled same label and no nan values returns that label otherwise return nan
    """
    if (row['karthik_drowsiness_level'] == row['Vanchha_drowsiness_level']) and (not pd.isna(row['karthik_drowsiness_level']) and not pd.isna(row['Vanchha_drowsiness_level'])):
        return row['karthik_drowsiness_level']
    else:
        return np.nan

In [5]:
# Selecting relevant columns
columns = ['window_id', 'metric_PERCLOS', 'metric_BlinkRate',
       'metric_YawnRate', 'metric_Entropy', 'metric_SteeringRate',
       'metric_SDLP', 'raw_ear', 'raw_mar', 'raw_steering', 'raw_lane',
       'karthik_drowsiness_level', 'karthik_notes',
       'karthik_submission_type', 'Vanchha_drowsiness_level',
       'Vanchha_notes', 'Vanchha_submission_type',
      ]

In [6]:
df1_ = df1[columns]
df2_ = df2[columns]
df3_ = df3[columns]

In [7]:
# concating all dataframes
df = pd.concat([df1_, df2_, df3_])

In [8]:
df["raw_ear"].isnull().sum()

np.int64(0)

In [9]:
# converting the string representation of list to actual list
df["raw_ear"] = df["raw_ear"].apply(ast.literal_eval)
df["steering_angle"] = df["raw_steering"].apply(
    lambda x: [v * 70 for v in ast.literal_eval(x)]
)

In [10]:
df.columns

Index(['window_id', 'metric_PERCLOS', 'metric_BlinkRate', 'metric_YawnRate',
       'metric_Entropy', 'metric_SteeringRate', 'metric_SDLP', 'raw_ear',
       'raw_mar', 'raw_steering', 'raw_lane', 'karthik_drowsiness_level',
       'karthik_notes', 'karthik_submission_type', 'Vanchha_drowsiness_level',
       'Vanchha_notes', 'Vanchha_submission_type', 'steering_angle'],
      dtype='object')

In [11]:
# correcting metric_PERCLOS by chaninging min_consec_frames to 0
df["metric_PERCLOS"] = df["raw_ear"].apply(lambda x: calculate_perclos(np.array(x), ear_threshold=0.2, min_consec_frames=0))

In [12]:
df["metric_SteeringRate"] = df["steering_angle"].apply(lambda x: steering_reversal_rate(np.array(x)))

In [13]:
df["steering_reversals"] = df["steering_angle"].apply(lambda x: steering_reversal_rate_simple(
    steering_angle=np.array(x),
    gap_size = 3.0,
    lowpass_cutoff = 0.6,
    filter_order= 2,
))

In [None]:
df["metric_Entropy"] = df["steering_angle"].apply(lambda x: approx_entropy(np.array(x)))

In [None]:
# create a new column label by checking karthik_drowsiness_level and Vanchha_drowsiness_level if they are same then keep that value otherwise remove that row
df["label"] = df.apply(checking_conflict, axis=1)

In [None]:
df

In [None]:
plt.plot(df["steering_angle"].iloc[1])

In [None]:
df.columns

In [None]:
final_columns = [
    'window_id', 'metric_PERCLOS', 'metric_BlinkRate', "metric_YawnRate",
    'metric_Entropy', 'metric_SteeringRate', 'metric_SDLP', "label",
    'steering_reversals', 'Entropy'
]

In [None]:
df_ = df[final_columns]

In [None]:
df_

In [None]:
# checking for nan values in labels column
df_["label"].isnull().sum()

In [None]:
# dropping rows with nan values in label column
df_ = df_.dropna(subset=['label'])

In [None]:
df_.reset_index(inplace=True, drop=True)

In [None]:
sns.boxplot(df_["metric_PERCLOS"])

In [None]:
# Create subplots: 3 rows, 1 column (one plot per person)
fig, axes = plt.subplots(3, 1, figsize=(10, 8), sharex=True)

# Plot each dataset on its own subplot
axes[0].plot(df2["metric_SteeringRate"], color='b')
axes[0].set_title('Lloyd')

axes[1].plot(df1["metric_SteeringRate"], color='g')
axes[1].set_title('Prof. Ignacio')

axes[2].plot(df3["metric_SteeringRate"], color='r')
axes[2].set_title('Devano Palma')

# Add labels and formatting
for ax in axes:
    ax.set_ylabel('metric_SteeringRate')
    ax.grid(True)

axes[-1].set_xlabel('Frame / Time Index')

plt.tight_layout()
plt.show()


In [None]:
df_

In [None]:
df_["label"].value_counts()

In [None]:
df_['label'] = df_['label'].map({'Low': 1, 'Moderate': 2, 'High': 3})

In [None]:
df_

In [None]:

for column in df_.columns:
    plt.figure(figsize=(6, 4))
    plt.boxplot(df_[column].dropna())
    plt.title(f'Boxplot of {column}')
    plt.ylabel(column)
    plt.show()

In [None]:
df_['label'].value_counts()

In [None]:
df_

In [None]:
df_rename_columns = {
    'metric_PERCLOS': 'perclos',
    'metric_BlinkRate': 'eye_blink_rate',
    'metric_YawnRate': 'yawning_rate',
    'metric_Entropy': 'steering_entropy',
    'metric_SteeringRate': 'steering_reversal_rate',
    'metric_SDLP': 'std_lane_position',
    'label': 'drowsiness_level'
}

df_ = df_.rename(columns=df_rename_columns, errors='ignore')


In [None]:
df_

In [None]:
df_.to_csv(r"C:\Users\pasupuleti\Desktop\group-project\experiments\drowsiness_data\drowsiness_dataset_cleaned.csv", index=False)

In [None]:
df_["label"]

In [None]:
x = np.array([1,2,3,4,5, -5, -4, -3, -2, -1])

In [None]:
np.sign(x)