# Screenshot Categorization

## Vocabulary

**Label**: manually assigned categories from the `www/` labeling tool. Semantic and contextual, not perceptual. These are the training targets for all classifiers.

**Feature**: a signal dimension used as input to a model. TF-IDF weights are features. CLIP embeddings are features. Raw pixels are not.

**Embedding**: the 768-dim vector CLIP produces per image. Encodes visual structure as learned from web-scale image-text pairs. No label semantics attached.

**Prediction**: the output of a classifier against a defined label set.

**Probability / score**: the confidence value attached to a prediction.

## Models

Two independent signals are extracted per screenshot and used to train separate classifiers against the same manual labels:

**Text**: OCR transcript vectorized as TF-IDF char n-grams. Captures lexical density, repeated UI tokens, numeric patterns, and domain-specific vocabulary.

**Image**: 768-dim embedding from `openai/clip-vit-base-patch32`. CLIP was pretrained contrastively on web-scraped image-text pairs, making it substantially more appropriate for screenshots than ImageNet-pretrained models. Captures visual geometry and layout.

Both signals have different coverage over the label set. The eventual fusion model combines surviving signals per label based on measured separability.

In [None]:
import time


def stopwatch(flag):
    if flag:
        stopwatch.s = time.perf_counter()
    else:
        print(time.perf_counter() - stopwatch.s)


stopwatch(1)

### Imports

In [None]:
import json
from collections import Counter
from pathlib import Path

import joblib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import torch
from PIL import Image
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, f1_score
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.multiclass import OneVsRestClassifier
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.svm import LinearSVC
from tqdm import tqdm
from transformers import CLIPModel, CLIPProcessor

### Set pathing and shared data

In [None]:
def find_root(marker="data/labels.jsonl"):
    for p in (Path.cwd(), *Path.cwd().parents):
        if (p / marker).exists():
            return p
    raise FileNotFoundError(marker)


ROOT = find_root()
LABELS_PATH = ROOT / "data/labels.jsonl"

rows = [json.loads(line) for line in LABELS_PATH.read_text().splitlines() if line.strip()]
paths = [ROOT / r["input_path"] for r in rows]

### Helpers
- `make_df` wraps classifier output into a labeled DataFrame indexed by filename.
- `top_k` slices that DataFrame by label column
- and returns ranked hits as a list
of `{path, score}` dicts  the currency everything else operates on.
- `top_labels` picks the *n* highest-mass columns by summing probability across
all images. This is useful for a first-pass survey of what the model is confident about.
- `print_hits` and `plot_hits` produce table and gallery of images and score

In [None]:
SEED = 42
TEST_SIZE = 0.2
THRESHOLD = 0.2


def make_df(probs, binarizer):
    return pd.DataFrame(probs, columns=binarizer.classes_, index=[p.name for p in paths])


def top_k(df, label, k=12):
    scores = df[label].to_numpy()
    order = np.argsort(scores)[::-1][:k]
    return [{"path": paths[i], "score": scores[i]} for i in order]


def top_labels(df, n=5):
    return df.sum().nlargest(n).index.tolist()


def print_hits(hits):
    for h in hits:
        print(f"{h['score']:.4f}  {h['path'].parent.name}/{h['path'].name}")


def plot_hits(hits, columns=4, label=""):
    rows_ = (len(hits) + columns - 1) // columns
    fig, axes = plt.subplots(rows_, columns, figsize=(3 * columns, 3 * rows_))
    axes = np.array(axes).reshape(-1)
    for ax, hit in zip(axes, hits):
        ax.imshow(Image.open(hit["path"]))
        ax.set_title(f"{hit['score']:.3f}\n{hit['path'].name}")
        ax.axis("off")
    for ax in axes[len(hits) :]:
        ax.axis("off")
    if label:
        fig.suptitle(label)
    plt.tight_layout()
    plt.show()


def split_indices(n):
    idx = np.random.RandomState(SEED).permutation(n)
    cut = int(n * (1 - TEST_SIZE))
    return idx[:cut], idx[cut:]


def ensure_label_coverage(targets, train_idx, test_idx):
    train_set, test_set = set(train_idx.tolist()), set(test_idx.tolist())
    for col in range(targets.shape[1]):
        if targets[list(train_set), col].sum() > 0:
            continue
        candidates = [i for i in test_set if targets[i, col] == 1]
        if candidates:
            test_set.remove(candidates[0])
            train_set.add(candidates[0])
    return np.array(sorted(train_set)), np.array(sorted(test_set))


def numeric_density(text):
    return sum(c.isdigit() for c in text) / max(len(text), 1)

### Label Distribution

In [None]:
label_counts = Counter(label for r in rows for label in r["categories"])
s = pd.Series(label_counts).sort_values()
print(f"total labels: {len(s)}")
print(f"labels with <5  examples: {(s < 5).sum()}")
print(f"labels with <10 examples: {(s < 10).sum()}")
print(f"labels with >=5 examples: {(s >= 5).sum()}")
print(f"labels with >=10 examples: {(s >= 10).sum()}")

viable = s[s >= 5].index.tolist()
print(f"\nviable labels: {viable}")

## Text model

OCR transcripts are cheap to extract and carry three distinct signal types depending on the label: 

1. lexical content (specific words and phrases)
2. numeric density (scores, metrics, timestamps), and
3. repeated UI tokens (OCR artifacts from icons, buttons, status indicators that are consistent within an app).

The corpus is first profiled per label to establish whether any of these signal types are present before choosing a vectorizer. A sweep over vectorizer configurations and classifiers on the viable label subset determines the training configuration empirically.

`char(2,4)` n-grams with logistic regression is the sweep winner on macro F1 across the expanded viable set (with >=5 examples). Macro is the target metric because it weights all labels equally regardless of frequency.

### Run OCR

this part will take some time, but only once since this method has caching

In [None]:
import sys

sys.path.insert(0, str(ROOT / "ml"))
import pytesseract
from handler import load_config, resolve_image_paths

config = load_config(ROOT / "config.yaml")
resolved = resolve_image_paths(config, series_prefix="screenshot")
all_image_paths = [r.path for r in resolved]

print(f"resolved {len(all_image_paths)} images")


def ocr_image(path: Path, psm: int = 6, oem: int = 3) -> str:
    with Image.open(path) as img:
        if img.size[0] < 10 or img.size[1] < 10:
            return ""
        return pytesseract.image_to_string(
            img.convert("RGB"), lang="eng", config=f"--psm {psm} --oem {oem}"
        )


def ocr_output_path(image_path: Path) -> Path:
    out = ROOT / "data/ocr" / image_path.parent.name / (image_path.stem + ".txt")
    out.parent.mkdir(parents=True, exist_ok=True)
    return out


def run_ocr_incremental(image_paths: list[Path]) -> dict[Path, Path]:
    mapping = {}
    for path in tqdm(image_paths, desc="ocr"):
        out = ocr_output_path(path)
        if not out.exists():
            try:
                text = ocr_image(path)
            except (OSError, Exception) as e:
                print(f"\nskipped {path.name}: {e}")
                text = ""
            out.write_text(text, encoding="utf-8")
        mapping[path] = out
    return mapping


ocr_map = run_ocr_incremental(all_image_paths)
print(f"ocr complete: {len(ocr_map)} files")

### Load OCR

In [None]:
def ocr_path(r):
    img = Path(r["input_path"])
    return ROOT / "data/ocr" / img.parent.name / (img.stem + ".txt")


text_ocr = [ocr_path(r).read_text().lower() for r in rows]

### Corpus: per-label OCR character profile

In [None]:
for label in viable:
    indices = [i for i, r in enumerate(rows) if label in r["categories"]]
    subset = [text_ocr[i] for i in indices]
    lengths = [len(t.split()) for t in subset]
    densities = [numeric_density(t) for t in subset]
    empty = sum(1 for t in subset if len(t.strip()) < 20)
    print(
        f"{label:20} n={len(indices):3}  mean_words={np.mean(lengths):6.0f}  "
        f"mean_density={np.mean(densities):.3f}  empty={empty}"
    )

### Top word tokens per viable label

In [None]:
cv = CountVectorizer(analyzer="word", min_df=1)
X_cv = cv.fit_transform(text_ocr)
vocab = np.array(cv.get_feature_names_out())

for label in viable:
    indices = [i for i, r in enumerate(rows) if label in r["categories"]]
    sums = np.asarray(X_cv[indices].sum(axis=0)).squeeze()
    top = np.argsort(sums)[::-1][:15]
    print(f"\n--- {label} ---")
    print(", ".join(f"{vocab[i]}({sums[i]:.0f})" for i in top))

### Label Separability

In [None]:
char_vec = TfidfVectorizer(analyzer="char", ngram_range=(3, 5), max_df=0.95)
word_vec = TfidfVectorizer(analyzer="word", min_df=1, max_df=0.95)
char_feat = char_vec.fit_transform(text_ocr)
word_feat = word_vec.fit_transform(text_ocr)

stats_idx = [i for i, r in enumerate(rows) if "statistics" in r["categories"]]
monitor_idx = [
    i
    for i, r in enumerate(rows)
    if "monitorat" in r["categories"] and "statistics" not in r["categories"]
]

for name, feat in [("char", char_feat), ("word", word_feat)]:
    sim = cosine_similarity(feat[stats_idx], feat[monitor_idx])
    print(f"{name}  statistics vs monitorat: mean={sim.mean():.3f} max={sim.max():.3f}")

#### vectorizer + classifier sweep

the purpose here is to determine the best model for this dataset

In [None]:
viable_set = set(viable)
viable_mask = np.array([any(label in viable_set for label in r["categories"]) for r in rows])
viable_idx = np.where(viable_mask)[0]

sub_rows = [rows[i] for i in viable_idx]
sub_ocr = [text_ocr[i] for i in viable_idx]
sub_binarizer = MultiLabelBinarizer(classes=viable)
sub_targets = sub_binarizer.fit_transform([r["categories"] for r in sub_rows])
sub_density = np.array([numeric_density(t) for t in sub_ocr]).reshape(-1, 1)

tr, te = ensure_label_coverage(sub_targets, *split_indices(len(sub_rows)))

vectorizers = {
    "char(3,5)": TfidfVectorizer(analyzer="char", ngram_range=(3, 5), max_df=0.95),
    "char(2,4)": TfidfVectorizer(analyzer="char", ngram_range=(2, 4), max_df=0.95),
    "word": TfidfVectorizer(analyzer="word", min_df=1, max_df=0.95),
    "word+bi": TfidfVectorizer(analyzer="word", ngram_range=(1, 2), min_df=1, max_df=0.95),
}

classifiers = {
    "logreg": OneVsRestClassifier(LogisticRegression(max_iter=1000, solver="liblinear")),
    "svc": OneVsRestClassifier(LinearSVC(max_iter=2000)),
}

for vec_name, vec in vectorizers.items():
    feat = vec.fit_transform(sub_ocr)
    feat_den = np.hstack([feat.toarray(), sub_density])
    for clf_name, clf in classifiers.items():
        for feat_label, f in [(vec_name, feat), (f"{vec_name}+density", feat_den)]:
            clf.fit(f[tr], sub_targets[tr])
            raw = (
                clf.predict_proba(f[te])
                if hasattr(clf, "predict_proba")
                else (clf.decision_function(f[te]) >= 0).astype(int)
            )
            preds = (raw >= THRESHOLD).astype(int) if hasattr(clf, "predict_proba") else raw
            micro = f1_score(sub_targets[te], preds, average="micro", zero_division=0)
            macro = f1_score(sub_targets[te], preds, average="macro", zero_division=0)
            print(f"{clf_name:8} {feat_label:30} micro={micro:.3f} macro={macro:.3f}")

#### Score 

Scores and saves the winning vec/clf model after the sweep winner is chosen

In [None]:
text_binarizer = MultiLabelBinarizer()
text_targets = text_binarizer.fit_transform([r["categories"] for r in rows])

text_vec = TfidfVectorizer(analyzer="char", ngram_range=(2, 4), max_df=0.95)
text_features = text_vec.fit_transform(text_ocr)

tr, te = ensure_label_coverage(text_targets, *split_indices(len(rows)))
text_clf = OneVsRestClassifier(LogisticRegression(max_iter=1000, solver="liblinear"))
text_clf.fit(text_features[tr], text_targets[tr])

text_probs = text_clf.predict_proba(text_features)
text_df = make_df(text_probs, text_binarizer)

joblib.dump(
    {"vectorizer": text_vec, "classifier": text_clf, "binarizer": text_binarizer},
    ROOT / "data/models/text.joblib",
)

### Survey

In [None]:
# text_df[top_labels(text_df)]
text_df[viable]

In [None]:
for label in text_df[top_labels(text_df)]:
    plot_hits(top_k(text_df, label, k=8), label=label)

## Image model

### ResNet18 as a visual feature extractor

ResNet was pretrained on natural photographs (ImageNet) and produces embeddings that conflate all screenshots as a generic class--cosine similarity between visually distinct labels was 0.56–0.70, indicating poor separation.

Therefore, this model has been removed from the notebook.

### CLIP as the image processor

CLIP,  `openai/clip-vit-base-patch32`, was pretrained contrastively on web-scraped image-text pairs, which includes UI, screenshots, and documents alongside natural images. The same label pairs that ResNet scored 0.56–0.70 similarity score 0.24–0.45 under CLIP--a meaningful improvement in discriminability across the board.

`pooler_output` from the vision encoder is used as the 768-dim embedding.

- fixed resolution
- no variable patching
- CPU-friendly at this corpus size (I only have Intel silicon)

In [None]:
# clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
# clip_model     = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").vision_model

clip_processor = CLIPProcessor.from_pretrained(
    "openai/clip-vit-base-patch32",
    use_fast=False,
    local_files_only=True,
)
clip_model = CLIPModel.from_pretrained(
    "openai/clip-vit-base-patch32",
    local_files_only=True,
).vision_model
clip_model.eval()


def embed_images_clip(image_paths, min_size=10):
    vectors = []
    skipped = []
    for path in tqdm(image_paths, desc="embedding"):
        with Image.open(path) as img:
            if img.size[0] < min_size or img.size[1] < min_size:
                skipped.append(path)
                vectors.append(np.zeros(768))
                continue
            inputs = clip_processor(images=img.convert("RGB"), return_tensors="pt")
        with torch.no_grad():
            outputs = clip_model(**inputs)
            vec = outputs.pooler_output.squeeze(0).cpu().numpy()
        vectors.append(vec)
    if skipped:
        print(f"skipped {len(skipped)} degenerate images:")
        for p in skipped:
            print(f"  {p.parent.name}/{p.name}")
    return np.vstack(vectors)

### Features

In [None]:
image_binarizer = MultiLabelBinarizer()
image_targets = image_binarizer.fit_transform([r["categories"] for r in rows])
clip_embeddings = embed_images_clip(paths)

print(
    f"samples: {len(rows)}  labels: {len(image_binarizer.classes_)}  "
    f"dims: {clip_embeddings.shape[1]}"
)

### Training

In [None]:
tr, te = ensure_label_coverage(image_targets, *split_indices(len(rows)))

image_clf = OneVsRestClassifier(LogisticRegression(max_iter=1000, solver="liblinear"))
image_clf.fit(clip_embeddings[tr], image_targets[tr])

image_probs_test = image_clf.predict_proba(clip_embeddings[te])
image_preds_test = (image_probs_test >= THRESHOLD).astype(int)

print("micro:", f1_score(image_targets[te], image_preds_test, average="micro", zero_division=0))
print("macro:", f1_score(image_targets[te], image_preds_test, average="macro", zero_division=0))
print(
    classification_report(
        image_targets[te], image_preds_test, target_names=image_binarizer.classes_, zero_division=0
    )
)

### Score and save

In [None]:
image_probs = image_clf.predict_proba(clip_embeddings)
image_df = make_df(image_probs, image_binarizer)

joblib.dump(
    {"classifier": image_clf, "binarizer": image_binarizer}, ROOT / "data/models/image.joblib"
)

In [None]:
image_df[top_labels(image_df)].head(5)

In [None]:
# for label in ["hockey", "browser", "food"]:
for label in top_labels(image_df):
    plot_hits(top_k(image_df, label, k=8), label=label)

## Synthesis

### side-by-side per-label F1 on viable set
re-scope both models to the same viable split for a fair comparison

In [None]:
syn_binarizer = MultiLabelBinarizer(classes=viable)
syn_targets = syn_binarizer.fit_transform([r["categories"] for r in sub_rows])
tr, te = ensure_label_coverage(syn_targets, *split_indices(len(sub_rows)))

### text winner config

In [None]:
syn_text_vec = TfidfVectorizer(analyzer="char", ngram_range=(2, 4), max_df=0.95)
syn_text_feat = syn_text_vec.fit_transform(sub_ocr)
syn_text_clf = OneVsRestClassifier(LogisticRegression(max_iter=1000, solver="liblinear"))
syn_text_clf.fit(syn_text_feat[tr], syn_targets[tr])
syn_text_preds = (syn_text_clf.predict_proba(syn_text_feat[te]) >= THRESHOLD).astype(int)

### image clip embeddings scoped to viable subset

In [None]:
sub_clip = clip_embeddings[viable_idx]
syn_image_clf = OneVsRestClassifier(LogisticRegression(max_iter=1000, solver="liblinear"))
syn_image_clf.fit(sub_clip[tr], syn_targets[tr])
syn_image_preds = (syn_image_clf.predict_proba(sub_clip[te]) >= THRESHOLD).astype(int)

### per-label F1 for each

In [None]:
from sklearn.metrics import f1_score

text_f1s = f1_score(syn_targets[te], syn_text_preds, average=None, zero_division=0)
image_f1s = f1_score(syn_targets[te], syn_image_preds, average=None, zero_division=0)
support = syn_targets[te].sum(axis=0)

cmp = pd.DataFrame(
    {
        "text_f1": text_f1s,
        "image_f1": image_f1s,
        "delta": image_f1s - text_f1s,
        "support": support.astype(int),
    },
    index=viable,
).sort_values("delta", ascending=False)

text_micro = f1_score(syn_targets[te], syn_text_preds, average="micro", zero_division=0)
text_macro = f1_score(syn_targets[te], syn_text_preds, average="macro", zero_division=0)
image_micro = f1_score(syn_targets[te], syn_image_preds, average="micro", zero_division=0)
image_macro = f1_score(syn_targets[te], syn_image_preds, average="macro", zero_division=0)

print(cmp.to_string(float_format="{:.3f}".format))
print(f"\ntext  micro={text_micro:.3f}  macro={text_macro:.3f}")
print(f"image micro={image_micro:.3f}  macro={image_macro:.3f}")

## Clusters via Unsupervised CLIP Clustering 

### Performance

this will estimate how long it will take to process all images with CLIP

In [None]:
def probe_embed_rate(all_image_paths, embed_fn, n=50):
    probe = all_image_paths[:n]
    t0 = time.perf_counter()
    _ = embed_fn(probe)
    elapsed = time.perf_counter() - t0
    rate = n / elapsed
    eta = len(all_image_paths) / rate
    print(f"{n} images in {elapsed:.1f}s - {rate:.1f} it/s")
    print(f"estimated full run: {eta / 60:.1f} min ({len(all_image_paths)} images)")


# probe_embed_rate(all_image_paths, embed_images_clip)

### Compute and Save all embeddings

In [None]:
EMBEDDINGS_PATH = ROOT / "data/embeddings/clip_full.pkl"
EMBEDDINGS_PATH.parent.mkdir(parents=True, exist_ok=True)

if EMBEDDINGS_PATH.exists():
    all_clip_embeddings = joblib.load(EMBEDDINGS_PATH)
    print(f"loaded from cache: {all_clip_embeddings.shape}")
else:
    all_clip_embeddings = embed_images_clip(all_image_paths)
    joblib.dump(all_clip_embeddings, EMBEDDINGS_PATH)
    print(f"computed and saved: {all_clip_embeddings.shape}")

#### UMAP Dimensionality Reduction

Reduce high-dimensional CLIP embeddings into a 2D representation using UMAP for visualization. The algorithm preserves local neighborhood structure based on cosine similarity, with `n_neighbors=15` controlling locality and `min_dist=0.1` allowing relatively compact clusters. The resulting embedding (`embedding2d`) contains one 2D point per image.

In [None]:
import umap

reducer = umap.UMAP(
    n_components=2,
    n_neighbors=15,
    min_dist=0.1,
    metric="cosine",
    random_state=SEED,
)
embedding2d = reducer.fit_transform(all_clip_embeddings)
print(f"reduced: {embedding2d.shape}")

#### Density-Based Clustering with HDBSCAN

Apply HDBSCAN to the 2D UMAP embedding to identify clusters based on point density. The algorithm groups regions of sufficient density while labeling sparse regions as noise (`-1`).

`min_cluster_size=10` defines the smallest allowable cluster, and `min_samples=5` controls how conservatively dense regions are defined. The output `cluster_labels` assigns each point either a cluster ID or noise. The summary reports the number of detected clusters, the number of noise points, and the total samples.

In [None]:
import hdbscan

clusterer = hdbscan.HDBSCAN(min_cluster_size=10, min_samples=5, metric="euclidean")
cluster_labels = clusterer.fit_predict(embedding2d)

n_clusters = len(set(cluster_labels)) - (1 if -1 in cluster_labels else 0)
n_noise = (cluster_labels == -1).sum()
print(f"clusters: {n_clusters}  noise points: {n_noise}  total: {len(cluster_labels)}")

### Cluster Visualization in UMAP Space

Visualize the 2D UMAP embedding with points colored according to HDBSCAN cluster assignments. Each color corresponds to a density-based cluster detected in the embedding space, while noise points (`-1`) are shown in light grey.

The legend reports cluster IDs and their sample counts. The title reflects the total number of images and the number of clusters identified by HDBSCAN (excluding noise).

In [None]:
fig, ax = plt.subplots(figsize=(14, 12))

unique_clusters = sorted(set(cluster_labels))
cmap = plt.cm.get_cmap("tab20", len(unique_clusters))

for i, cid in enumerate(unique_clusters):
    mask = cluster_labels == cid
    label = f"noise ({mask.sum()})" if cid == -1 else f"cluster {cid} ({mask.sum()})"
    color = "lightgrey" if cid == -1 else cmap(i)
    ax.scatter(embedding2d[mask, 0], embedding2d[mask, 1], s=6, color=color, label=label, alpha=0.6)

ax.legend(bbox_to_anchor=(1.01, 1), loc="upper left", fontsize=7, markerscale=2)
ax.set_title(f"UMAP - {len(all_image_paths)} images, {n_clusters} clusters")
plt.tight_layout()
plt.show()

####  inspect a cluster

In [None]:
def inspect_cluster(cid, k=12):
    indices = [i for i, c in enumerate(cluster_labels) if c == cid]
    sample = np.random.RandomState(SEED).choice(indices, size=min(k, len(indices)), replace=False)
    hits = [{"path": all_image_paths[i], "score": clusterer.probabilities_[i]} for i in sample]
    plot_hits(hits, columns=4, label=f"cluster {cid} - n={len(indices)}")


# inspect_cluster(6)

#### gallery sweep, sample from every cluster

In [None]:
def cluster_summary(k=8, columns=4, skip_noise=True):
    unique = sorted(c for c in set(cluster_labels) if not (skip_noise and c == -1))
    for cid in unique:
        indices = [i for i, c in enumerate(cluster_labels) if c == cid]
        sample = np.random.RandomState(SEED).choice(
            indices, size=min(k, len(indices)), replace=False
        )
        hits = [{"path": all_image_paths[i], "score": clusterer.probabilities_[i]} for i in sample]
        plot_hits(hits, columns=columns, label=f"cluster {cid} — n={len(indices)}")


cluster_summary()

## Cluster OCR vocabulary vs label OCR vocabulary via Jaccard Similarity

### map each image path to its cluster id

In [None]:
path_to_cluster = {all_image_paths[i]: int(cluster_labels[i]) for i in range(len(all_image_paths))}

### token sets per cluster: union of all OCR words across member images

In [None]:
def tokenize(text):
    return set(text.lower().split())


cluster_vocab = {}
for i, path in enumerate(all_image_paths):
    cid = cluster_labels[i]
    txt_path = ROOT / "data/ocr" / path.parent.name / (path.stem + ".txt")
    if not txt_path.exists():
        continue
    tokens = tokenize(txt_path.read_text())
    cluster_vocab.setdefault(cid, set()).update(tokens)

### token sets per label: union of OCR words across labeled images

In [None]:
label_vocab = {}
for r, ocr in zip(rows, text_ocr):
    tokens = tokenize(ocr)
    for label in r["categories"]:
        label_vocab.setdefault(label, set()).update(tokens)

### Jaccard similarity: cluster vs label

In [None]:
def jaccard(a, b):
    if not a or not b:
        return 0.0
    return len(a & b) / len(a | b)


unique_clusters = sorted(c for c in set(cluster_labels) if c != -1)
labels_sorted = sorted(label_vocab)

jac = pd.DataFrame(
    [
        [
            jaccard(cluster_vocab.get(cid, set()), label_vocab.get(label, set()))
            for label in labels_sorted
        ]
        for cid in unique_clusters
    ],
    index=[f"c{cid}" for cid in unique_clusters],
    columns=labels_sorted,
)

### top 5 label matches per cluster

In [None]:
def print_cluster_matches(jac, cluster_labels, unique_clusters, top_n=5):
    print(f"{'cluster':>8}  {'n':>5}  top label matches")
    print("-" * 80)
    for cid in unique_clusters:
        n = (cluster_labels == cid).sum()
        row = jac.loc[f"c{cid}"]
        top = row.nlargest(top_n)
        desc = "  |  ".join(f"{label} ({score:.3f})" for label, score in top.items())
        print(f"c{cid:>6}  {n:>5}  {desc}")


# print_cluster_matches(jac, cluster_labels, unique_clusters)

### column filter: drop labels no cluster scores highly

In [None]:
THRESHOLD = 0.10
VMAX_PCTILE = 99  # prevents outlier bleaching

active_cols = jac.columns[jac.max(axis=0) >= THRESHOLD]
J = jac[active_cols].copy()

cluster_sizes = {cid: int((cluster_labels == cid).sum()) for cid in unique_clusters}
dominant = J.idxmax(axis=1)
J.index = [
    f"c{i.lstrip('c'):>2} n={cluster_sizes[int(i.lstrip('c'))]:>3} [{dominant[i]}]" for i in J.index
]

vmax = np.percentile(J.values, VMAX_PCTILE)

cg = sns.clustermap(
    J,
    method="ward",
    metric="euclidean",
    cmap="YlOrRd",
    vmin=0,
    vmax=vmax,
    figsize=(max(14, len(active_cols) * 0.45), max(12, len(J) * 0.22)),
    linewidths=0.2,
    linecolor="#e0e0e0",
    xticklabels=True,
    yticklabels=True,
    dendrogram_ratio=(0.10, 0.06),
    cbar_pos=(0.01, 0.84, 0.02, 0.12),
    cbar_kws={"label": f"Jaccard (capped p{VMAX_PCTILE})"},
)
plt.setp(cg.ax_heatmap.get_xticklabels(), rotation=45, ha="right", fontsize=8)
plt.setp(cg.ax_heatmap.get_yticklabels(), fontsize=7.5)
cg.ax_heatmap.set_xlabel("Label", fontsize=10)
cg.ax_heatmap.set_ylabel("")
cg.ax_col_dendrogram.set_title(
    f"Cluster × Label — Ward/Euclidean clustermap  (threshold={THRESHOLD})", fontsize=11
)
plt.show()

In [None]:
stopwatch(0)