# Setup

## Function to read iMotions sensor file

In [2]:
import os
from pathlib import Path
import pandas as pd
import numpy as np

from wbdlib.io import safe_write_csv

def _extract_imotions_metadata(path, metadata=None):
    """Read leading metadata lines from an iMotions CSV without loading the data."""
    metadata = metadata or []
    requested = set(metadata) if metadata else None
    meta_lines = []
    header_rows = 0
    with open(path, "r", encoding="latin1") as file:
        while True:
            line = file.readline()
            if not line:
                break
            first_cell = line.split(",", 1)[0]
            if "#" in first_cell:
                meta_lines.append(line)
                header_rows += 1
            else:
                break
    meta_dict = {}
    for raw_line in meta_lines:
        segments = raw_line.strip().split("#", 1)
        if len(segments) < 2:
            continue
        cleaned = segments[1]
        parts = cleaned.split(",")
        if len(parts) > 1:
            key = parts[0].strip()
            value = ",".join(parts[1:]).strip()
            if requested is None or key in requested:
                meta_dict[key] = value
    return meta_dict, header_rows

def read_imotions_metadata(path, metadata=None):
    """Return only the requested metadata from an iMotions CSV."""
    meta_dict, _ = _extract_imotions_metadata(path, metadata)
    return meta_dict

def read_imotions(path, metadata=None):
    """
    Reads an iMotions CSV file while extracting optional metadata fields.

    Parameters:
        path (str): Path to the iMotions CSV file.
        metadata (list[str], optional): List of metadata keys to extract.

    Returns:
        df (pd.DataFrame): The data as a DataFrame.
        meta_dict (dict): Dictionary containing requested metadata fields.
    """
    meta_dict, header_rows = _extract_imotions_metadata(path, metadata)
    df = pd.read_csv(path, header=header_rows, low_memory=True)
    return df, meta_dict

def get_files(folder, tags=['',]):
    return [f for f in os.listdir(folder) if not f.startswith('.') and all(x in f for x in tags)] 


def get_biometric_data(in_folder, results_folder):

    ######## Define ########
    # Define paths
    out_path = f"{results_folder}/"
    os.makedirs(out_path, exist_ok=True)

    respondents = [1,2,3] #define list of respondent ids

    # Define signal columns
    cols_afdex = [
                "Anger", "Contempt", "Disgust", "Fear", "Joy", "Sadness",
                "Surprise", "Engagement", "Valence", "Sentimentality",
                "Confusion", "Neutral"
        ]
    cols_eeg = ['High Engagement',
        'Low Engagement',
        'Distraction',
        'Drowsy',
        'Workload Average',
        'Frontal Asymmetry Alpha',
        ]


    #Define window lengths in seconds
    window_lengths = [3,]

    ######## Read Inputs #######
    #Get input files
    sensor_files = get_files(f'{in_folder}/Sensors/',tags=['.csv',])

    ### Begin ###

    results = []
    errors = []
    for respondent in respondents:
        error = {'respondent':respondent, 'FAC':None, 'EEG':None, 'GSR':None, 'Blinks':None, 'ET':None}
        interaction = {'respondent':respondent}
        try:
            file = [f for f in sensor_files if respondent in f][0] #may need adjustment
            df_sens_resp,_ = read_imotions(f'{in_folder}/Sensors/{file}')

            # Get sensor data per stimulus
            for task in df_sens_resp['SourceStimuliName'].unique():
                df_sens_task = df_sens_resp.loc[(df_sens_resp['SourceStimuliName']==task)]
                window = task

                # Get facial coding data
                for a in cols_afdex:
                    try:
                        interaction[f'sens_{window}_FAC_{a}_mean']=df_sens_task[a].dropna().mean()
                        auc_data = df_sens_task[['Timestamp',a]].dropna()
                        interaction[f'sens_{window}_FAC_{a}_AUC']=np.trapz(auc_data[a],x=auc_data['Timestamp'])/1000
                        interaction[f'sens_{window}_FAC_{a}_Binary']=df_sens_task[a].dropna().max()>= 50
                    except:
                        error['FAC']='Missing'

                for e in cols_eeg:
                    try:
                        interaction[f'sens_{window}_EEG_{e}_mean']=df_sens_task[e].dropna()[df_sens_int[e] > -9000].mean()
                        auc_data = df_sens_task.loc[df_sens_task[e].notna() & (df_sens_task[e] > -9000), ['Timestamp', e]]
                        interaction[f'sens_{window}_EEG_{e}_AUC']=np.trapz(auc_data[e],x=auc_data['Timestamp'])/1000
                    except:
                        error['EEG']='Missing'

                try:
                    interaction[f'sens_{window}_GSR_PeakDetected_Binary'] =1 if df_sens_task['Peak Detected'].sum()>0 else 0
                    gsr_data = df_sens_task[['Timestamp','Peak Detected']].dropna()
                    mask = gsr_data['Peak Detected'] == 1
                    segments = (mask != mask.shift()).cumsum()  # Assign unique numbers to patches
                    count_patches = gsr_data.loc[mask, 'Peak Detected'].groupby(segments).ngroup().nunique()
                    interaction[f'sens_{window}_GSR_Peaks_Count'] =count_patches
                except:
                    error['GSR']='Missing'

                try:
                    blink_data = df_sens_task[['Timestamp','Blink Detected']].dropna()
                    mask = blink_data['Blink Detected'] == 1
                    segments = (mask != mask.shift()).cumsum()  # Assign unique numbers to patches
                    count_patches = blink_data.loc[mask, 'Blink Detected'].groupby(segments).ngroup().nunique()
                    interaction[f'sens_{window}_ET_Blink_Count'] =count_patches
                    interaction[f'sens_{window}_ET_Blink_Rate'] =count_patches/((df_sens_task['Timestamp'].values[-1]-df_sens_task['Timestamp'].values[0])/(1000 * 60))
                except:
                    error['ET']='Missing'

            # TODO Get sensor data for non-interaction
            ##################################### Add this in
            results.append(interaction)
            errors.append(error)

            pass
        except IndexError:
            print(f'>>> Could not find {respondent} sensor data')
        except:
            print(f'>>> Failed {respondent}')

    results = pd.DataFrame(results)
    results.to_csv(f'{out_path}biometric_results.csv')

    errors = pd.DataFrame(errors)
    errors.to_csv(f'{out_path}errors_biometric.csv')

project_root = Path.cwd().parent
data_export_dir = project_root / "data" / "Export"

## Explanation of functions

The above functions are used to read in the sesor data files, one csv at a time, and extract single features per stimulus, and write these features to a simple results file.

The functions must be adjusted to:
- Discern between long form and short form
- Isolate key moments from timings file provided by client
- Extract time series
- Compute group-wide features such as inter-subject correlation

# Preparation
- Create naming dictionary for all stims
- Get total times of all stims
- Prepare key_moments

In [3]:
# Locate one sensor export per group for duration scanning


group_sensor_files = {}
for group_dir in sorted(data_export_dir.glob("Group *")):
    if not group_dir.is_dir():
        continue
    sensor_dirs = sorted(group_dir.glob("Analyses/*/Sensor Data"))
    csv_candidates = []
    for sensor_dir in sensor_dirs:
        csv_candidates.extend(sorted(sensor_dir.glob("*.csv")))
    group_sensor_files[group_dir.name] = csv_candidates[0] if csv_candidates else None

sensor_selection = pd.DataFrame([
    {
        "group": group,
        "sensor_file": path.name if path else None
    }
    for group, path in group_sensor_files.items()
]).sort_values("group").reset_index(drop=True)

sensor_selection

Unnamed: 0,group,sensor_file
0,Group A,001_116.csv
1,Group B,001_58.csv
2,Group C,001_114.csv
3,Group D,001_102.csv
4,Group E,001_108.csv
5,Group F,001_107.csv


In [4]:
# Collect per-group stimulus durations without aggregating across groups
duration_tables = []
issues = {}

for group, path in group_sensor_files.items():
    if path is None:
        issues[group] = "No sensor CSV found"
        continue
    try:
        df_group, _ = read_imotions(path)
    except Exception as exc:
        issues[group] = f"read_imotions failed: {exc}"
        continue

    required_cols = {"SourceStimuliName", "Timestamp"}
    if not required_cols.issubset(df_group.columns):
        issues[group] = "Missing SourceStimuliName or Timestamp"
        continue
    df_clean = df_group[["SourceStimuliName", "Timestamp"]].copy()
    df_clean = df_clean.dropna(subset=["SourceStimuliName"])
    df_clean["Timestamp"] = pd.to_numeric(df_clean["Timestamp"], errors="coerce")
    df_clean = df_clean.dropna(subset=["Timestamp"])
    if df_clean.empty:
        issues[group] = "No valid timestamp data"
        continue

    group_duration = (
        df_clean.groupby("SourceStimuliName")["Timestamp"]
        .apply(lambda s: s.max() - s.min())
        .reset_index(name="duration_ms")
    )

    if group_duration.empty:
        issues[group] = "No stimuli with duration"
        continue

    group_duration["duration_seconds"] = group_duration["duration_ms"] / 1000.0
    group_duration["duration_minutes"] = group_duration["duration_seconds"] / 60.0
    group_duration.insert(0, "group", group)
    group_duration.rename(columns={"SourceStimuliName": "stimulus_name"}, inplace=True)
    duration_tables.append(group_duration[["group", "stimulus_name", "duration_seconds", "duration_minutes"]])

if duration_tables:
    stimulus_summary = pd.concat(duration_tables, ignore_index=True)
    stimulus_summary.sort_values(["group", "stimulus_name"], inplace=True)
    stimulus_summary["duration_seconds"] = stimulus_summary["duration_seconds"].round(2)
    stimulus_summary["duration_minutes"] = stimulus_summary["duration_minutes"].round(2)
    stimulus_summary.reset_index(drop=True, inplace=True)
    stimulus_summary
else:
    print("No duration records computed.")

if issues:
    pd.DataFrame(
        {"group": list(issues.keys()), "issue": list(issues.values())}
    ).sort_values("group").reset_index(drop=True)

  df = pd.read_csv(path, header=header_rows, low_memory=True)
  df = pd.read_csv(path, header=header_rows, low_memory=True)
  df = pd.read_csv(path, header=header_rows, low_memory=True)
  df = pd.read_csv(path, header=header_rows, low_memory=True)
  df = pd.read_csv(path, header=header_rows, low_memory=True)
  df = pd.read_csv(path, header=header_rows, low_memory=True)


In [5]:
stimulus_summary.head()

Unnamed: 0,group,stimulus_name,duration_seconds,duration_minutes
0,Group A,A STAR IS BORN,248.67,4.14
1,Group A,HOME ALONE,115.12,1.92
2,Group A,MAD MAX FURY ROAD,226.47,3.77
3,Group A,THE CONJURING,171.31,2.86
4,Group A,THE TOWN,1744.51,29.08


In [6]:
issues

{}

In [7]:
stimulus_summary.shape

(36, 4)

In [8]:
stimulus_summary['duration_seconds'].agg(['min','max']).round(2)

min      59.82
max    1811.54
Name: duration_seconds, dtype: float64

In [9]:
stimuli_per_group = stimulus_summary.groupby('group')['stimulus_name'].nunique().reset_index(name='unique_stimuli')
stimuli_per_group

Unnamed: 0,group,unique_stimuli
0,Group A,6
1,Group B,6
2,Group C,6
3,Group D,6
4,Group E,6
5,Group F,6


In [10]:
stimulus_summary_path = project_root / "results" / "stimulus_summary_biometric.csv"
safe_write_csv(stimulus_summary, stimulus_summary_path)
stimulus_summary_path

WindowsPath('c:/Users/ashra/Documents/NeuralSense/NeuralData/clients/544_WBD_CXCU/results/stimulus_summary_biometric.csv')

# Feature Extraction

## Stimulus Annotation Overview
- `stimulus_rename` links each group-specific stimulus from `stimulus_summary` to a clean `title` and its presentation `Form` (`Long` or `Short`).
- Some titles appear in both forms; the long cut (≈30 min) includes the short-form key moment as an embedded segment.
- `key_moments` pinpoints, for every long-form title, when the key moment begins (`Lead-up Duration`) and how long it lasts (`Key moment Duration_LF`).
- These tables let us align short-form clips with the corresponding segment inside the long-form presentation for downstream comparisons.

## Stage 1: Demographics
We reuse the vetted Stage 1 export produced in `analysis/assemble_uv.ipynb`.
Loading the respondent roster directly from `results/uv_stage1.csv` keeps the biometric pipeline aligned with the self-report workflow.

In [11]:
from wbdlib import uv as uv_utils

uv_stage1_path = project_root / "results" / "uv_stage1.csv"
if not uv_stage1_path.exists():
    raise FileNotFoundError(f"Stage 1 export not found at {uv_stage1_path}")

uv_stage1 = pd.read_csv(uv_stage1_path)
uv_stage1["respondent"] = (
    uv_stage1["respondent"].apply(uv_utils.first_segment).fillna("").astype(str).str.strip()
)
uv_stage1 = uv_stage1.loc[uv_stage1["respondent"] != ""].copy()
uv_stage1["group"] = (
    uv_stage1["group"].apply(uv_utils.first_segment).fillna("").astype(str).str.strip()
)
uv_stage1["source_file"] = uv_stage1["source_file"].apply(uv_utils.first_segment)
uv_stage1["source_file"] = uv_stage1["source_file"].apply(
    lambda value: value.strip() if isinstance(value, str) and value.strip() else None
)

uv = uv_stage1.copy()
uv_stage1.head()

Unnamed: 0,source_file,group,respondent,date_study,time_study,age,age_group,gender,ethnicity,income_group,content_consumption,content_consumption_movies,content_consumption_series,content_consumption_short,grid_comments,Short Form,Long Form
0,003_104.csv,A,104,10/16/2025,18:09:03,59,44-59,Male,White,"$60,000 or more per year",More than 24 hours per week,10,90,0,,Mad Max,The Town
1,002_106.csv,A,106,10/16/2025,19:35:05,30,28-43,Male,White,"$60,000 or more per year",3 to 12 hours per week,25,50,25,,Mad Max,The Town
2,001_116.csv,A,116,10/18/2025,12:37:40,19,18-27,Male,White,"$35,000  $60,000 per year",3 to 12 hours per week,25,50,25,,Mad Max,The Town
3,006_14.csv,A,14,10/11/2025,09:32:42,33,28-43,Male,Hispanic/Latino/Latina/Latinx,"$60,000 or more per year",More than 24 hours per week,20,40,40,No EEG.,Mad Max,The Town
4,007_3.csv,A,3,10/10/2025,09:19:22,34,28-43,Female,White,"$60,000 or more per year",12 to 24 hours per week,10,70,20,No EEG.,Mad Max,The Town


In [12]:
stage1_expected_columns = [
    "source_file","group","respondent","Short Form","Long Form",
    "age","gender","age_group","ethnicity","income_group",
    "content_consumption","content_consumption_movies",
    "content_consumption_series","content_consumption_short"
]
missing_stage1_columns = [col for col in stage1_expected_columns if col not in uv_stage1.columns]

if missing_stage1_columns:
    print(f"Warning: Missing expected Stage 1 columns: {missing_stage1_columns}")
else:
    print("All expected Stage 1 columns present.")

stage1_group_counts = (
    uv_stage1.groupby("group")["respondent"].nunique()
    .reset_index(name="respondent_count")
    .sort_values("group")
)

stage1_group_counts

All expected Stage 1 columns present.


Unnamed: 0,group,respondent_count
0,A,12
1,B,10
2,C,16
3,D,17
4,E,14
5,F,14


In [13]:
short_long_pairs = (
    uv_stage1.loc[:, ["group", "respondent", "Short Form", "Long Form"]]
    .drop_duplicates(subset=["group", "Short Form", "Long Form"])
    .sort_values(["group", "Short Form", "Long Form"])
)

short_long_pairs.head(12)

Unnamed: 0,group,respondent,Short Form,Long Form
0,A,104,Mad Max,The Town
12,B,10,The Town,Mad Max
22,C,1,The Town,Abbot Elementary
38,D,102,Abbot Elementary,The Town
55,E,100,Abbot Elementary,Mad Max
69,F,107,Mad Max,Abbot Elementary


In [14]:
duplicate_respondents = uv_stage1[uv_stage1.duplicated(subset="respondent", keep=False)]
if duplicate_respondents.empty:
    print("No duplicate respondents detected.")
else:
    duplicate_respondents.sort_values("respondent")

No duplicate respondents detected.


In [15]:
stage1_summary = {
    "rows": len(uv_stage1),
    "columns": len(uv_stage1.columns),
    "stage1_source": uv_stage1_path.as_posix(),
}
stage1_summary

{'rows': 83,
 'columns': 17,
 'stage1_source': 'c:/Users/ashra/Documents/NeuralSense/NeuralData/clients/544_WBD_CXCU/results/uv_stage1.csv'}

## Stage 2: Sensor Data
We retain the Stage 1 roster as the authoritative respondent list and validate sensor coverage before computing biometric features. After confirming file availability and mappings, we run the full feature extraction pipeline with the finalized windowing and naming conventions.

In [16]:
available_sensor_files = {
    path.name
    for path in (project_root / "data" / "Export").glob("Group */Analyses/*/Sensor Data/*.csv")
}
stage1_sensor_reference = uv_stage1.loc[:, ["respondent", "group", "source_file"]].copy()
stage1_sensor_reference["sensor_file_found"] = stage1_sensor_reference["source_file"].apply(
    lambda fname: fname in available_sensor_files if fname else False
)
coverage_rate = stage1_sensor_reference["sensor_file_found"].mean()
print(f"Sensor file coverage: {coverage_rate:.2%}")
missing_sensor_records = stage1_sensor_reference.loc[
    ~stage1_sensor_reference["sensor_file_found"]
].sort_values(["group", "respondent"])
print(f"Stage 1 respondents with missing sensor exports: {len(missing_sensor_records)}")
missing_sensor_records.head(10)

Sensor file coverage: 100.00%
Stage 1 respondents with missing sensor exports: 0


Unnamed: 0,respondent,group,source_file,sensor_file_found


In [17]:
# Load stimulus annotations and key-moment timing tables
stimulus_map = pd.read_csv(project_root / "data" / "stimulus_rename.csv")
stimulus_map["group_letter"] = stimulus_map["group"].str.extract(r"Group\s*([A-F])", expand=False).str.upper()
stimulus_map_lookup = stimulus_map.set_index(["group_letter", "stimulus_name"]).sort_index()

key_moments_raw = pd.read_csv(project_root / "data" / "key_moments.csv")
time_columns = ["Lead-up Duration", "Key moment Duration_LF"]
key_moments = key_moments_raw[["title", *time_columns]].dropna(subset=["title"]).copy()

def hhmmss_to_ms(value):
    """Convert hh:mm:ss strings to integer milliseconds (None on failure)."""
    if pd.isna(value):
        return None
    text = str(value).strip()
    if not text:
        return None
    try:
        duration = pd.to_timedelta(text)
    except ValueError:
        # Handle mm:ss formatted entries by padding hours when possible
        parts = text.split(":")
        if len(parts) == 2:
            try:
                duration = pd.to_timedelta(f"00:{text}")
            except ValueError:
                return None
        else:
            return None
    return int(duration.total_seconds() * 1000)

key_moments["lead_up_ms"] = key_moments["Lead-up Duration"].apply(hhmmss_to_ms)
key_moments["key_moment_ms"] = key_moments["Key moment Duration_LF"].apply(hhmmss_to_ms)
key_moment_lookup = key_moments.set_index("title")["lead_up_ms"].to_dict()
key_duration_lookup = key_moments.set_index("title")["key_moment_ms"].to_dict()

stimulus_map_lookup.head()

# Clear in-memory sensor dataframes to free memory
import gc
_sensor_df_names = []
for _name, _obj in list(globals().items()):
    if isinstance(_obj, pd.DataFrame) and "SourceStimuliName" in _obj.columns:
        _sensor_df_names.append(_name)
        del globals()[_name]
gc.collect()
print(f"Cleared sensor dataframes: {_sensor_df_names}")

Cleared sensor dataframes: ['df_group', 'df_clean']


In [18]:
from typing import Dict
import numpy as np

fac_columns = [
    "Anger", "Contempt", "Disgust", "Fear", "Joy", "Sadness",
    "Surprise", "Engagement", "Sentimentality",
    "Confusion", "Neutral",
 ]
fac_adaptive_metrics = {
    "AdaptiveEngagement": "Adaptive Engagement",
    "PositiveAdaptiveValence": "Positive Adaptive Valence",
    "NegativeAdaptiveValence": "Negative Adaptive Valence",
    "NeutralAdaptiveValence": "Neutral Adaptive Valence",
}
eeg_columns = [
    "High Engagement",
    "Low Engagement",
    "Distraction",
    "Drowsy",
    "Workload Average",
    "Frontal Alpha Asymmetry",
 ]
eeg_alternate_columns = {
    "Frontal Alpha Asymmetry": ["Frontal Alpha Asymmetry", "Frontal Asymmetry Alpha"]
}
eeg_metric_alias = {
    "High Engagement": "HighEngagement",
    "Low Engagement": "LowEngagement",
    "Distraction": "Distraction",
    "Drowsy": "Drowsy",
    "Workload Average": "Workload",
    "Frontal Alpha Asymmetry": "FrontalAlphaAsymmetry",
}
sensor_required_columns = {
    "FAC": fac_columns + list(fac_adaptive_metrics.values()),
    "EEG": eeg_columns,
    "GSR": ["Peak Detected"],
    "ET": [
        "Blink Detected",
        "Fixation Dispersion",
        "Fixation Index",
        "Fixation Duration",
    ],
}

def _trapezoid_integral(values: np.ndarray, time_axis: np.ndarray) -> float:
    """Integrate using numpy.trapezoid when available, falling back to trapz."""
    integrate = getattr(np, "trapezoid", np.trapz)
    return float(integrate(values, x=time_axis) / 1000.0)

def prepare_stimulus_segment(df_sensor: pd.DataFrame, raw_name: str, form: str, title: str) -> pd.DataFrame:
    """Return the time-zeroed slice for the requested stimulus, clipping to key moments when needed."""
    if "SourceStimuliName" not in df_sensor.columns or "Timestamp" not in df_sensor.columns:
        return pd.DataFrame()
    subset = df_sensor.loc[df_sensor["SourceStimuliName"] == raw_name].copy()
    if subset.empty:
        return subset
    subset["Timestamp"] = pd.to_numeric(subset["Timestamp"], errors="coerce")
    subset = subset.dropna(subset=["Timestamp"])
    if subset.empty:
        return subset
    subset.sort_values("Timestamp", inplace=True)
    if "SlideEvent" in subset.columns:
        slide_events = subset["SlideEvent"].astype(str)
        start_candidates = subset.loc[slide_events == "StartMedia", "Timestamp"]
    else:
        start_candidates = pd.Series(dtype=float)
    if not start_candidates.empty:
        start_timestamp = start_candidates.iloc[0]
    else:
        start_timestamp = subset["Timestamp"].min()
    subset["time_from_start"] = subset["Timestamp"] - start_timestamp
    if form == "Long":
        lead_ms = key_moment_lookup.get(title)
        duration_ms = key_duration_lookup.get(title)
        if lead_ms is None or duration_ms is None:
            return pd.DataFrame()
        window_start = lead_ms
        window_end = lead_ms + duration_ms
        subset = subset.loc[(subset["time_from_start"] >= window_start) & (subset["time_from_start"] <= window_end)].copy()
        if subset.empty:
            return subset
        subset["time_from_start"] = subset["time_from_start"] - window_start
    return subset

def register_feature(container: Dict[str, float], form: str, title: str, sensor: str, metric: str, method: str, value: float) -> None:
    if value is None:
        return
    if isinstance(value, float) and np.isnan(value):
        return
    key = f"{form}_{title}_{sensor}_{metric}_{method}"
    container[key] = value

def compute_sensor_features(segment: pd.DataFrame, form: str, title: str) -> Dict[str, float]:
    """Compute FAC, EEG, GSR, and ET summary statistics for a stimulus segment."""
    features: Dict[str, float] = {}
    if segment.empty:
        return features
    if "time_from_start" not in segment.columns or segment["time_from_start"].empty:
        return features
    # Duration in seconds for this stimulus window
    duration_ms = float(segment["time_from_start"].max() - segment["time_from_start"].min())
    if duration_ms <= 0:
        return features
    duration_seconds = duration_ms / 1000.0
    features[f"{form}_{title}_duration"] = duration_seconds
    duration_minutes = duration_seconds / 60.0
    # Facial coding summaries
    for metric in fac_columns:
        if metric not in segment.columns:
            continue
        values = pd.to_numeric(segment[metric], errors="coerce").dropna()
        if values.empty:
            continue
        time_axis = segment.loc[values.index, "time_from_start"].values
        register_feature(features, form, title, "FAC", metric, "Mean", float(values.mean()))
        register_feature(features, form, title, "FAC", metric, "AUC", _trapezoid_integral(values.values, time_axis))
        register_feature(features, form, title, "FAC", metric, "Binary", int(values.max() >= 50))
    for metric, column_name in fac_adaptive_metrics.items():
        if column_name not in segment.columns:
            continue
        values = pd.to_numeric(segment[column_name], errors="coerce").dropna()
        if values.empty:
            continue
        time_axis = segment.loc[values.index, "time_from_start"].values
        register_feature(features, form, title, "FAC", metric, "Mean", float(values.mean()))
        register_feature(features, form, title, "FAC", metric, "AUC", _trapezoid_integral(values.values, time_axis))
    # EEG summaries
    for metric in eeg_columns:
        candidate_columns = [metric, *eeg_alternate_columns.get(metric, [])]
        actual_column = next((col for col in candidate_columns if col in segment.columns), None)
        if actual_column is None:
            continue
        values = pd.to_numeric(segment[actual_column], errors="coerce")
        valid = values.loc[values > -9000].dropna()
        valid = valid.loc[valid < 9000].dropna()
        if valid.empty:
            continue
        time_axis = segment.loc[valid.index, "time_from_start"].values
        label = eeg_metric_alias.get(metric, metric)
        register_feature(features, form, title, "EEG", label, "Mean", float(valid.mean()))
        register_feature(features, form, title, "EEG", label, "AUC", _trapezoid_integral(valid.values, time_axis))
    # GSR summaries
    if "Peak Detected" in segment.columns:
        peak_series = pd.to_numeric(segment["Peak Detected"], errors="coerce").fillna(0)
        peak_mask = peak_series >= 1
        register_feature(features, form, title, "GSR", "PeakDetected", "Binary", int(peak_mask.any()))
        if peak_mask.any():
            segments = (peak_mask != peak_mask.shift()).cumsum()
            peak_blocks = segments.loc[peak_mask]
            peak_count = int(peak_blocks.nunique())
            register_feature(features, form, title, "GSR", "Peaks", "Count", peak_count)
            if duration_minutes > 0:
                register_feature(features, form, title, "GSR", "Peaks", "PerMinute", float(peak_count / duration_minutes))
    # ET metrics
    if "Blink Detected" in segment.columns:
        blink_series = pd.to_numeric(segment["Blink Detected"], errors="coerce").fillna(0)
        blink_mask = blink_series >= 1
        if blink_mask.any():
            segments = (blink_mask != blink_mask.shift()).cumsum()
            blink_blocks = segments.loc[blink_mask]
            blink_count = int(blink_blocks.nunique())
            register_feature(features, form, title, "ET", "Blink", "Count", blink_count)
            if duration_minutes > 0:
                register_feature(features, form, title, "ET", "Blink", "Rate", float(blink_count / duration_minutes))
    if "Fixation Dispersion" in segment.columns:
        dispersion = pd.to_numeric(segment["Fixation Dispersion"], errors="coerce").dropna()
        if not dispersion.empty:
            register_feature(features, form, title, "ET", "FixationDispersion", "Mean", float(dispersion.mean()))
    if "Fixation Index" in segment.columns:
        fixation_index = pd.to_numeric(segment["Fixation Index"], errors="coerce").dropna()
        if not fixation_index.empty:
            fixation_count = int(fixation_index.nunique())
            register_feature(features, form, title, "ET", "Fixation", "Count", fixation_count)
            if duration_minutes > 0:
                register_feature(features, form, title, "ET", "Fixation", "PerMinute", float(fixation_count / duration_minutes))
    if "Fixation Duration" in segment.columns:
        fixation_duration = pd.to_numeric(segment["Fixation Duration"], errors="coerce").dropna()
        if not fixation_duration.empty:
            register_feature(features, form, title, "ET", "FixationDuration", "Mean", float(fixation_duration.mean()))
    return features

In [19]:
import gc

# Compute pilot sensor features for selected respondents
sensor_file_index = {
    path.name: path
    for path in (project_root / "data" / "Export").glob("Group */Analyses/*/Sensor Data/*.csv")
}

pilot_ids = ["2", "58", "116"]
pilot_subset = uv_stage1.loc[uv_stage1["respondent"].astype(str).isin(pilot_ids)].copy()
pilot_subset["respondent_numeric"] = pd.to_numeric(pilot_subset["respondent"], errors="coerce")
pilot_subset = pilot_subset.sort_values(["respondent_numeric", "respondent"])

pilot_feature_rows = []
pilot_issue_log = []

for _, row in pilot_subset.iterrows():
    respondent_id = str(row["respondent"]).strip()
    group_letter = str(row.get("group", "")).strip().upper() if pd.notna(row.get("group")) else None
    source_file = row.get("source_file")
    if not source_file or source_file not in sensor_file_index:
        pilot_issue_log.append({
            "respondent": respondent_id,
            "stimulus": None,
            "issue": "Sensor export not located.",
        })
        continue
    df_sensor, _ = read_imotions(sensor_file_index[source_file])
    feature_row: Dict[str, float] = {"respondent": respondent_id}
    try:
        if df_sensor.empty or "SourceStimuliName" not in df_sensor.columns:
            pilot_issue_log.append({
                "respondent": respondent_id,
                "stimulus": None,
                "issue": "Sensor export missing SourceStimuliName column.",
            })
            continue
        # Assess sensor coverage for this respondent
        for sensor_label, required_columns in sensor_required_columns.items():
            if sensor_label == "EEG":
                missing_metrics = []
                for metric in required_columns:
                    candidates = [metric, *eeg_alternate_columns.get(metric, [])]
                    if not any(column in df_sensor.columns for column in candidates):
                        missing_metrics.append(metric)
                feature_row[f"{sensor_label}_data_missing"] = int(bool(missing_metrics))
                if missing_metrics:
                    metrics_display = ", ".join(missing_metrics)
                    pilot_issue_log.append({
                        "respondent": respondent_id,
                        "stimulus": None,
                        "issue": f"Missing EEG columns: {metrics_display}.",
                    })
                continue
            missing_columns = [col for col in required_columns if col not in df_sensor.columns]
            feature_row[f"{sensor_label}_data_missing"] = int(bool(missing_columns))
            if missing_columns:
                pilot_issue_log.append({
                    "respondent": respondent_id,
                    "stimulus": None,
                    "issue": f"Missing {sensor_label} columns: {', '.join(missing_columns)}.",
                })
        unique_stimuli = sorted({str(s).strip() for s in df_sensor["SourceStimuliName"].dropna().unique()})
        for raw_stimulus in unique_stimuli:
            lookup_key = (group_letter, raw_stimulus)
            if lookup_key not in stimulus_map_lookup.index:
                pilot_issue_log.append({
                    "respondent": respondent_id,
                    "stimulus": raw_stimulus,
                    "issue": "Stimulus missing from rename map.",
                })
                continue
            map_row = stimulus_map_lookup.loc[lookup_key]
            if isinstance(map_row, pd.DataFrame):
                map_row = map_row.iloc[0]
            title = map_row["title"]
            form = map_row["form"]
            if form == "Long" and (key_moment_lookup.get(title) is None or key_duration_lookup.get(title) is None):
                pilot_issue_log.append({
                    "respondent": respondent_id,
                    "stimulus": raw_stimulus,
                    "issue": "Key moment timing not defined for long-form title.",
                })
                continue
            segment = prepare_stimulus_segment(df_sensor, raw_stimulus, form, title)
            if segment.empty:
                pilot_issue_log.append({
                    "respondent": respondent_id,
                    "stimulus": raw_stimulus,
                    "issue": "No data after windowing (check key moment timings).",
                })
                del segment
                continue
            try:
                features = compute_sensor_features(segment, form, title)
                if not features:
                    pilot_issue_log.append({
                        "respondent": respondent_id,
                        "stimulus": raw_stimulus,
                        "issue": "No features computed for segment.",
                    })
                    continue
                feature_row.update(features)
            finally:
                del segment
        pilot_feature_rows.append(feature_row)
    finally:
        del df_sensor
        gc.collect()

pilot_features = pd.DataFrame(pilot_feature_rows)
pilot_features

  df = pd.read_csv(path, header=header_rows, low_memory=True)
  df = pd.read_csv(path, header=header_rows, low_memory=True)
  df = pd.read_csv(path, header=header_rows, low_memory=True)


Unnamed: 0,respondent,FAC_data_missing,EEG_data_missing,GSR_data_missing,ET_data_missing,Short_The Big Bang Theory_duration,Short_The Big Bang Theory_FAC_Anger_Mean,Short_The Big Bang Theory_FAC_Anger_AUC,Short_The Big Bang Theory_FAC_Anger_Binary,Short_The Big Bang Theory_FAC_Contempt_Mean,...,Short_Titanic_EEG_FrontalAlphaAsymmetry_AUC,Short_Titanic_GSR_PeakDetected_Binary,Short_Titanic_GSR_Peaks_Count,Short_Titanic_GSR_Peaks_PerMinute,Short_Titanic_ET_Blink_Count,Short_Titanic_ET_Blink_Rate,Short_Titanic_ET_FixationDispersion_Mean,Short_Titanic_ET_Fixation_Count,Short_Titanic_ET_Fixation_PerMinute,Short_Titanic_ET_FixationDuration_Mean
0,2,0,1,0,0,186.82,0.0,0.0,0.0,0.001964,...,,,,,,,,,,
1,58,0,0,0,0,,,,,,...,,,,,,,,,,
2,116,0,0,0,0,,,,,,...,4.751817,1.0,590.0,591.261358,46.0,46.098343,0.261135,156.0,156.333511,459.838233


In [20]:
# Merge pilot features back into the UV and review any issues
uv_stage1["respondent"] = uv_stage1["respondent"].astype(str)
if not pilot_features.empty:
    pilot_features["respondent"] = pilot_features["respondent"].astype(str)
pilot_uv = (
    uv_stage1.loc[uv_stage1["respondent"].isin(pilot_ids)]
    .copy()
    .merge(pilot_features, on="respondent", how="left")
)

pilot_output_path = project_root / "results" / "uv_biometric_pilot_features.csv"
safe_write_csv(pilot_uv, pilot_output_path)
pilot_output_path

WindowsPath('c:/Users/ashra/Documents/NeuralSense/NeuralData/clients/544_WBD_CXCU/results/uv_biometric_pilot_features.csv')

In [21]:
issues_df = pd.DataFrame(pilot_issue_log)
issues_df.sort_values(["respondent", "stimulus"], na_position="last") if not issues_df.empty else "No issues logged."

Unnamed: 0,respondent,stimulus,issue
0,2,,"Missing EEG columns: High Engagement, Low Enga..."


In [22]:
# Validate long-form durations against key-moment specifications
tolerance_seconds = 3
duration_columns = [col for col in pilot_features.columns if col.endswith("_duration")]
validation_records = []
for _, feat_row in pilot_features.iterrows():
    respondent_id = feat_row.get("respondent")
    for col in duration_columns:
        value = feat_row.get(col)
        if pd.isna(value):
            continue
        form = col.split('_', 1)[0]
        if form != "Long":
            continue
        title = col[len(form) + 1: -len("_duration")]
        expected_ms = key_duration_lookup.get(title)
        if expected_ms is None:
            continue
        observed_ms = float(value) * 1000.0
        diff_seconds = abs(observed_ms - expected_ms) / 1000.0
        validation_records.append({
            "respondent": respondent_id,
            "title": title,
            "observed_seconds": round(observed_ms / 1000.0, 2),
            "expected_seconds": round(expected_ms / 1000.0, 2),
            "diff_seconds": round(diff_seconds, 2),
            "within_tolerance": diff_seconds <= tolerance_seconds,
        })
duration_validation = pd.DataFrame(validation_records)
duration_validation if not duration_validation.empty else "No long-form durations to validate."

Unnamed: 0,respondent,title,observed_seconds,expected_seconds,diff_seconds,within_tolerance
0,2,The Town,262.0,262.0,0.01,True
1,58,Mad Max,225.0,225.0,0.0,True
2,116,The Town,262.0,262.0,0.0,True


In [23]:
def run_sensor_feature_pipeline(stage1_df=None, respondent_ids=None, export_label="uv_stage2_full", save_outputs=True):
    """Compute sensor features for the specified respondents and optionally persist outputs."""
    sensor_file_index = {
        path.name: path
        for path in (project_root / "data" / "Export").glob("Group */Analyses/*/Sensor Data/*.csv")
    }
    if stage1_df is None:
        base_stage1 = uv_stage1.copy()
    else:
        base_stage1 = stage1_df.copy()
    base_stage1["respondent"] = base_stage1["respondent"].astype(str).str.strip()
    base_stage1 = base_stage1.loc[base_stage1["respondent"] != ""]
    base_stage1["group"] = base_stage1["group"].astype(str).str.strip()
    base_stage1["source_file"] = base_stage1["source_file"].apply(
        lambda value: str(value).strip() if pd.notna(value) and str(value).strip() else None
    )
    if respondent_ids is None:
        target_ids = sorted({str(r).strip() for r in base_stage1["respondent"]})
    else:
        cleaned_ids = [str(r).strip() for r in respondent_ids if pd.notna(r)]
        target_ids = sorted({identifier for identifier in cleaned_ids if identifier})
    if not target_ids:
        raise ValueError("No respondents provided for sensor feature processing.")
    subset = base_stage1.loc[base_stage1["respondent"].isin(target_ids)].copy()
    if subset.empty:
        raise ValueError("No matching respondents found in Stage 1 roster for the requested IDs.")
    subset["respondent_numeric"] = pd.to_numeric(subset["respondent"], errors="coerce")
    subset = subset.sort_values(["respondent_numeric", "respondent"])
    feature_rows = []
    issue_rows = []

    def log_issue(respondent_id, group, stimulus, message):
        issue_rows.append({
            "respondent": respondent_id,
            "group": group,
            "stimulus": stimulus,
            "issue": message,
        })

    for _, row in subset.iterrows():
        respondent_id = str(row["respondent"]).strip()
        group_letter = str(row.get("group", "")).strip().upper() if pd.notna(row.get("group")) else None
        source_file = row.get("source_file")
        if not source_file:
            log_issue(respondent_id, group_letter, None, "Sensor export not located in Stage 1 roster.")
            continue
        if source_file not in sensor_file_index:
            log_issue(respondent_id, group_letter,None, f"Sensor export {source_file} not found on disk.")
            continue
        df_sensor, _ = read_imotions(sensor_file_index[source_file])
        feature_row: Dict[str, float] = {"respondent": respondent_id}
        try:
            if df_sensor.empty or "SourceStimuliName" not in df_sensor.columns:
                log_issue(respondent_id,group_letter, None, "Sensor export missing SourceStimuliName column.")
                continue
            for sensor_label, required_columns in sensor_required_columns.items():
                if sensor_label == "EEG":
                    missing_metrics = []
                    for metric in required_columns:
                        candidates = [metric, *eeg_alternate_columns.get(metric, [])]
                        if not any(column in df_sensor.columns for column in candidates):
                            missing_metrics.append(metric)
                    feature_row[f"{sensor_label}_data_missing"] = int(bool(missing_metrics))
                    if missing_metrics:
                        log_issue(respondent_id,group_letter, None, f"Missing EEG columns: {', '.join(missing_metrics)}.")
                    continue
                missing_columns = [col for col in required_columns if col not in df_sensor.columns]
                feature_row[f"{sensor_label}_data_missing"] = int(bool(missing_columns))
                if missing_columns:
                    log_issue(respondent_id, group_letter,None, f"Missing {sensor_label} columns: {', '.join(missing_columns)}.")
            unique_stimuli = sorted({str(s).strip() for s in df_sensor["SourceStimuliName"].dropna().unique()})
            for raw_stimulus in unique_stimuli:
                lookup_key = (group_letter, raw_stimulus)
                if lookup_key not in stimulus_map_lookup.index:
                    log_issue(respondent_id,group_letter, raw_stimulus, "Stimulus missing from rename map.")
                    continue
                map_row = stimulus_map_lookup.loc[lookup_key]
                if isinstance(map_row, pd.DataFrame):
                    map_row = map_row.iloc[0]
                title = map_row["title"]
                form = map_row["form"]
                if form == "Long" and (key_moment_lookup.get(title) is None or key_duration_lookup.get(title) is None):
                    log_issue(respondent_id, group_letter,raw_stimulus, "Key moment timing not defined for long-form title.")
                    continue
                segment = prepare_stimulus_segment(df_sensor, raw_stimulus, form, title)
                if segment.empty:
                    log_issue(respondent_id,group_letter, raw_stimulus, "No data after windowing (check key moment timings).")
                    del segment
                    continue
                try:
                    features = compute_sensor_features(segment, form, title)
                    if not features:
                        log_issue(respondent_id,group_letter, raw_stimulus, "No features computed for segment.")
                        continue
                    feature_row.update(features)
                finally:
                    del segment
            feature_rows.append(feature_row)
        finally:
            del df_sensor
            gc.collect()

    features_df = pd.DataFrame(feature_rows)
    issues_df = pd.DataFrame(issue_rows)
    merged_uv = (
        base_stage1.loc[base_stage1["respondent"].isin(target_ids)]
        .copy()
    )
    merged_uv["respondent"] = merged_uv["respondent"].astype(str)
    if not features_df.empty:
        features_df["respondent"] = features_df["respondent"].astype(str)
        merged_uv = merged_uv.merge(features_df, on="respondent", how="left")

    if save_outputs:
        results_dir = project_root / "results"
        results_dir.mkdir(parents=True, exist_ok=True)
        features_path = results_dir / f"{export_label}_features.csv"
        uv_path = results_dir / f"{export_label}_uv.csv"
        issues_path = results_dir / f"{export_label}_issues.csv"
        safe_write_csv(features_df, features_path)
        safe_write_csv(merged_uv, uv_path)
        if not issues_df.empty:
            issues_sorted = issues_df.sort_values(["respondent", "stimulus"], na_position="last")
            safe_write_csv(issues_sorted, issues_path)
    gc.collect()
    return features_df, issues_df, merged_uv

In [24]:
# Run Stage 2 sensor feature extraction for the full respondent list
biometric_features, biometric_issues, biometric_uv = run_sensor_feature_pipeline(
    stage1_df=uv_stage1,
    export_label="uv_biometric_stage2",
    save_outputs=True
)
print(
    f"Computed features for {len(biometric_features)} respondents; "
    f"{len(biometric_issues)} issues logged."
)
biometric_features.shape

  df = pd.read_csv(path, header=header_rows, low_memory=True)
  df = pd.read_csv(path, header=header_rows, low_memory=True)
  df = pd.read_csv(path, header=header_rows, low_memory=True)
  df = pd.read_csv(path, header=header_rows, low_memory=True)
  df = pd.read_csv(path, header=header_rows, low_memory=True)
  df = pd.read_csv(path, header=header_rows, low_memory=True)
  df = pd.read_csv(path, header=header_rows, low_memory=True)
  df = pd.read_csv(path, header=header_rows, low_memory=True)
  df = pd.read_csv(path, header=header_rows, low_memory=True)
  df = pd.read_csv(path, header=header_rows, low_memory=True)
  df = pd.read_csv(path, header=header_rows, low_memory=True)
  df = pd.read_csv(path, header=header_rows, low_memory=True)
  df = pd.read_csv(path, header=header_rows, low_memory=True)
  df = pd.read_csv(path, header=header_rows, low_memory=True)
  df = pd.read_csv(path, header=header_rows, low_memory=True)
  df = pd.read_csv(path, header=header_rows, low_memory=True)
  df = p

Computed features for 83 respondents; 34 issues logged.


(83, 1076)

## UV Merge
Combine the biometric feature matrix with the self-report unified view to produce a single dataset that carries both survey and sensor-derived metrics. Outputs are written with biometric-specific filenames to avoid clashing with the self-report notebook.

In [25]:
uv_self_report_path = project_root / "results" / "uv_merged.csv"
if not uv_self_report_path.exists():
    raise FileNotFoundError(f"Self-report UV not found at {uv_self_report_path}")

uv_self_report = pd.read_csv(uv_self_report_path)
uv_self_report["respondent"] = uv_self_report["respondent"].astype(str).str.strip()

if biometric_features.empty:
    print("No biometric features computed; skipping biometric merge.")
    uv_biometric_full = uv_self_report.copy()
    added_columns = []
else:
    biometric_features_clean = biometric_features.copy()
    biometric_features_clean["respondent"] = biometric_features_clean["respondent"].astype(str).str.strip()
    overlap_columns = [
        col
        for col in biometric_features_clean.columns
        if col != "respondent" and col in uv_self_report.columns
    ]
    if overlap_columns:
        rename_map = {col: f"biometric_{col}" for col in overlap_columns}
        biometric_features_clean = biometric_features_clean.rename(columns=rename_map)
    added_columns = [col for col in biometric_features_clean.columns if col != "respondent"]
    uv_biometric_full = uv_self_report.merge(
        biometric_features_clean,
        on="respondent",
        how="left",
    )

uv_biometric_full_path = project_root / "results" / "uv_biometric_full.csv"
safe_write_csv(uv_biometric_full, uv_biometric_full_path)

merge_summary = {
    "self_report_rows": len(uv_self_report),
    "biometric_feature_rows": len(biometric_features),
    "merged_rows": len(uv_biometric_full),
    "biometric_columns_added": len(added_columns),
}
if added_columns:
    missing_all = int(uv_biometric_full[added_columns].isna().all(axis=1).sum())
    merge_summary["respondents_missing_all_biometric_features"] = missing_all
merge_summary

{'self_report_rows': 83,
 'biometric_feature_rows': 83,
 'merged_rows': 83,
 'biometric_columns_added': 1075,
 'respondents_missing_all_biometric_features': 0}