# Visual exploration: heatmaps, scanpaths, distributions, clusters

This notebook visualizes fixation behavior by image and by category.

It will:
- Load precomputed heatmaps/scanpaths from `visualization/` if available.
- Load raw fixations from `fixations/` and compute:
  - Heatmaps per image and per category (density of fixations).
  - Scanpaths (ordered fixation sequences) for selected participants/images.
  - Boxplots/violinplots of fixation duration and pupil size across categories.
  - Cluster maps (K-Means on fixation coordinates) per category to reveal spatial patterns.

Outputs will be saved under `visualization/visual_exploration/` with reasonable defaults.

In [2]:
# Setup and paths
import os, glob, re, math
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from PIL import Image
from sklearn.cluster import KMeans

sns.set_context("talk")
sns.set_style("whitegrid")

# Detect project root robustly by walking up until we find expected folders
def find_project_root(start: Path):
    for cand in [start, *start.parents]:
        if (cand / "fixations").exists() or (cand / "visualization").exists() or (cand / "README.md").exists():
            return cand
    return start

nb_dir = Path.cwd()
project_root = find_project_root(nb_dir)

fixations_dir = project_root / "fixations"
viz_dir = project_root / "visualization"
out_dir = viz_dir / "visual_exploration"
out_dir.mkdir(parents=True, exist_ok=True)

print(f"Notebook CWD: {nb_dir}")
print(f"Project root: {project_root}")
print(f"Fixations dir: {fixations_dir}  (exists={fixations_dir.exists()})")
print(f"Visualization dir: {viz_dir}  (exists={viz_dir.exists()})")
print(f"Outputs: {out_dir}")

Notebook CWD: c:\Users\SWixforth\Uni\eye-tracking-ai\data_analysis\visual_exploration
Project root: c:\Users\SWixforth\Uni\eye-tracking-ai
Fixations dir: c:\Users\SWixforth\Uni\eye-tracking-ai\fixations  (exists=True)
Visualization dir: c:\Users\SWixforth\Uni\eye-tracking-ai\visualization  (exists=True)
Outputs: c:\Users\SWixforth\Uni\eye-tracking-ai\visualization\visual_exploration


In [3]:
# Use precomputed visualization assets if available
USE_PRECOMPUTED = True
pre_heatmaps_dir = viz_dir / "heatmaps"
pre_scanpaths_dir = viz_dir / "scanpaths"
pre_heatmap_files = list(pre_heatmaps_dir.glob("*.png")) if pre_heatmaps_dir.exists() else []
pre_scanpath_files = list(pre_scanpaths_dir.glob("*.png")) if pre_scanpaths_dir.exists() else []
print(f"Precomputed heatmaps in: {pre_heatmaps_dir} -> exists={pre_heatmaps_dir.exists()} files={len(pre_heatmap_files)}")
print(f"Precomputed scanpaths in: {pre_scanpaths_dir} -> exists={pre_scanpaths_dir.exists()} files={len(pre_scanpath_files)}")
if not fixations_dir.exists():
    print("WARNING: fixations directory does not exist. Check project_root detection or paths.")
if pre_heatmaps_dir.exists() and len(pre_heatmap_files) == 0:
    print("WARNING: heatmaps dir exists but contains 0 PNG files.")
if pre_scanpaths_dir.exists() and len(pre_scanpath_files) == 0:
    print("WARNING: scanpaths dir exists but contains 0 PNG files.")

Precomputed heatmaps in: c:\Users\SWixforth\Uni\eye-tracking-ai\visualization\heatmaps -> exists=True files=7977
Precomputed scanpaths in: c:\Users\SWixforth\Uni\eye-tracking-ai\visualization\scanpaths -> exists=True files=7362


In [4]:
# Utilities to align precomputed images with fixation binary codes and labels
import json
from IPython.display import display, HTML

# Optional labels file (not authoritative for categories here)
labels_map = None
labels_path = project_root / "labels_per_id.csv"
if labels_path.exists():
    try:
        _lab = pd.read_csv(labels_path)
        key_col = None
        for c in ["image_id","id","image","img_id"]:
            if c in _lab.columns:
                key_col = c
                break
        if key_col is not None:
            lab_df = _lab.copy()
            lab_df["image_id"] = lab_df[key_col].astype(str).str.extract(r"(\d+)").fillna("").iloc[:,0].str.zfill(3)
            # If needed elsewhere, keep a dict; not used for determining categories here
            if "labels_txt" in lab_df.columns:
                labels_map = dict(zip(lab_df["image_id"], lab_df["labels_txt"].astype(str)))
    except Exception as e:
        print(f"Note: couldn't parse labels_per_id.csv: {e}")

pre_name_re = re.compile(r"^P(?P<participant>\d+)_id(?P<image>\d+)_(?P<rest>.+)\.png$", re.IGNORECASE)
fix_name_re = re.compile(r"^P(?P<participant>\d+)_id(?P<image>\d+)(?:_(?P<category>[A-Za-z]+))?_(?P<code>[01]{5})\.csv$", re.IGNORECASE)

LABEL_ORDER = ["meme","person","politik","ort","text"]

def parse_precomputed_meta(p: Path):
    m = pre_name_re.match(p.name)
    if not m:
        return None
    d = m.groupdict()
    return {
        "participant": d.get("participant"),
        "image_id": str(d.get("image")).zfill(3),
        # we deliberately ignore any category hints in the PNG name
    }

def parse_fixation_meta(name: str):
    m = fix_name_re.match(name)
    if not m:
        return None
    d = m.groupdict()
    return {
        "participant": d.get("participant"),
        "image_id": str(d.get("image")).zfill(3),
        "category": (d.get("category") or "").lower() or None,
        "code": d.get("code"),
    }

def decode_code_to_labels(code: str):
    # Expect a 5-char binary string in LABEL_ORDER [meme, person, politik, ort, text]
    if not isinstance(code, str):
        return None, {}
    code = code.strip()
    if not re.fullmatch(r"[01]{5}", code):
        return None, {}
    bits = [int(b) for b in code]
    labels = {k: bits[i] for i, k in enumerate(LABEL_ORDER)}
    true_labels = [k for k, v in labels.items() if v == 1]
    return true_labels, labels

def find_fixation_code(participant: str, image_id: str):
    # Match by participant and image only (ignore category tokens)
    pattern = f"P{participant}_id{image_id}_*.csv"
    candidates = sorted((fixations_dir.glob(pattern)))
    for c in candidates:
        meta = parse_fixation_meta(c.name)
        if not meta:
            continue
        if meta["participant"] == participant and meta["image_id"] == image_id and meta.get("code"):
            return meta.get("code"), c
    return None, None

def gallery_grid(items, title, ncols=4):
    # items: list of dicts with keys: path (Path), caption (str)
    cards = []
    for it in items:
        p = Path(it["path"])
        caption = it.get("caption", p.name)
        try:
            rel = Path(os.path.relpath(p, nb_dir)).as_posix()
        except Exception:
            rel = p.as_posix()
        cap_html = caption.replace("&","&amp;").replace("<","&lt;").replace(">","&gt;")
        cards.append(f'<div style="padding:6px"><img src="{rel}" style="max-width:100%"><div style="font-size:12px;word-wrap:anywhere">{cap_html}</div></div>')
    items_html = "".join(cards)
    grid = f"<h3>{title}</h3><div style='display:grid;grid-template-columns:repeat({ncols},1fr);gap:8px'>{items_html}</div>"
    display(HTML(grid))

def build_precomputed_mapping(files, kind: str):
    rows = []
    items = []
    for p in sorted(files):
        m = parse_precomputed_meta(Path(p)) or {}
        participant = m.get("participant")
        image_id = m.get("image_id")
        code, matched_csv = (None, None)
        if participant and image_id:
            code, matched_csv = find_fixation_code(participant, image_id)
        true_labels, labels_dict = decode_code_to_labels(code) if code else ([], {})
        # Caption based purely on binary code-derived labels
        caption_bits = [f"P{participant}", f"id{image_id}"]
        if code:
            caption_bits.append(f"code={code}")
        if true_labels:
            caption_bits.append("labels=" + ",".join(true_labels))
        caption = " | ".join([b for b in caption_bits if b])
        row = {
            "kind": kind,
            "path": str(p),
            "file": Path(p).name,
            "participant": participant,
            "image_id": image_id,
            "code": code,
            "matched_fixation_csv": str(matched_csv) if matched_csv else None,
        }
        # Add boolean columns per label
        for k in LABEL_ORDER:
            row[f"code_{k}"] = int(labels_dict.get(k, 0))
        rows.append(row)
        items.append({"path": Path(p), "caption": caption})
    if rows:
        df = pd.DataFrame(rows)
        out = out_dir / f"precomputed_{kind}_mapping.csv"
        try:
            df.to_csv(out, index=False)
            print(f"Saved mapping -> {out} ({len(df)} rows)")
        except Exception as e:
            print(f"Warning: couldn't save mapping CSV for {kind}: {e}")
        return items, df
    return items, pd.DataFrame([])

In [7]:
 # Load all fixation CSVs and parse meta
fname_re = re.compile(r"^P(?P<participant>\d+)_id(?P<image>\d+)(?:_(?P<category>[A-Za-z]+))?_(?P<code>[01]{5})\.csv$", re.IGNORECASE)

def load_fixations():
    rows = []
    files = sorted(glob.glob(str(fixations_dir / "*.csv")))
    print(f"Fixation files discovered: {len(files)} in {fixations_dir}")
    for fp in files:
        name = os.path.basename(fp)
        m = fname_re.match(name)
        if not m:
            # Skip files that do not end with a 5-bit code
            # print(f"Skip (no code in name): {name}")
            continue
        meta = m.groupdict()
        meta["image_id"] = str(meta.pop("image")).zfill(3)
        meta["participant"] = meta["participant"] or None
        meta["category"] = (meta.get("category") or "").lower() or None
        meta["code"] = meta.get("code")
        try:
            df = pd.read_csv(fp)
        except Exception as e:
            print(f"Skip {name}: {e}")
            continue
        # numeric coercion
        df["x"] = pd.to_numeric(df.get("x"), errors="coerce")
        df["y"] = pd.to_numeric(df.get("y"), errors="coerce")
        if "duration" in df.columns:
            df["duration"] = pd.to_numeric(df.get("duration"), errors="coerce")
        df = df.assign(**meta)
        df["code_group"] = df["code"]  # alias for grouping
        rows.append(df)
    if not rows:
        print("WARNING: No fixation files matched the expected pattern with 5-bit code.")
        return pd.DataFrame(columns=["x","y","duration","participant","image_id","category","code","code_group"])
    return pd.concat(rows, ignore_index=True)

fix = load_fixations()
print(f"Loaded fixations: {len(fix)} rows; images={fix['image_id'].nunique() if not fix.empty else 0} participants={fix['participant'].nunique() if not fix.empty else 0} code_groups={fix['code_group'].nunique() if 'code_group' in fix else 0}")
fix.head()

Fixation files discovered: 7362 in c:\Users\SWixforth\Uni\eye-tracking-ai\fixations
Loaded fixations: 214814 rows; images=152 participants=49 code_groups=12
Loaded fixations: 214814 rows; images=152 participants=49 code_groups=12


Unnamed: 0,start_time,end_time,duration,x,y,avg_pupil_size,pupil_size_norm,weight_meme,weight_ort,weight_person,weight_politik,weight_text,participant,category,code,image_id,code_group
0,0.0,199.664,199.664,418.61286,350.962164,3.17263,-1.733186,1.0,0.0,0.0,0.0,0.0,0,meme,10000,1,10000
1,232.829,399.328,166.499,291.180768,231.734577,3.259471,-1.472783,,,,,,0,meme,10000,1,10000
2,415.862,715.38,299.518,254.463803,256.831578,3.385282,-1.09552,,,,,,0,meme,10000,1,10000
3,765.301,1098.246,332.945,486.867317,548.620869,3.593167,-0.472149,,,,,,0,meme,10000,1,10000
4,1148.162,1564.037,415.875,569.54872,677.14781,3.588862,-0.485059,,,,,,0,meme,10000,1,10000


In [8]:
# Load all fixation CSVs and parse meta
fname_re = re.compile(r"^P(?P<participant>\d+)_id(?P<image>\d+)(?:_(?P<category>[A-Za-z]+))?_(?P<code>[01]{5})\.csv$")

def load_fixations():
    if not fixations_dir.exists():
        print(f"ERROR: fixations_dir not found: {fixations_dir}")
        return pd.DataFrame(columns=["x","y","duration","participant","image_id","category"])
    files = sorted(glob.glob(str(fixations_dir / "*.csv")))
    print(f"Scanning fixations CSVs: {len(files)} files in {fixations_dir}")
    rows = []
    for fp in files:
        name = os.path.basename(fp)
        m = fname_re.match(name)
        if not m:
            print(f"Skip (name pattern mismatch): {name}")
            continue
        meta = m.groupdict()
        meta["image_id"] = str(meta.pop("image")).zfill(3)
        meta["participant"] = meta["participant"] or None
        meta["category"] = (meta.get("category") or "").lower() or None
        meta["code"] = meta.get("code")
        try:
            df = pd.read_csv(fp)
        except Exception as e:
            print(f"Skip {name}: {e}")
            continue
        # minimal columns check (allow heatmaps if x/y exist)
        df["x"] = pd.to_numeric(df.get("x"), errors="coerce")
        df["y"] = pd.to_numeric(df.get("y"), errors="coerce")
        if "duration" in df.columns:
            df["duration"] = pd.to_numeric(df.get("duration"), errors="coerce")
        df = df.assign(**meta)
        rows.append(df)
    if not rows:
        print("WARNING: No fixation files loaded after filtering.")
        return pd.DataFrame(columns=["x","y","duration","participant","image_id","category","code"])
    return pd.concat(rows, ignore_index=True)

fix = load_fixations()
print(f"Loaded fixations: {len(fix)} rows; images={fix['image_id'].nunique() if 'image_id' in fix else 0} participants={fix['participant'].nunique() if 'participant' in fix else 0}")
fix.head()

Scanning fixations CSVs: 7362 files in c:\Users\SWixforth\Uni\eye-tracking-ai\fixations
Loaded fixations: 214814 rows; images=152 participants=49
Loaded fixations: 214814 rows; images=152 participants=49


Unnamed: 0,start_time,end_time,duration,x,y,avg_pupil_size,pupil_size_norm,weight_meme,weight_ort,weight_person,weight_politik,weight_text,participant,category,code,image_id
0,0.0,199.664,199.664,418.61286,350.962164,3.17263,-1.733186,1.0,0.0,0.0,0.0,0.0,0,meme,10000,1
1,232.829,399.328,166.499,291.180768,231.734577,3.259471,-1.472783,,,,,,0,meme,10000,1
2,415.862,715.38,299.518,254.463803,256.831578,3.385282,-1.09552,,,,,,0,meme,10000,1
3,765.301,1098.246,332.945,486.867317,548.620869,3.593167,-0.472149,,,,,,0,meme,10000,1
4,1148.162,1564.037,415.875,569.54872,677.14781,3.588862,-0.485059,,,,,,0,meme,10000,1


In [9]:
# Heatmaps per image and per category
from IPython.display import display, HTML
def gallery_grid(img_paths, title, ncols=4):
    # Backward compatible wrapper: accept list[Path] or list[dict]
    if img_paths and isinstance(img_paths[0], dict):
        return gallery_grid_with_items(img_paths, title, ncols)
    cards = []
    for p in img_paths:
        try:
            rel = Path(os.path.relpath(p, nb_dir)).as_posix()
        except Exception:
            rel = p.as_posix()
        cards.append(f'<div style="padding:6px"><img src="{rel}" style="max-width:100%"><div style="font-size:12px">{p.name}</div></div>')
    items = "".join(cards)
    grid = f"<h3>{title}</h3><div style='display:grid;grid-template-columns:repeat({ncols},1fr);gap:8px'>{items}</div>"
    display(HTML(grid))

def gallery_grid_with_items(items, title, ncols=4):
    cards = []
    for it in items:
        p = it["path"]
        caption = it.get("caption", p.name if isinstance(p, Path) else str(p))
        p = Path(p)
        try:
            rel = Path(os.path.relpath(p, nb_dir)).as_posix()
        except Exception:
            rel = p.as_posix()
        cap_html = caption.replace("&","&amp;").replace("<","&lt;").replace(">","&gt;")
        cards.append(f'<div style="padding:6px"><img src="{rel}" style="max-width:100%"><div style="font-size:12px;word-wrap:anywhere">{cap_html}</div></div>')
    items_html = "".join(cards)
    grid = f"<h3>{title}</h3><div style='display:grid;grid-template-columns:repeat({ncols},1fr);gap:8px'>{items_html}</div>"
    display(HTML(grid))

if USE_PRECOMPUTED and pre_heatmap_files:
    # Build mapping and show gallery with captions including matched code and label
    items, map_df = build_precomputed_mapping(pre_heatmap_files, kind="heatmaps")
    gallery_grid_with_items(items, "Precomputed heatmaps (mapped to fixation codes)")
else:
    def plot_heatmap(df, title, fname, bins=200):
        d = df[["x","y"]].dropna()
        if len(d) < 10:
            print(f"Skip heatmap {title}: too few points")
            return
        plt.figure(figsize=(6,5))
        try:
            sns.kdeplot(data=d, x="x", y="y", fill=True, cmap="magma", thresh=0.05, levels=50, bw_method="scott")
        except Exception:
            # Fallback to 2D histogram
            plt.hist2d(d["x"], d["y"], bins=bins, cmap="magma")
        plt.gca().invert_yaxis()  # coordinates often origin at top-left
        plt.title(title)
        plt.tight_layout()
        out = out_dir / fname
        plt.savefig(out, dpi=150)
        plt.close()
        print(f"Saved {out}")
    # Per-image
    for img_id, g in fix.groupby("image_id"):
        plot_heatmap(g, f"Heatmap — Image {img_id}", f"heatmap_image_{img_id}.png")
    # Per-category
    if "category" in fix.columns and fix["category"].notna().any():
        for cat, g in fix.dropna(subset=["category"]).groupby("category"):
            plot_heatmap(g, f"Heatmap — Category {cat}", f"heatmap_category_{cat}.png")

Saved mapping -> c:\Users\SWixforth\Uni\eye-tracking-ai\visualization\visual_exploration\precomputed_heatmaps_mapping.csv (7977 rows)


In [10]:
# Scanpaths: ordered fixation sequences
from IPython.display import display, HTML
def gallery_grid(img_paths, title, ncols=4):
    if img_paths and isinstance(img_paths[0], dict):
        return gallery_grid_with_items(img_paths, title, ncols)
    cards = []
    for p in img_paths:
        try:
            rel = Path(os.path.relpath(p, nb_dir)).as_posix()
        except Exception:
            rel = p.as_posix()
        cards.append(f'<div style="padding:6px"><img src="{rel}" style="max-width:100%"><div style="font-size:12px">{p.name}</div></div>')
    items = "".join(cards)
    grid = f"<h3>{title}</h3><div style='display:grid;grid-template-columns:repeat({ncols},1fr);gap:8px'>{items}</div>"
    display(HTML(grid))

def gallery_grid_with_items(items, title, ncols=4):
    cards = []
    for it in items:
        p = it["path"]
        caption = it.get("caption", p.name if isinstance(p, Path) else str(p))
        p = Path(p)
        try:
            rel = Path(os.path.relpath(p, nb_dir)).as_posix()
        except Exception:
            rel = p.as_posix()
        cap_html = caption.replace("&","&amp;").replace("<","&lt;").replace(">","&gt;")
        cards.append(f'<div style="padding:6px"><img src="{rel}" style="max-width:100%"><div style="font-size:12px;word-wrap:anywhere">{cap_html}</div></div>')
    items_html = "".join(cards)
    grid = f"<h3>{title}</h3><div style='display:grid;grid-template-columns:repeat({ncols},1fr);gap:8px'>{items_html}</div>"
    display(HTML(grid))

if USE_PRECOMPUTED and pre_scanpath_files:
    items, map_df = build_precomputed_mapping(pre_scanpath_files, kind="scanpaths")
    gallery_grid_with_items(items, "Precomputed scanpaths (mapped to fixation codes)")
else:
    def plot_scanpath(df, title, fname):
        d = df.dropna(subset=["x","y","start_time","end_time"]).copy()
        if d.empty:
            print(f"Skip scanpath {title}: no data")
            return
        d["order"] = d["start_time"].rank(method="first").astype(int)
        d = d.sort_values("order")
        plt.figure(figsize=(6,5))
        plt.plot(d["x"], d["y"], "-o", alpha=0.8)
        for _, r in d.iterrows():
            plt.text(r["x"], r["y"], str(r["order"]), fontsize=8)
        plt.gca().invert_yaxis()
        plt.title(title)
        plt.tight_layout()
        out = out_dir / fname
        plt.savefig(out, dpi=150)
        plt.close()
        print(f"Saved {out}")
    # Example: first 3 images with up to 3 participants each
    examples = []
    for img_id, g in fix.groupby("image_id"):
        for pid, gp in g.groupby("participant"):
            examples.append((img_id, pid, gp))
        if len(examples) >= 9:
            break
    for img_id, pid, gp in examples:
        plot_scanpath(gp, f"Scanpath — Image {img_id}, Participant {pid}", f"scanpath_image_{img_id}_P{pid}.png")

Saved mapping -> c:\Users\SWixforth\Uni\eye-tracking-ai\visualization\visual_exploration\precomputed_scanpaths_mapping.csv (7362 rows)


In [11]:
# Violin/Box plots by binary code group: fixation duration and pupil size
def pick_pupil_column(df):
    cols = df.columns
    for c in ["pupil_size_norm", "pupil_norm", "pupil_normalized"]:
        if c in cols:
            return c
    for c in ["pupil_size", "pupil"]:
        if c in cols:
            return c
    lr = [c for c in cols if c.lower() in ("pupil_left","pupil_right","left_pupil","right_pupil")]
    if len(lr) >= 2:
        df["pupil_avg_lr"] = df[lr[:2]].mean(axis=1)
        return "pupil_avg_lr"
    pc = [c for c in cols if c.lower().startswith("pupil")]
    return pc[0] if pc else None

# Ensure a 'code' column exists from filenames
if 'code' not in fix.columns:
    # Attempt to parse from a 'file' column if present, otherwise skip
    print("WARNING: 'code' column missing; violin/box by code will be skipped.")
else:
    pupil_col = pick_pupil_column(fix)
    print("Using pupil column:", pupil_col)
    dur_df = fix.dropna(subset=["duration","code"]).copy()
    dur_df["code_group"] = dur_df["code"]
    if pupil_col:
        pup_df = fix.dropna(subset=[pupil_col,"code"]).copy()
        pup_df["code_group"] = pup_df["code"]
    else:
        pup_df = pd.DataFrame(columns=["code_group","pupil"])
    plt.figure(figsize=(9,5))
    sns.violinplot(data=dur_df, x="code_group", y="duration", inner="box", cut=0)
    plt.title("Fixation duration by binary code group")
    plt.ylabel("Duration (ms)")
    plt.xlabel("Binary code (meme,person,politik,ort,text)")
    plt.tight_layout()
    out = out_dir / "violin_fixation_duration_by_code.png"
    plt.savefig(out, dpi=150)
    plt.close()
    print(f"Saved {out}")
    if pupil_col and not pup_df.empty:
        plt.figure(figsize=(9,5))
        sns.violinplot(data=pup_df, x="code_group", y=pupil_col, inner="box", cut=0)
        plt.title(f"Pupil by binary code group ({pupil_col})")
        plt.ylabel(pupil_col)
        plt.xlabel("Binary code (meme,person,politik,ort,text)")
        plt.tight_layout()
        out = out_dir / f"violin_{pupil_col}_by_code.png"
        plt.savefig(out, dpi=150)
        plt.close()
        print(f"Saved {out}")
    else:
        print("Skip pupil violinplot: no pupil column detected or no data")

Using pupil column: pupil_size_norm
Saved c:\Users\SWixforth\Uni\eye-tracking-ai\visualization\visual_exploration\violin_fixation_duration_by_code.png
Saved c:\Users\SWixforth\Uni\eye-tracking-ai\visualization\visual_exploration\violin_fixation_duration_by_code.png
Saved c:\Users\SWixforth\Uni\eye-tracking-ai\visualization\visual_exploration\violin_pupil_size_norm_by_code.png
Saved c:\Users\SWixforth\Uni\eye-tracking-ai\visualization\visual_exploration\violin_pupil_size_norm_by_code.png


In [12]:
# KMeans clustering maps per category
from sklearn.cluster import KMeans

def plot_kmeans_clusters(df, title, fname, k=4):
    d = df.dropna(subset=["x","y"]).copy()
    if len(d) < k:
        print(f"Skip KMeans {title}: not enough points")
        return
    X = d[["x","y"]].to_numpy()
    km = KMeans(n_clusters=k, n_init=10, random_state=42)
    labels = km.fit_predict(X)
    centers = km.cluster_centers_
    plt.figure(figsize=(6,5))
    plt.scatter(d["x"], d["y"], c=labels, s=12, cmap="tab10", alpha=0.7, edgecolors="none")
    plt.scatter(centers[:,0], centers[:,1], c="black", s=80, marker="x", label="centers")
    plt.gca().invert_yaxis()
    plt.title(f"{title} (k={k})")
    plt.legend(loc="best")
    plt.tight_layout()
    out = out_dir / fname
    plt.savefig(out, dpi=150)
    plt.close()
    print(f"Saved {out}")

for cat, g in fix.groupby("category"):
    plot_kmeans_clusters(g, f"KMeans clusters — Category {cat}", f"kmeans_category_{cat}.png", k=4)

Saved c:\Users\SWixforth\Uni\eye-tracking-ai\visualization\visual_exploration\kmeans_category_meme.png
Saved c:\Users\SWixforth\Uni\eye-tracking-ai\visualization\visual_exploration\kmeans_category_ort.png
Saved c:\Users\SWixforth\Uni\eye-tracking-ai\visualization\visual_exploration\kmeans_category_ort.png
Saved c:\Users\SWixforth\Uni\eye-tracking-ai\visualization\visual_exploration\kmeans_category_person.png
Saved c:\Users\SWixforth\Uni\eye-tracking-ai\visualization\visual_exploration\kmeans_category_person.png
Saved c:\Users\SWixforth\Uni\eye-tracking-ai\visualization\visual_exploration\kmeans_category_politik.png
Saved c:\Users\SWixforth\Uni\eye-tracking-ai\visualization\visual_exploration\kmeans_category_politik.png
Saved c:\Users\SWixforth\Uni\eye-tracking-ai\visualization\visual_exploration\kmeans_category_text.png
Saved c:\Users\SWixforth\Uni\eye-tracking-ai\visualization\visual_exploration\kmeans_category_text.png


## Image-level norms (aggregated across all participants)
This section creates one aggregated norm per image by combining all participants’ fixations:
- A density heatmap per image (KDE or 2D histogram fallback).
- A normalized 2D histogram (PDF) per image saved to disk.
- A compact CSV with summary stats per image (counts, centroid, dispersion).
It avoids verbose per-image logging; you’ll see a final summary only.

In [None]:
# KMeans clustering maps per binary code group
from sklearn.cluster import KMeans

def plot_kmeans_clusters(df, title, fname, k=4):
    d = df.dropna(subset=["x","y"]).copy()
    if len(d) < k:
        print(f"Skip KMeans {title}: not enough points")
        return
    X = d[["x","y"]].to_numpy()
    km = KMeans(n_clusters=k, n_init=10, random_state=42)
    labels = km.fit_predict(X)
    centers = km.cluster_centers_
    plt.figure(figsize=(6,5))
    plt.scatter(d["x"], d["y"], c=labels, s=12, cmap="tab10", alpha=0.7, edgecolors="none")
    plt.scatter(centers[:,0], centers[:,1], c="black", s=80, marker="x", label="centers")
    plt.gca().invert_yaxis()
    plt.title(f"{title} (k={k})")
    plt.legend(loc="best")
    plt.tight_layout()
    out = out_dir / fname
    plt.savefig(out, dpi=150)
    plt.close()
    print(f"Saved {out}")

if 'code' in fix.columns and fix['code'].notna().any():
    for code, g in fix.dropna(subset=['code']).groupby('code'):
        safe = str(code)
        plot_kmeans_clusters(g, f"KMeans clusters — Code {safe}", f"kmeans_code_{safe}.png", k=4)
else:
    print("No binary code available for KMeans grouping.")

Image norms: 152 images processed; saved PNGs and PDFs to c:\Users\SWixforth\Uni\eye-tracking-ai\visualization\visual_exploration\image_norms; summary -> c:\Users\SWixforth\Uni\eye-tracking-ai\visualization\visual_exploration\image_norms_summary.csv


Unnamed: 0,image_id,n,centroid_x,centroid_y,std_x,std_y
0,1,1159,409.022967,486.924995,131.615693,169.587136
1,2,1284,374.224851,272.512324,102.74238,113.912481
2,3,1110,417.891315,365.722605,38.745365,183.835708
3,4,1354,390.879705,403.894883,91.170177,168.131953
4,5,1359,406.113051,351.832508,111.161684,146.041093


## Cluster normalized heatmaps (image_norms) with label lookup
This section clusters the per-image normalized heatmaps stored in `image_norms/` and joins labels from `labels_per_id.csv`. It:
- Loads each image’s normalized PDF (prefer `.npz` with `H`; fallback to PNG heatmaps).
- Resizes to a common grid, flattens, and L2-normalizes feature vectors.
- Runs K-Means to obtain clusters of image-level gaze norms.
- Joins labels by `image_id` from `labels_per_id.csv` and writes results to CSV.
- Saves a 2D PCA scatter plot colored by cluster and cluster-average prototype heatmaps.

In [None]:
# Cluster normalized heatmaps
import json
from sklearn.decomposition import PCA
from sklearn.preprocessing import normalize as sk_normalize
from skimage.transform import resize as sk_resize

def load_norm_vectors(norm_dir: Path, target_size=(64,64)):
    # Load NPZ if present else attempt image; return list of dicts with image_id, vec, H
    entries = []
    npzs = sorted(norm_dir.glob("norm_pdf_image_*.npz"))
    if not npzs:
        print(f"No NPZ PDFs found in {norm_dir}; clustering will be skipped.")
        return entries
    for fp in npzs:
        m = re.search(r"norm_pdf_image_(\d+)\.npz$", fp.name)
        if not m:
            continue
        image_id = m.group(1)
        try:
            data = np.load(fp)
            H = data["H"]
        except Exception as e:
            print(f"Skip {fp.name}: {e}")
            continue
        if H.ndim != 2:
            print(f"Skip {fp.name}: H is not 2D")
            continue
        # Resize to target grid for consistent vector length
        H_resized = sk_resize(H, target_size, order=1, anti_aliasing=True, preserve_range=True).astype(np.float32)
        # L2 normalize flattened vector
        v = H_resized.flatten()
        v = sk_normalize(v.reshape(1, -1), norm="l2").ravel()
        entries.append({"image_id": image_id, "vec": v, "H": H_resized})
    return entries

def cluster_norms(norm_dir: Path, k=5, target_size=(64,64)):
    entries = load_norm_vectors(norm_dir, target_size=target_size)
    if not entries:
        return None, None, None
    X = np.stack([e["vec"] for e in entries], axis=0)
    ids = [e["image_id"] for e in entries]
    km = KMeans(n_clusters=k, n_init=25, random_state=42)
    cluster_ids = km.fit_predict(X)
    # Build results DataFrame
    df = pd.DataFrame({"image_id": ids, "cluster": cluster_ids})
    # Join labels_per_id.csv
    labels_csv = project_root / "labels_per_id.csv"
    if labels_csv.exists():
        lab = pd.read_csv(labels_csv)
        # Ensure 3-digit string for image_id
        if 'image_id' in lab.columns:
            lab['image_id'] = lab['image_id'].astype(str).str.extract(r"(\d+)").fillna("").iloc[:,0].str.zfill(3)
        else:
            # Try alternate id columns
            for c in ["id","image","img_id"]:
                if c in lab.columns:
                    lab['image_id'] = lab[c].astype(str).str.extract(r"(\d+)").fillna("").iloc[:,0].str.zfill(3)
                    break
        df = df.merge(lab, on='image_id', how='left')
    # Save CSV
    out_csv = out_dir / "image_norms_clusters.csv"
    df.to_csv(out_csv, index=False)
    print(f"Saved {out_csv} (k={k}, n={len(df)})")
    # PCA scatter for visualization
    pca = PCA(n_components=2, random_state=42)
    Z = pca.fit_transform(X)
    plt.figure(figsize=(7,6))
    sc = plt.scatter(Z[:,0], Z[:,1], c=cluster_ids, cmap="tab10", alpha=0.8)
    for i, img in enumerate(ids):
        plt.text(Z[i,0], Z[i,1], img, fontsize=8, alpha=0.7)
    plt.title("PCA of image norms (colored by cluster)")
    plt.tight_layout()
    out_png = out_dir / "image_norms_pca_clusters.png"
    plt.savefig(out_png, dpi=150)
    plt.close()
    print(f"Saved {out_png}")
    # Cluster prototypes (average heatmap per cluster)
    proto_dir = out_dir / "image_norms_cluster_prototypes"
    proto_dir.mkdir(parents=True, exist_ok=True)
    for c_id in sorted(np.unique(cluster_ids)):
        idx = np.where(cluster_ids == c_id)[0]
        if len(idx) == 0:
            continue
        avgH = np.mean([entries[i]["H"] for i in idx], axis=0)
        plt.figure(figsize=(5,4))
        plt.imshow(avgH, cmap="viridis", origin="upper")
        plt.colorbar(fraction=0.046, pad=0.04)
        plt.title(f"Cluster {c_id} (n={len(idx)}) — prototype")
        plt.tight_layout()
        fp = proto_dir / f"cluster_{c_id}_prototype.png"
        plt.savefig(fp, dpi=150)
        plt.close()
    return df, X, cluster_ids

norms_dir = out_dir / 'image_norms'
clusters_df, X_norms, cluster_ids = cluster_norms(norms_dir, k=5, target_size=(64,64))
clusters_df.head() if isinstance(clusters_df, pd.DataFrame) else None