
# Thesis Chapters 4–5 — Data Processing Notebook
**Datasets**: MultiHateClip, HateClipSeg  
**Environment**: Kaggle

This notebook standardizes the three datasets into CSV manifests that point to the **actual video paths** on Kaggle.

**Input root (read-only):** `/kaggle/input/dataset-sample`  
**Output root (writable):** `/kaggle/working/processed`



In [2]:
from pathlib import Path
import pandas as pd
import json, ast

DATASET_ROOT = Path("/kaggle/input/dataset-sample/preprocess_lab/data/raw")
OUT_DIR = Path("/kaggle/working/processed")
OUT_DIR.mkdir(parents=True, exist_ok=True)

CANON_LABELS = ["normal","offensive","hateful","insulting","sexual","violence","self-harm"]

In [8]:
# === Helpers: fuzzy column match, safe parsing, label normalization, utilities ===

def _norm_key(s: str) -> str:
    """Normalize string key: case/space/_/- insensitive."""
    if not isinstance(s, str):
        s = str(s)
    return s.strip().lower().replace(" ", "").replace("_","").replace("-","")

def find_col(df: pd.DataFrame, *candidates):
    """Return the first matching column from candidates (fuzzy)."""
    norm_map = {_norm_key(c): c for c in df.columns}
    for cand in candidates:
        key = _norm_key(cand)
        if key in norm_map:
            return norm_map[key]
    return None

def safe_parse_list(x):
    """Parse list-like strings: try JSON then ast.literal_eval; return [] on failure."""
    if isinstance(x, (list, tuple)):
        return list(x)
    if not isinstance(x, str):
        return []
    s = x.strip()
    try:
        return json.loads(s)
    except Exception:
        try:
            return ast.literal_eval(s)
        except Exception:
            return []

def normalize_label_string(x: str) -> str:
    """Map raw labels to canonical labels."""
    if x is None:
        return "normal"
    k = str(x).strip().lower()
    m = {
        "nonhate": "normal", "non hate": "normal", "non_hate": "normal", "non-hate": "normal",
        "none": "normal", "neutral": "normal", "benign": "normal", "normal": "normal",
        "offensive": "offensive",
        "hate": "hateful", "hateful": "hateful",
        "insult": "insulting", "insulting": "insulting",
        "sexual": "sexual",
        "violence": "violence", "violent": "violence",
        "harm": "self-harm", "self-harm": "self-harm", "self_harm": "self-harm"
    }
    return m.get(k, k)

def onehot_to_labels(vec):
    """
    Convert a multi-hot vector (list of 0/1) using HateClipSeg index mapping to canonical labels.
    Mapping: 0=normal, 1=hateful, 2=insulting, 3=sexual, 4=violence, 5=self-harm
    """
    idx2lab = ["normal", "hateful", "insulting", "sexual", "violence", "self-harm"]
    out = []
    for i, v in enumerate(vec):
        try:
            active = int(v) == 1
        except Exception:
            active = v == 1
        if active and i < len(idx2lab):
            out.append(idx2lab[i])
    # Normalize + dedupe
    seen, uniq = set(), []
    for l in out:
        l2 = normalize_label_string(l)
        if l2 not in seen:
            seen.add(l2)
            uniq.append(l2)
    return uniq

def any_offensive(labels_list):
    """Return 1 if any non-'normal' label appears; else 0."""
    return int(any(normalize_label_string(l) != "normal" for l in labels_list))

def explode_single_label_rows(df, labels_col="labels", keep_cols=None):
    """
    Given a dataframe where `labels` is a list, explode into single-label rows.
    keep_cols: columns to keep as-is (copied to each exploded row).
    """
    if keep_cols is None:
        keep_cols = [c for c in df.columns if c != labels_col]
    rows = []
    for _, r in df.iterrows():
        labs = r[labels_col] if isinstance(r[labels_col], list) else safe_parse_list(r[labels_col])
        if not labs:
            labs = ["normal"]
        for lb in labs:
            newr = {c: r[c] for c in keep_cols}
            newr["label"] = normalize_label_string(lb)  # single canonical label
            rows.append(newr)
    return pd.DataFrame(rows)

def build_sample_id(dataset, video_id, start, end, label):
    """Build a deterministic sample_id for downstream training/caching."""
    s = "0" if (start is None or pd.isna(start)) else f"{float(start):.3f}"
    e = "end" if (end is None or pd.isna(end)) else f"{float(end):.3f}"
    return f"{dataset}__{video_id}__{s}_{e}__{label}"


In [11]:
# --- MultiHateClip ---
mh_root = DATASET_ROOT / "MultiHateClip"
mh_train = pd.read_csv(mh_root / "train.tsv", sep="\t")
mh_valid = pd.read_csv(mh_root / "valid.tsv", sep="\t")
mh_test  = pd.read_csv(mh_root / "test.tsv",  sep="\t")
df_mh = pd.concat([mh_train,mh_valid,mh_test],keys=["train","valid","test"],names=["split"]).reset_index(level=0)
vid_col = "Video_ID"
lbl_col = "Label"

records = []
for _,r in df_mh.iterrows():
    vid = str(r[vid_col]).strip()
    lab = normalize_label_string(r[lbl_col])
    records.append({
        "dataset":"MultiHateClip","Video_ID":vid,
        "video_path": str(mh_root/"data"/r["split"]/f"{vid}.mp4"),
        "labels":[lab],"binary_offensive":int(lab!="normal"),
        "start_sec":None,"end_sec":None
    })
mh_df = pd.DataFrame(records)
mh_single = explode_single_label_rows(mh_df,"labels",
    ["dataset","Video_ID","video_path","binary_offensive","start_sec","end_sec"])
mh_single["sample_id"] = mh_single.apply(lambda r: build_sample_id(r.dataset,r.Video_ID,r.start_sec,r.end_sec,r.label),axis=1)
mh_single.to_csv(OUT_DIR/"multihateclip_samples.csv",index=False)

# --- HateClipSeg ---
hcs_root = DATASET_ROOT / "HateClipSeg"
df_seg = pd.read_csv(hcs_root/"segment_level_annotation.csv")
col_vid, col_lbl, col_ts = "Video Id","Segment-Level Label","Segment Timestamp"

records=[]
for _,r in df_seg.iterrows():
    vid=str(r[col_vid]).strip()
    labels= safe_parse_list(r[col_lbl])
    times = safe_parse_list(r[col_ts])
    for lb,ts in zip(labels,times):
        labs = onehot_to_labels(lb)
        t0,t1 = float(ts[0]), float(ts[1])
        vpath = hcs_root/"data"/"segment_level"/f"{vid}.mp4"
        records.append({
            "dataset":"HateClipSeg","Video_ID":vid,"video_path":str(vpath),
            "labels":labs,"binary_offensive":any(l!="normal" for l in labs),
            "start_sec":t0,"end_sec":t1
        })
hcs_df = pd.DataFrame(records)
hcs_single = explode_single_label_rows(hcs_df,"labels",
    ["dataset","Video_ID","video_path","binary_offensive","start_sec","end_sec"])
hcs_single["sample_id"] = hcs_single.apply(lambda r: build_sample_id(r.dataset,r.Video_ID,r.start_sec,r.end_sec,r.label),axis=1)
hcs_single.to_csv(OUT_DIR/"hateclipseg_samples.csv",index=False)


In [12]:
parts=[pd.read_csv(OUT_DIR/"multihateclip_samples.csv"),
       pd.read_csv(OUT_DIR/"hateclipseg_samples.csv")]
unified=pd.concat(parts,ignore_index=True)
unified.to_csv(OUT_DIR/"unified_samples.csv",index=False)
print("Unified saved:",len(unified))
print(unified.label.value_counts())

Unified saved: 14467
label
normal                                             6491
insulting                                          2920
hateful                                            2363
violence                                           1281
['normal', 'normal']                                585
sexual                                              372
['offensive', 'offensive']                          174
self-harm                                            39
['counter narrative', 'normal']                      30
['hateful', 'hateful']                               29
['normal', 'offensive', 'offensive']                 27
['offensive', 'normal', 'normal']                    22
['hateful', 'offensive', 'hateful']                  18
['offensive', 'normal', 'offensive']                 15
['offensive', 'hateful', 'hateful']                  15
['hateful', 'offensive', 'offensive']                12
['counter narrative', 'offensive', 'offensive']      11
['normal', 'offensive

In [30]:
from pathlib import Path
import pandas as pd
import numpy as np
import subprocess, shlex, math

# Input manifests (generated earlier)
OUT_DIR = Path("/kaggle/working/processed")
MHC_IN  = OUT_DIR / "multihateclip_samples.csv"
HCS_IN  = OUT_DIR / "hateclipseg_samples.csv"

assert MHC_IN.exists(), f"Missing {MHC_IN}"
assert HCS_IN.exists(), f"Missing {HCS_IN}"

# Outputs
MHC_CLEAN = OUT_DIR / "multihateclip_samples.cleaned.csv"
HCS_CLEAN = OUT_DIR / "hateclipseg_samples.cleaned.csv"
UNIFIED_CLEAN = OUT_DIR / "unified_samples.cleaned.csv"

# Optional: repair broken MP4 into /kaggle/working/repair
REPAIR_BROKEN_MP4 = False
REPAIR_DIR = Path("/kaggle/working/repair")
if REPAIR_BROKEN_MP4:
    REPAIR_DIR.mkdir(parents=True, exist_ok=True)

print("Repair broken mp4?", REPAIR_BROKEN_MP4)


Repair broken mp4? False


In [31]:
# Robust duration probing (ffprobe first, OpenCV fallback) + optional repair via remux
try:
    import cv2
except Exception:
    cv2 = None

def _exists_and_nonempty(p: str | Path) -> bool:
    try:
        p = Path(p)
        return p.exists() and p.is_file() and p.stat().st_size > 0
    except Exception:
        return False

def _ffprobe_duration(p: str) -> float | None:
    cmd = (
        "ffprobe -v error -analyzeduration 100M -probesize 100M "
        "-select_streams v:0 -show_entries format=duration "
        "-of default=nw=1:nk=1 "
        f"{shlex.quote(str(p))}"
    )
    try:
        out = subprocess.check_output(shlex.split(cmd), stderr=subprocess.DEVNULL, text=True).strip()
        dur = float(out)
        if math.isfinite(dur) and dur > 0:
            return dur
    except Exception:
        return None
    return None

def _opencv_duration(p: str) -> float | None:
    if cv2 is None:
        return None
    try:
        cap = cv2.VideoCapture(str(p))
        if not cap.isOpened():
            return None
        fps = cap.get(cv2.CAP_PROP_FPS) or 0.0
        frames = cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0.0
        cap.release()
        if fps > 0 and frames > 0:
            return float(frames / fps)
    except Exception:
        return None
    return None

def _try_repair_mp4(src: str, repair_dir: Path) -> str | None:
    """
    Remux MP4 to generate missing moov atom; fallback to re-encode if remux fails.
    Returns repaired path or None.
    """
    try:
        srcp = Path(src)
        dst = repair_dir / f"{srcp.stem}_repaired.mp4"
        # Fast remux
        cmd = (
            f'ffmpeg -hide_banner -loglevel error -y '
            f'-err_detect ignore_err -fflags +genpts -i {shlex.quote(src)} '
            f'-c copy -movflags +faststart {shlex.quote(str(dst))}'
        )
        ok = subprocess.run(shlex.split(cmd)).returncode == 0
        if not ok:
            # Re-encode fallback
            cmd = (
                f'ffmpeg -hide_banner -loglevel error -y '
                f'-err_detect ignore_err -fflags +genpts -i {shlex.quote(src)} '
                f'-c:v libx264 -c:a aac -movflags +faststart {shlex.quote(str(dst))}'
            )
            ok = subprocess.run(shlex.split(cmd)).returncode == 0
        if ok:
            dur = _ffprobe_duration(str(dst)) or _opencv_duration(str(dst))
            if dur and dur > 0:
                return str(dst)
    except Exception:
        return None
    return None

def probe_or_repair(path: str, allow_repair: bool = False) -> tuple[float|None, str|None, bool]:
    """
    Returns: (duration_seconds, playable_path, repaired_flag)
    - playable_path is original or repaired; None if unusable.
    - repaired_flag True if we generated and used a repaired file.
    """
    if not _exists_and_nonempty(path):
        return None, None, False
    dur = _ffprobe_duration(path)
    if dur and dur > 0:
        return dur, path, False
    # Try OpenCV fallback
    dur_cv = _opencv_duration(path)
    if dur_cv and dur_cv > 0:
        return dur_cv, path, False
    # Optional repair
    if allow_repair:
        repaired = _try_repair_mp4(path, REPAIR_DIR)
        if repaired:
            dur2 = _ffprobe_duration(repaired) or _opencv_duration(repaired)
            if dur2 and dur2 > 0:
                return dur2, repaired, True
    # Unusable
    return None, None, False


In [35]:
import pandas as pd
import numpy as np
import subprocess, shlex, math
from pathlib import Path

# ---------- Video probing helpers ----------
def _ffprobe_duration(path: str) -> float | None:
    """Return video duration in seconds using ffprobe."""
    cmd = f"ffprobe -v error -show_entries format=duration -of default=nw=1:nk=1 {shlex.quote(str(path))}"
    try:
        out = subprocess.check_output(shlex.split(cmd), stderr=subprocess.DEVNULL, text=True).strip()
        dur = float(out)
        if math.isfinite(dur) and dur > 0:
            return dur
    except Exception:
        return None
    return None

def probe_duration(path: str) -> float | None:
    """Wrapper to probe duration; returns None if file is broken/unreadable."""
    try:
        return _ffprobe_duration(path)
    except Exception:
        return None

# ---------- Manifest cleaning ----------
def clean_and_stats(manifest_csv: str, dataset_name: str):
    """
    - Load manifest (columns: sample_id, video_path, label, start_sec, end_sec, ...)
    - Probe each unique video_path
    - Drop broken/unreadable files
    - Return cleaned df and stats summary
    """
    df = pd.read_csv(manifest_csv)
    if "video_path" not in df.columns:
        raise ValueError(f"{manifest_csv} missing 'video_path'")

    # Probe unique videos
    uniq = df["video_path"].dropna().unique().tolist()
    results = {}
    for p in uniq:
        dur = probe_duration(p)
        results[p] = dur

    df["video_duration"] = df["video_path"].map(results)

    # segment length if start/end available
    def seg_len(row):
        try:
            if pd.notna(row.get("start_sec")) and pd.notna(row.get("end_sec")):
                return max(0.0, float(row["end_sec"]) - float(row["start_sec"]))
        except Exception:
            pass
        return np.nan
    df["segment_seconds"] = df.apply(seg_len, axis=1)

    # Mark broken
    df["is_broken"] = df["video_duration"].isna()

    total = len(df)
    broken = df["is_broken"].sum()
    kept = total - broken

    # Stats per label
    stats = df[~df["is_broken"]].groupby("label").agg(
        samples=("label", "count"),
        avg_video_len=("video_duration", "mean"),
        avg_seg_len=("segment_seconds", "mean")
    ).reset_index()

    print(f"\n=== {dataset_name} ===")
    print(f"Total samples: {total}")
    print(f"Broken/unusable: {broken} ({broken/total:.2%})")
    print(f"Usable: {kept}")
    print("\nPer-class stats:")
    display(stats)

    return df[~df["is_broken"]], stats

# ---------- Run on your manifests ----------
mhc_clean, mhc_stats = clean_and_stats("/kaggle/working/processed/multihateclip_samples.csv", "MultiHateClip")
hcs_clean, hcs_stats = clean_and_stats("/kaggle/working/processed/hateclipseg_samples.csv", "HateClipSeg")

# Merge unified dataset
unified = pd.concat([mhc_clean, hcs_clean], ignore_index=True)
print("\n=== Unified Dataset ===")
print("Total usable samples:", len(unified))
print("Label distribution:\n", unified["label"].value_counts())



=== MultiHateClip ===
Total samples: 1001
Broken/unusable: 998 (99.70%)
Usable: 3

Per-class stats:


  has_large_values = (abs_vals > 1e6).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()


Unnamed: 0,label,samples,avg_video_len,avg_seg_len
0,"['counter narrative', 'normal']",1,57.144271,
1,"['normal', 'normal']",2,31.857774,



=== HateClipSeg ===
Total samples: 13466
Broken/unusable: 13466 (100.00%)
Usable: 0

Per-class stats:


Unnamed: 0,label,samples,avg_video_len,avg_seg_len



=== Unified Dataset ===
Total usable samples: 3
Label distribution:
 label
['normal', 'normal']               2
['counter narrative', 'normal']    1
Name: count, dtype: int64


  unified = pd.concat([mhc_clean, hcs_clean], ignore_index=True)
