# 04 - Simulation Production

Notebook de simulation du flow de production Chiron.

## Flow complet
```
PDF original
    │
    ▼
1. Extraction nom (pdfplumber + regex)      [local, rapide]
    │
    ▼
2. NER pour variantes (CamemBERT)           [local, ~0.4s]
    │
    ▼
3. Anonymisation PDF (PyMuPDF redaction)    [local, rapide]
    │
    ▼
4. OCR (Mistral OCR)                        [cloud, ~2s]
    │
    ▼
5. Données structurées (EleveExtraction)
```

In [None]:
# ruff: noqa: E402
import sys
from pathlib import Path

# Auto-détecter project_root
current = Path.cwd()
while current != current.parent:
    if (current / "pyproject.toml").exists():
        project_root = current
        break
    current = current.parent

sys.path.insert(0, str(project_root))

from dotenv import load_dotenv

load_dotenv(project_root / ".env")

# Paths
DATA_DIR = project_root / "data"
RAW_DIR = DATA_DIR / "raw"

print(f"Project root: {project_root}")
print(f"PDFs disponibles: {[p.name for p in RAW_DIR.glob('*.pdf')]}")

## 1. Configuration et estimation des coûts

In [None]:
from src.document import estimate_mistral_cost
from src.llm.config import settings

# Afficher la config actuelle
print(f"Modèle Mistral OCR : {settings.mistral_ocr_model}")
print(f"Coût Mistral OCR : {settings.mistral_ocr_cost_per_1000_pages}$/1000 pages")

# Estimation du coût
pdfs = [p for p in RAW_DIR.glob("*.pdf") if not p.name.startswith("ELEVE_TEST")]
estimate = estimate_mistral_cost(pdfs)
print(f"\nPDFs à traiter : {len(pdfs)}")
print(f"Pages totales : {estimate['pages']}")
print(f"Coût estimé Mistral OCR : ${estimate['cost_usd']}")

## 2. Chargement des modèles NER

CamemBERT v1 (`Jean-Baptiste/camembert-ner`) est utilisé pour détecter les variantes du nom de l'élève.

In [None]:
from transformers import pipeline

# Charger CamemBERT NER (meilleur compromis vitesse/précision)
print("Chargement de CamemBERT NER...")
nlp_ner = pipeline(
    "ner",
    model="Jean-Baptiste/camembert-ner",
    aggregation_strategy="simple"
)
print("✅ CamemBERT NER chargé")

## 3. Fonctions d'anonymisation

Ces fonctions permettent :
1. D'extraire le nom de l'élève depuis l'en-tête du PDF
2. De détecter toutes les variantes avec NER
3. D'anonymiser le PDF avant envoi cloud

In [None]:
import re

import fitz  # PyMuPDF
import pdfplumber


def extract_eleve_name(pdf_path: Path) -> dict | None:
    """Extrait le nom de l'élève depuis le PDF.

    Recherche le pattern "Élève : <nom>" et capture le texte jusqu'à la fin de la ligne.

    Args:
        pdf_path: Chemin vers le fichier PDF.

    Returns:
        Dict avec 'nom_complet' et 'texte_complet', ou None si non trouvé.
    """
    with pdfplumber.open(pdf_path) as pdf:
        text_complet = ""
        for page in pdf.pages:
            text_complet += (page.extract_text() or "") + "\n"

        match = re.search(r"[ÉE]l[èe]ve\s*:\s*([^\n]+)", text_complet, re.IGNORECASE)
        if match:
            return {
                "nom_complet": match.group(1).strip(),
                "texte_complet": text_complet,
            }
    return None


def split_into_chunks(text: str, max_chars: int = 2000) -> list[str]:
    """Découpe le texte en chunks en respectant les fins de phrases.

    Args:
        text: Texte à découper
        max_chars: Taille max par chunk (~2000 chars ≈ 400 tokens pour CamemBERT)

    Returns:
        Liste de chunks
    """
    sentences = re.split(r"(?<=[.!?])\s+", text)

    chunks = []
    current_chunk = ""

    for sentence in sentences:
        if len(current_chunk) + len(sentence) < max_chars:
            current_chunk += sentence + " "
        else:
            if current_chunk:
                chunks.append(current_chunk.strip())
            current_chunk = sentence + " "

    if current_chunk:
        chunks.append(current_chunk.strip())

    return chunks if chunks else [text]


def detect_name_variants(text: str, nom_entete: str, nlp_pipeline) -> list[str]:
    """Détecte toutes les variantes du nom de l'élève avec NER.

    Args:
        text: Texte complet du PDF
        nom_entete: Nom extrait de l'en-tête ("Élève : XXX")
        nlp_pipeline: Pipeline NER (CamemBERT)

    Returns:
        Liste des variantes uniques, triées par longueur décroissante
    """
    # Labels pour les personnes
    person_labels = {"PER", "PERSON", "I-PER", "B-PER"}

    # Extraire les personnes avec NER
    chunks = split_into_chunks(text, max_chars=2000)
    personnes = []
    for chunk in chunks:
        entities = nlp_pipeline(chunk)
        for e in entities:
            if e["entity_group"].upper() in person_labels or "PER" in e["entity_group"].upper():
                personnes.append(e["word"])

    # Filtrer les variantes qui correspondent au nom de l'en-tête
    nom_parts = [p.lower() for p in nom_entete.strip().split()]
    variants = set()

    for personne in set(personnes):
        personne_clean = personne.strip()
        if not personne_clean:
            continue
        personne_parts = personne_clean.lower().split()
        # Match si au moins une partie correspond
        if any(part in nom_parts for part in personne_parts):
            variants.add(personne_clean)

    # Toujours inclure le nom complet
    variants.add(nom_entete.strip())

    # Trier par longueur décroissante (remplacer les plus longs d'abord)
    return sorted(variants, key=len, reverse=True)


def anonymize_pdf(pdf_path: Path, noms_a_remplacer: list[str], nom_anonyme: str) -> bytes:
    """Remplace les noms dans le PDF et retourne les bytes du PDF anonymisé.

    Args:
        pdf_path: Chemin vers le PDF original
        noms_a_remplacer: Liste des variantes du nom à remplacer
        nom_anonyme: Nom de remplacement (ex: "ELEVE_001")

    Returns:
        Bytes du PDF anonymisé
    """
    fitz.TOOLS.set_small_glyph_heights(True)

    doc = fitz.open(pdf_path)
    total_replacements = 0

    for page in doc:
        for nom in noms_a_remplacer:
            instances = page.search_for(nom)
            for rect in instances:
                page.add_redact_annot(
                    rect,
                    text=nom_anonyme,
                    fill=(1, 1, 1),
                    fontsize=10,
                )
                total_replacements += 1
        page.apply_redactions()

    return doc.tobytes(), total_replacements


print("✅ Fonctions d'anonymisation définies")

## 4. Fonction d'extraction Mistral OCR

In [None]:
import base64
import json
import os

from mistralai import Mistral

# Init client Mistral
mistral_api_key = os.getenv("MISTRAL_API_KEY") or os.getenv("MISTRAL_OCR_API_KEY")
if not mistral_api_key:
    raise ValueError("MISTRAL_API_KEY ou MISTRAL_OCR_API_KEY non configurée dans .env")

mistral_client = Mistral(api_key=mistral_api_key)
mistral_model = settings.mistral_ocr_model


def extract_with_mistral_ocr(pdf_bytes: bytes) -> dict:
    """Extrait le contenu d'un PDF avec Mistral OCR.

    Args:
        pdf_bytes: Bytes du PDF (peut être anonymisé)

    Returns:
        Dict avec le résultat OCR (pages, markdown, etc.)
    """
    base64_pdf = base64.b64encode(pdf_bytes).decode("utf-8")

    response = mistral_client.ocr.process(
        model=mistral_model,
        document={
            "type": "document_url",
            "document_url": f"data:application/pdf;base64,{base64_pdf}"
        },
    )

    return json.loads(response.model_dump_json())


print(f"✅ Client Mistral initialisé (modèle: {mistral_model})")

## 5. Pipeline complet : Anonymisation + OCR

Traitement de tous les PDFs avec le flow complet.

In [None]:
import time

# Résultats
results = {}
timings = {}

# Filtrer les PDFs de test
pdfs_to_process = [p for p in sorted(RAW_DIR.glob("*.pdf")) if not p.name.startswith("ELEVE_TEST")]

print(f"Traitement de {len(pdfs_to_process)} PDFs...\n")
print("=" * 70)

for pdf_path in pdfs_to_process:
    eleve_id = pdf_path.stem
    print(f"\n[{eleve_id}]")

    timing = {}
    start_total = time.perf_counter()

    try:
        # 1. Extraire le nom depuis l'en-tête
        start = time.perf_counter()
        eleve_info = extract_eleve_name(pdf_path)
        timing["extract_name"] = time.perf_counter() - start

        if not eleve_info:
            print("  ⚠️ Nom non trouvé dans l'en-tête")
            results[eleve_id] = {"error": "Nom non trouvé"}
            continue

        nom_original = eleve_info["nom_complet"]
        print(f"  1. Nom extrait: '{nom_original}' ({timing['extract_name']:.3f}s)")

        # 2. Détecter les variantes avec NER
        start = time.perf_counter()
        variants = detect_name_variants(eleve_info["texte_complet"], nom_original, nlp_ner)
        timing["ner"] = time.perf_counter() - start
        print(f"  2. Variantes NER: {variants} ({timing['ner']:.3f}s)")

        # 3. Anonymiser le PDF
        start = time.perf_counter()
        nom_anonyme = eleve_id  # Utiliser l'ID du fichier comme pseudonyme
        pdf_anonymise, nb_replacements = anonymize_pdf(pdf_path, variants, nom_anonyme)
        timing["anonymize"] = time.perf_counter() - start
        print(f"  3. Anonymisation: {nb_replacements} remplacements ({timing['anonymize']:.3f}s)")

        # 4. Envoyer à Mistral OCR
        start = time.perf_counter()
        ocr_result = extract_with_mistral_ocr(pdf_anonymise)
        timing["ocr"] = time.perf_counter() - start
        markdown = ocr_result["pages"][0]["markdown"]
        print(f"  4. Mistral OCR: {len(markdown)} chars ({timing['ocr']:.3f}s)")

        # 5. Vérifier l'anonymisation
        anonymisation_ok = True
        for variant in variants:
            if variant.lower() in markdown.lower():
                print(f"  ⚠️ '{variant}' encore présent dans le résultat OCR!")
                anonymisation_ok = False

        if anonymisation_ok:
            print("  ✅ Anonymisation vérifiée")

        # Stocker le résultat
        timing["total"] = time.perf_counter() - start_total
        results[eleve_id] = {
            "nom_original": nom_original,
            "nom_anonyme": nom_anonyme,
            "variants": variants,
            "nb_replacements": nb_replacements,
            "markdown": markdown,
            "anonymisation_ok": anonymisation_ok,
        }
        timings[eleve_id] = timing

        print(f"  → Total: {timing['total']:.2f}s")

    except Exception as e:
        print(f"  ❌ Erreur: {e}")
        results[eleve_id] = {"error": str(e)}

print("\n" + "=" * 70)
print(f"Traitement terminé: {len(results)} PDFs")

## 6. Résumé des résultats

In [None]:
import pandas as pd

# Tableau récapitulatif
summary = []
for eleve_id, result in results.items():
    if "error" in result:
        summary.append({
            "eleve_id": eleve_id,
            "status": "❌ ERREUR",
            "nom_original": "-",
            "nb_variants": 0,
            "nb_replacements": 0,
            "anonymisation_ok": "-",
            "temps_total": "-",
        })
    else:
        summary.append({
            "eleve_id": eleve_id,
            "status": "✅ OK",
            "nom_original": result["nom_original"],
            "nb_variants": len(result["variants"]),
            "nb_replacements": result["nb_replacements"],
            "anonymisation_ok": "✅" if result["anonymisation_ok"] else "❌",
            "temps_total": f"{timings[eleve_id]['total']:.2f}s",
        })

df_summary = pd.DataFrame(summary)
print("RÉSUMÉ DU TRAITEMENT")
print("=" * 80)
print(df_summary.to_string(index=False))

In [None]:
# Détail des temps
if timings:
    df_times = pd.DataFrame([
        {
            "eleve_id": eleve_id,
            "extract_name": f"{t['extract_name']:.3f}s",
            "ner": f"{t['ner']:.3f}s",
            "anonymize": f"{t['anonymize']:.3f}s",
            "ocr": f"{t['ocr']:.3f}s",
            "total": f"{t['total']:.2f}s",
        }
        for eleve_id, t in timings.items()
    ])

    print("\nDÉTAIL DES TEMPS")
    print("=" * 80)
    print(df_times.to_string(index=False))

    # Moyennes
    avg_total = sum(t["total"] for t in timings.values()) / len(timings)
    avg_ocr = sum(t["ocr"] for t in timings.values()) / len(timings)
    avg_ner = sum(t["ner"] for t in timings.values()) / len(timings)
    print(f"\nMoyennes: NER={avg_ner:.2f}s, OCR={avg_ocr:.2f}s, Total={avg_total:.2f}s")

## 7. Afficher un exemple de résultat OCR

In [None]:
# Afficher le markdown d'un exemple
exemple_id = list(results.keys())[0]
exemple = results[exemple_id]

if "markdown" in exemple:
    print(f"=== {exemple_id} (anonymisé en '{exemple['nom_anonyme']}') ===")
    print(f"Nom original: {exemple['nom_original']}")
    print(f"Variantes détectées: {exemple['variants']}")
    print("\n--- Markdown OCR (premiers 1500 chars) ---\n")
    print(exemple["markdown"][:1500])
else:
    print(f"Erreur: {exemple}")

## 8. Prochaines étapes

- [ ] Parser le markdown en `EleveExtraction` structuré
- [ ] Stockage DuckDB (avec pseudonymisation)
- [ ] Génération LLM des synthèses
- [ ] Dépseudonymisation pour export