# egoEMOTION Dataset Exploration

*egoEMOTION* is the first dataset that couples egocentric visual and physiological signals with dense self-reports of emotion and personality across controlled and real-world scenarios. Participants completed emotion-elicitation tasks and naturalistic activities while self-reporting their affective state using the Circumplex Model and Mikels’ Wheel as well as their personality via the Big Five model.

In this notebook, we (i) load a selected participant’s data, (ii) compute quick summary statistics to sanity-check shapes, sampling rates, and missing values, then (iii) visualize the sensor streams and task segments, (iv) inspect the corresponding POV/webcam video frames alongside eye-tracking frames, and finally (v) review the participant’s self-report annotations for each activity.

Please refer to the `README.md` file in the [egoEMOTION](https://github.com/eth-siplab/egoEMOTION) repository for detailed information on each sensor stream (i.e., channel definition, sampling frequency, ...).

Important: Please change the variable "original_data_path" to the path where you saved the downloaded egoEMOTION dataset.

## Import relevant libraries

In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Patch
import cv2
import json
import subprocess
import pandas as pd

## Define participant and paths

In [None]:
participant = "037"
original_data_path = 'INSERT_PATH_TO_DOWNLOADED_DATASET'

## Load files from participant

In [None]:
sensor_streams = [
    "ecg_90fps", "eda_90fps", "et",
    "gaze_90fps", "imu_right_90fps",
    "intensity_90fps",
    "ppg_ear_90fps", "ppg_nose_90fps",
    "pupils_90fps", "rr_90fps", 
    "gaze", "pupils", 
    "intensity" 
]
video_streams = ["pov", "webcam"]

def load_npy(path: str):
    """Robust .npy loader: returns ndarray OR python object (dict) if stored that way."""
    obj = np.load(path, allow_pickle=True)
    # dict saved via np.save loads as 0-d object array
    if isinstance(obj, np.ndarray) and obj.dtype == object and obj.shape == ():
        try:
            obj = obj.item()
        except Exception:
            pass
    return obj

sensor_dict: dict[str, object] = {}
video_dict: dict[str, str] = {}

# --- Sensors (.npy) ---
for stream in sensor_streams:
    path = os.path.join(original_data_path, participant, f"{stream}.npy")
    if not os.path.exists(path):
        print(f"[WARN] missing sensor stream: {path}")
        continue
    sensor_dict[stream] = load_npy(path)

# --- Videos (.mp4) ---
for stream in video_streams:
    path = os.path.join(original_data_path, participant, f"{stream}.mp4")
    if not os.path.exists(path):
        print(f"[WARN] missing video stream: {path}")
        continue
    video_dict[stream] = path

# --- Task Times (.npy) ---
task_times = load_npy(os.path.join(original_data_path,'task_times.npy'))

# --- Self-reports (valence, arousal, dominance, discrete emotions) --- #
session_A_path = os.path.join(original_data_path, participant, f"Session_A_{participant}.csv")
session_B_path = os.path.join(original_data_path, participant, f"Session_B_{participant}.csv")

session_A_df = pd.read_csv(session_A_path) if os.path.exists(session_A_path) else None
if session_A_df is None:
    print(f"[WARN] missing session CSV: {session_A_path}")

session_B_df = pd.read_csv(session_B_path) if os.path.exists(session_B_path) else None
if session_B_df is None:
    print(f"[WARN] missing session CSV: {session_B_path}")

# --- Personality Questionnaire --- #
personality_df = pd.read_csv(os.path.join(original_data_path, "personality_questionnaire_results.csv"), sep=';')
if personality_df is None:
    print(f"[WARN] missing session CSV: {session_A_path}")


print("Loaded sensors:", list(sensor_dict.keys()))
print("Found videos:", list(video_dict.keys()))
print("Task times", list(task_times.keys()))
print("Session A loaded:", session_A_df is not None)
print("Session B loaded:", session_B_df is not None)
print("Personality questionnaire loaded:", personality_df is not None)

## Synchronization checks

In [None]:
def probe_video(path):
    # ---------- Best: ffprobe ----------
    try:
        cmd = [
            "ffprobe", "-v", "error",
            "-print_format", "json",
            "-show_format",
            "-show_streams",
            path
        ]
        out = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode("utf-8")
        meta = json.loads(out)

        # pick the first video stream
        vstreams = [s for s in meta.get("streams", []) if s.get("codec_type") == "video"]
        vs = vstreams[0] if vstreams else {}

        # duration: container format duration is usually most reliable
        dur = meta.get("format", {}).get("duration", None)
        dur = float(dur) if dur is not None else None

        # fps: try avg_frame_rate first
        def _parse_rate(r):
            if not r or r == "0/0": return None
            num, den = r.split("/")
            return float(num) / float(den) if float(den) != 0 else None

        fps = _parse_rate(vs.get("avg_frame_rate")) or _parse_rate(vs.get("r_frame_rate"))

        # nb_frames might be missing for some files
        nb_frames = vs.get("nb_frames", None)
        nb_frames = int(nb_frames) if nb_frames is not None and str(nb_frames).isdigit() else None

        w = vs.get("width", None)
        h = vs.get("height", None)

        return {
            "path": path,
            "readable": True,
            "fps": fps,
            "n_frames": nb_frames,
            "duration_s": dur,
            "size": (w, h),
            "source": "ffprobe",
        }

    except Exception:
        pass

    # ---------- Fallback: OpenCV (less reliable) ----------
    cap = cv2.VideoCapture(path)
    if not cap.isOpened():
        return {"path": path, "readable": False, "source": "opencv"}

    fps = cap.get(cv2.CAP_PROP_FPS)
    n_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    ok, _ = cap.read()
    cap.release()

    dur = (n_frames / fps) if fps and fps > 0 and n_frames > 0 else None
    return {
        "path": path,
        "readable": bool(ok),
        "fps": float(fps) if fps and fps > 0 else None,
        "n_frames": n_frames if n_frames > 0 else None,
        "duration_s": float(dur) if dur is not None else None,
        "size": (w, h),
        "source": "opencv",
    }


# -------------------- SENSOR QC --------------------

def _unwrap_stream(x):
    """Handle dict-like saved npy; otherwise return as-is."""
    if isinstance(x, dict):
        for k in ("data", "values", "samples", "signal", "x", "frames"):
            if k in x:
                return x[k]
    return x

def _to_2d(arr):
    """(N,) -> (N,1); (N,...) -> (N,C)"""
    arr = np.asarray(arr)
    if arr.ndim == 1:
        return arr.reshape(-1, 1)
    if arr.ndim == 2:
        return arr
    # e.g., ET frames (N,H,W) -> keep as 3D elsewhere; caller decides
    return arr

def infer_fs(name, default_fs=None):
    """Simple inference. Extend if you have known native rates."""
    n = name.lower()
    if "_90fps" in n or n == "et":
        return 90.0
    return default_fs

def sensor_summary(stream_dict, default_fs=None):
    """
    Prints fs + key QC stats per stream:
    - shape
    - n_samples, n_channels (where applicable)
    - inferred duration if fs known
    - NaN/Inf fraction
    """
    print("\n=== SENSOR STREAM QC ===")
    for name in sorted(stream_dict.keys()):
        raw = stream_dict[name]
        raw = _unwrap_stream(raw)
        arr = np.asarray(raw)

        fs = infer_fs(name, default_fs=default_fs)

        # ET / image-like (N,H,W)
        if arr.ndim == 3:
            N, H, W = arr.shape
            nan_frac = float(np.isnan(arr).mean()) if np.issubdtype(arr.dtype, np.floating) else 0.0
            inf_frac = float(np.isinf(arr).mean()) if np.issubdtype(arr.dtype, np.floating) else 0.0
            dur = (N / fs) if fs else None
            print(f"- {name:18s} shape={arr.shape} dtype={arr.dtype}  fs≈{fs}  dur≈{dur:.2f}s  NaN={nan_frac:.4f} Inf={inf_frac:.4f}")
            continue

        # Regular signals (N,C)
        arr2 = _to_2d(arr)
        if arr2.ndim != 2:
            print(f"- {name:18s} shape={arr.shape} dtype={arr.dtype}  (unhandled dims)")
            continue

        N, C = arr2.shape
        if np.issubdtype(arr2.dtype, np.floating):
            nan_frac = float(np.isnan(arr2).mean())
            inf_frac = float(np.isinf(arr2).mean())
        else:
            nan_frac = inf_frac = 0.0

        dur = (N / fs) if fs else None
        dur_str = f"{dur:.2f}s" if dur is not None else "n/a"
        fs_str = f"{fs:.3f}Hz" if fs is not None else "n/a"
        print(f"- {name:18s} shape={arr2.shape!s:12s} dtype={arr2.dtype}  fs≈{fs_str:>8s}  dur≈{dur_str:>8s}  NaN={nan_frac:.1f} Inf={inf_frac:.1f}")

# -------------------- FULL QC REPORT --------------------

def qc_report(participant_dir, sensor_dict, video_names=("pov.mp4", "webcam.mp4"), default_sensor_fs=None):
    # Sensors
    sensor_summary(sensor_dict, default_fs=default_sensor_fs)

    # Videos
    print("\n=== VIDEO QC ===")
    for vid in video_names:
        vp = os.path.join(participant_dir, vid)
        if not os.path.exists(vp):
            print(f"- {vid:10s} MISSING")
            continue
        info = probe_video(vp)
        print(f"- {vid:10s} {info}")

    # Compare ET duration to videos
    if "et" in sensor_dict:
        et = np.asarray(_unwrap_stream(sensor_dict["et"]))
        if et.ndim == 3:
            et_dur = et.shape[0] / 90.0
            print(f"\n=== ALIGNMENT QUICK CHECK ===")
            print(f"ET duration: {et_dur:.2f}s")
            for vid in video_names:
                vp = os.path.join(participant_dir, vid)
                if not os.path.exists(vp):
                    continue
                info = probe_video(vp)
                if info.get("duration_s") is not None:
                    diff = info["duration_s"] - et_dur
                    print(f"{vid}: duration {info['duration_s']:.2f}s  (video - ET = {diff:.2f}s)")

qc_report(os.path.join(original_data_path, participant), sensor_dict, default_sensor_fs=None)


## Summary statistics per task

In [None]:
# ---- config ----
EMO_COLS = ["Amused","Content","Excited","Awe","Neutral","Fear","Sad","Disgust","Anger"]

def _norm(s: str) -> str:
    # "TryNotToLaugh" == "try_not_to_laugh" == "try not to laugh"
    return str(s).strip().lower().replace(" ", "").replace("_", "")

def _match_self_row(df: pd.DataFrame, task: str):
    """
    Session A: task 'video_<Emotion>' -> match df['Video Emotion']
    Session B: task '<activity>' -> match df['Activity Name']
    Fallback: try df['Video Name'] / df['Activity Name'] exact (normalized) match.
    Returns pd.Series or None.
    """
    if df is None or df.empty:
        return None

    t = str(task)

    # Session A: video_* tasks
    if t.lower().startswith("video_") and "Video Emotion" in df.columns:
        target = _norm(t.split("_", 1)[1])
        hit = df[df["Video Emotion"].astype(str).map(_norm) == target]
        return hit.iloc[0] if len(hit) else None

    # Session B: activity tasks
    if "Activity Name" in df.columns:
        target = _norm(t)
        hit = df[df["Activity Name"].astype(str).map(_norm) == target]
        return hit.iloc[0] if len(hit) else None

    # Fallback: try Video Name if present
    if "Video Name" in df.columns:
        target = _norm(t)
        hit = df[df["Video Name"].astype(str).map(_norm) == target]
        return hit.iloc[0] if len(hit) else None

    return None

def _asarray(x):
    if isinstance(x, dict):
        for k in ("data","values","samples","signal","x","frames"):
            if k in x:
                x = x[k]; break
    return np.asarray(x)

def _slice_1d_or_2d(arr, s, e):
    arr = _asarray(arr)
    N = arr.shape[0]
    s = max(0, min(N, int(s)))
    e = max(0, min(N, int(e)))
    if e <= s:
        return None
    return arr[s:e]

def _mean_std(arr):
    if arr is None:
        return None
    a = np.asarray(arr)
    if a.ndim == 1:
        a = a.astype(np.float64)
        return float(np.nanmean(a)), float(np.nanstd(a))
    if a.ndim == 2:
        a = a.astype(np.float64)
        return (np.nanmean(a, axis=0), np.nanstd(a, axis=0))  # per-channel
    return None

def print_task_block(task, s, e, sensor_dict, self_row=None, fs=90.0):
    dur_s = max(0, e - s) / fs
    print(f"\n------------------------------- Task: {task} -------------------------------")
    print(f"duration: {dur_s:.2f}s  (idx {s} → {e})")

    # sensors: mean/std
    for stream, obj in sensor_dict.items():
        # skip non-timeseries here (ET is image frames)
        if stream == "et":
            continue
        chunk = _slice_1d_or_2d(obj, s, e)
        ms = _mean_std(chunk)
        if ms is None:
            print(f"{stream}: n/a")
            continue

        mean, std = ms
        if isinstance(mean, np.ndarray):  # multi-channel
            # print compactly: mean/std per channel
            mean_str = np.array2string(mean, precision=3, separator=", ", max_line_width=140)
            std_str  = np.array2string(std,  precision=3, separator=", ", max_line_width=140)
            print(f"{stream}: mean={mean_str}  std={std_str}")
        else:
            print(f"{stream}: mean={mean:.4f}  std={std:.4f}")

    # self-reports
    print("\nself-reports:")
    if self_row is None:
        print("valence: n/a")
        print("arousal: n/a")
        print("dominance: n/a")
        print("\nemotions: n/a")
        return

    # VAD
    v = self_row.get("Valence", None)
    a = self_row.get("Arousal", None)
    d = self_row.get("Dominance", None)
    print(f"valence: {v if pd.notna(v) else 'n/a'}")
    print(f"arousal: {a if pd.notna(a) else 'n/a'}")
    print(f"dominance: {d if pd.notna(d) else 'n/a'}")

    # discrete emotions (use the emotion columns as given)
    print("\nemotions:")
    for c in EMO_COLS:
        if c in self_row.index:
            val = self_row[c]
            # keep original scale (often 0..1)
            print(f"{c.lower()}: {float(val):.1f}" if pd.notna(val) else f"{c.lower()}: n/a")

def print_session_summary(participant, task_times, sensor_dict, session_df, session_name, fs=90.0):
    segs = task_times[participant]
    shift = int(segs["session_A"][0])

    # decide which tasks belong to this session by intersection with [session_start, session_end]
    sess_key = "session_A" if session_name.lower().endswith("a") else "session_B"
    if sess_key not in segs:
        raise KeyError(f"{sess_key} not found in task_times[{participant}]")
    sess_s, sess_e = map(int, segs[sess_key])
    sess_s -= shift
    sess_e -= shift

    # build task list (exclude session_A/B)
    tasks = []
    for k,(s,e) in segs.items():
        if k in ("session_A","session_B"):
            continue
        s0, e0 = int(s)-shift, int(e)-shift
        # keep tasks that overlap the chosen session window
        if e0 > sess_s and s0 < sess_e:
            tasks.append((k, s0, e0))
    tasks.sort(key=lambda x: x[1])

    # prep session df
    df = session_df.copy() if session_df is not None else None
    if df is not None:
        df.columns = [c.strip() for c in df.columns]
        for c in ["Valence","Arousal","Dominance"] + [c for c in EMO_COLS if c in df.columns]:
            if c in df.columns:
                df[c] = pd.to_numeric(df[c], errors="coerce")
        if "Video Emotion" in df.columns:
            df["Video Emotion"] = df["Video Emotion"].astype(str).str.strip()

    print(f"\n================ {session_name} ==================")

    for task, s, e in tasks:
        row = _match_self_row(df, task)
        print_task_block(task, s, e, sensor_dict, self_row=row, fs=fs)


print_session_summary(participant, task_times, sensor_dict, session_A_df, "Session A", fs=90.0)
print_session_summary(participant, task_times, sensor_dict, session_B_df, "Session B", fs=90.0)


## Sensor streams visualization

Important: These are unfiltered plots of the physiological data. Please filter them and zoom in accordingly to analyze them.

In [None]:
def plot_stream(stream_dict, name, n_channels, title=None, task_times=task_times, participant=participant, alpha=0.3):
    """
    Plot the first n_channels of a stream on the same axes.
    Optionally overlays activity segments from task_times[participant] as colored backgrounds.
    X-axis = sample index.
    """
    if name not in stream_dict:
        raise KeyError(f"{name} not in stream_dict. Available: {list(stream_dict.keys())}")

    data = stream_dict[name]
    if isinstance(data, dict):
        for k in ("data", "values", "samples", "signal", "x"):
            if k in data:
                data = data[k]
                break
        else:
            raise ValueError(f"{name} is a dict but no data key found.")

    arr = np.asarray(data)
    if arr.ndim == 1:
        arr = arr.reshape(-1, 1)
    elif arr.ndim > 2:
        arr = arr.reshape(arr.shape[0], -1)

    N, C = arr.shape
    use_c = min(n_channels, C)

    fig, ax = plt.subplots(figsize=(12, 4))

    # --- Activity backgrounds ---
    if task_times is not None and participant is not None and participant in task_times:
        segs = task_times[participant]
        if "session_A" not in segs or segs["session_A"] is None:
            raise KeyError(f"task_times['{participant}'] must contain 'session_A'=[start,end] to shift indices.")
        sessionA0 = int(segs["session_A"][0])

        # labels excluding sessions
        labels = sorted([k for k in segs.keys() if k not in ("session_A", "session_B")])

        cmap = plt.get_cmap("tab20")
        color_map = {lbl: cmap(i % cmap.N) for i, lbl in enumerate(labels)}
        task_handles = [Patch(facecolor=color_map[lbl], edgecolor="none", alpha=alpha, label=lbl) for lbl in labels]

        # draw spans in start-time order (after shift)
        spans = []
        for lbl in labels:
            s, e = segs[lbl]
            s = int(s) - sessionA0
            e = int(e) - sessionA0
            spans.append((lbl, s, e))
        spans.sort(key=lambda x: x[1])

        for lbl, s, e in spans:
            if e <= 0 or s >= N:
                continue
            s = max(0, s)
            e = min(N - 1, e)
            col = color_map[lbl]
            ax.axvspan(s, e, color=col, alpha=alpha, linewidth=0)
            ax.axvline(s, color=col, alpha=min(alpha * 4, 0.6), linewidth=1)
            ax.axvline(e, color=col, alpha=min(alpha * 4, 0.6), linewidth=1)

    # --- Signals ---
    for ch in range(use_c):
        ax.plot(arr[:, ch], label=f"ch{ch}")

    ax.set_title(title or f"{name} | N={N} (plotted {use_c}/{C} channels)")
    ax.set_xlabel("sample index")
    ax.set_ylabel("value")

    # Channel legend (top-right) if few channels
    if use_c <= 12:
        ax.legend(ncols=min(use_c, 6), fontsize=8, loc="upper right")

    # Task legend (right side)
    if task_handles:
        ax.legend(
            handles=task_handles,
            fontsize=7,
            loc="upper left",
            bbox_to_anchor=(1.01, 1.0),
            borderaxespad=0.0,
            title="Tasks",
            title_fontsize=8,
        )

    plt.tight_layout()
    plt.show()


# ECG: 1 signal
plot_stream(sensor_dict, "ecg_90fps", n_channels=1, title="ECG (1 channel)")

# EDA: 1 signal
plot_stream(sensor_dict, "eda_90fps", n_channels=1, title="EDA (1 channel)")

# GAZE: 2 signals
# plot_stream(sensor_dict, "gaze_90fps", n_channels=2, title="GAZE (2 channels)")

# IMU: 6 signals
plot_stream(sensor_dict, "imu_right_90fps", n_channels=6, title="IMU (6 channels)")

# Intensity: 1 signal
plot_stream(sensor_dict, "intensity_90fps", n_channels=1, title="Intensity (1 channel)")

# PPG EAR: 1 signal
plot_stream(sensor_dict, "ppg_ear_90fps", n_channels=1, title="PPG EAR (1 channel)")

# PPG NOSE: 1 signal
plot_stream(sensor_dict, "ppg_nose_90fps", n_channels=1, title="PPG NOSE (1 channel)")

# PUPILS: 2 signal
plot_stream(sensor_dict, "pupils_90fps", n_channels=2, title="PUPILS (2 channels)")

# RR: 1 signal
plot_stream(sensor_dict, "rr_90fps", n_channels=1, title="RR (1 channel)")


## Eye-tracking, POV and face webcam videos visualization

In [None]:

def _sharpness_score(img_2d: np.ndarray) -> float:
    img = img_2d.astype(np.float32)
    gy, gx = np.gradient(img)
    lap = np.gradient(gx, axis=1) + np.gradient(gy, axis=0)
    return float(np.var(lap))

def _read_frame_by_index(cap: cv2.VideoCapture, frame_idx: int):
    cap.set(cv2.CAP_PROP_POS_FRAMES, int(frame_idx))
    ok, frame = cap.read()
    if not ok:
        return None
    return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

def show_et_with_videos_by_task(
    stream_dict: dict,
    task_times: dict,
    participant: str,
    pov_path: str,
    webcam_path: str,
    et_fps: float = 90.0,
    pov_fps: float = 10.0,
    webcam_fps: float = 60.0,
    exclude=("session_A", "session_B"),
    stride: int = 20,
    figsize_row: float = 2.8,
):
    """
    For each task:
      - pick a representative ET frame (sharpest within segment)
      - show ET + closest POV frame + closest Webcam frame side-by-side

    Index mapping:
      t = et_idx / et_fps + offset_s
      pov_frame   = round(t * pov_fps)
      webcam_frame= round(t * webcam_fps)

    Also applies shift task indices by session_A[0].
    """

    # --- get ET frames from stream_dict['et'] ---
    if "et" not in stream_dict:
        raise KeyError(f"'et' not in stream_dict. Available: {list(stream_dict.keys())}")

    et = stream_dict["et"]
    if isinstance(et, dict):
        for k in ("frames", "data", "values", "samples", "x"):
            if k in et:
                et = et[k]
                break

    frames = np.asarray(et)
    if frames.ndim != 3:
        raise ValueError(f"Expected et frames with shape (N,H,W), got {frames.shape}")

    segs = task_times[participant]
    sessionA0 = int(segs["session_A"][0])

    tasks = [k for k in segs.keys() if k not in exclude]
    tasks.sort(key=lambda k: segs[k][0])

    # open videos once
    cap_pov = cv2.VideoCapture(pov_path)
    cap_web = cv2.VideoCapture(webcam_path)
    if not cap_pov.isOpened():
        raise RuntimeError(f"Could not open POV video: {pov_path}")
    if not cap_web.isOpened():
        raise RuntimeError(f"Could not open webcam video: {webcam_path}")

    pov_n = int(cap_pov.get(cv2.CAP_PROP_FRAME_COUNT)) or None
    web_n = int(cap_web.get(cv2.CAP_PROP_FRAME_COUNT)) or None

    picks = []
    N = frames.shape[0]

    for t in tasks:
        s, e = map(int, segs[t])
        s = s - sessionA0
        e = e - sessionA0
        s = max(0, s)
        e = min(N - 1, e)
        if e <= s:
            continue

        best_i, best_sc = s, -1.0
        for i in range(s, e + 1, stride):
            sc = _sharpness_score(frames[i])
            if sc > best_sc:
                best_sc, best_i = sc, i

        # map ET index -> video frame indices via time
        t_sec = (best_i / et_fps)
        pov_idx = int(round(t_sec * pov_fps))
        web_idx = int(round(t_sec * webcam_fps))

        # clamp (if frame counts are available)
        if pov_n is not None:
            pov_idx = max(0, min(pov_n - 1, pov_idx))
        else:
            pov_idx = max(0, pov_idx)

        if web_n is not None:
            web_idx = max(0, min(web_n - 1, web_idx))
        else:
            web_idx = max(0, web_idx)

        picks.append((t, best_i, pov_idx, web_idx))

    # plot: one row per task, 3 columns (ET / POV / Webcam)
    rows = len(picks)
    fig, axes = plt.subplots(rows, 3, figsize=(12, max(2.5, rows * figsize_row)))

    if rows == 1:
        axes = np.array([axes])

    for r, (t, et_i, pov_i, web_i) in enumerate(picks):
        et_img = frames[et_i]
        pov_img = _read_frame_by_index(cap_pov, pov_i)
        web_img = _read_frame_by_index(cap_web, web_i)

        ax0, ax1, ax2 = axes[r]

        ax0.imshow(et_img, cmap="gray", interpolation="nearest")
        ax0.set_title(f"{t}\nET idx={et_i}", fontsize=9)
        ax0.axis("off")

        if pov_img is None:
            ax1.text(0.5, 0.5, "POV read failed", ha="center", va="center")
        else:
            ax1.imshow(pov_img)
        ax1.set_title(f"POV idx={pov_i}", fontsize=9)
        ax1.axis("off")

        if web_img is None:
            ax2.text(0.5, 0.5, "Webcam read failed", ha="center", va="center")
        else:
            ax2.imshow(web_img)
        ax2.set_title(f"Webcam idx={web_i}", fontsize=9)
        ax2.axis("off")

    plt.subplots_adjust(hspace=0.10, wspace=0.02)
    plt.show()

    cap_pov.release()
    cap_web.release()

show_et_with_videos_by_task(
    sensor_dict, task_times, participant=participant,
    pov_path=f"{original_data_path}/{participant}/pov.mp4", webcam_path=f"{original_data_path}/{participant}/webcam.mp4",
)

## Self-reports Visualization

In [None]:
# ----------------------------
# Config
# ----------------------------
EMO_COLS = ["Amused","Content","Excited","Awe","Neutral","Fear","Sad","Disgust","Anger"]
VAD_COLS = ["Valence","Arousal","Dominance"]
BIG5 = ["Extraversion", "Agreeableness", "Conscientiousness", "Negative Emotionality", "Open-Mindedness"]

def _norm(s: str) -> str:
    return str(s).strip().lower().replace(" ", "").replace("_", "")

def _to_float(x):
    if pd.isna(x):
        return np.nan
    s = str(x).strip()
    if s == "" or s.lower() == "nan":
        return np.nan
    return float(s.replace(",", "."))

# ----------------------------
# Task list from task_times (shift by session_A[0], skip session_A/B)
# ----------------------------
def get_session_tasks(task_times, participant, session_key):
    segs = task_times[participant]
    shift = int(segs["session_A"][0])

    sess_s, sess_e = map(int, segs[session_key])
    sess_s -= shift
    sess_e -= shift

    tasks = []
    for k, (s, e) in segs.items():
        if k in ("session_A", "session_B"):
            continue
        s0, e0 = int(s) - shift, int(e) - shift
        if e0 > sess_s and s0 < sess_e:
            tasks.append((k, s0, e0))
    tasks.sort(key=lambda x: x[1])
    return tasks  # list of (task, start, end)

# ----------------------------
# Self-report aggregation
# ----------------------------
def prepare_self_df(df):
    if df is None:
        return None
    df = df.copy()
    df.columns = [c.strip() for c in df.columns]

    for c in VAD_COLS + [c for c in EMO_COLS if c in df.columns]:
        if c in df.columns:
            df[c] = pd.to_numeric(df[c], errors="coerce")

    for c in ("Video Emotion", "Video Name", "Activity Name"):
        if c in df.columns:
            df[c] = df[c].astype(str).str.strip()
            df[f"_{c}_norm"] = df[c].map(_norm)

    return df

def aggregate_self_report_for_task(df, task):
    """
    Returns dict with keys VAD + EMO_COLS aggregated for the task.
    - Session A: task 'video_<Emotion>' matches Video Emotion (can aggregate multiple rows).
    - Session B: task '<activity>' matches Activity Name (can aggregate duplicates).
    """
    if df is None or df.empty:
        return None

    task = str(task)

    if task.lower().startswith("video_") and "_Video Emotion_norm" in df.columns:
        target = _norm(task.split("_", 1)[1])
        sub = df[df["_Video Emotion_norm"] == target]
    elif "_Activity Name_norm" in df.columns:
        target = _norm(task)
        sub = df[df["_Activity Name_norm"] == target]
    elif "_Video Name_norm" in df.columns:
        target = _norm(task)
        sub = df[df["_Video Name_norm"] == target]
    else:
        sub = df.iloc[0:0]

    if sub.empty:
        return None

    out = {}
    for c in VAD_COLS:
        out[c] = float(sub[c].mean()) if c in sub.columns else np.nan
    for c in EMO_COLS:
        out[c] = float(sub[c].mean()) if c in sub.columns else np.nan
    return out

# ----------------------------
# Plotting helpers
# ----------------------------
def radar_plot(values, title, labels, ax=None, rlim=None, fill_alpha=0.12):
    """
    Radar plot that can draw into an existing polar axis (ax=...) or create a new fig if ax is None.
    values: list/array numeric (len == len(labels))
    """
    values = np.array(values, dtype=float)
    values = np.nan_to_num(values, nan=0.0)

    angles = np.linspace(0, 2*np.pi, len(labels), endpoint=False)
    values = np.r_[values, values[0]]
    angles = np.r_[angles, angles[0]]

    created = False
    if ax is None:
        fig, ax = plt.subplots(figsize=(5.4, 5.4), subplot_kw={"polar": True})
        created = True

    ax.plot(angles, values)
    ax.fill(angles, values, alpha=fill_alpha)
    ax.set_xticks(angles[:-1])
    ax.set_xticklabels(labels, fontsize=10)
    ax.set_title(title, fontsize=12)

    if rlim is not None:
        ax.set_ylim(*rlim)

    if created:
        plt.tight_layout()
        plt.show()

def plot_task_self_report(task, agg):
    """One task: spider for emotions + bar for V/A/D."""
    if agg is None:
        print(f"[WARN] No self-report found for task: {task}")
        return

    fig = plt.figure(figsize=(10, 4))

    ax1 = fig.add_subplot(1, 2, 1, polar=True)
    emo_vals = [agg.get(c, np.nan) for c in EMO_COLS]
    radar_plot(emo_vals, f"{task} - Discrete Emotions", EMO_COLS, ax=ax1, rlim=(0, 1))

    ax2 = fig.add_subplot(1, 2, 2)
    vad_vals = [agg.get(c, np.nan) for c in VAD_COLS]
    ax2.bar(VAD_COLS, [0 if np.isnan(v) else v for v in vad_vals])
    ax2.set_ylim(1, 7)
    ax2.set_title(f"{task} - Continuous Affect", fontsize=11)
    ax2.set_ylabel("rating")

    plt.tight_layout()
    plt.show()

def plot_session_self_reports(session_name, session_df, tasks):
    """
    tasks: list of (task, start, end) - we only use task name for self-report viz.
    """
    df = prepare_self_df(session_df)
    print(f"\n================ {session_name} — Self-report visualizations ================")
    for task, _, _ in tasks:
        agg = aggregate_self_report_for_task(df, task)
        plot_task_self_report(task, agg)

# ----------------------------
# Personality
# ----------------------------
def personality_tables_from_wide(personality_df):
    df = personality_df.copy()

    # participant IDs are in row 0, from col 1 onward
    id_row = df.iloc[0]
    value_cols = df.columns[1:]
    participant_ids = [str(id_row[c]).strip() for c in value_cols]
    col_map = {col: pid for col, pid in zip(value_cols, participant_ids)}

    # find separator row "T-score" in first column
    sep_idx = df.index[df.iloc[:, 0].astype(str).str.strip().str.lower() == "t-score"]
    if len(sep_idx) == 0:
        raise ValueError("Could not find the 'T-score' separator row.")
    sep = int(sep_idx[0])

    def build_section(start, end):
        block = df.iloc[start:end].copy()
        trait = block.iloc[:, 0].astype(str).str.strip()
        block = block[trait.notna() & (trait != "")].copy()
        block.iloc[:, 0] = block.iloc[:, 0].astype(str).str.strip()

        vals = block.loc[:, value_cols].rename(columns=col_map)
        vals = vals.map(_to_float)
        vals.index = block.iloc[:, 0].values
        return vals

    mean_df = build_section(start=1, end=sep)
    tscore_df = build_section(start=sep+1, end=len(df))
    return mean_df, tscore_df

def plot_personality_from_wide(personality_df, participant="005", use="T-score"):
    mean_df, t_df = personality_tables_from_wide(personality_df)
    pid = str(participant).strip()
    use_l = use.strip().lower()

    if use_l.startswith("t"):
        table = t_df
        rlim = (20, 80)
        name = "T-score"
    else:
        table = mean_df
        rlim = None
        name = "Mean"

    if pid not in table.columns:
        raise KeyError(f"Participant '{pid}' not found. Example columns: {list(table.columns)[:10]}")

    vals = [table.loc[t, pid] if t in table.index else np.nan for t in BIG5]
    radar_plot(vals, f"Personality ({name}) — {pid}", BIG5, rlim=rlim)


# Personality
plot_personality_from_wide(personality_df, participant=participant, use="T-score")
plot_personality_from_wide(personality_df, participant=participant, use="Mean")

# Self-reports by task
tasks_A = get_session_tasks(task_times, participant, "session_A")
tasks_B = get_session_tasks(task_times, participant, "session_B")

plot_session_self_reports("Session A", session_A_df, tasks_A)
plot_session_self_reports("Session B", session_B_df, tasks_B)