1. Extract
2. Transform
3. Split
4. Outlier Removal only from the Training set and then scaling
5. Load

In [1]:
# IMPORTS
import pandas as pd
import numpy as np
from pathlib import Path
from sklearn.preprocessing import RobustScaler
from sklearn.model_selection import GroupShuffleSplit
import re
import warnings

In [2]:
# OPTIONS
warnings.filterwarnings("ignore")

In [3]:
# Extract
RAW_DATA_PATH = Path("../data/raw/")
OFFSETS = [0, 1, 2, 3, 4]  # Augmentation

In [4]:
# Feature Definitions
# Columns that go into the model
FEATURE_COLS = [
    "mean_hr_5min",
    "hr_volatility_5min",
    "hr_jumpiness_5min",
    "hr_mean_total",
    "hr_std_total",
    "stress_cv",
    "hours_awake",
    "cum_sleep_debt",
    "sleep_inertia_idx",
    "circadian_sin",
    "circadian_cos",
    "hr_zscore",
    "user_id",  # Needed for splitting, will be dropped before training
]

# Columns that require scaling (Subset of above)
SCALE_COLS = [
    "mean_hr_5min",
    "hr_volatility_5min",
    "hr_jumpiness_5min",
    "hr_mean_total",
    "hr_std_total",
    "stress_cv",
    "sleep_inertia_idx",
    "hours_awake",
    "cum_sleep_debt",
]

In [5]:
# Transform
TARGET_COL = "is_fatigued"

In [6]:
# Split
TEST_SIZE = 0.2
RANDOM_STATE = 42

In [7]:
# Sample
SAMPLE_SIZE = 2000

In [8]:
# Load
PROCESSED_PATH = Path("../data/processed/")
INPUTS_TRAIN_FILE = PROCESSED_PATH / "inputs_train.parquet"
INPUTS_TEST_FILE = PROCESSED_PATH / "inputs_test.parquet"
TARGETS_TRAIN_FILE = PROCESSED_PATH / "targets_train.parquet"
TARGETS_TEST_FILE = PROCESSED_PATH / "targets_test.parquet"
INPUTS_SAMPLE_FILE = PROCESSED_PATH / "inputs_sample.parquet"
TARGETS_SAMPLE_FILE = PROCESSED_PATH / "targets_sample.parquet"

In [9]:
def get_all_user_ids(data_path):
    files = list(data_path.glob("HR_*.csv"))
    ids = [
        re.search(r"HR_(\d+).csv", f.name).group(1)
        for f in files
        if re.search(r"HR_(\d+).csv", f.name)
    ]
    return sorted(list(set(ids)))

CLEANING (Per User) and ENGINEERING (Per User) for heart rate, pvt, and sleep data

In [10]:
def clean_hr_data(file_path):
    """
    STEP 1: CLEANING (Per User)
    - Fix timestamps, remove duplicates, filter impossible values.
    - NO feature engineering here.
    """
    # 1. Load & Sort
    df = pd.read_csv(file_path)
    df["HRTIME"] = pd.to_datetime(df["HRTIME"])
    df = df.sort_values("HRTIME")

    # 2. Clean (Drop NaNs, Filter Outliers 40-180bpm)
    df = df.dropna(subset=["HR"])
    df = df[(df["HR"] > 40) & (df["HR"] < 180)]

    # 3. Handle Duplicates (merged the rows into one single row by calculating the average.)
    df = df.groupby("HRTIME", as_index=False)["HR"].mean()
    df = df.sort_values("HRTIME")
    return df

In [11]:
def engineer_hr_features(df):
    """
    STEP 2: ENGINEERING (Per User)
    - Rolling Windows & Expanding Baselines.
    - Requires continuous time series (cannot be done after split).
    """

    # Set index for time-aware rolling
    df = df.set_index("HRTIME").sort_index()

    # Rolling Features (Short-term State - 5 min window)
    # min_periods=30 ensures we have at least 30 seconds of data
    df["mean_hr_5min"] = df["HR"].rolling("5min", min_periods=30).mean()
    df["hr_volatility_5min"] = df["HR"].rolling("5min", min_periods=30).std()

    # --- Cumulative Features (Long-term Baseline) ---
    df["hr_mean_total"] = df["HR"].expanding(min_periods=30).mean()
    df["hr_std_total"] = df["HR"].expanding(min_periods=30).std()
    df["hr_std_total"] = df["hr_std_total"].fillna(1)

    # --- Z-Score (Standardized Severity) ---
    df["hr_zscore"] = (df["HR"] - df["hr_mean_total"]) / (df["hr_std_total"] + 0.1)

    # Jumpiness (Short Term Variability)
    time_gap = df.index.to_series().diff().dt.total_seconds()
    df["hr_diff"] = df["HR"].diff().abs()
    df.loc[time_gap > 5, "hr_diff"] = np.nan  # Only diff if gap < 5s

    df["hr_jumpiness_5min"] = np.sqrt((df["hr_diff"] ** 2).rolling("5min", min_periods=30).mean())

    # Stress Ratio (Coefficient of Variation)
    df["stress_cv"] = df["hr_volatility_5min"] / (df["mean_hr_5min"] + 0.1)

    # Final Clean: Remove Warm-up NaNs
    df = df.dropna(subset=["mean_hr_5min"])

    # Drop bad dates (Year 2000 bug)
    df = df[df.index.year > 2021].copy()

    if df.empty:
        return None

    return df.reset_index()

In [12]:
def clean_sleep_data(file_path):
    """
    CLEANING (Per User)
    - Responsibilities: Load, Clean NaNs/Duplicates, Filter logical durations.
    - Returns: Clean DataFrame with valid timestamps and raw duration.
    """
    df = pd.read_csv(file_path)
    df["START"] = pd.to_datetime(df["START"])
    df["END"] = pd.to_datetime(df["END"])

    # Remove missing values
    df = df.dropna(subset=["START", "END"])
    # Clean Duplicates
    df = df.drop_duplicates(subset=["START", "END"])

    # Calculate Duration (Needed for filtering)
    df["duration_hours"] = (df["END"] - df["START"]).dt.total_seconds() / 3600

    # 3. Filter Logical Duration (15 min to 36 hours)
    df = df[(df["duration_hours"] > 0.25) & (df["duration_hours"] < 36)]

    # 4. Sort (Critical for rolling calculations later)
    df = df.sort_values("END")

    return df

In [13]:
def engineer_sleep_features(df):
    """
    ENGINEERING (Per User)
    - Responsibilities: Calculate Sleep Debt and Cumulative Debt.
    """

    # Rolling Debt Logic
    df["sleep_debt"] = 8.0 - df["duration_hours"]

    # Cumulative Debt (Rolling sum over last 3 sessions)
    # Group by a dummy key because this function runs per-user
    df["_temp_group"] = 1
    df["cum_sleep_debt"] = (
        df.groupby("_temp_group")["sleep_debt"]
        .rolling(3, min_periods=1)
        .sum()
        .reset_index(0, drop=True)
    )
    return df[["START", "END", "duration_hours", "cum_sleep_debt"]]

In [14]:
def process_pvt_data(file_path):
    df = pd.read_csv(file_path)
    df["TESTSTART"] = pd.to_datetime(df["TESTSTART"])
    df = df[df["TAPTIME"] > 100]  # Filter Artifacts (False starts < 100ms)
    # Aggregate per Test
    df = df.groupby(["TESTID", "TESTSTART"]).agg({"TAPTIME": "mean"}).reset_index()
    df.rename(columns={"TAPTIME": "pvt_mean_rt"}, inplace=True)
    return df

Target Logic

In [15]:
def create_classes(rt):
    """
    Map PVT reaction time to continuous fatigue risk [0, 1]
    """
    if rt < 300:
        return 0.0
    elif rt < 400:
        # Linear interpolation between 300â€“400 ms
        return (rt - 300) / 100.0
    else:
        return 1.0

In [16]:
def process_single_user_pipeline(uid, raw_path, offsets):
    """
    The Master Function.
    Coordinates loading, cleaning, engineering, and merging for ONE user.
    """
    try:
        # 1. Define Paths
        hr_file = raw_path / f"HR_{uid}.csv"
        pvt_file = raw_path / f"pvt_{uid}.csv"
        sleep_file = raw_path / f"sleep_{uid}.csv"

        # 2. Check Existence
        if not (hr_file.exists() and pvt_file.exists() and sleep_file.exists()):
            return None

        # --- HR PROCESSING ---
        df_hr_clean = clean_hr_data(hr_file)
        if df_hr_clean is None:
            return None

        df_hr_eng = engineer_hr_features(df_hr_clean)
        if df_hr_eng is None:
            return None

        # --- PVT PROCESSING ---
        df_pvt = process_pvt_data(pvt_file)
        if df_pvt is None:
            return None

        # --- SLEEP PROCESSING (Split) ---
        df_sleep_clean = clean_sleep_data(sleep_file)
        if df_sleep_clean is None:
            return None

        df_sleep_eng = engineer_sleep_features(df_sleep_clean)
        if df_sleep_eng is None:
            return None

        # 4. Augmentation (Merge HR + PVT)
        augmented_dfs = []
        for offset in offsets:
            targets_shifted = df_pvt.copy()
            targets_shifted["MATCH_TIME"] = targets_shifted["TESTSTART"] - pd.Timedelta(
                minutes=offset
            )

            merged = pd.merge_asof(
                targets_shifted.sort_values("MATCH_TIME"),
                df_hr_eng.sort_values("HRTIME"),
                left_on="MATCH_TIME",
                right_on="HRTIME",
                direction="backward",
                tolerance=pd.Timedelta("10 minutes"),
            )
            augmented_dfs.append(merged)

        df_aug = pd.concat(augmented_dfs)
        df_aug = df_aug.dropna(subset=["mean_hr_5min"])

        # 5. Merge Sleep
        df_sleep_merge = df_sleep_eng.rename(columns={"END": "last_sleep_end"})
        df_final = pd.merge_asof(
            df_aug.sort_values("TESTSTART"),
            df_sleep_merge.sort_values("last_sleep_end"),
            left_on="TESTSTART",
            right_on="last_sleep_end",
            direction="backward",
        )

        # 6. Context Features & Imputation
        df_final["hours_awake"] = (
            df_final["TESTSTART"] - df_final["last_sleep_end"]
        ).dt.total_seconds() / 3600

        # Impute Missing Sleep
        if df_final["hours_awake"].isna().any():
            user_median = df_final["hours_awake"].median()
            fill_val = user_median if pd.notna(user_median) else 8.0
            df_final["hours_awake"] = df_final["hours_awake"].fillna(fill_val)
            if "cum_sleep_debt" in df_final.columns:
                df_final["cum_sleep_debt"] = df_final["cum_sleep_debt"].fillna(0.0)

        # Recalculate Inertia & Circadian
        df_final["sleep_inertia_idx"] = 1 / (df_final["hours_awake"] + 0.1)
        test_hour = df_final["TESTSTART"].dt.hour + (df_final["TESTSTART"].dt.minute / 60)
        df_final["circadian_sin"] = np.sin(2 * np.pi * test_hour / 24)
        df_final["circadian_cos"] = np.cos(2 * np.pi * test_hour / 24)

        # Sleep Inertia
        df_final["sleep_inertia_idx"] = 1 / (df_final["hours_awake"] + 0.1)

        # 7. Final Clean
        df_final_user = df_final.dropna(subset=["hours_awake", "mean_hr_5min"])
        df_final_user["user_id"] = uid

        return df_final_user

    except Exception as e:
        print(f"Error processing {uid}: {e}")
        return None

In [27]:
pvt_file = RAW_DATA_PATH / f"pvt_{3369}.csv"
df = pd.read_csv(pvt_file)
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2099 entries, 0 to 2098
Data columns (total 7 columns):
 #   Column      Non-Null Count  Dtype 
---  ------      --------------  ----- 
 0   id          2099 non-null   int64 
 1   TESTID      2099 non-null   int64 
 2   TESTSTART   2099 non-null   object
 3   TRIALID     2099 non-null   int64 
 4   TRIALNAME   2099 non-null   object
 5   TRIALSTART  2099 non-null   object
 6   TAPTIME     2099 non-null   int64 
dtypes: int64(4), object(3)
memory usage: 114.9+ KB


In [17]:
# --- 2. DATASET LEVEL PIPELINE ---


def split_and_scale_dataset(df, feature_cols, scale_cols, target_func):
    """
    Takes the raw combined dataframe and returns ready-to-train arrays.
    Performs: Target Creation -> Feature Select -> Split -> Scale
    """
    # 1. Apply Target Logic
    df["fatigue_score"] = df["pvt_mean_rt"].apply(target_func)

    # 2. Select Features
    inputs = df[feature_cols].copy()
    targets = df[["fatigue_score"]].copy()

    # 3. Split (GroupShuffleSplit)
    # This respects User independence
    splitter = GroupShuffleSplit(n_splits=1, test_size=TEST_SIZE, random_state=RANDOM_STATE)
    train_idx, test_idx = next(splitter.split(inputs, targets, groups=inputs["user_id"]))

    X_train = inputs.iloc[train_idx].copy()
    X_test = inputs.iloc[test_idx].copy()
    y_train = targets.iloc[train_idx].copy()
    y_test = targets.iloc[test_idx].copy()

    # 4. Scale (Fit on Train, Transform Test)
    print(f"Scaling {len(scale_cols)} columns...")
    scaler = RobustScaler()

    X_train[scale_cols] = scaler.fit_transform(X_train[scale_cols])
    X_test[scale_cols] = scaler.transform(X_test[scale_cols])

    return X_train, X_test, y_train, y_test

Main Execution

In [18]:
# --- EXTRACT ---
all_user_data = []
user_ids = get_all_user_ids(RAW_DATA_PATH)
print(f"Found {len(user_ids)} users.")

print("Starting Phase 1: Per-User Processing...")
for uid in user_ids:
    user_df = process_single_user_pipeline(uid, RAW_DATA_PATH, OFFSETS)

    if user_df is not None:
        all_user_data.append(user_df)
        print(f"User {uid}: Added {len(user_df)} rows.")
    else:
        print(f"User {uid}: Skipped.")

# Aggregate
df_total = pd.concat(all_user_data, ignore_index=True)
print(f"Total Extraction: {df_total.shape}")

Found 29 users.
Starting Phase 1: Per-User Processing...
User 3369: Added 125 rows.
User 3372: Added 0 rows.
User 3394: Added 5 rows.
User 3566: Added 25 rows.
User 38213: Added 10 rows.
User 4692: Added 20 rows.
User 5135: Added 5 rows.
User 54720: Added 50 rows.
User 5876: Added 20 rows.
User 6073: Added 0 rows.
User 6321: Added 0 rows.
User 6366: Added 0 rows.
User 6391: Added 25 rows.
User 6561: Added 0 rows.
User 6724: Added 5 rows.
User 7591: Added 85 rows.
User 7982680: Added 40 rows.
User 7982818: Added 25 rows.
User 8068: Added 0 rows.
User 8340242: Added 35 rows.
User 8394: Added 26 rows.
User 8489754: Added 96 rows.
User 8489783: Added 70 rows.
User 8489813: Added 55 rows.
User 8490189: Added 55 rows.
User 8562291: Added 40 rows.
User 8562423: Added 31 rows.
User 8562476: Added 10 rows.
User 8632: Added 480 rows.
Total Extraction: (1338, 23)


In [19]:
df_total.to_csv(PROCESSED_PATH / "full_dataset_preprocess.csv", index=False)

In [20]:
# B. Phase 2: Dataset-Level Processing (Split & Scale)
print("Starting Phase 2: Splitting and Scaling...")

X_train, X_test, y_train, y_test = split_and_scale_dataset(
    df=df_total,
    feature_cols=FEATURE_COLS,
    scale_cols=SCALE_COLS,
    target_func=create_classes,
)

Starting Phase 2: Splitting and Scaling...
Scaling 9 columns...


LOAD

In [21]:
# Phase 3: Save Artifacts
print(f"Train Shape: {X_train.shape}, Test Shape: {X_test.shape}")

# Create Debug Sample (Aligned)
X_sample = X_train.sample(n=min(SAMPLE_SIZE, len(X_train)), random_state=RANDOM_STATE)
y_sample = y_train.loc[X_sample.index]

PROCESSED_PATH.mkdir(parents=True, exist_ok=True)

X_train.to_parquet(INPUTS_TRAIN_FILE)
X_test.to_parquet(INPUTS_TEST_FILE)
y_train.to_parquet(TARGETS_TRAIN_FILE)
y_test.to_parquet(TARGETS_TEST_FILE)
X_sample.to_parquet(INPUTS_SAMPLE_FILE)
y_sample.to_parquet(TARGETS_SAMPLE_FILE)

print("Data successfully saved to:", PROCESSED_PATH)

Train Shape: (1032, 13), Test Shape: (306, 13)
Data successfully saved to: ../data/processed
