# C3 — Mapping Manga Sanctuary (MS) ↔ Kitsu + Enrichissement série (Postgres-ready, JSONB) — v2 (anti-collisions)

## Objectifs
- Générer `ms_title_norm` + `ms_titles_exploded` (titre principal + autres titres)
- Générer les **2 pivots Kitsu** :
  - `kitsu_series_core` (1 ligne / kitsu_id) = référentiel d’enrichissement
  - `kitsu_titles_exploded` (multi-lignes / kitsu_id) = index de matching
- Matching en 2 passes :
  1) **Exact match** sur `title_norm` (avec **gestion des collisions** via score qualité Kitsu)
  2) **Fuzzy match** (RapidFuzz) sur les non matchés (seuil conservateur)
- Produire :
  - `ms_kitsu_map.(csv|parquet)` : table de correspondance (audit : méthode, score, titre matché)
  - `ms_series_enriched_plus_kitsu.(csv|parquet)` : MS enrichi par Kitsu (sans écrasement destructif)
  - exports de pivots (audit) : `ms_titles_exploded.csv`, `kitsu_series_core.csv`, `kitsu_titles_exploded.csv`
  - optionnel : `ms_kitsu_ambiguous.csv` (cas ambigus à vérifier)

## Notes
- Pas de RAG ici (pas de chunking, pas d’embeddings).
- Les colonnes JSON (tags/genres/catégories, autres titres) sont conservées pour stockage **JSONB** dans PostgreSQL.

In [None]:
from pathlib import Path
import os
import json
import re
import unicodedata

import pandas as pd
import numpy as np

try:
    from dotenv import load_dotenv
except Exception:
    load_dotenv = None

pd.set_option("display.max_colwidth", 160)

## 0) Paramètres (chemins)

In [None]:
# --- PROJECT ROOT (robuste même si le kernel démarre dans `notebooks/`) ---
def find_repo_root(start: Path) -> Path:
    start = start.resolve()
    for root in [start, *start.parents]:
        if (root / "pyproject.toml").exists():
            return root
    return start

PROJECT_ROOT = find_repo_root(Path.cwd())

if "load_dotenv" in globals() and load_dotenv is not None:
    load_dotenv(PROJECT_ROOT / ".env", override=False)

def resolve_from_root(p: str) -> Path:
    candidate = Path(p).expanduser()
    if candidate.is_absolute():
        return candidate
    return (PROJECT_ROOT / candidate).resolve()

# --- INPUTS ---
# Override via env/.env si besoin:
# - MS_SERIES_CSV=...
# - KITSU_CLEAN_CSV=...
MS_SERIES_CSV = resolve_from_root(os.getenv("MS_SERIES_CSV", "out_ms_final/ms_series_enriched.csv"))
KITSU_CLEAN_CSV = resolve_from_root(os.getenv("KITSU_CLEAN_CSV", "exports/kitsu/kitsu_top_rated_clean.csv"))

# Compat Colab
if not MS_SERIES_CSV.exists() and Path("/mnt/data/ms_series_enriched.csv").exists():
    MS_SERIES_CSV = Path("/mnt/data/ms_series_enriched.csv")
if not KITSU_CLEAN_CSV.exists() and Path("/mnt/data/kitsu_top_rated_clean.csv").exists():
    KITSU_CLEAN_CSV = Path("/mnt/data/kitsu_top_rated_clean.csv")

if not MS_SERIES_CSV.exists():
    raise FileNotFoundError(f"MS_SERIES_CSV introuvable: {MS_SERIES_CSV} (override via MS_SERIES_CSV)")
if not KITSU_CLEAN_CSV.exists():
    raise FileNotFoundError(f"KITSU_CLEAN_CSV introuvable: {KITSU_CLEAN_CSV} (override via KITSU_CLEAN_CSV)")

# --- OUTPUTS ---
# Override via env/.env si besoin:
# - C3_OUT_DIR=...
OUT_DIR = resolve_from_root(os.getenv("C3_OUT_DIR", "out_ms_final/c3_ms_kitsu_v2"))
OUT_DIR.mkdir(parents=True, exist_ok=True)

MAP_CSV  = OUT_DIR / "ms_kitsu_map.csv"
MAP_PARQ = OUT_DIR / "ms_kitsu_map.parquet"

MS_PLUS_CSV  = OUT_DIR / "ms_series_enriched_plus_kitsu.csv"
MS_PLUS_PARQ = OUT_DIR / "ms_series_enriched_plus_kitsu.parquet"

MS_TITLES_EX_CSV     = OUT_DIR / "ms_titles_exploded.csv"
KITSU_CORE_CSV       = OUT_DIR / "kitsu_series_core.csv"
KITSU_TITLES_EX_CSV  = OUT_DIR / "kitsu_titles_exploded.csv"
AMBIG_CSV            = OUT_DIR / "ms_kitsu_ambiguous.csv"  # optionnel

print("PROJECT_ROOT:", PROJECT_ROOT)
print("MS_SERIES_CSV:", MS_SERIES_CSV)
print("KITSU_CLEAN_CSV:", KITSU_CLEAN_CSV)
print("OUT_DIR:", OUT_DIR.resolve())

## 1) Dépendances (pyarrow pour Parquet, rapidfuzz pour fuzzy)

In [None]:
try:
    import pyarrow  # noqa: F401
except Exception as e:
    raise RuntimeError(
        "pyarrow requis pour l'export Parquet. Installe-le (ex: pip install -r requirements-dev.txt). "
        f"Détail: {repr(e)}"
    )

try:
    from rapidfuzz import process, fuzz
except Exception as e:
    raise RuntimeError(
        "rapidfuzz requis pour le fuzzy matching. Installe-le (ex: pip install -r requirements-dev.txt). "
        f"Détail: {repr(e)}"
    )

print("OK: pyarrow + rapidfuzz")

## 2) Lecture + contrôles C3

In [None]:
ms = pd.read_csv(MS_SERIES_CSV)
kitsu = pd.read_csv(KITSU_CLEAN_CSV)

print("MS series:", ms.shape, " | cols:", len(ms.columns))
print("Kitsu:", kitsu.shape, " | cols:", len(kitsu.columns))

assert "series_id" in ms.columns, "MS: colonne series_id manquante"
assert "series_title" in ms.columns, "MS: colonne series_title manquante"
assert "kitsu_id" in kitsu.columns, "Kitsu: colonne kitsu_id manquante"

assert ms["series_id"].notna().all(), "MS: series_id contient des NA"
assert ms["series_id"].is_unique, "MS: series_id doit être unique"
assert kitsu["kitsu_id"].notna().all(), "Kitsu: kitsu_id contient des NA"
assert kitsu["kitsu_id"].is_unique, "Kitsu: kitsu_id doit être unique"

ms.head(2)

## 3) Helpers (normalisation titres + parsing JSON)

In [None]:
def norm_title(s: str) -> str:
    if s is None or (isinstance(s, float) and pd.isna(s)):
        return None
    s = str(s).strip().lower()
    if not s:
        return None
    s = unicodedata.normalize("NFKD", s)
    s = "".join(ch for ch in s if not unicodedata.combining(ch))
    s = s.replace("&", "and")
    s = re.sub(r"[^a-z0-9]+", " ", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s or None

def try_parse_json_list(x):
    # MS peut contenir des listes en string JSON ou repr Python
    if x is None or (isinstance(x, float) and pd.isna(x)):
        return []
    if isinstance(x, list):
        return x
    s = str(x).strip()
    if s in ("", "[]", "nan", "None"):
        return []
    # JSON strict
    try:
        v = json.loads(s)
        return v if isinstance(v, list) else [v]
    except Exception:
        pass
    # tentative "python repr" -> JSON (simple)
    try:
        s2 = s.replace("'", '"')
        v = json.loads(s2)
        return v if isinstance(v, list) else [v]
    except Exception:
        return [s]

def is_empty_list_str(x) -> bool:
    # True si vide ou "[]"
    if x is None or (isinstance(x, float) and pd.isna(x)):
        return True
    s = str(x).strip()
    return s in ("", "[]", "nan", "None")

## 4) MS — ms_title_norm + ms_titles_exploded

In [None]:
ms["ms_title_main"] = ms["series_title"].astype(str)
ms["ms_title_norm"] = ms["ms_title_main"].map(norm_title)

# autres titres
other_col = None
for cand in ["series_other_titles_json", "series_other_titles"]:
    if cand in ms.columns:
        other_col = cand
        break

if other_col:
    ms["_other_titles_list"] = ms[other_col].map(try_parse_json_list)
else:
    ms["_other_titles_list"] = [[] for _ in range(len(ms))]

# explode titres (main + others)
rows = []
for _, r in ms[["series_id", "ms_title_main", "ms_title_norm", "_other_titles_list"]].iterrows():
    sid = r["series_id"]
    if isinstance(r["ms_title_norm"], str) and r["ms_title_norm"]:
        rows.append({"series_id": sid, "title_source": "main", "title": r["ms_title_main"], "title_norm": r["ms_title_norm"]})
    for t in r["_other_titles_list"] or []:
        tn = norm_title(t)
        if tn:
            rows.append({"series_id": sid, "title_source": "other", "title": str(t), "title_norm": tn})

ms_titles_ex = pd.DataFrame(rows).drop_duplicates(subset=["series_id", "title_norm"])

print("ms_titles_exploded:", ms_titles_ex.shape)
ms_titles_ex.head(10)

## 5) Kitsu — pivots : core + titles_exploded

In [None]:
# title_norm_primary (fallback si absent)
if "title_norm_primary" not in kitsu.columns:
    kitsu["title_norm_primary"] = kitsu.get("title_norm_canonical")
    kitsu.loc[kitsu["title_norm_primary"].isna(), "title_norm_primary"] = kitsu.get("title_norm_en")

# candidats titres
if "title_candidates_json" in kitsu.columns:
    kitsu["_cands"] = kitsu["title_candidates_json"].map(try_parse_json_list)
else:
    kitsu["_cands"] = kitsu.apply(
        lambda r: [t for t in [r.get("title_canonical"), r.get("title_en"), r.get("title_ja")] if isinstance(t, str) and t.strip()],
        axis=1
    )
kitsu["_cands_norm"] = kitsu["_cands"].map(lambda L: [norm_title(x) for x in (L or []) if norm_title(x)])

# Pivot 1 : core (1 ligne / kitsu_id) — référentiel d'enrichissement
core_cols = [
    "kitsu_id","slug","status",
    "title_canonical","title_en","title_ja",
    "title_norm_primary","title_norm_canonical","title_norm_en","title_norm_ja",
    "synopsis_clean","rating_average_10","rating_rank","popularity_rank",
    "categories_json","genres_json","tags_all_json",
]
core_cols = [c for c in core_cols if c in kitsu.columns]
kitsu_core = kitsu[core_cols].copy()

# Pivot 2 : titles_exploded (multi-lignes / kitsu_id) — index de matching
rows = []
for _, r in kitsu[["kitsu_id","_cands","_cands_norm"]].iterrows():
    kid = r["kitsu_id"]
    for t, tn in zip(r["_cands"] or [], r["_cands_norm"] or []):
        if tn:
            rows.append({"kitsu_id": kid, "title": str(t), "title_norm": tn})

kitsu_titles_ex = pd.DataFrame(rows).drop_duplicates(subset=["kitsu_id","title_norm"])

print("kitsu_core:", kitsu_core.shape)
print("kitsu_titles_exploded:", kitsu_titles_ex.shape)
kitsu_titles_ex.head(10)

## 6) Amélioration v2 — score qualité Kitsu (pour départager les collisions exactes)

In [None]:
def non_empty_jsonish(x) -> bool:
    if x is None or (isinstance(x, float) and pd.isna(x)):
        return False
    if isinstance(x, list):
        return len(x) > 0
    s = str(x).strip()
    return s not in ("", "[]", "nan", "None")

kitsu_quality = kitsu_core[["kitsu_id"]].copy()

kitsu_quality["has_synopsis"] = kitsu_core.get("synopsis_clean").fillna("").astype(str).str.strip().ne("")
kitsu_quality["has_tags"] = kitsu_core.get("tags_all_json").map(non_empty_jsonish) if "tags_all_json" in kitsu_core.columns else False
kitsu_quality["has_categories"] = kitsu_core.get("categories_json").map(non_empty_jsonish) if "categories_json" in kitsu_core.columns else False
kitsu_quality["has_genres"] = kitsu_core.get("genres_json").map(non_empty_jsonish) if "genres_json" in kitsu_core.columns else False

kitsu_quality["popularity_rank"] = pd.to_numeric(kitsu_core.get("popularity_rank"), errors="coerce")
kitsu_quality["rating_rank"] = pd.to_numeric(kitsu_core.get("rating_rank"), errors="coerce")
kitsu_quality["rating_average_10"] = pd.to_numeric(kitsu_core.get("rating_average_10"), errors="coerce")

# Score stable & simple : favorise la complétude (synopsis/tags) + un peu de note
kitsu_quality["quality_score"] = (
    kitsu_quality["has_synopsis"].astype(int) * 1000
    + kitsu_quality["has_tags"].astype(int) * 100
    + kitsu_quality["has_categories"].astype(int) * 30
    + kitsu_quality["has_genres"].astype(int) * 20
    + kitsu_quality["rating_average_10"].fillna(0) * 2
)

kitsu_quality.head(5)

## 7) Exact match (title_norm) — v2 anti-collisions (sélection par score)

In [None]:
# Option anti-collisions : ignorer les title_norm trop courts (souvent ambigus)
MIN_NORM_LEN = 4
ms_titles_ex_f = ms_titles_ex[ms_titles_ex["title_norm"].str.len() >= MIN_NORM_LEN].copy()
kitsu_titles_ex_f = kitsu_titles_ex[kitsu_titles_ex["title_norm"].str.len() >= MIN_NORM_LEN].copy()

exact = (
    ms_titles_ex_f
    .merge(kitsu_titles_ex_f, on="title_norm", how="inner", suffixes=("_ms","_kitsu"))
    .merge(kitsu_quality, on="kitsu_id", how="left")
)

# priorité au titre principal MS
exact["priority"] = np.where(exact["title_source"] == "main", 0, 1)

# Tri : priorité main, puis qualité desc, puis ranks asc
exact = exact.sort_values(
    ["series_id", "priority", "quality_score", "popularity_rank", "rating_rank"],
    ascending=[True, True, False, True, True],
)

# Un seul match exact par série
exact_best = exact.drop_duplicates(subset=["series_id"], keep="first").copy()
exact_best["match_method"] = "exact_title_norm_scored"
exact_best["match_score"] = 100

# Audit collisions (combien de candidats par série)
collision_stats = exact.groupby("series_id").size().reset_index(name="n_exact_candidates")
n_collisions = int((collision_stats["n_exact_candidates"] >= 2).sum())

print("exact matches:", len(exact_best), "/", ms["series_id"].nunique())
print("series with collisions (>=2 exact candidates):", n_collisions)

exact_best.head(10)

## 8) Optionnel — Export des cas ambigus à vérifier (preuve C3)

In [None]:
# On marque "ambigus" si >=3 candidats exacts (seuil ajustable)
AMBIG_THRESHOLD = 3
ambig_ids = set(collision_stats.loc[collision_stats["n_exact_candidates"] >= AMBIG_THRESHOLD, "series_id"])

ms_ambiguous = ms.loc[ms["series_id"].isin(ambig_ids), ["series_id","ms_title_main","ms_title_norm"]].copy()
ms_ambiguous["n_exact_candidates"] = ms_ambiguous["series_id"].map(
    dict(zip(collision_stats["series_id"], collision_stats["n_exact_candidates"]))
)

print("ambiguous series to review:", len(ms_ambiguous))
ms_ambiguous.head(20)

## 9) Fuzzy match (non-matchés) — rapidfuzz (seuil conservateur)

In [None]:
matched_series_ids = set(exact_best["series_id"].tolist())
ms_left = ms[~ms["series_id"].isin(matched_series_ids)].copy()

# Index fuzzy : compare à title_norm_primary (1 clé par kitsu_id)
kitsu_index = kitsu_core[["kitsu_id","title_norm_primary"]].dropna().copy()
kitsu_choices = dict(zip(kitsu_index["kitsu_id"].tolist(), kitsu_index["title_norm_primary"].tolist()))

FUZZY_MIN_SCORE = 92  # conservateur (baisse si trop peu de match; augmente si faux positifs)

fuzzy_rows = []
for _, r in ms_left[["series_id","ms_title_norm","ms_title_main"]].iterrows():
    sid = r["series_id"]
    q = r["ms_title_norm"]
    if not isinstance(q, str) or not q:
        continue
    hit = process.extractOne(q, kitsu_choices, scorer=fuzz.token_sort_ratio, score_cutoff=FUZZY_MIN_SCORE)
    if hit:
        kitsu_norm, score, kitsu_id = hit[0], hit[1], hit[2]
        fuzzy_rows.append({
            "series_id": sid,
            "kitsu_id": int(kitsu_id),
            "match_method": "fuzzy_title_norm_primary",
            "match_score": float(score),
            "matched_title_norm": kitsu_norm,
            "ms_title_norm": q,
            "ms_title": r["ms_title_main"],
        })

fuzzy = pd.DataFrame(fuzzy_rows)
print("fuzzy matches:", len(fuzzy), "/", len(ms_left))
fuzzy.head(10)

## 10) Construire ms_kitsu_map (audit exact + fuzzy)

In [None]:
# Exact map (audit)
map_exact = exact_best[["series_id","kitsu_id","match_method","match_score","title_norm"]].copy()
map_exact = map_exact.rename(columns={"title_norm":"matched_title_norm"})
map_exact["ms_title"] = ms.set_index("series_id").loc[map_exact["series_id"], "ms_title_main"].values
map_exact["ms_title_norm"] = ms.set_index("series_id").loc[map_exact["series_id"], "ms_title_norm"].values

# Union exact + fuzzy
ms_kitsu_map = pd.concat([map_exact, fuzzy], ignore_index=True)

# Si double match: garder exact avant fuzzy puis score max
ms_kitsu_map["method_prio"] = np.where(ms_kitsu_map["match_method"].str.startswith("exact"), 0, 1)
ms_kitsu_map = ms_kitsu_map.sort_values(["series_id","method_prio","match_score"], ascending=[True, True, False])
ms_kitsu_map = ms_kitsu_map.drop_duplicates(subset=["series_id"], keep="first").drop(columns=["method_prio"])

print("total matches:", len(ms_kitsu_map), "/", ms["series_id"].nunique())
ms_kitsu_map["match_method"].value_counts()

## 11) Enrichissement MS (sans écrasement destructif)

In [None]:
ms_plus = (ms
           .merge(ms_kitsu_map[["series_id","kitsu_id","match_method","match_score"]], on="series_id", how="left")
           .merge(kitsu_core.add_prefix("kitsu_"), left_on="kitsu_id", right_on="kitsu_kitsu_id", how="left"))

# synopsis: remplir seulement si MS vide
if "series_synopsis" in ms_plus.columns and "kitsu_synopsis_clean" in ms_plus.columns:
    ms_plus["series_synopsis_enriched"] = ms_plus["series_synopsis"]
    ms_plus.loc[
        ms_plus["series_synopsis_enriched"].isna()
        | (ms_plus["series_synopsis_enriched"].astype(str).str.strip() == ""),
        "series_synopsis_enriched"
    ] = ms_plus["kitsu_synopsis_clean"]
else:
    ms_plus["series_synopsis_enriched"] = ms_plus.get("series_synopsis")

# tags/genres: si MS est []/vide => remplir via Kitsu
if "series_tags" in ms_plus.columns and "kitsu_tags_all_json" in ms_plus.columns:
    ms_plus["series_tags_enriched"] = ms_plus["series_tags"]
    empty_mask = ms_plus["series_tags_enriched"].map(is_empty_list_str)
    ms_plus.loc[empty_mask, "series_tags_enriched"] = ms_plus.loc[empty_mask, "kitsu_tags_all_json"]

if "series_genres" in ms_plus.columns and "kitsu_genres_json" in ms_plus.columns:
    ms_plus["series_genres_enriched"] = ms_plus["series_genres"]
    empty_mask = ms_plus["series_genres_enriched"].map(is_empty_list_str)
    ms_plus.loc[empty_mask, "series_genres_enriched"] = ms_plus.loc[empty_mask, "kitsu_genres_json"]

kpi_enrich = {
    "ms_rows": int(len(ms_plus)),
    "matched_%": float(ms_plus["kitsu_id"].notna().mean() * 100),
    "synopsis_after_non_empty_%": float((ms_plus["series_synopsis_enriched"].notna() & (ms_plus["series_synopsis_enriched"].astype(str).str.strip() != "")).mean() * 100),
}
kpi_enrich

## 12) Exports CSV + Parquet (JSONB-ready)

In [None]:
import json as _json

def export_csv_parquet_jsonb(df: pd.DataFrame, csv_path: Path, parq_path: Path, json_cols: list[str]):
    # Parquet (pyarrow)
    df.to_parquet(parq_path, index=False)

    # CSV (garantit strings JSON valides pour les colonnes JSON)
    df_csv = df.copy()
    for c in json_cols:
        if c in df_csv.columns:
            def _to_json(v):
                if isinstance(v, (list, dict)):
                    return _json.dumps(v, ensure_ascii=False)
                if v is None or (isinstance(v, float) and pd.isna(v)):
                    return "[]"
                s = str(v).strip()
                if s == "":
                    return "[]"
                return s
            df_csv[c] = df_csv[c].map(_to_json)

    df_csv.to_csv(csv_path, index=False)

# pivots (audit)
ms_titles_ex.to_csv(MS_TITLES_EX_CSV, index=False)
kitsu_core.to_csv(KITSU_CORE_CSV, index=False)
kitsu_titles_ex.to_csv(KITSU_TITLES_EX_CSV, index=False)

# ambiguous (optionnel)
if len(ms_ambiguous) > 0:
    ms_ambiguous.to_csv(AMBIG_CSV, index=False)
    print("Wrote:", AMBIG_CSV)

# mapping
export_csv_parquet_jsonb(ms_kitsu_map, MAP_CSV, MAP_PARQ, json_cols=[])

# ms enrichi: colonnes JSON à conserver JSONB-ready
json_cols_plus = []
for c in [
    "series_other_titles","series_other_titles_json","series_statuses","series_related_works","series_tags","series_genres",
    "series_tags_enriched","series_genres_enriched",
    "kitsu_categories_json","kitsu_genres_json","kitsu_tags_all_json"
]:
    if c in ms_plus.columns:
        json_cols_plus.append(c)

export_csv_parquet_jsonb(ms_plus, MS_PLUS_CSV, MS_PLUS_PARQ, json_cols=json_cols_plus)

print("Exported:")
print(" -", MAP_CSV)
print(" -", MS_PLUS_CSV)
print(" - pivots:", MS_TITLES_EX_CSV, KITSU_CORE_CSV, KITSU_TITLES_EX_CSV)

## 13) Quick checks (preuve C3)

In [None]:
print("Mapping methods:")
print(ms_kitsu_map["match_method"].value_counts())

print("\nTop 10 lowest fuzzy scores (if any):")
low = (ms_kitsu_map[ms_kitsu_map["match_method"].str.startswith("fuzzy")]
       .sort_values("match_score")
       .head(10))
low[["series_id","kitsu_id","match_score","ms_title","matched_title_norm"]]