## BioVid HeatPain — Train/Val/Test Split

In [None]:
import pandas as pd
from pathlib import Path
import os
from IPython.display import display
from typing import Optional, Dict, Any


In [None]:
BASE_DIR = Path('/content/drive/MyDrive/PainRecognitionProject/data/BioVid_HeatPain/')
SOURCE_CSV_FILENAME = 'samples.csv'
SOURCE_CSV_PATH = BASE_DIR / SOURCE_CSV_FILENAME

print(f"Source CSV: {SOURCE_CSV_PATH}")

### Base folder check - BioVid_HeatPain folder

In [None]:
def check_dataset_integrity(
    base_dir: str,
    expected_files_per_subject: int = 100,
    prefix_sep: str = '-',
    allowed_exts: tuple = ('.mp4',),
    sample_csv_path: Optional[str] = None,
    max_examples: int = 3,
    save_report_path: Optional[str] = None,
    verbose: bool = True
) -> Dict[str, Any]:
    """
    Scan `base_dir` subject subfolders and return integrity report.
    If `sample_csv_path` given, cross-check listed samples against actual files.
    """
    base = Path(base_dir)
    if not base.is_dir():
        raise FileNotFoundError(f"Base directory does not exist: {base_dir}")

    subject_folders = [p for p in base.iterdir() if p.is_dir()]
    report = {
        'base_dir': str(base.resolve()),
        'n_subjects': len(subject_folders),
        'total_mp4_files': 0,
        'folders_wrong_count': {},
        'naming_issues': {},
        'duplicates': {},
        'unexpected_extensions': {},
        'missing_samples': {},
    }

    samples_df = None
    if sample_csv_path:
        samples_df = pd.read_csv(sample_csv_path)
        samples_df_columns = set(samples_df.columns)

    for subject in sorted(subject_folders):
        mp4_files = [f.name for f in subject.iterdir() if f.is_file() and f.suffix.lower() in allowed_exts]
        other_files = [f.name for f in subject.iterdir() if f.is_file() and f.suffix.lower() not in allowed_exts]
        report['total_mp4_files'] += len(mp4_files)

        if len(mp4_files) != expected_files_per_subject:
            report['folders_wrong_count'][subject.name] = len(mp4_files)

        bad_names = [fn for fn in mp4_files if not fn.startswith(f"{subject.name}{prefix_sep}")]
        if bad_names:
            report['naming_issues'][subject.name] = bad_names[:max_examples]

        seen = set()
        dups = []
        for fn in mp4_files:
            if fn in seen:
                dups.append(fn)
            else:
                seen.add(fn)
        if dups:
            report['duplicates'][subject.name] = dups

        if other_files:
            report['unexpected_extensions'][subject.name] = other_files[:max_examples]

        if samples_df is not None:
            if 'video_path' in samples_df.columns:
                expected_for_subject = samples_df[samples_df['video_path'].str.startswith(subject.name + '/')]
                expected_files = expected_for_subject['video_path'].apply(lambda p: os.path.basename(p)).tolist()
            elif {'subject_name', 'sample_name'}.issubset(samples_df.columns):
                expected_for_subject = samples_df[samples_df['subject_name'] == subject.name]
                expected_files = (expected_for_subject['sample_name'] + '.mp4').tolist()
            else:
                expected_files = []

            if expected_files:
                missing = [fn for fn in expected_files if fn not in mp4_files]
                if missing:
                    report['missing_samples'][subject.name] = missing[:max_examples]

    if verbose:
        print(f"Dataset check: {report['n_subjects']} subjects, {report['total_mp4_files']} mp4 files found")
        if report['folders_wrong_count']:
            print(f"Folders with wrong MP4 count: {len(report['folders_wrong_count'])}")
        else:
            print("All folders have the expected file count (or none deviated).")

        if report['naming_issues']:
            print(f"Naming issues in {len(report['naming_issues'])} folders (examples shown).")
        if report['duplicates']:
            print(f"Duplicates in {len(report['duplicates'])} folders.")
        if report['unexpected_extensions']:
            print(f"Unexpected file extensions in {len(report['unexpected_extensions'])} folders.")
        if samples_df is not None and report['missing_samples']:
            print(f"Missing samples listed in CSV for {len(report['missing_samples'])} folders.")

        def show_examples(d: dict, title: str):
            if d:
                print(f"\n{title}:")
                for subj, vals in list(d.items())[:5]:
                    print(f" - {subj}: {vals if isinstance(vals, list) else vals}")

        show_examples(report['folders_wrong_count'], "Folders with wrong file counts")
        show_examples(report['naming_issues'], "Naming issues (examples)")
        show_examples(report['duplicates'], "Duplicate filenames")
        show_examples(report['unexpected_extensions'], "Unexpected file extensions")
        if samples_df is not None:
            show_examples(report['missing_samples'], "Missing expected samples from CSV")

    if save_report_path:
        out_dir = Path(save_report_path)
        out_dir.mkdir(parents=True, exist_ok=True)
        rows = []
        for subject in sorted(subject_folders):
            rows.append({
                'subject': subject.name,
                'mp4_count': len([f for f in subject.iterdir() if f.is_file() and f.suffix.lower() in allowed_exts]),
                'naming_issues': ';'.join(report['naming_issues'].get(subject.name, [])),
                'duplicates': ';'.join(report['duplicates'].get(subject.name, [])),
                'unexpected_files': ';'.join(report['unexpected_extensions'].get(subject.name, [])),
                'missing_samples': ';'.join(report['missing_samples'].get(subject.name, []))
            })
        pd.DataFrame(rows).to_csv(out_dir / 'dataset_integrity_by_subject.csv', index=False)
        print(f"\nReport saved to: {out_dir}")

    return report

In [None]:
report = check_dataset_integrity('/content/drive/MyDrive/PainRecognitionProject/data/BioVid_HeatPain/')

### Data division - Training / Validation / Test

Subject division proposed by the creators of the database
https://www.nit.ovgu.de/nit_media/Bilder/Dokumente/BIOVID_Dokumente/BioVid_HoldOutEval_Proposal.pdf

In [None]:
VAL_TEST_SUBJECTS_IDS = [
    '100914_m_39', '101114_w_37', '082315_w_60', '083114_w_55', '083109_m_60',
    '072514_m_27', '080309_m_29', '112016_m_25', '112310_m_20', '092813_w_24',
    '112809_w_23', '112909_w_20', '071313_m_41', '101309_m_48', '101609_m_36',
    '091809_w_43', '102214_w_36', '102316_w_50', '112009_w_43', '101814_m_58',
    '101908_m_61', '102309_m_61', '112209_m_51', '112610_w_60', '112914_w_51',
    '120514_w_56'
]

In [None]:
def perform_subject_split(df_all_clips, val_test_subjects_ids):
    """
    Split clips into train/validation/test sets based on subject IDs.
    Implements a fixed, balanced split where 26 specified subjects are divided into Validation (13) and Test (13).
    """

    temp_data = [{'subject_name': id, 'gender': id.split('_')[1], 'expression': 'Low' if id in ['100914_m_39', '101114_w_37', '082315_w_60', '083114_w_55', '083109_m_60'] else 'Normal'} 
                 for id in val_test_subjects_ids]
    df_26 = pd.DataFrame(temp_data)

    # Niska ekspresja (5): Val: 1 M, 1 W; Test: 1 M, 2 W
    low_m = df_26[(df_26['expression'] == 'Low') & (df_26['gender'] == 'm')]
    low_w = df_26[(df_26['expression'] == 'Low') & (df_26['gender'] == 'w')]
    val_low_ids = pd.concat([low_m.iloc[0:1], low_w.iloc[0:1]])['subject_name'].tolist()
    test_low_ids = pd.concat([low_m.iloc[1:2], low_w.iloc[1:3]])['subject_name'].tolist()

    # Normalna ekspresja (21): Val: 11; Test: 10 (zrównoważony podział reszty)
    df_normal = df_26[df_26['expression'] == 'Normal'].sample(frac=1, random_state=42).reset_index(drop=True)
    val_normal_ids = df_normal.iloc[:11]['subject_name'].tolist()
    test_normal_ids = df_normal.iloc[11:]['subject_name'].tolist()

    # Finalne listy ID pacjentów
    val_ids = val_low_ids + val_normal_ids
    test_ids = test_low_ids + test_normal_ids

    df_val = df_all_clips[df_all_clips['subject_name'].isin(val_ids)].copy()
    df_test = df_all_clips[df_all_clips['subject_name'].isin(test_ids)].copy()
    df_train = df_all_clips[~df_all_clips['subject_name'].isin(val_ids + test_ids)].copy()

    print(f"\n--- Weryfikacja Podziału Pacjentów ---")
    print(f"Trening (Klipów): {len(df_train)} | Pacjentów: {df_train['subject_name'].nunique()}")
    print(f"Walidacja (Klipów): {len(df_val)} | Pacjentów: {df_val['subject_name'].nunique()}")
    print(f"Test (Klipów): {len(df_test)} | Pacjentów: {df_test['subject_name'].nunique()}")
    print(f"Całkowita liczba pacjentów: {df_all_clips['subject_name'].nunique()}")

    return df_train, df_val, df_test

In [None]:
try:
    df_master = pd.read_csv(SOURCE_CSV_PATH, sep='\t')
except FileNotFoundError:
    raise FileNotFoundError(f"Source CSV not found: {SOURCE_CSV_PATH}")

print(f"Loaded {len(df_master)} rows; unique subjects: {df_master['subject_name'].nunique()}")
display(df_master.head())

In [None]:
df_master['video_path'] = df_master['subject_name'] + '/' + df_master['sample_name'] + '.mp4'
df_master['label'] = df_master['class_id']

display(df_master[['video_path', 'label']].head())
print("Label counts:")
print(df_master['label'].value_counts())

In [None]:
df_train, df_val, df_test = perform_subject_split(df_master, VAL_TEST_SUBJECTS_IDS)

print("Clips:", len(df_train), len(df_val), len(df_test))
print("Unique subjects:", df_train['subject_name'].nunique(), df_val['subject_name'].nunique(), df_test['subject_name'].nunique())

display(df_train.head())
display(df_val.head())
display(df_test.head())

In [None]:
BASE_DIR.mkdir(parents=True, exist_ok=True)
output_columns = ['video_path', 'label']

df_train[output_columns].to_csv(BASE_DIR / 'train.csv', index=False)
df_val[output_columns].to_csv(BASE_DIR / 'val.csv', index=False)
df_test[output_columns].to_csv(BASE_DIR / 'test.csv', index=False)

print("Saved files:")
print(BASE_DIR / 'train.csv')
print(BASE_DIR / 'val.csv')
print(BASE_DIR / 'test.csv')