# Annotation linguistique de films 

Ce notebook transforme un export TMDB en un dataset utilisable pour de l'annotation linguistique.

**Pipeline**
1. Nettoyage du CSV brut (valeurs manquantes, types numériques, filtre `popularity != 0`).
2. Filtre langue : on garde uniquement les films où la **langue originale** est bien dans `spoken_languages`.
3. (Optionnel) Filtre des langues **rares** pour réduire les cas trop niche.
4. Attribution de labels heuristiques : `linguistic_level`, `linguistic_register`, `linguistic_exposure`, + `score` et `confidence`.

## Fichiers produits
- `films_TMDB_clean_filtered.csv`
- `films_language_match_only.csv`
- `films_language_match_only_filtered_rare_spoken50.csv` (optionnel)
- `films_language_match_only_with_linguistic_labels_v2.csv`


# 0) Library Import 

In [None]:
import ast
import re
from collections import Counter, defaultdict
import numpy as np
import pandas as pd
from pathlib import Path


In [None]:
# ----------------------------
# PARAMETRES (à adapter)
# ----------------------------
RAW_CSV = Path('films_TMDB_all_reordered_clean_no_runtime_0.csv')
CLEAN_CSV = Path('films_TMDB_clean_filtered.csv')
MATCH_CSV = Path('films_language_match_only.csv')
FILTERED_CSV = Path('films_language_match_only_filtered_rare_spoken50.csv')
LABELED_CSV = Path('films_language_match_only_with_linguistic_labels_v2.csv')

# Filtre "langues rares" (nombre d'apparitions dans spoken_languages)
RARE_THRESHOLD = 50

# Pour les sanity checks (cellule tout en bas)
EXCLUDE_ORIG = {'en'}
MIN_VOTE_COUNT_FOR_QA = 300
TOP_N_QA = 20


In [None]:
# ----------------------------
# Helpers parsing (robustes)
# ----------------------------

def parse_py_list(x):
    # Retourne une liste Python depuis NaN / list / string-literal
    if pd.isna(x):
        return []
    if isinstance(x, list):
        return x
    s = str(x).strip()
    if not s or s in ['[]', '{}']:
        return []
    if (s.startswith('[') and s.endswith(']')) or (s.startswith('{') and s.endswith('}')):
        try:
            v = ast.literal_eval(s)
            return v if isinstance(v, list) else []
        except Exception:
            return []
    return []

# ISO -> Nom (pour comparer original_language à spoken_languages)
ISO_TO_LANG = {
    'en': 'English', 'fr': 'French', 'es': 'Spanish', 'ja': 'Japanese', 'de': 'German',
    'ru': 'Russian', 'pt': 'Portuguese', 'it': 'Italian', 'zh': 'Chinese', 'ko': 'Korean',
    'hi': 'Hindi', 'ar': 'Arabic', 'sv': 'Swedish', 'nl': 'Dutch', 'pl': 'Polish',
    'tr': 'Turkish', 'fa': 'Persian', 'el': 'Greek', 'fi': 'Finnish', 'da': 'Danish',
    'no': 'Norwegian', 'he': 'Hebrew', 'cs': 'Czech', 'hu': 'Hungarian', 'ro': 'Romanian',
    'th': 'Thai', 'vi': 'Vietnamese', 'id': 'Indonesian', 'ms': 'Malay', 'uk': 'Ukrainian',
    'bg': 'Bulgarian', 'sr': 'Serbian', 'hr': 'Croatian', 'sk': 'Slovak', 'lt': 'Lithuanian',
    'lv': 'Latvian', 'et': 'Estonian', 'sl': 'Slovenian', 'ca': 'Catalan', 'bn': 'Bengali',
    'ta': 'Tamil', 'te': 'Telugu', 'ml': 'Malayalam', 'kn': 'Kannada', 'mr': 'Marathi',
    'gu': 'Gujarati', 'pa': 'Punjabi', 'ur': 'Urdu', 'ne': 'Nepali', 'sw': 'Swahili',
    'zu': 'Zulu', 'xh': 'Xhosa', 'af': 'Afrikaans'
}

def _lang_name_from_item(item):
    # item: dict TMDB (english_name/name/iso_639_1) ou string
    if isinstance(item, dict):
        v = item.get('english_name') or item.get('name') or item.get('iso_639_1')
        return str(v).strip() if v else None
    if item is None:
        return None
    s = str(item).strip()
    return s if s else None

def parse_spoken_languages(x):
    # Sortie: liste de noms (ex: ['English','French'])
    if pd.isna(x):
        return []

    # 1) déjà liste
    if isinstance(x, list):
        out = []
        for it in x:
            name = _lang_name_from_item(it)
            if name:
                out.append(name)
        return out

    s = str(x).strip()
    if not s or s in ['[]', '{}']:
        return []

    # 2) tentative parse python-literal
    if (s.startswith('[') and s.endswith(']')) or (s.startswith('{') and s.endswith('}')):
        try:
            parsed = ast.literal_eval(s)
            if isinstance(parsed, list):
                out = []
                for it in parsed:
                    name = _lang_name_from_item(it)
                    if name:
                        out.append(name)
                return out
            if isinstance(parsed, dict):
                name = _lang_name_from_item(parsed)
                return [name] if name else []
        except Exception:
            pass

    # 3) fallback split texte ("English, French" etc.)
    parts = re.split(r"[;,/|]\s*|\s*,\s*", s)
    return [p.strip() for p in parts if p.strip()]


def to_numeric_safe(df, cols):
    for c in cols:
        if c in df.columns:
            df[c] = pd.to_numeric(df[c], errors='coerce')
    return df


## 1) Nettoyage du CSV brut

- Remplacement des "fausses valeurs manquantes" (`""`, `"None"`, `[]`, `{}` ...)
- Conversion de colonnes numériques
- Suppression des lignes avec NaN sur les colonnes **critiques**
- Suppression des films avec `popularity == 0`


In [None]:
if not RAW_CSV.exists():
    raise FileNotFoundError(f"Fichier introuvable: {RAW_CSV.resolve()}")

# Chargement
raw = pd.read_csv(RAW_CSV)
print('RAW shape:', raw.shape)

# Nettoyage des fausses valeurs manquantes
fake_nan_patterns = {
    r'^\s*$': np.nan,
    r'(?i)^(none|null|nan)$': np.nan,
    r'^\[\s*\]$': np.nan,
    r'^\{\s*\}$': np.nan,
}
raw = raw.replace(fake_nan_patterns, regex=True)

# Conversion numériques
numeric_cols = ['popularity','vote_average','vote_count','budget','revenue','runtime','year']
raw = to_numeric_safe(raw, numeric_cols)

# Colonnes autorisées à être manquantes
allowed_nan = [
    'poster_path','backdrop_path','revenue','cast','directors','writers','keywords',
    'adult','video','production_companies','production_countries'
]
allowed_nan = [c for c in allowed_nan if c in raw.columns]
critical_cols = [c for c in raw.columns if c not in allowed_nan]

before = len(raw)
clean = raw.dropna(subset=critical_cols).copy()

# Filtre popularity != 0
if 'popularity' in clean.columns:
    clean = clean[clean['popularity'].notna() & (clean['popularity'] != 0)].copy()

print('Dropped (NaN on critical cols):', before - len(clean))
print('CLEAN shape:', clean.shape)

clean.to_csv(CLEAN_CSV, index=False)
print('Saved:', CLEAN_CSV)


## 2) Analyse rapide des langues 

On vérifie la distribution des langues pour savoir un peu avec quoi on travaille

In [None]:
df = pd.read_csv(CLEAN_CSV)

# Comptage langues originales
if 'original_language' in df.columns:
    print('=== original_language (top 20) ===')
    print(df['original_language'].value_counts().head(20).to_string())

# Comptage langues parlées
if 'spoken_languages' in df.columns:
    spoken = []
    for v in df['spoken_languages'].dropna():
        spoken += parse_spoken_languages(v)

    counts = Counter(spoken)
    print('\n=== spoken_languages (top 20) ===')
    for lang, c in counts.most_common(20):
        print(f'{lang}: {c}')


## 3) Filtre: garder uniquement les films où la langue originale est parlée

On garde un film si :
- `original_language` (code ISO) **ou** son nom (via `ISO_TO_LANG`) est présent dans `spoken_languages`.

Sortie : `films_language_match_only.csv`

In [None]:
df = pd.read_csv(CLEAN_CSV)

if 'original_language' not in df.columns or 'spoken_languages' not in df.columns:
    raise ValueError('Colonnes requises manquantes: original_language / spoken_languages')

keep_rows = []
for _, row in df.iterrows():
    orig_code = row.get('original_language')
    if pd.isna(orig_code):
        continue

    orig_code = str(orig_code).strip()
    orig_name = ISO_TO_LANG.get(orig_code, orig_code)

    spoken = set(parse_spoken_languages(row.get('spoken_languages')))

    # check: nom ou code
    if (orig_name in spoken) or (orig_code in spoken):
        keep_rows.append(row)

matched = pd.DataFrame(keep_rows)
print('Before:', len(df), '| After match:', len(matched))
matched.to_csv(MATCH_CSV, index=False)
print('Saved:', MATCH_CSV)


## 4) (Optionnel) Filtre des langues rares

Règles (comme dans ton notebook initial) :
- **A**: si `spoken_languages` contient 1 seule langue et qu'elle est rare (< `RARE_THRESHOLD`) → supprimer
- **B**: si plusieurs langues mais que la langue originale est rare → supprimer

Sortie : `films_language_match_only_filtered_rare_spoken50.csv`

In [None]:
df = pd.read_csv(MATCH_CSV)

# Comptage de chaque langue parlée (1 fois par film)
spoken_counter = Counter()
spoken_sets = []
for v in df['spoken_languages']:
    langs = set(parse_spoken_languages(v))
    spoken_sets.append(langs)
    for lang in langs:
        spoken_counter[lang] += 1

rare_langs = {lang for lang, c in spoken_counter.items() if c < RARE_THRESHOLD}
print(f'Rare langs (<{RARE_THRESHOLD}):', len(rare_langs))

rows_to_keep = []
removed_by_lang = defaultdict(int)

for idx, row in df.iterrows():
    langs = spoken_sets[idx]
    if not langs:
        rows_to_keep.append(row)
        continue

    orig_code = str(row.get('original_language')).strip()
    orig_name = ISO_TO_LANG.get(orig_code, orig_code)

    # Règle A
    if len(langs) == 1:
        only_lang = next(iter(langs))
        if only_lang in rare_langs:
            removed_by_lang[only_lang] += 1
            continue

    # Règle B
    if orig_name in rare_langs:
        removed_by_lang[orig_name] += 1
        continue

    rows_to_keep.append(row)

filtered = pd.DataFrame(rows_to_keep)
print('Before:', len(df), '| After rare-filter:', len(filtered), '| Removed:', len(df) - len(filtered))

if removed_by_lang:
    print('\nTop langues ayant entraîné des suppressions:')
    for lang, c in sorted(removed_by_lang.items(), key=lambda x: x[1], reverse=True)[:20]:
        print(f'{lang}: {c}')

filtered.to_csv(FILTERED_CSV, index=False)
print('Saved:', FILTERED_CSV)

## 5) Attribution de labels linguistiques (heuristiques)

On calcule un **score de difficulté** puis on en déduit :
- `linguistic_level` : Débutant / Intermédiaire / Avancé
- `linguistic_register` : Familier / Courant / Soutenu
- `linguistic_exposure` : Forte / Moyenne / Faible (1,2,3+ langues parlées)
- `linguistic_confidence` : proxy basé sur `vote_count` (stabilité) et multilinguisme


In [None]:
# Choix de l'entrée : filtrée rare (si elle existe) sinon match-only
input_for_labels = FILTERED_CSV if FILTERED_CSV.exists() else MATCH_CSV
print('Using input:', input_for_labels)

df = pd.read_csv(input_for_labels)

# Conversions utiles
for c in ['vote_count','vote_average','popularity','runtime','year','linguistic_difficulty_score']:
    if c in df.columns:
        df[c] = pd.to_numeric(df[c], errors='coerce')

# Genres: on accepte list, string-literal, ou texte

def parse_genres(x):
    if pd.isna(x):
        return []
    if isinstance(x, list):
        return [str(v).strip() for v in x if str(v).strip()]
    s = str(x).strip()
    if not s:
        return []
    # format list-literal
    if s.startswith('[') and s.endswith(']'):
        try:
            v = ast.literal_eval(s)
            if isinstance(v, list):
                return [str(it).strip() for it in v if str(it).strip()]
        except Exception:
            pass
    # fallback texte "A, B"
    return [p.strip() for p in re.split(r"[;,/]\s*|\s*,\s*", s) if p.strip()]

# Spoken languages -> set de noms
spoken_sets = df['spoken_languages'].apply(lambda x: set(parse_spoken_languages(x)) if 'spoken_languages' in df.columns else set())

# Comptage global des langues (1 fois par film)
lang_counter = Counter()
for langs in spoken_sets:
    for l in langs:
        lang_counter[l] += 1

# Poids par genre (à ajuster)
genre_weight = {
    # plutôt accessible
    'Animation': -0.7,
    'Family': -0.7,
    'TV Movie': -0.6,
    'Adventure': -0.25,
    'Fantasy': -0.10,
    'Romance': -0.20,
    'Music': -0.15,
    'Comedy': -0.10,

    # neutre
    'Action': 0.00,
    'Science Fiction': 0.10,
    'Horror': 0.20,

    # plus dense
    'Drama': 0.45,
    'Thriller': 0.45,
    'Mystery': 0.55,
    'Crime': 0.55,

    # souvent exigeant
    'Documentary': 0.85,
    'History': 0.95,
    'War': 0.95,
    'Western': 0.55,
}

# Helpers score

def safe_float(x, default=np.nan):
    try:
        v = float(x)
        return v
    except Exception:
        return default

def safe_int(x, default=None):
    try:
        if pd.isna(x):
            return default
        return int(float(x))
    except Exception:
        return default

def is_mainstream(popularity, vote_count):
    pop = safe_float(popularity, 0.0)
    vc = safe_float(vote_count, 0.0)
    return (pop >= 25 and vc >= 500) or (vc >= 2500)

def epic_bonus(genre_set, runtime):
    if runtime is None:
        return 0.0
    if runtime >= 150 and ('History' in genre_set or 'War' in genre_set):
        return 0.55
    if runtime >= 160 and ('Drama' in genre_set and ('History' in genre_set or 'War' in genre_set)):
        return 0.65
    if runtime >= 180 and ('Drama' in genre_set):
        return 0.35
    return 0.0

def rarity_bonus(spoken_set):
    b = 0.0
    for l in spoken_set:
        c = lang_counter.get(l, 0)
        if c < 10:
            b = max(b, 0.8)
        elif c < 50:
            b = max(b, 0.5)
        elif c < 200:
            b = max(b, 0.2)
    return b

def multilingual_penalty(spoken_set, mainstream_flag):
    n = len(spoken_set)
    if n <= 1:
        return 0.0
    pen = 0.10 * (n - 1)
    if mainstream_flag:
        pen *= 0.25
    if (n >= 5) and (not mainstream_flag):
        pen += 0.35
    return min(pen, 0.9)

def difficulty_score(row, spoken_set):
    score = 2.0

    genres = parse_genres(row.get('genres'))
    gset = set(genres)

    score += sum(genre_weight.get(g, 0.0) for g in genres)

    rt = safe_float(row.get('runtime'), default=np.nan)
    runtime = None if np.isnan(rt) else rt

    if runtime is not None:
        if runtime < 80:
            score -= 0.15
        if runtime > 140:
            score += 0.20
        if runtime > 180:
            score += 0.35

    pop = safe_float(row.get('popularity'), 0.0)
    vc = safe_float(row.get('vote_count'), 0.0)
    mainstream_flag = is_mainstream(pop, vc)

    if mainstream_flag:
        score -= 0.20
    elif vc < 25:
        score += 0.15

    score += multilingual_penalty(spoken_set, mainstream_flag)

    score += rarity_bonus(spoken_set) * (0.6 if mainstream_flag else 1.0)

    score += epic_bonus(gset, runtime)

    year = safe_int(row.get('year'), default=None)
    if year is not None and year < 1980:
        score += 0.15
    if year is not None and year < 1960:
        score += 0.20

    return score

def level_from_score(s):
    if s < 1.7:
        return 'Débutant'
    elif s < 3.4:
        return 'Intermédiaire'
    return 'Avancé'

def register_label(row, spoken_set, score):
    genres = set(parse_genres(row.get('genres')))
    year = safe_int(row.get('year'), default=2000)
    rt = safe_float(row.get('runtime'), default=np.nan)
    runtime = None if np.isnan(rt) else rt

    r = 0.0

    if any(g in genres for g in ['Documentary','History','War']):
        r += 1.1
    if 'Drama' in genres:
        r += 0.25
    if year is not None and year < 1970:
        r += 0.35

    if runtime is not None and runtime >= 150 and ('Fantasy' in genres and 'Adventure' in genres):
        r += 0.55

    if any(g in genres for g in ['Animation','Family','Comedy']):
        r -= 0.95
    if year is not None and year >= 2000 and ('Comedy' in genres or 'Romance' in genres):
        r -= 0.15

    if score >= 4.2:
        r += 0.15

    if r >= 0.65:
        return 'Soutenu'
    if r <= -0.55:
        return 'Familier'
    return 'Courant'

def exposure_label(spoken_set):
    n = len(spoken_set)
    if n <= 1:
        return 'Forte'
    if n == 2:
        return 'Moyenne'
    return 'Faible'

def confidence(row, spoken_set):
    vc = safe_float(row.get('vote_count'), 0.0)
    pop = safe_float(row.get('popularity'), 0.0)
    mainstream_flag = is_mainstream(pop, vc)

    base = 1 - np.exp(-vc / 250)  # 0..1
    n = len(spoken_set)

    penalty = 0.06 * max(0, n - 2)
    if mainstream_flag:
        penalty *= 0.35

    return float(np.clip(base - penalty, 0.0, 1.0))

# Annotation
scores, levels, registers, exposures, confs = [], [], [], [], []

for i, row in df.iterrows():
    sp = spoken_sets.iloc[i] if hasattr(spoken_sets, 'iloc') else spoken_sets[i]
    s = difficulty_score(row, sp)
    scores.append(s)
    levels.append(level_from_score(s))
    exposures.append(exposure_label(sp))
    registers.append(register_label(row, sp, s))
    confs.append(confidence(row, sp))

# Ajout colonnes

df['linguistic_difficulty_score'] = np.round(scores, 3)
df['linguistic_level'] = levels
df['linguistic_register'] = registers
df['linguistic_exposure'] = exposures
df['linguistic_confidence'] = np.round(confs, 3)

# Aperçu
cols_show = [
    c for c in [
        'title','year','original_language','genres','spoken_languages','runtime','vote_count',
        'linguistic_level','linguistic_register','linguistic_exposure','linguistic_difficulty_score','linguistic_confidence'
    ]
    if c in df.columns
]
print('\n=== Aperçu (Top 25 vote_count) ===')
print(df.sort_values('vote_count', ascending=False)[cols_show].head(25).to_string(index=False))

# Sauvegarde

df.to_csv(LABELED_CSV, index=False)
print('\nSaved:', LABELED_CSV)

## 6) Sanity checks par langue (aide à la validation manuelle)

- On exclut par défaut l'anglais (`EXCLUDE_ORIG`)
- On limite aux films avec un `vote_count` minimum
- On affiche : top connus, top difficile, top facile, répartitions, cas suspects

In [None]:
if not LABELED_CSV.exists():
    raise FileNotFoundError("Le CSV labelisé est introuvable. Lance la cellule précédente.")

df = pd.read_csv(LABELED_CSV)

# conversions
for c in [
    'vote_count', 'vote_average', 'popularity', 'runtime',
    'linguistic_difficulty_score', 'year', 'linguistic_confidence'
]:
    if c in df.columns:
        df[c] = pd.to_numeric(df[c], errors='coerce')

# filtre hors langues exclues
if 'original_language' in df.columns:
    df = df[~df['original_language'].isin(EXCLUDE_ORIG)].copy()

# filtre vote_count
if 'vote_count' in df.columns:
    df = df[df['vote_count'].fillna(0) >= MIN_VOTE_COUNT_FOR_QA].copy()

cols_show = [
    c for c in [
        'title','original_title','year','original_language','spoken_languages','genres','runtime',
        'vote_average','vote_count','popularity','linguistic_level','linguistic_register','linguistic_exposure',
        'linguistic_difficulty_score','linguistic_confidence'
    ] if c in df.columns
]

langs = sorted(df['original_language'].dropna().unique()) if 'original_language' in df.columns else []

def print_block(title, subdf, n=TOP_N_QA):
    print(f"\n=== {title} ===")
    if len(subdf) == 0:
        print('(vide)')
        return
    print(subdf.head(n)[cols_show].to_string(index=False))


def count_langs(x):
    if pd.isna(x):
        return 0
    s = str(x).strip()
    # list-literal
    if s.startswith('[') and s.endswith(']'):
        try:
            v = ast.literal_eval(s)
            if isinstance(v, list):
                return len(set(parse_spoken_languages(v)))
        except Exception:
            pass
    return len(set(parse_spoken_languages(s)))

for lang in langs:
    sub = df[df['original_language'] == lang].copy()
    if len(sub) == 0:
        continue

    print("\n" + "=" * 90)
    print(f"LANGUE ORIGINALE = {lang} | n={len(sub)} (vote_count >= {MIN_VOTE_COUNT_FOR_QA})")
    print("=" * 90)

    if 'vote_count' in sub.columns:
        print_block(
            f"Top {TOP_N_QA} (vote_count) — films connus",
            sub.sort_values('vote_count', ascending=False)
        )

    if 'linguistic_difficulty_score' in sub.columns:
        print_block(
            f"Top {TOP_N_QA} (difficulté la + haute)",
            sub.sort_values('linguistic_difficulty_score', ascending=False)
        )
        print_block(
            f"Top {TOP_N_QA} (difficulté la + basse)",
            sub.sort_values('linguistic_difficulty_score', ascending=True)
        )

    if 'linguistic_level' in sub.columns:
        print("\n--- Répartition linguistic_level ---")
        print(sub['linguistic_level'].value_counts(dropna=False).to_string())

    if 'linguistic_register' in sub.columns:
        print("\n--- Répartition linguistic_register ---")
        print(sub['linguistic_register'].value_counts(dropna=False).to_string())

    if 'linguistic_exposure' in sub.columns:
        print("\n--- Répartition linguistic_exposure ---")
        print(sub['linguistic_exposure'].value_counts(dropna=False).to_string())

    if 'spoken_languages' in sub.columns:
        sub['n_spoken'] = sub['spoken_languages'].apply(count_langs)

        if {'linguistic_confidence', 'linguistic_difficulty_score'}.issubset(sub.columns):
            suspects = sub[
                (sub['linguistic_confidence'].fillna(1) < 0.80)
                | (sub['n_spoken'] >= 4)
                | (sub['linguistic_difficulty_score'] > 4.5)
                | (sub['linguistic_difficulty_score'] < 0.8)
            ].copy()

            suspects = suspects.sort_values(
                ['linguistic_confidence', 'n_spoken', 'linguistic_difficulty_score'],
                ascending=[True, False, False]
            )

            print_block('CAS SUSPECTS (à vérifier manuellement)', suspects, n=TOP_N_QA)
