## Wear Data Preprocessing notebook
### Step 0. Setup the paths and env variables

In [1]:
from pathlib import Path
import json, sys, re
from typing import List, Union

import numpy as np
import pandas as pd
from tqdm import tqdm

ROOT = Path("/home/aidan/IMU_LM_Data")
sys.path.insert(0, str(ROOT))
from UTILS.helpers import (
    resample_df,            # decimate FIR-based resampling (df, target_cols, factor)
    convert_unit,           # unit conversion; here used for accel "acc"
    normalize_str, keyize, _keyize
)

BASE    = ROOT / "data"
RAW     = BASE / "raw_data" / "Wear" / "50hz"    
CLEANED = BASE / "cleaned_premerge"
MERGED  = BASE / "merged_dataset"

SCHEMA_PATH       = ROOT / "Unification" / "schemas" / "continuous_stream_schema.json"
ACTIVITY_MAP_PATH = ROOT / "Unification" / "schemas" / "activity_mapping.json"

SCHEMA       = json.loads(SCHEMA_PATH.read_text())
ACT_MAP_FULL = json.loads(ACTIVITY_MAP_PATH.read_text())

UNKNOWN_ID = int(ACT_MAP_FULL.get("unknown_activity_id", 9000))
ID2NAME    = {int(x["id"]): x["name"] for x in ACT_MAP_FULL["label_set"]}
RAW2ID     = {_keyize(k): int(v) for k, v in ACT_MAP_FULL.get("mapping", {}).items()}

print("Paths & contracts ready.")
print(f"Schema keys: {list(SCHEMA.keys())}")
print("RAW dir :", RAW)
print("CLEANED :", CLEANED)
print("MERGED  :", MERGED)

Paths & contracts ready.
Schema keys: ['name', 'version', 'primary_index', 'description', 'columns', 'rate_hz', 'axis_frame', 'unit_contract', 'unknown_activity_id', 'expectations']
RAW dir : /home/aidan/IMU_LM_Data/data/raw_data/Wear/50hz
CLEANED : /home/aidan/IMU_LM_Data/data/cleaned_premerge
MERGED  : /home/aidan/IMU_LM_Data/data/merged_dataset


### Step 1. Ingest, preporccess and map the data 

In [2]:
# ================================
# STEP 1 — Load WEAR (right wrist)
# ================================
def _collect_wear_files(raw_dir: Path) -> List[Path]:
    files = sorted(raw_dir.glob("sbj_*.csv"))
    print(f"[WEAR] Found {len(files)} subject CSVs under {raw_dir}")
    return files

def _estimate_hz_from_index(n_rows: int, assumed_hz: float = 50.0) -> float:
    # With equally spaced index-derived timestamps, the median Hz is the assumed_hz.
    # This is a placeholder for future actual-timestamp variants.
    return float(assumed_hz) if n_rows >= 3 else np.nan

def load_wear_raw(
    raw_dir: Path,
    wrist: str = "right",               # "right" | "left"
    downsample_to_50hz_if_needed: bool = False,  # WEAR/50hz already at 50 Hz; keep False by default
) -> pd.DataFrame:
    """
    Returns wrist-only (accelerometer) frame with:
      subject_id (str), session_id (str), timestamp_s (float),
      acc_x/y/z (m/s^2), gyro_x/y/z (NaN),
      dataset_activity_label (str)
    """
    wrist = wrist.lower()
    assert wrist in {"right","left"}, "wrist must be 'right' or 'left'"

    files = _collect_wear_files(raw_dir)
    if not files:
        print("No WEAR subject CSVs found.")
        return pd.DataFrame()

    # Column templates
    col_prefix = f"{wrist}_arm_acc_"
    acc_cols = [f"{col_prefix}x", f"{col_prefix}y", f"{col_prefix}z"]

    # Some distributions include other sensors; we ignore legs & the other wrist here
    all_rows = []
    dataset_name = "wear"
    sampling_rate_hz = 50.0  # WEAR 50Hz split

    for f in tqdm(files, desc="WEAR files"):
        try:
            df = pd.read_csv(f)
        except Exception as e:
            print(f"[WARN] Failed to read {f.name}: {e}")
            continue

        # Subject ID: prefer 'sbj_id' column; otherwise parse from filename
        if "sbj_id" in df.columns:
            subj_raw = str(df["sbj_id"].iloc[0])
        else:
            m = re.match(r"sbj_(\d+)\.csv$", f.name)
            subj_raw = m.group(1) if m else f.stem.replace("sbj_","")
        subject_id = f"S{int(subj_raw):02d}" if str(subj_raw).isdigit() else f"S{subj_raw}"

        # Treat each file as a single session to preserve monotonic timestamps cleanly
        session_id = "ses01"

        # Required columns sanity check
        missing = [c for c in acc_cols if c not in df.columns]
        if missing:
            print(f"[WARN] {f.name} missing columns: {missing} — skipping.")
            continue

        # Label column presence; keep even if NaN (we'll fill with 'unknown')
        label_col = "label" if "label" in df.columns else None
        if label_col is None:
            # create a placeholder label column
            df["label"] = np.nan
            label_col = "label"

        # Build synthetic timestamp at 50 Hz from row index (0,1,2,...)/50
        # Keep float seconds; later we convert to ns.
        # Do NOT drop rows based on labels; retain all data.
        n = len(df)
        timestamps_s = np.arange(n, dtype=np.float64) / sampling_rate_hz

        # Units: WEAR files store accel in g; convert to m/s^2.
        # convert_unit(..., kind="acc") is assumed to do g→m/s^2 if input is in g.
        acc_x = convert_unit(df[acc_cols[0]].astype(np.float64).to_numpy(), kind="acc")
        acc_y = convert_unit(df[acc_cols[1]].astype(np.float64).to_numpy(), kind="acc")
        acc_z = convert_unit(df[acc_cols[2]].astype(np.float64).to_numpy(), kind="acc")

        # Gyro is absent → fill NaN float32 arrays
        gyro_x = np.full(n, np.nan, dtype=np.float32)
        gyro_y = np.full(n, np.nan, dtype=np.float32)
        gyro_z = np.full(n, np.nan, dtype=np.float32)

        # Native labels (string); we won't drop NaNs — fill to 'unknown'
        native_lbl = df[label_col].astype("string").fillna("unknown")

        # Assemble minimal raw frame
        out = pd.DataFrame({
            "subject_id": subject_id,
            "session_id": session_id,
            "timestamp_s": timestamps_s,

            "acc_x": acc_x.astype(np.float32),
            "acc_y": acc_y.astype(np.float32),
            "acc_z": acc_z.astype(np.float32),

            "gyro_x": gyro_x, "gyro_y": gyro_y, "gyro_z": gyro_z,
            "activity_label_raw": native_lbl
        })

        # Optional: if provided WEAR files ever drifted from 50 Hz, resample here.
        if downsample_to_50hz_if_needed:
            # Guard: nothing to do since we already synthesized 50 Hz timestamps.
            pass

        out["subject_id"] = out["subject_id"].astype("string")
        out["session_id"] = out["session_id"].astype("string")

        all_rows.append(out)

    if not all_rows:
        return pd.DataFrame()

    raw = pd.concat(all_rows, ignore_index=True)

    # Quick RAW SUMMARY
    print("\n=== RAW SUMMARY (WEAR right wrist) ===")
    print(f"Rows: {raw.shape[0]:,}")
    # Estimate Hz: derived from index; report assumed 50.
    print(f"Estimated Hz: ~{_estimate_hz_from_index(len(raw)):0.2f}")
    print("Top native labels (verbatim):")
    print(raw["activity_label_raw"].value_counts(dropna=False).head(20))

    return raw

raw_wear = load_wear_raw(RAW, wrist="right", downsample_to_50hz_if_needed=False)
raw_wear.head(3)

[WEAR] Found 24 subject CSVs under /home/aidan/IMU_LM_Data/data/raw_data/Wear/50hz


  df = pd.read_csv(f)
WEAR files: 100%|██████████| 24/24 [00:05<00:00,  4.49it/s]



=== RAW SUMMARY (WEAR right wrist) ===
Rows: 3,466,400
Estimated Hz: ~50.00
Top native labels (verbatim):
activity_label_raw
unknown                         1377362
jogging                          125158
jogging (sidesteps)              123374
stretching (lunging)             122021
lunges (complex)                 121820
sit-ups (complex)                120555
sit-ups                          119535
lunges                           118323
burpees                          116681
stretching (triceps)             116485
stretching (lumbar rotation)     116101
push-ups (complex)               115867
stretching (hamstrings)          115116
jogging (skipping)               114037
stretching (shoulders)           112913
jogging (rotating arms)          109596
push-ups                         108250
jogging (butt-kicks)             107619
bench-dips                       105587
Name: count, dtype: Int64


Unnamed: 0,subject_id,session_id,timestamp_s,acc_x,acc_y,acc_z,gyro_x,gyro_y,gyro_z,activity_label_raw
0,S00,ses01,0.0,10.894856,0.63259,0.417429,,,,unknown
1,S00,ses01,0.02,11.536308,2.272242,-0.03143,,,,unknown
2,S00,ses01,0.04,11.017628,2.593674,-0.269356,,,,unknown


### Step 2. Map the data and audit the mapping

In [4]:
# ============================================
# STEP 2 — Quick audit: raw_label → mapped_id
# ============================================
if raw_wear.empty:
    raise SystemExit("No WEAR rows after loading. Check RAW path/layout.")

raw_counts = (
    raw_wear["activity_label_raw"]
        .astype(str).map(_keyize)
        .value_counts()
        .rename_axis("raw_label")
        .reset_index(name="count")
)

# Stable native ID map for this dataset (alphabetical + unknown = -1)
# Keep a reserved -1 for unknown to be explicit.
labels_sorted = sorted([rl for rl in raw_counts["raw_label"].unique() if rl != _keyize("unknown")])
NATIVE_LBL2ID = {_keyize("unknown"): UNKNOWN_ID}
NATIVE_LBL2ID.update({lbl: i for i, lbl in enumerate(labels_sorted, start=0)})

raw_counts["native_id"] = raw_counts["raw_label"].map(NATIVE_LBL2ID).astype(int)
raw_counts["mapped_gid"] = raw_counts["raw_label"].map(RAW2ID).fillna(UNKNOWN_ID).astype(int)
raw_counts["mapped_nm"]  = raw_counts["mapped_gid"].map(lambda x: ID2NAME.get(int(x), "other"))

unmapped = raw_counts.loc[raw_counts["mapped_gid"] == UNKNOWN_ID]
print(f"Raw label unique: {len(raw_counts)} | Unmapped→global: {len(unmapped)}")
print("Unmapped (top-10):")
print(unmapped.nlargest(10, "count")[["raw_label","count"]].to_string(index=False))
raw_counts.head(10)


Raw label unique: 19 | Unmapped→global: 1
Unmapped (top-10):
raw_label   count
  unknown 1377362


Unnamed: 0,raw_label,count,native_id,mapped_gid,mapped_nm
0,unknown,1377362,9000,9000,other
1,jogging,125158,2,3,run_jog
2,jogging (sidesteps),123374,5,3,run_jog
3,stretching (lunging),122021,15,12,stretching
4,lunges (complex),121820,8,7,exercise_lower
5,sit ups (complex),120555,12,8,exercise_core
6,sit ups,119535,11,8,exercise_core
7,lunges,118323,7,7,exercise_lower
8,burpees,116681,1,9,exercise_plyometric
9,stretching (triceps),116485,17,12,stretching


### Step 3. Build and clean dataset in stream json fromat

In [6]:
# =========================================================
# STEP 3 — Build schema-ordered continuous_stream (v3) df
# =========================================================
def to_continuous_stream_wear(df_raw: pd.DataFrame, dataset_name: str = "wear") -> pd.DataFrame:
    if df_raw.empty:
        return pd.DataFrame(columns=[c["name"] for c in SCHEMA["columns"]])

    # Native (dataset-level) ID/label: use stable map with 'unknown' → -1
    raw_key  = df_raw["activity_label_raw"].astype(str).map(_keyize)
    native_id  = raw_key.map(NATIVE_LBL2ID).astype(np.int16)
    native_lbl = df_raw["activity_label_raw"].astype("string")

    # Global map via activity_mapping.json
    gid    = raw_key.map(RAW2ID).fillna(UNKNOWN_ID).astype(np.int16)
    glabel = gid.map(lambda x: ID2NAME.get(int(x), "other")).astype("string")

    out = pd.DataFrame({
        "dataset":                dataset_name,
        "subject_id":             df_raw["subject_id"].astype("string"),
        "session_id":             df_raw["session_id"].astype("string"),
        "timestamp_ns":           (df_raw["timestamp_s"].astype(np.float64) * 1e9).round().astype("int64"),

        "acc_x": df_raw["acc_x"].astype("float32"),
        "acc_y": df_raw["acc_y"].astype("float32"),
        "acc_z": df_raw["acc_z"].astype("float32"),
        "gyro_x": df_raw["gyro_x"].astype("float32"),
        "gyro_y": df_raw["gyro_y"].astype("float32"),
        "gyro_z": df_raw["gyro_z"].astype("float32"),

        "global_activity_id":     gid,
        "global_activity_label":  glabel,

        "dataset_activity_id":    native_id,
        "dataset_activity_label": native_lbl,
    })

    # Enforce schema column order
    order = [c["name"] for c in SCHEMA["columns"]]
    return out[order]

wear_df = to_continuous_stream_wear(raw_wear, dataset_name="wear")
print("UNIFIED rows:", len(wear_df))
wear_df.head(3)

UNIFIED rows: 3466400


Unnamed: 0,dataset,subject_id,session_id,timestamp_ns,acc_x,acc_y,acc_z,gyro_x,gyro_y,gyro_z,global_activity_id,global_activity_label,dataset_activity_id,dataset_activity_label
0,wear,S00,ses01,0,10.894856,0.63259,0.417429,,,,9000,other,9000,unknown
1,wear,S00,ses01,20000000,11.536308,2.272242,-0.03143,,,,9000,other,9000,unknown
2,wear,S00,ses01,40000000,11.017628,2.593674,-0.269356,,,,9000,other,9000,unknown


### Step 4. Audit check the unified frame

In [7]:
# ==========================================
# STEP 4 — Contract checks & quick QA
# ==========================================
print("Subjects:", wear_df["subject_id"].nunique(),
      "| Sessions:", wear_df["session_id"].nunique())

# Monotonic timestamp per (subject, session)
viol = 0
for (_sid, _sess), g in wear_df.groupby(["subject_id","session_id"], sort=False):
    ts = g["timestamp_ns"].to_numpy()
    if ts.size and not np.all(np.diff(ts) >= 0):
        viol += 1
print("Monotonic violations (groups):", viol)

# Approx Hz from ns timestamps
def est_hz_ns(ts_ns: pd.Series):
    arr = ts_ns.to_numpy()
    if arr.size < 3: return np.nan
    dt = np.diff(arr) / 1e9  # ns → s
    dt = dt[(dt > 0) & np.isfinite(dt)]
    return float(np.median(1.0 / dt)) if dt.size else np.nan

hz = wear_df.groupby(["subject_id","session_id"])["timestamp_ns"].apply(est_hz_ns)
print(f"Median Hz: {np.nanmedian(hz.values):.2f} (target={SCHEMA['rate_hz']})")

# Required-not-null coverage
req = SCHEMA["expectations"]["required_not_null"]
pct = wear_df[req].notnull().all(axis=1).mean() * 100
print(f"Rows meeting required-not-null: {pct:.2f}%")

# Global mapping coverage
cov = (wear_df["global_activity_id"] != UNKNOWN_ID).mean() * 100
print(f"Global mapping coverage: {cov:.1f}% (unknown={UNKNOWN_ID})")

# Quick canonical label distribution
print("\nTop-15 canonical labels:")
print(wear_df["global_activity_label"].value_counts().head(15))

Subjects: 24 | Sessions: 1
Monotonic violations (groups): 0
Median Hz: 50.00 (target=50)
Rows meeting required-not-null: 100.00%
Global mapping coverage: 60.3% (unknown=9000)

Top-15 canonical labels:
global_activity_label
other                  1377362
stretching              582636
run_jog                 579784
exercise_upper          329704
exercise_lower          240143
exercise_core           240090
exercise_plyometric     116681
Name: count, dtype: Int64


### Step 5. Save outputs

In [8]:
# Persist
CLEANED.mkdir(parents=True, exist_ok=True)
out_path = CLEANED / "wear_clean_data.parquet"
wear_df.to_parquet(out_path, index=False)
print("Saved:", out_path)

Saved: /home/aidan/IMU_LM_Data/data/cleaned_premerge/wear_clean_data.parquet
