In [16]:
from pathlib import Path
import json
import pandas as pd
from sklearn.model_selection import train_test_split, KFold, cross_val_score
import re
import numpy as np
from collections import Counter

from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, MultiLabelBinarizer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.multiclass import OneVsRestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.metrics import f1_score, make_scorer

import joblib
from datetime import datetime

In [2]:
REPO_URL = "https://github.com/eugeniavd/magic_tagger.git"  # <-- EDIT if needed
!git clone {REPO_URL}


fatal: destination path 'magic_tagger' already exists and is not an empty directory.


In [3]:
PROJECT_ROOT = Path("/content/magic_tagger")

csv_path = PROJECT_ROOT / "data" / "processed" / "classify_data_normalized.csv"

# --- load ---
df = pd.read_csv(csv_path, encoding="utf-8")
print("Loaded:", csv_path)
print("Shape:", df.shape)
display(df.head(5))

Loaded: /content/magic_tagger/data/processed/classify_data_normalized.csv
Shape: (50, 14)


Unnamed: 0,tale_id,rights_status,content_description,set,sampling_version,type_count,collection,volume_no,source_ref,atu_labels_json,txt_path,text_raw,summary_norm,text_norm
0,era_vene_1_503_1,open,[Царевна-лягушка].,core,v1_20251230,3,"ERA, Vene",1,"ERA, Vene 1, 503/4 (1)","[""402""]",/Users/eugenia/Desktop/thesis/magic_tagger/dat...,Тили были царь с царицей у не\nбыло три сына. ...,царевна-лягушка.,тили были царь с царицей у не было три сына. ц...
1,era_vene_1_515_1,open,"[По пьяни мужик спорит, что сможет принести но...",coverage,v1_20251230,1,"ERA, Vene",1,"ERA, Vene 1, 515/6 (1)","[""410""]",/Users/eugenia/Desktop/thesis/magic_tagger/dat...,"Раз пяное, ребятище» подился.\nчто можит в 12 ...","по пьяни мужик спорит, что сможет принести ноч...","раз пяное, ребятище» подился. что можит в 12 ч..."
2,era_vene_12_105_22,open,Снегурочка.,core,v1_20251230,3,"ERA, Vene",12,"ERA, Vene 12, 105 (22)","[""703*""]",/Users/eugenia/Desktop/thesis/magic_tagger/dat...,Сделали дети со снегу куклу.\nВ одного старина...,снегурочка.,сделали дети со снегу куклу. в одного старина ...
3,era_vene_12_137_98,open,Иван-дурак.,core,v1_20251230,4,"ERA, Vene",12,"ERA, Vene 12, 137/41 (98)","[""530""]",/Users/eugenia/Desktop/thesis/magic_tagger/dat...,"Кил-был стажк. В яво бло\nтра сегна. Миша, Гри...",иван-дурак.,"кил-был стажк. в яво бло тра сегна. миша, гриш..."
4,era_vene_12_189_1,open,Два брата.,core,v1_20251230,2,"ERA, Vene",12,"ERA, Vene 12, 189/94 (1)","[""735A""]",/Users/eugenia/Desktop/thesis/magic_tagger/dat...,Жили – брели два брата.\nи посла смерти отца о...,два брата.,жили — брели два брата. и посла смерти отца об...


In [4]:
col = "atu_labels_json"

def parse_labels(x):
    if pd.isna(x):
        return []
    s = str(x).strip()
    if not s:
        return []
    try:
        v = json.loads(s)
        if isinstance(v, list):
            return [str(t).strip() for t in v if str(t).strip()]

        return [str(v).strip()]
    except Exception:

        return [t.strip() for t in s.split(",") if t.strip()]

df["labels"] = df[col].apply(parse_labels)

unique_labels = sorted({lab for labs in df["labels"] for lab in labs})
print("Unique labels:", len(unique_labels))
print("Example:", unique_labels[:20])


Unique labels: 37
Example: ['1000', '1060', '1168', '1174', '300', '300A', '301', '302C*', '302С*', '307', '313', '325', '327A', '331', '402', '410', '425C', '470', '480A', '480D*']


In [5]:
label_counts = pd.Series([lab for labs in df["labels"] for lab in labs]).value_counts()
display(label_counts)

Unnamed: 0,count
707,6
480D*,5
530,3
703*,3
402,3
552,3
480A,3
650A,3
307,3
301,2


In [6]:
DROP_COLS = [
    "rights_status",
    "content_description",
    "sampling_version",
    "type_count",
    "collection",
    "volume_no",
    "source_ref",
    "atu_labels_json",
    "txt_path",
    "text_raw",
    "set"
]

df = df.drop(columns=[c for c in DROP_COLS if c in df.columns]).copy()
df.head(5)


Unnamed: 0,tale_id,summary_norm,text_norm,labels
0,era_vene_1_503_1,царевна-лягушка.,тили были царь с царицей у не было три сына. ц...,[402]
1,era_vene_1_515_1,"по пьяни мужик спорит, что сможет принести ноч...","раз пяное, ребятище» подился. что можит в 12 ч...",[410]
2,era_vene_12_105_22,снегурочка.,сделали дети со снегу куклу. в одного старина ...,[703*]
3,era_vene_12_137_98,иван-дурак.,"кил-был стажк. в яво бло тра сегна. миша, гриш...",[530]
4,era_vene_12_189_1,два брата.,жили — брели два брата. и посла смерти отца об...,[735A]


### Multi-label encoding and parent-level labels for evaluation

Because each tale in our corpus can be assigned to **one or more ATU types**, we treat ATU prediction as a **multi-label classification** task. We first convert the human-assigned label sets into a machine-learning friendly representation using `MultiLabelBinarizer`. This step builds a fixed label vocabulary from the training data and transforms each tale’s label list into a **multi-hot binary vector**. This representation is required by standard multi-label classifiers (e.g., One-vs-Rest logistic regression) and ensures a reproducible mapping between labels and output dimensions.

In addition to the original ATU labels, we derive a **parent-level label set** for evaluation. ATU types frequently include suffixes or modifiers (e.g., `327A`, `480D*`), while the **leading numeric component** (e.g., `327`, `480`) captures a higher-level category that is often more stable under noisy HTR conditions and small-data regimes. We therefore extract the first 1–4 digits from each ATU label via a simple regular expression and assign the resulting parent codes as `labels_parent`. This enables evaluation at a coarser granularity (e.g., Parent-Hit@3), which better reflects the intended use of the system as a **decision-support tool**: even when the model fails to predict the exact subtype, correctly retrieving the parent class can still provide a meaningful shortlist for expert review.


In [7]:

# -------------------------
# 1) ATU parent extraction
# -------------------------
RE_ATU_PARENT = re.compile(r"(\d{1,4})")

def atu_parent(label: str) -> str:
    """
    Extract the leading numeric ATU parent code.
    Examples:
      "530" -> "530"
      "327A" -> "327"
      "ATU_480D*" -> "480"
      "ATU 301" -> "301"
    """
    if label is None:
        return ""
    s = str(label).strip()
    if not s:
        return ""
    m = RE_ATU_PARENT.search(s)
    return m.group(1) if m else ""


# -------------------------
# 2) Robust label utilities
# -------------------------
def _is_nan(x) -> bool:
    return isinstance(x, float) and pd.isna(x)

def to_parent_set(labels) -> list[str]:
    """
    Convert a list of ATU labels to a sorted list of unique parent codes.
    Robust to:
      - labels=None / labels=NaN
      - NaN elements inside the list
      - empty/whitespace strings
    """
    if labels is None or _is_nan(labels):
        return []

    out = set()
    for x in labels:
        if x is None or _is_nan(x):
            continue
        s = str(x).strip()
        if not s:
            continue
        p = atu_parent(s)
        if p:
            out.add(p)

    return sorted(out)

def ensure_labels_parent(
    df: pd.DataFrame,
    labels_col: str = "labels",
    out_col: str = "labels_parent"
) -> pd.DataFrame:
    """
    Add df[out_col] as parent codes derived from df[labels_col] (list[str]).
    """
    df = df.copy()
    df[out_col] = df[labels_col].apply(to_parent_set)
    return df

def clean_label_list(labels) -> list[str]:
    if labels is None or _is_nan(labels):
        return []
    if isinstance(labels, str):
        s = labels.strip()
        return [s] if s else []
    out = []
    for x in labels:
        if x is None or _is_nan(x):
            continue
        s = str(x).strip()
        if s:
            out.append(s)
    seen = set()
    dedup = []
    for s in out:
        if s not in seen:
            seen.add(s)
            dedup.append(s)
    return dedup



# ---------------------------------------
# 3) Parent-Hit@k from proba (any match)
# ---------------------------------------
def parent_hit_at_k_from_proba(
    y_true_parent_lists: list[list[str]],
    proba: np.ndarray,
    classes: np.ndarray,
    k: int = 3
) -> float:
    """
    Parent-Hit@k (any match):
    success if at least one gold parent code appears among parent codes
    of the model's top-k predicted labels.
    """
    if k <= 0:
        raise ValueError("k must be >= 1")
    if proba.shape[0] != len(y_true_parent_lists):
        raise ValueError("n_samples mismatch between y_true_parent_lists and proba")
    if proba.shape[1] != len(classes):
        raise ValueError("proba columns != classes length (alignment issue)")

    classes_parent = np.array([atu_parent(c) for c in classes])
    topk_idx = np.argsort(-proba, axis=1)[:, :k]

    hits = []
    for i, gold_parents in enumerate(y_true_parent_lists):
        gold_set = set(gold_parents or [])
        pred_parent_set = set(classes_parent[topk_idx[i]])
        pred_parent_set.discard("")  # defensive
        hits.append(1 if (gold_set & pred_parent_set) else 0)

    return float(np.mean(hits))


# -----------------------------------
# 4) Exact-Hit@k from proba (any match)
# -----------------------------------
def exact_hit_at_k_from_proba(
    y_true_labels_lists: list[list[str]],
    proba: np.ndarray,
    classes: np.ndarray,
    k: int = 3
) -> float:
    """
    Exact-Hit@k (any match):
    success if at least one exact gold label appears among the model's top-k labels.
    """
    if k <= 0:
        raise ValueError("k must be >= 1")
    if proba.shape[0] != len(y_true_labels_lists):
        raise ValueError("n_samples mismatch between y_true_labels_lists and proba")
    if proba.shape[1] != len(classes):
        raise ValueError("proba columns != classes length (alignment issue)")

    topk_idx = np.argsort(-proba, axis=1)[:, :k]

    hits = []
    for i, gold_labels in enumerate(y_true_labels_lists):
        gold_set = set(clean_label_list(gold_labels))
        pred_set = set(classes[topk_idx[i]])
        hits.append(1 if (gold_set & pred_set) else 0)

    return float(np.mean(hits))


# -------------------------------------------------------
# 5) Weighted@k: exact=1.0, parent-only=parent_weight else 0
# -------------------------------------------------------
def weighted_hit_at_k_from_proba(
    y_true_labels_lists: list[list[str]],
    y_true_parent_lists: list[list[str]],
    proba: np.ndarray,
    classes: np.ndarray,
    k: int = 3,
    parent_weight: float = 0.5
) -> float:
    """
    Weighted@k:
      - 1.0 if exact-hit@k
      - parent_weight if exact miss but parent-hit@k
      - 0.0 otherwise
    """
    if k <= 0:
        raise ValueError("k must be >= 1")
    if not (0.0 <= parent_weight <= 1.0):
        raise ValueError("parent_weight must be in [0, 1]")
    if proba.shape[0] != len(y_true_labels_lists) or proba.shape[0] != len(y_true_parent_lists):
        raise ValueError("n_samples mismatch between y_true lists and proba")
    if proba.shape[1] != len(classes):
        raise ValueError("proba columns != classes length (alignment issue)")

    classes_parent = np.array([atu_parent(c) for c in classes])
    topk_idx = np.argsort(-proba, axis=1)[:, :k]

    scores = []
    for i in range(proba.shape[0]):
        gold_labels = set(clean_label_list(y_true_labels_lists[i]))
        gold_parents = set(y_true_parent_lists[i] or [])

        pred_labels = set(classes[topk_idx[i]])
        pred_parents = set(classes_parent[topk_idx[i]])
        pred_parents.discard("")

        if gold_labels & pred_labels:
            scores.append(1.0)
        elif gold_parents & pred_parents:
            scores.append(float(parent_weight))
        else:
            scores.append(0.0)

    return float(np.mean(scores))


# -----------------------------------------
# 6) Model wrappers (assumes predict_proba)
# -----------------------------------------
def _get_model_scores(model, X_df: pd.DataFrame) -> np.ndarray:
    if hasattr(model, "predict_proba"):
        return model.predict_proba(X_df)
    if hasattr(model, "decision_function"):
        scores = model.decision_function(X_df)
        if scores.ndim == 1:
            scores = scores.reshape(-1, 1)
        return scores
    raise AttributeError("Model must implement predict_proba or decision_function")


def parent_hit_at_k_model(
    model,
    X_df: pd.DataFrame,
    y_true_parent_lists: list[list[str]],
    mlb,
    k: int = 3
) -> float:
    scores = _get_model_scores(model, X_df)
    classes = np.asarray(mlb.classes_)
    return parent_hit_at_k_from_proba(y_true_parent_lists, scores, classes, k=k)

def exact_hit_at_k_model(
    model,
    X_df: pd.DataFrame,
    y_true_labels_lists: list[list[str]],
    mlb,
    k: int = 3
) -> float:
    scores = _get_model_scores(model, X_df)
    classes = np.asarray(mlb.classes_)
    return exact_hit_at_k_from_proba(y_true_labels_lists, scores, classes, k=k)

def weighted_hit_at_k_model(
    model,
    X_df: pd.DataFrame,
    y_true_labels_lists: list[list[str]],
    y_true_parent_lists: list[list[str]],
    mlb,
    k: int = 3,
    parent_weight: float = 0.5
) -> float:
    scores = _get_model_scores(model, X_df)
    classes = np.asarray(mlb.classes_)
    return weighted_hit_at_k_from_proba(
        y_true_labels_lists,
        y_true_parent_lists,
        scores,
        classes,
        k=k,
        parent_weight=parent_weight
    )


# -----------------------------------------
# 7) Recommended safe X construction example
# -----------------------------------------
def build_X_text_only(df: pd.DataFrame, text_cols=("summary_norm", "text_norm")) -> pd.DataFrame:
    """
    SAFE helper: return X_df with ONLY the columns intended for vectorization.
    This prevents accidental leakage via tale_id or metadata columns.
    """
    cols = [c for c in text_cols if c in df.columns]
    if not cols:
        raise ValueError(f"None of text_cols {text_cols} found in df columns")
    return df[cols].copy()

### Stratified train/test split for multi-label data using parent ATU codes

To evaluate the classifier on a held-out set while preserving label coverage, we implement a **custom stratified split** tailored to **multi-label** data. Standard stratification methods assume a single label per instance and do not directly support the setting where each tale can have **multiple ATU types**. This is especially problematic in our small corpus, where many labels are rare: a naive random split can easily place the only example of a label in the test set, leaving the model with **zero training examples** for that label.

Our function `stratified_multilabel_split_by_parent()` performs an approximate stratification over `labels_parent` (coarser ATU parent codes), with the goal of constructing a test subset of size `test_size` while ensuring that the training set retains minimal coverage for the labels present.

The procedure is as follows:

1. **Target test size.** We compute the desired number of test documents (`n_test = round(n * test_size)`).

2. **Label availability tracking.** We count how many documents contain each parent label (`all_counts`) and keep a mutable counter `remaining` to track how many examples of each label would remain in the training pool if we move documents into the test set.

3. **Safety constraint.** A document is considered *safe* to move into the test set if doing so does not exhaust any of its labels in the remaining pool:
   - `is_safe(i)` returns `True` only if for every label in document `i`, `remaining[label] ≥ 2`.
   This conservative rule ensures that after selecting the test items, each label included in a moved document still has at least one example left for training (and avoids dropping a label entirely from the training set).

4. **Coverage-oriented greedy selection.** We iteratively build the test set using a greedy criterion:
   - `gain(i)` measures how many **new** parent labels a candidate document would contribute to the current test set (labels not yet covered in `covered_test`).
   At each step we choose, among safe candidates, a document with maximal gain (ties are broken randomly). This increases the diversity of labels represented in the test split without violating the safety constraint.

5. **Fallback filling.** If we cannot reach the desired test size using the greedy coverage criterion (because the safety constraint becomes too restrictive), we fill the remaining slots by randomly selecting from the remaining safe documents.

Finally, we return two dataframes: `train_df` and `test_df`. This split is designed to be **more stable and fair** than a naive random split for small multi-label datasets, because it reduces the risk of creating “unseen-in-training” labels and improves the interpretability of downstream evaluation (e.g., Parent-Hit@3).


In [8]:
# --- 1) Fit MLB safely (labels must be list[str] per row) ---
# If you already have ensure_list() from earlier, use it here.
# Otherwise this minimal guard will prevent common issues with NaN / empty / stringified lists.
def _is_nan(x) -> bool:
    return isinstance(x, float) and pd.isna(x)

def clean_label_list(labels) -> list[str]:
    if labels is None or _is_nan(labels):
        return []
    # if labels accidentally stored as a single string, wrap it
    if isinstance(labels, str):
        s = labels.strip()
        return [s] if s else []
    out = []
    for x in labels:
        if x is None or _is_nan(x):
            continue
        s = str(x).strip()
        if s:
            out.append(s)
    # unique, stable order
    seen = set()
    dedup = []
    for s in out:
        if s not in seen:
            seen.add(s)
            dedup.append(s)
    return dedup

df = df.copy()
df["labels"] = df["labels"].apply(clean_label_list)

mlb = MultiLabelBinarizer()
y = mlb.fit_transform(df["labels"])

print("Docs:", len(df))
print("Unique labels:", len(mlb.classes_))


# --- 2) Parent labels for evaluation (robust, filters NaN inside lists) ---
RE_ATU_PARENT = re.compile(r"(\d{1,4})")

def atu_parent(label: str) -> str:
    if label is None:
        return ""
    s = str(label).strip()
    if not s:
        return ""
    m = RE_ATU_PARENT.search(s)
    return m.group(1) if m else ""

def to_parent_set(labels) -> list[str]:
    if labels is None or _is_nan(labels):
        return []
    out = set()
    for x in labels:
        if x is None or _is_nan(x):
            continue
        s = str(x).strip()
        if not s:
            continue
        p = atu_parent(s)
        if p:
            out.add(p)
    return sorted(out)

df["labels_parent"] = df["labels"].apply(to_parent_set)

Docs: 50
Unique labels: 37


In [9]:
from collections import Counter
import numpy as np
import pandas as pd

def stratified_multilabel_split_by_parent(
    df: pd.DataFrame,
    label_col: str = "labels_parent",
    test_size: float = 0.2,
    random_state: int = 42,
    min_train_count_per_label: int = 1,
    require_nonempty_labels: bool = False,
):
    """
    Greedy multi-label "stratified-ish" split by parent labels.
    Goals:
      - put ~test_size of rows into test
      - maximize label coverage in test
      - avoid removing too many instances of any label from train

    Safety:
      - filters NaN/None/empty labels
      - guarantees: for any label assigned to a test doc, train keeps at least
        min_train_count_per_label instances of that label (when possible).

    Notes:
      - This is heuristic (not exact stratification).
      - If some labels are extremely rare, constraints may prevent reaching
        the exact n_test; we backfill with best-effort safe picks.
    """
    rng = np.random.RandomState(random_state)
    df = df.reset_index(drop=True).copy()

    if not (0.0 < test_size < 1.0):
        raise ValueError("test_size must be in (0, 1)")
    n = len(df)
    if n < 2:
        raise ValueError("Need at least 2 rows to split")

    # ---- normalize label lists: list[str], filter empties/NaNs ----
    def _clean_labels(x):
        if x is None or (isinstance(x, float) and pd.isna(x)):
            return []
        out = []
        for lab in x:
            if lab is None or (isinstance(lab, float) and pd.isna(lab)):
                continue
            s = str(lab).strip()
            if s:
                out.append(s)
        # unique, stable order
        seen = set()
        dedup = []
        for s in out:
            if s not in seen:
                seen.add(s)
                dedup.append(s)
        return dedup

    df[label_col] = df[label_col].apply(_clean_labels)

    if require_nonempty_labels:
        # keep only rows with at least one label (optional strict mode)
        df = df[df[label_col].map(len) > 0].reset_index(drop=True)
        n = len(df)
        if n < 2:
            raise ValueError("After filtering empty labels, not enough rows to split")

    n_test = int(round(n * test_size))
    n_test = max(1, min(n - 1, n_test))  # must leave at least 1 train row

    # label frequency in full data
    all_counts = Counter(lab for labs in df[label_col] for lab in labs)
    remaining = Counter(all_counts)  # how many of each label still available for test assignment

    test_idx = []
    covered_test = set()

    candidates = list(range(n))
    rng.shuffle(candidates)

    def is_safe(i: int) -> bool:
        labs = df.at[i, label_col]
        # If no labels, it's always safe (doesn't affect remaining counts)
        if not labs:
            return True
        # Ensure leaving at least min_train_count_per_label in train after moving this row to test.
        # That is: remaining[lab] - 1 >= min_train_count_per_label
        return all((remaining[lab] - 1) >= min_train_count_per_label for lab in labs)

    def gain(i: int) -> int:
        labs = set(df.at[i, label_col])
        return len(labs - covered_test)

    # ---- greedy pick: maximize new label coverage under safety constraint ----
    while len(test_idx) < n_test:
        safe = [i for i in candidates if (i not in test_idx and is_safe(i))]
        if not safe:
            break

        gains = np.array([gain(i) for i in safe], dtype=int)
        best_gain = gains.max()
        best = [safe[j] for j in np.where(gains == best_gain)[0]]
        chosen = int(rng.choice(best))

        test_idx.append(chosen)
        labs_chosen = df.at[chosen, label_col]
        for lab in labs_chosen:
            remaining[lab] -= 1
            covered_test.add(lab)

    # ---- backfill if needed: pick any remaining safe rows (random order) ----
    if len(test_idx) < n_test:
        safe_rest = [i for i in range(n) if (i not in test_idx and is_safe(i))]
        rng.shuffle(safe_rest)
        need = n_test - len(test_idx)
        test_idx.extend(safe_rest[:need])

    # if still short, last resort: fill randomly (may violate constraints for ultra-rare labels)
    if len(test_idx) < n_test:
        rest = [i for i in range(n) if i not in test_idx]
        rng.shuffle(rest)
        need = n_test - len(test_idx)
        test_idx.extend(rest[:need])

    test_idx = sorted(set(test_idx))
    # ensure not all docs went to test due to dedup
    if len(test_idx) >= n:
        test_idx = test_idx[: n - 1]

    train_idx = [i for i in range(n) if i not in test_idx]

    train_df = df.iloc[train_idx].reset_index(drop=True)
    test_df = df.iloc[test_idx].reset_index(drop=True)

    return train_df, test_df


# usage
train_df, test_df = stratified_multilabel_split_by_parent(
    df,
    label_col="labels_parent",
    test_size=0.2,
    random_state=42,
    min_train_count_per_label=1,   # keep at least 1 occurrence of each label in train when possible
    require_nonempty_labels=False  # set True if you want to exclude empty-labeled rows entirely
)

print("Train:", train_df.shape, "| Test:", test_df.shape)

print("Unique parents train:", len(set(l for labs in train_df["labels_parent"] for l in labs)))
print("Unique parents test:", len(set(l for labs in test_df["labels_parent"] for l in labs)))


Train: (40, 5) | Test: (10, 5)
Unique parents train: 32
Unique parents test: 11


In [10]:
train_df.head(5)

Unnamed: 0,tale_id,summary_norm,text_norm,labels,labels_parent
0,era_vene_1_503_1,царевна-лягушка.,тили были царь с царицей у не было три сына. ц...,[402],[402]
1,era_vene_1_515_1,"по пьяни мужик спорит, что сможет принести ноч...","раз пяное, ребятище» подился. что можит в 12 ч...",[410],[410]
2,era_vene_12_105_22,снегурочка.,сделали дети со снегу куклу. в одного старина ...,[703*],[703]
3,era_vene_12_137_98,иван-дурак.,"кил-был стажк. в яво бло тра сегна. миша, гриш...",[530],[530]
4,era_vene_12_189_1,два брата.,жили — брели два брата. и посла смерти отца об...,[735A],[735]


In [11]:

train_df = train_df.copy()
test_df = test_df.copy()

train_df["labels"] = train_df["labels"].apply(clean_label_list)
test_df["labels"]  = test_df["labels"].apply(clean_label_list)

TEXT_COLS = [c for c in ["summary_norm", "text_norm"] if c in train_df.columns]

X_train = train_df[TEXT_COLS].copy()
X_test  = test_df[TEXT_COLS].copy()

y_train = mlb.transform(train_df["labels"])
y_test  = mlb.transform(test_df["labels"])

print("X_train:", X_train.shape, "y_train:", y_train.shape)
print("X_test :", X_test.shape,  "y_test :", y_test.shape)


X_train: (40, 2) y_train: (40, 37)
X_test : (10, 2) y_test : (10, 37)


In [12]:
train_labels = set(l for labs in train_df["labels"] for l in labs)
test_labels  = set(l for labs in test_df["labels"] for l in labs)
unknown_in_test = sorted(test_labels - train_labels)

print("Labels only in test (will be ignored by mlb):", len(unknown_in_test))


Labels only in test (will be ignored by mlb): 0


### Feature extraction and multi-label classifier (TF-IDF + One-vs-Rest Logistic Regression)

This block defines the **final text-based baseline model** as a single scikit-learn `Pipeline` that (i) converts textual inputs into numerical features and (ii) trains a **multi-label** classifier over ATU types.

**1) Character-level TF-IDF on OCR/HTR text (`text_norm`).**  
We build a TF-IDF representation using **character n-grams (3–5)**. Character n-grams are a common and effective choice for noisy OCR/HTR corpora because they remain informative even when word boundaries or spellings are corrupted. We enable `sublinear_tf=True` (log-scaled term frequency) and cap the vocabulary with `max_features=50,000` to control dimensionality on a small dataset.

**2) Word-level TF-IDF on summaries (`summary_norm`).**  
In parallel, we build a TF-IDF representation using **word n-grams (1–2)** from the tale summary. Summaries typically contain less OCR noise and capture higher-level semantics, which complements the robustness of character n-grams. We similarly apply log-scaled TF and cap the vocabulary at `max_features=20,000`.

**3) Feature concatenation via `ColumnTransformer`.**  
The `ColumnTransformer` applies each vectorizer to its corresponding dataframe column and **concatenates** the resulting sparse vectors into a single feature space. All other dataframe columns are dropped (`remainder="drop"`), ensuring that only text-derived signals enter the model.

**4) Multi-label classification with One-vs-Rest Logistic Regression.**  
Because a tale can legitimately have **multiple ATU assignments**, we use a `OneVsRestClassifier(LogisticRegression)` scheme: a separate binary logistic regression is trained for each ATU label, producing a score per label. Logistic regression is fast, stable on sparse TF-IDF features, and provides well-behaved ranking scores for Top-k recommendation.

**5) End-to-end pipeline.**  
Finally, we wrap preprocessing and classification into a single `Pipeline` so that the same transformations are consistently applied at training and inference time. This also simplifies serialization and deployment (e.g., saving the pipeline as a single artifact for the Streamlit application).


In [13]:
# Pipeline: (ColumnTransformer: char TF-IDF on text_norm + optional word TF-IDF on summary_norm/text_norm)
#        -> OneVsRest(LogisticRegression)
#
# Notes:
# - ID columns are safe as long as you pass ONLY ["text_norm","summary_norm"] to fit/predict (see build_X_text_only below).
# - Uses sparse output from ColumnTransformer (TF-IDF) into OvR logistic regression.

from __future__ import annotations

from typing import Tuple, Optional

import numpy as np
import pandas as pd

from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.multiclass import OneVsRestClassifier
from sklearn.linear_model import LogisticRegression


def build_model(
    use_word_summary: bool = True,
    use_word_text: bool = False,
    char_analyzer: str = "char",          # "char" or "char_wb"
    char_ngram: Tuple[int, int] = (3, 5),
    word_ngram: Tuple[int, int] = (1, 2),
    char_max_features: int = 50000,
    word_max_features: int = 20000,
    C: float = 2.0,
    class_weight: Optional[str] = None,   # e.g. "balanced" if you want, usually None for multi-label
    random_state: int = 42,
) -> Pipeline:
    transformers = []

    # --- char TF-IDF on text_norm (robust to OCR noise) ---
    transformers.append((
        "char_tfidf",
        TfidfVectorizer(
            analyzer=char_analyzer,
            ngram_range=char_ngram,
            min_df=1,
            max_features=char_max_features,
            sublinear_tf=True,
            lowercase=False,   # you already normalized; keep as-is
        ),
        "text_norm"
    ))

    # --- word TF-IDF on summary_norm (if available) ---
    if use_word_summary:
        transformers.append((
            "sum_word",
            TfidfVectorizer(
                analyzer="word",
                ngram_range=word_ngram,
                min_df=1,
                max_features=word_max_features,
                sublinear_tf=True,
                lowercase=False,
            ),
            "summary_norm"
        ))

    # --- word TF-IDF on text_norm (optional; can help if summary is short/empty) ---
    if use_word_text:
        transformers.append((
            "text_word",
            TfidfVectorizer(
                analyzer="word",
                ngram_range=word_ngram,
                min_df=1,
                max_features=word_max_features,
                sublinear_tf=True,
                lowercase=False,
            ),
            "text_norm"
        ))

    preprocess = ColumnTransformer(transformers=transformers, remainder="drop", sparse_threshold=0.3)

    # OvR Logistic Regression (stable for small data)
    base_lr = LogisticRegression(
        max_iter=2000,
        solver="liblinear",
        C=C,
        class_weight=class_weight,
        random_state=random_state
    )

    clf = OneVsRestClassifier(base_lr, n_jobs=-1)

    return Pipeline([
        ("features", preprocess),
        ("clf", clf),
    ])


# SAFE helper: pass ONLY these columns to the model (prevents ID leakage)
def build_X_text_only(df: pd.DataFrame, text_cols=("summary_norm", "text_norm")) -> pd.DataFrame:
    cols = [c for c in text_cols if c in df.columns]
    if not cols:
        raise ValueError(f"None of text_cols {text_cols} found in df")
    return df[cols].copy()


# Example usage:
# model = build_model(use_word_summary=True, use_word_text=False, char_analyzer="char", char_ngram=(3,5))
# X_train = build_X_text_only(train_df, text_cols=("summary_norm","text_norm"))
# model.fit(X_train, y_train)
# proba = model.predict_proba(build_X_text_only(test_df))


In [14]:
# ============================================================
# Repeated hold-out eval for YOUR model + YOUR metric:
# Metric = Parent-Hit@k computed exactly as in your model:
#   top-k FINE labels (by model scores) -> map to PARENTS -> any-match with gold parents
#
# Uses:
# - text_norm (always) for char TF-IDF
# - summary_fallback for word TF-IDF (summary_norm if present, else fallback to text_norm[:N])
# - your parent_hit_at_k_from_proba(...) + atu_parent(...) (must be defined already)
#
# IMPORTANT:
# - X passed to model = only columns needed by ColumnTransformer (no IDs, no metadata)
# ============================================================

import numpy as np
import pandas as pd
from collections import Counter
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.multiclass import OneVsRestClassifier
from sklearn.linear_model import LogisticRegression


# -------------------------
# 0) Robust label cleaning
# -------------------------
def _is_nan(x) -> bool:
    return isinstance(x, float) and pd.isna(x)

def clean_label_list(labels) -> list[str]:
    """Ensure labels is list[str] (no NaN/None/empty)."""
    if labels is None or _is_nan(labels):
        return []
    if isinstance(labels, str):
        s = labels.strip()
        return [s] if s else []
    out = []
    for x in labels:
        if x is None or _is_nan(x):
            continue
        s = str(x).strip()
        if s:
            out.append(s)
    # unique, stable order
    seen = set()
    dedup = []
    for s in out:
        if s not in seen:
            seen.add(s)
            dedup.append(s)
    return dedup


# -----------------------------------------
# 1) Ensure summary_fallback for word TF-IDF
# -----------------------------------------
def ensure_summary_fallback(
    df_in: pd.DataFrame,
    summary_col: str = "summary_norm",
    text_col: str = "text_norm",
    out_col: str = "summary_fallback",
    n_fallback_chars: int = 800
) -> pd.DataFrame:
    """
    Creates/cleans df[out_col]:
      - if summary_col exists and is non-empty => use it
      - else fallback to text_norm[:n_fallback_chars]
    """
    df = df_in.copy()

    # Ensure text column exists as string
    if text_col not in df.columns:
        raise ValueError(f"Missing required text column: {text_col}")
    df[text_col] = df[text_col].fillna("").astype(str)

    # If summary exists, use it, else fallback from text
    if summary_col in df.columns:
        df[summary_col] = df[summary_col].fillna("").astype(str)
        df[out_col] = df[summary_col].where(
            df[summary_col].str.strip().ne(""),
            df[text_col].str.slice(0, n_fallback_chars)
        )
    else:
        df[out_col] = df[text_col].str.slice(0, n_fallback_chars)

    return df


# --------------------------------------------
# 2) Parent labels (gold) must be list[str]
# --------------------------------------------
def _clean_parent_list(x) -> list[str]:
    if x is None or _is_nan(x):
        return []
    if isinstance(x, str):
        s = x.strip()
        return [s] if s else []
    out = []
    for lab in x:
        if lab is None or _is_nan(lab):
            continue
        s = str(lab).strip()
        if s:
            out.append(s)
    # unique
    seen = set()
    dedup = []
    for s in out:
        if s not in seen:
            seen.add(s)
            dedup.append(s)
    return dedup


# ---------------------------------------------------------
# 3) Stratified-ish split by parent (safe, best-effort)
# ---------------------------------------------------------
def stratified_multilabel_split_by_parent(
    df: pd.DataFrame,
    label_col: str = "labels_parent",
    test_size: float = 0.2,
    random_state: int = 42,
    min_train_count_per_label: int = 1,
):
    rng = np.random.RandomState(random_state)
    df = df.reset_index(drop=True).copy()

    if not (0.0 < test_size < 1.0):
        raise ValueError("test_size must be in (0, 1)")

    # clean gold parent lists
    df[label_col] = df[label_col].apply(_clean_parent_list)

    n = len(df)
    n_test = int(round(n * test_size))
    n_test = max(1, min(n - 1, n_test))

    all_counts = Counter(lab for labs in df[label_col] for lab in labs)
    remaining = Counter(all_counts)

    test_idx = []
    covered_test = set()

    candidates = list(range(n))
    rng.shuffle(candidates)

    def is_safe(i: int) -> bool:
        labs = df.at[i, label_col]
        if not labs:
            return True
        return all((remaining[lab] - 1) >= min_train_count_per_label for lab in labs)

    def gain(i: int) -> int:
        labs = set(df.at[i, label_col])
        return len(labs - covered_test)

    while len(test_idx) < n_test:
        safe = [i for i in candidates if (i not in test_idx and is_safe(i))]
        if not safe:
            break

        gains = np.array([gain(i) for i in safe], dtype=int)
        best_gain = gains.max()
        best = [safe[j] for j in np.where(gains == best_gain)[0]]
        chosen = int(rng.choice(best))

        test_idx.append(chosen)
        for lab in df.at[chosen, label_col]:
            remaining[lab] -= 1
            covered_test.add(lab)

    # backfill safely, then last resort random
    if len(test_idx) < n_test:
        safe_rest = [i for i in range(n) if (i not in test_idx and is_safe(i))]
        rng.shuffle(safe_rest)
        need = n_test - len(test_idx)
        test_idx.extend(safe_rest[:need])

    if len(test_idx) < n_test:
        rest = [i for i in range(n) if i not in test_idx]
        rng.shuffle(rest)
        need = n_test - len(test_idx)
        test_idx.extend(rest[:need])

    test_idx = sorted(set(test_idx))
    if len(test_idx) >= n:
        test_idx = test_idx[: n - 1]

    train_idx = [i for i in range(n) if i not in test_idx]
    return df.iloc[train_idx].reset_index(drop=True), df.iloc[test_idx].reset_index(drop=True)


# ---------------------------------------------------------
# 4) YOUR model builder (char + optional word channels)
# ---------------------------------------------------------
def build_model(
    use_word_summary: bool = True,
    use_word_text: bool = False,
    char_analyzer: str = "char",         # "char" or "char_wb"
    char_ngram=(3, 5),
    word_ngram=(1, 2),
    char_max_features: int = 50000,
    word_max_features: int = 20000,
    summary_col: str = "summary_fallback",
    C: float = 2.0,
    random_state: int = 42,
):
    transformers = []

    transformers.append((
        "char_tfidf",
        TfidfVectorizer(
            analyzer=char_analyzer,
            ngram_range=char_ngram,
            min_df=1,
            max_features=char_max_features,
            sublinear_tf=True,
            lowercase=False,
        ),
        "text_norm"
    ))

    if use_word_summary:
        transformers.append((
            "sum_word",
            TfidfVectorizer(
                analyzer="word",
                ngram_range=word_ngram,
                min_df=1,
                max_features=word_max_features,
                sublinear_tf=True,
                lowercase=False,
            ),
            summary_col
        ))

    if use_word_text:
        transformers.append((
            "text_word",
            TfidfVectorizer(
                analyzer="word",
                ngram_range=word_ngram,
                min_df=1,
                max_features=word_max_features,
                sublinear_tf=True,
                lowercase=False,
            ),
            "text_norm"
        ))

    preprocess = ColumnTransformer(transformers=transformers, remainder="drop")

    clf = OneVsRestClassifier(
        LogisticRegression(max_iter=2000, solver="liblinear", C=C, random_state=random_state),
        n_jobs=-1
    )

    return Pipeline([("features", preprocess), ("clf", clf)])


def build_X_for_model(df: pd.DataFrame, summary_col: str = "summary_fallback") -> pd.DataFrame:
    """Pass ONLY the columns the ColumnTransformer expects."""
    cols = ["text_norm"]
    if summary_col in df.columns:
        cols.append(summary_col)
    return df[cols].copy()


# ---------------------------------------------------------
# 5) Repeated hold-out evaluation with YOUR metric
#    (parent_hit_at_k_from_proba must be defined already)
# ---------------------------------------------------------
def repeated_holdout_parent_hit_k(
    df_in: pd.DataFrame,
    seeds,
    model_builder,
    test_size: float = 0.2,
    k: int = 3,
    summary_col: str = "summary_fallback",
    min_train_count_per_label: int = 1,
):
    # prepare text + labels
    df = ensure_summary_fallback(df_in, summary_col="summary_norm", out_col=summary_col)
    df = df.copy()
    df["labels"] = df["labels"].apply(clean_label_list)
    df["labels_parent"] = df["labels_parent"].apply(_clean_parent_list)

    # IMPORTANT: fix label universe ONCE (consistent scoring across seeds)
    mlb = MultiLabelBinarizer()
    mlb.fit(df["labels"])

    scores = []
    for rs in seeds:
        tr_df, te_df = stratified_multilabel_split_by_parent(
            df, label_col="labels_parent", test_size=test_size, random_state=rs,
            min_train_count_per_label=min_train_count_per_label
        )

        X_train = build_X_for_model(tr_df, summary_col=summary_col)
        X_test  = build_X_for_model(te_df, summary_col=summary_col)

        y_train = mlb.transform(tr_df["labels"])

        model = model_builder()
        model.fit(X_train, y_train)

        proba = model.predict_proba(X_test)

        score = parent_hit_at_k_from_proba(
            y_true_parent_lists=te_df["labels_parent"].tolist(),
            proba=proba,
            classes=mlb.classes_,
            k=k
        )
        scores.append(score)

    return np.array(scores, dtype=float)


# =========================
# RUN EXPERIMENTS A/B/C/D
# =========================

seeds = [41, 42, 43, 44, 45, 46, 47, 48, 49, 50]

experiments = {
    "A_char + word_summary_fallback": lambda: build_model(use_word_summary=True,  use_word_text=False, char_analyzer="char"),
    "B_char + word_summary_fallback + word_text": lambda: build_model(use_word_summary=True,  use_word_text=True,  char_analyzer="char"),
    "C_charWB + word_summary_fallback": lambda: build_model(use_word_summary=True,  use_word_text=False, char_analyzer="char_wb"),
    "D_charWB + word_summary_fallback + word_text": lambda: build_model(use_word_summary=True,  use_word_text=True,  char_analyzer="char_wb"),
}

results = {}
for name, builder in experiments.items():
    scores = repeated_holdout_parent_hit_k(
        df, seeds=seeds, model_builder=builder,
        test_size=0.2, k=3, summary_col="summary_fallback",
        min_train_count_per_label=1
    )
    results[name] = scores
    print(f"{name}: mean={scores.mean():.4f} std={scores.std(ddof=0):.4f} "
          f"min={scores.min():.3f} max={scores.max():.3f} scores={scores}")

summary = pd.DataFrame({
    "model": list(results.keys()),
    "mean": [results[m].mean() for m in results],
    "std":  [results[m].std(ddof=0) for m in results],
    "min":  [results[m].min() for m in results],
    "max":  [results[m].max() for m in results],
}).sort_values("mean", ascending=False)

display(summary)


A_char + word_summary_fallback: mean=0.4300 std=0.1187 min=0.300 max=0.700 scores=[0.4 0.4 0.5 0.3 0.7 0.4 0.5 0.3 0.3 0.5]
B_char + word_summary_fallback + word_text: mean=0.4600 std=0.0917 min=0.400 max=0.700 scores=[0.4 0.4 0.5 0.4 0.7 0.4 0.5 0.4 0.4 0.5]
C_charWB + word_summary_fallback: mean=0.4500 std=0.1025 min=0.300 max=0.700 scores=[0.4 0.4 0.5 0.3 0.7 0.4 0.5 0.4 0.4 0.5]
D_charWB + word_summary_fallback + word_text: mean=0.4600 std=0.0917 min=0.400 max=0.700 scores=[0.4 0.4 0.5 0.4 0.7 0.4 0.5 0.4 0.4 0.5]


Unnamed: 0,model,mean,std,min,max
1,B_char + word_summary_fallback + word_text,0.46,0.091652,0.4,0.7
3,D_charWB + word_summary_fallback + word_text,0.46,0.091652,0.4,0.7
2,C_charWB + word_summary_fallback,0.45,0.10247,0.3,0.7
0,A_char + word_summary_fallback,0.43,0.118743,0.3,0.7


In a repeated hold-out evaluation (10 random seeds; 80/20 split using the parent-aware multi-label splitter), the TF-IDF + OvR Logistic Regression classifier substantially outperformed the naïve frequency baseline under the project’s primary metric, Parent-Hit@3 (success if at least one gold ATU parent code appears among the parents of the model’s Top-3 predicted fine-grained types). The frequency baseline reaches only ~0.15 Parent-Hit@3 (≈1–2 hits per 10 tales), whereas the proposed text-based models achieve 0.43–0.46 on average (≈4–5 hits per 10 tales), indicating that textual features provide strong predictive signal beyond label priors. Across ablations, combining character TF-IDF on text_norm with word TF-IDF on the summary fallback is already effective (mean 0.43–0.45), while adding word TF-IDF on text_norm yields a small but consistent improvement and better stability (mean 0.46, minimum 0.40 across seeds). Using char_wb instead of char did not materially change performance in this setting. Based on these results, the mixed vector representation (char + word summary + optional word text) is retained as the default baseline model for subsequent experiments and UI integration.

## Baseline Model

In [15]:
# ============================================================
# Baseline (frequency) evaluated with THE SAME repeated hold-out
# protocol as your model (same seeds, same splitter, same metric).
#
# Baseline logic is aligned with your model-eval logic:
# - Baseline predicts Top-k FINE labels (constant) from TRAIN fold frequencies
# - Evaluation uses your Parent-Hit@k:
#     top-k fine -> map to parents -> any-match with gold parents
#
# Prereqs:
# - df has: labels (list[str] or convertible), labels_parent (list[str]),
#          text_norm (str), summary_norm (optional)
# - you already defined: atu_parent(), parent_hit_at_k_from_proba()
# - you have (or paste) the same splitter: stratified_multilabel_split_by_parent()
# ============================================================

import numpy as np
import pandas as pd
from collections import Counter
from sklearn.preprocessing import MultiLabelBinarizer


# -------------------------
# Robust cleaning helpers
# -------------------------
def _is_nan(x) -> bool:
    return isinstance(x, float) and pd.isna(x)

def clean_label_list(labels) -> list[str]:
    """Ensure labels is list[str] (no NaN/None/empty)."""
    if labels is None or _is_nan(labels):
        return []
    if isinstance(labels, str):
        s = labels.strip()
        return [s] if s else []
    out = []
    for x in labels:
        if x is None or _is_nan(x):
            continue
        s = str(x).strip()
        if s:
            out.append(s)
    # unique, stable order
    seen = set()
    dedup = []
    for s in out:
        if s not in seen:
            seen.add(s)
            dedup.append(s)
    return dedup

def _clean_parent_list(x) -> list[str]:
    if x is None or _is_nan(x):
        return []
    if isinstance(x, str):
        s = x.strip()
        return [s] if s else []
    out = []
    for lab in x:
        if lab is None or _is_nan(lab):
            continue
        s = str(lab).strip()
        if s:
            out.append(s)
    # unique, stable order
    seen = set()
    dedup = []
    for s in out:
        if s not in seen:
            seen.add(s)
            dedup.append(s)
    return dedup


# -------------------------
# Baseline: Top-k FINE by doc frequency (TRAIN only)
# -------------------------
def topk_fine_labels_by_freq(labels_lists, k=3) -> list[str]:
    cnt = Counter()
    for labs in labels_lists:
        labs = clean_label_list(labs)
        for lab in set(labs):   # doc-level counting
            cnt[lab] += 1
    return [lab for lab, _ in cnt.most_common(k)]


def constant_score_matrix_from_topk_fine(
    topk_fine: list[str],
    classes: np.ndarray,
    n_rows: int
) -> np.ndarray:
    """
    Build an (n_rows, n_classes) score matrix that yields exactly `topk_fine`
    as the top-k predicted labels (ranking only; not calibrated probabilities).
    """
    classes = np.asarray(classes)
    row = np.zeros((1, len(classes)), dtype=float)

    score = float(len(topk_fine))
    for lab in topk_fine:
        idx = np.where(classes == lab)[0]
        if len(idx) == 0:
            continue
        row[0, idx[0]] = score
        score -= 1.0

    return np.repeat(row, repeats=n_rows, axis=0)


# -------------------------
# Repeated hold-out baseline with same protocol
# -------------------------
def repeated_holdout_freq_baseline_parent_hit_k(
    df_in: pd.DataFrame,
    seeds,
    test_size: float = 0.2,
    k: int = 3,
    min_train_count_per_label: int = 1,
    verbose: bool = True
) -> np.ndarray:
    """
    Uses the SAME repeated hold-out protocol as your model:
      - split with stratified_multilabel_split_by_parent(..., random_state=seed)
      - baseline top-k computed from TRAIN fold only
      - evaluate with parent_hit_at_k_from_proba (same as model)
    """
    df = df_in.copy()
    df["labels"] = df["labels"].apply(clean_label_list)
    df["labels_parent"] = df["labels_parent"].apply(_clean_parent_list)

    # Fix label universe ONCE (consistent across seeds)
    mlb = MultiLabelBinarizer()
    mlb.fit(df["labels"])
    classes = np.asarray(mlb.classes_)

    scores = []
    for rs in seeds:
        tr_df, te_df = stratified_multilabel_split_by_parent(
            df,
            label_col="labels_parent",
            test_size=test_size,
            random_state=rs,
            min_train_count_per_label=min_train_count_per_label
        )

        # TRAIN-only: choose constant top-k fine labels by frequency
        topk_fine = topk_fine_labels_by_freq(tr_df["labels"].tolist(), k=k)

        # Build constant score matrix for TEST fold (size = len(te_df))
        proba_te = constant_score_matrix_from_topk_fine(
            topk_fine=topk_fine,
            classes=classes,
            n_rows=len(te_df)
        )

        # SAME metric as model eval (fine top-k -> parent any-match)
        score = parent_hit_at_k_from_proba(
            y_true_parent_lists=te_df["labels_parent"].tolist(),
            proba=proba_te,
            classes=classes,
            k=k
        )
        scores.append(score)

        if verbose:
            pred_parents = sorted({atu_parent(x) for x in topk_fine if str(x).strip()})
            print(f"seed={rs}: Top{k} fine={topk_fine} | parents={pred_parents} | Parent-Hit@{k}={score:.3f}")

    scores = np.array(scores, dtype=float)
    if verbose:
        print(f"Repeated hold-out freq-baseline Parent-Hit@{k}: mean={scores.mean():.4f} std={scores.std(ddof=0):.4f} "
              f"min={scores.min():.3f} max={scores.max():.3f} scores={scores}")
    return scores


# -------------------------
# Example run (same seeds as your A/B/C/D)
# -------------------------
seeds = [41, 42, 43, 44, 45, 46, 47, 48, 49, 50]

baseline_scores = repeated_holdout_freq_baseline_parent_hit_k(
    df,
    seeds=seeds,
    test_size=0.2,
    k=3,
    min_train_count_per_label=1,
    verbose=True
)


seed=41: Top3 fine=['480D*', '707', '552'] | parents=['480', '552', '707'] | Parent-Hit@3=0.200
seed=42: Top3 fine=['707', '480D*', '402'] | parents=['402', '480', '707'] | Parent-Hit@3=0.200
seed=43: Top3 fine=['707', '480D*', '552'] | parents=['480', '552', '707'] | Parent-Hit@3=0.200
seed=44: Top3 fine=['707', '480D*', '530'] | parents=['480', '530', '707'] | Parent-Hit@3=0.200
seed=45: Top3 fine=['480D*', '707', '402'] | parents=['402', '480', '707'] | Parent-Hit@3=0.200
seed=46: Top3 fine=['480D*', '707', '650A'] | parents=['480', '650', '707'] | Parent-Hit@3=0.200
seed=47: Top3 fine=['480D*', '707', '650A'] | parents=['480', '650', '707'] | Parent-Hit@3=0.200
seed=48: Top3 fine=['480D*', '707', '402'] | parents=['402', '480', '707'] | Parent-Hit@3=0.200
seed=49: Top3 fine=['707', '480D*', '650A'] | parents=['480', '650', '707'] | Parent-Hit@3=0.200
seed=50: Top3 fine=['707', '480D*', '552'] | parents=['480', '552', '707'] | Parent-Hit@3=0.200
Repeated hold-out freq-baseline Paren

Under the same repeated hold-out protocol (10 random seeds; 80/20 split with the parent-aware multi-label splitter) and the same evaluation rule (Parent-Hit@3, computed by taking the model’s Top-3 fine-grained ATU predictions, mapping them to parent codes, and counting a hit if any gold parent is present), the TF-IDF + OvR Logistic Regression models consistently outperform a naïve frequency baseline. The frequency baseline—implemented as a constant predictor that always outputs the Top-3 most frequent fine labels from the training split—achieves Parent-Hit@3 = 0.20 (std = 0.00), i.e., about 2 hits per 10 tales. In contrast, the text-based classifiers reach 0.43–0.46 on average (A: 0.43, B: 0.46, C: 0.45, D: 0.46), corresponding to roughly 4–5 hits per 10 tales, i.e., an absolute improvement of +0.23 to +0.26 over the baseline. Adding a word-level TF-IDF channel on text_norm (B/D) provides a small but consistent gain and improves stability (minimum 0.40 across seeds), while switching from char to char_wb does not materially change performance in this setting. Based on this comparison, the mixed TF-IDF representation (char TF-IDF on text_norm + word TF-IDF on the summary fallback, optionally augmented with word TF-IDF on text_norm) is retained as the default baseline model for subsequent experiments and UI integration.

In [17]:
# ----------------------------
# 0) Prepare df (text + labels)
# ----------------------------
df_all = ensure_summary_fallback(df, summary_col="summary_norm", out_col="summary_fallback").copy()
df_all["labels"] = df_all["labels"].apply(clean_label_list)
df_all["labels_parent"] = df_all["labels_parent"].apply(_clean_parent_list)

# ----------------------------
# 1) Freeze ONE test split
# ----------------------------
SPLIT_SEED = 42
train_df, test_df = stratified_multilabel_split_by_parent(
    df_all,
    label_col="labels_parent",
    test_size=0.2,
    random_state=SPLIT_SEED,
    min_train_count_per_label=1
)
print("Train:", train_df.shape, "| Test:", test_df.shape)

Train: (40, 6) | Test: (10, 6)


In [18]:
# ----------------------------
# 3) CV evaluation on TRAIN ONLY
#    metric = Parent-Hit@3 exactly like your model
# ----------------------------
def cv_parent_hit_at_k(
    train_df: pd.DataFrame,
    mlb: MultiLabelBinarizer,
    model_builder,
    k: int = 3,
    n_splits: int = 5,
    random_state: int = 42,
    summary_col: str = "summary_fallback"
):
    X = build_X_for_model(train_df, summary_col=summary_col)
    y = mlb.transform(train_df["labels"])  # uses fixed label space from TRAIN
    y_parent = train_df["labels_parent"].tolist()

    cv = KFold(n_splits=n_splits, shuffle=True, random_state=random_state)
    scores = []

    for fold, (tr, va) in enumerate(cv.split(train_df), start=1):
        model = model_builder()
        model.fit(X.iloc[tr], y[tr])

        proba = model.predict_proba(X.iloc[va])
        score = parent_hit_at_k_from_proba(
            y_true_parent_lists=[y_parent[i] for i in va],
            proba=proba,
            classes=mlb.classes_,
            k=k
        )
        scores.append(score)
        print(f"Fold {fold}: Parent-Hit@{k}={score:.3f}")

    scores = np.array(scores, dtype=float)
    print(f"CV mean={scores.mean():.4f} std={scores.std(ddof=0):.4f} scores={scores}")
    return scores

# ----------------------------
# 4) Define A/B/C/D builders
# ----------------------------
experiments = {
    "A_char + word_summary_fallback": lambda: build_model(use_word_summary=True,  use_word_text=False, char_analyzer="char"),
    "B_char + word_summary_fallback + word_text": lambda: build_model(use_word_summary=True,  use_word_text=True,  char_analyzer="char"),
    "C_charWB + word_summary_fallback": lambda: build_model(use_word_summary=True,  use_word_text=False, char_analyzer="char_wb"),
    "D_charWB + word_summary_fallback + word_text": lambda: build_model(use_word_summary=True,  use_word_text=True,  char_analyzer="char_wb"),
}

# ----------------------------
# 5) Run CV on train, pick best
# ----------------------------
cv_results = {}
for name, builder in experiments.items():
    print("\n" + "="*70)
    print(name)
    scores = cv_parent_hit_at_k(
        train_df=train_df,
        mlb=mlb,
        model_builder=builder,
        k=3,
        n_splits=5,
        random_state=123,   # seed for CV shuffling
        summary_col="summary_fallback"
    )
    cv_results[name] = scores

cv_summary = pd.DataFrame({
    "model": list(cv_results.keys()),
    "mean": [cv_results[m].mean() for m in cv_results],
    "std":  [cv_results[m].std(ddof=0) for m in cv_results],
}).sort_values("mean", ascending=False)

display(cv_summary)

best_name = cv_summary.iloc[0]["model"]
best_builder = experiments[best_name]
print("\nBEST by CV:", best_name)


A_char + word_summary_fallback
Fold 1: Parent-Hit@3=0.375
Fold 2: Parent-Hit@3=0.500
Fold 3: Parent-Hit@3=0.125
Fold 4: Parent-Hit@3=0.625
Fold 5: Parent-Hit@3=0.625
CV mean=0.4500 std=0.1871 scores=[0.375 0.5   0.125 0.625 0.625]

B_char + word_summary_fallback + word_text
Fold 1: Parent-Hit@3=0.375
Fold 2: Parent-Hit@3=0.250
Fold 3: Parent-Hit@3=0.125
Fold 4: Parent-Hit@3=0.625
Fold 5: Parent-Hit@3=0.625
CV mean=0.4000 std=0.2000 scores=[0.375 0.25  0.125 0.625 0.625]

C_charWB + word_summary_fallback
Fold 1: Parent-Hit@3=0.375
Fold 2: Parent-Hit@3=0.500
Fold 3: Parent-Hit@3=0.125
Fold 4: Parent-Hit@3=0.625
Fold 5: Parent-Hit@3=0.625
CV mean=0.4500 std=0.1871 scores=[0.375 0.5   0.125 0.625 0.625]

D_charWB + word_summary_fallback + word_text
Fold 1: Parent-Hit@3=0.375
Fold 2: Parent-Hit@3=0.250
Fold 3: Parent-Hit@3=0.125
Fold 4: Parent-Hit@3=0.625
Fold 5: Parent-Hit@3=0.625
CV mean=0.4000 std=0.2000 scores=[0.375 0.25  0.125 0.625 0.625]


Unnamed: 0,model,mean,std
0,A_char + word_summary_fallback,0.45,0.187083
2,C_charWB + word_summary_fallback,0.45,0.187083
1,B_char + word_summary_fallback + word_text,0.4,0.2
3,D_charWB + word_summary_fallback + word_text,0.4,0.2



BEST by CV: A_char + word_summary_fallback


In [20]:
# ----------------------------
# 6) Train best model on FULL TRAIN and evaluate on TEST
# ----------------------------
X_train = build_X_for_model(train_df, summary_col="summary_fallback")
X_test  = build_X_for_model(test_df,  summary_col="summary_fallback")

best_model = best_builder()
best_model.fit(X_train, y_train)

proba_test = best_model.predict_proba(X_test)
test_score = parent_hit_at_k_from_proba(
    y_true_parent_lists=test_df["labels_parent"].tolist(),
    proba=proba_test,
    classes=mlb.classes_,
    k=3
)
print(f"\nTEST Parent-Hit@3 for BEST ({best_name}): {test_score:.3f}")


TEST Parent-Hit@3 for BEST (A_char + word_summary_fallback): 0.400


In [21]:
def topk_fine_labels_by_freq(labels_lists, k=3) -> list[str]:
    cnt = Counter()
    for labs in labels_lists:
        labs = clean_label_list(labs)
        for lab in set(labs):
            cnt[lab] += 1
    return [lab for lab, _ in cnt.most_common(k)]

def constant_score_matrix_from_topk_fine(topk_fine, classes, n_rows):
    classes = np.asarray(classes)
    row = np.zeros((1, len(classes)), dtype=float)
    score = float(len(topk_fine))
    for lab in topk_fine:
        idx = np.where(classes == lab)[0]
        if len(idx) == 0:
            continue
        row[0, idx[0]] = score
        score -= 1.0
    return np.repeat(row, repeats=n_rows, axis=0)

topk_fine = topk_fine_labels_by_freq(train_df["labels"].tolist(), k=3)
proba_base = constant_score_matrix_from_topk_fine(topk_fine, mlb.classes_, n_rows=len(test_df))

baseline_test = parent_hit_at_k_from_proba(
    y_true_parent_lists=test_df["labels_parent"].tolist(),
    proba=proba_base,
    classes=mlb.classes_,
    k=3
)
print(f"TEST Parent-Hit@3 frequency baseline (Top3={topk_fine}): {baseline_test:.3f}")
print(f"Absolute gain vs baseline: {test_score - baseline_test:+.3f}")

TEST Parent-Hit@3 frequency baseline (Top3=['707', '480D*', '402']): 0.200
Absolute gain vs baseline: +0.200


In [None]:
MODEL_PATH = "models/atu_ovr_tfidf.joblib"
LABELS_PATH = "models/labels.json"
META_PATH = "models/meta.json"

# model: ваш sklearn Pipeline
# mlb: MultiLabelBinarizer (fitted)

joblib.dump(model, MODEL_PATH)

with open(LABELS_PATH, "w", encoding="utf-8") as f:
    json.dump(list(mlb.classes_), f, ensure_ascii=False, indent=2)

meta = {
    "created_at": datetime.utcnow().isoformat() + "Z",
    "model_type": "TFIDF(char+word) + OvR LogisticRegression",
    "top_k": 3,
    "input_columns": ["text_norm", "summary_fallback"],
    "notes": "Config B/D; trained on ERA magic tales; Parent-Hit@3 target metric."
}
with open(META_PATH, "w", encoding="utf-8") as f:
    json.dump(meta, f, ensure_ascii=False, indent=2)

print("Saved:", MODEL_PATH, LABELS_PATH, META_PATH)
