In [1]:
"""
step4_validate_candidates.py — Étape 4 de la pipeline
=======================================================
Validation sémantique des posts candidats par double passe :

  PASSE 1 — Embedding cosinus (sentence-transformers, batch)
            Modèle : paraphrase-multilingual-MiniLM-L12-v2
            Objectif : pré-filtrer à bas coût, éliminer les faux candidats évidents.
            Seuil   : EMBED_PREFILTER_THRESHOLD (défaut 0.38)

  PASSE 2 — LLM (OpenAI gpt-4o-mini, JSON structuré)
            Objectif : évaluer la similarité de CLAIM, pas seulement de surface.
            Chaque appel est indépendant → parallélisable via ThreadPoolExecutor.

  SCORE FINAL = EMBED_WEIGHT * embed_score + LLM_WEIGHT * llm_score
  Retour      : [(BlueskyPost, final_score)] filtrés à ≥ FINAL_THRESHOLD,
                triés par score décroissant.

Dépendances :
    pip install sentence-transformers openai

Variables d'environnement :
    OPENAI_API_KEY   — clé OpenAI (obligatoire pour la passe LLM)
    OPENAI_MODEL     — modèle OpenAI (défaut: gpt-4o-mini)
    VALIDATE_WORKERS — nombre de threads parallèles pour les appels LLM (défaut: 4)
"""

from __future__ import annotations

import json
import logging
import os
import re
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from typing import Optional

import numpy as np
from sentence_transformers import SentenceTransformer
from openai import OpenAI

# ─────────────────────────────────────────────────────────────────────────────
# Configuration
# ─────────────────────────────────────────────────────────────────────────────

# Modèle d'embedding : multilingue, gère le français, ~117 MB
EMBEDDING_MODEL_NAME = "paraphrase-multilingual-MiniLM-L12-v2"

# Seuil de pré-filtrage par embedding. En dessous → pas d'appel LLM.
# 0.38 est volontairement bas pour ne pas rater de vrais positifs
# (le LLM tranchera ensuite).
EMBED_PREFILTER_THRESHOLD = 0.38

# Poids du score final. LLM > embedding car il comprend l'intention sémantique.
EMBED_WEIGHT = 0.35
LLM_WEIGHT   = 0.65

# Seuil de validation finale. Conforme à MIN_SIMILARITY de provenance_graph.py.
FINAL_THRESHOLD = 0.65

# Parallélisme des appels LLM (ThreadPoolExecutor)
DEFAULT_WORKERS = int(os.getenv("VALIDATE_WORKERS", "4"))

# Modèle OpenAI
OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o-mini")

# ─────────────────────────────────────────────────────────────────────────────
# Logging
# ─────────────────────────────────────────────────────────────────────────────

logger = logging.getLogger(__name__)

# ─────────────────────────────────────────────────────────────────────────────
# Résultat intermédiaire de validation
# ─────────────────────────────────────────────────────────────────────────────

@dataclass
class _ValidationResult:
    """Résultat intermédiaire — usage interne uniquement."""
    post: object                     # BlueskyPost
    embed_score: float               # cosinus brut
    llm_score: Optional[float]       # score LLM (None si non appelé ou erreur)
    llm_reasoning: str = ""          # justification textuelle du LLM
    final_score: float = 0.0         # score combiné calculé après les deux passes

    def compute_final(self) -> None:
        """
        Calcule le score final.
        Si le LLM n'a pas répondu (None), on utilise uniquement l'embedding
        mais on plafonne à FINAL_THRESHOLD - ε pour ne jamais valider
        sur embedding seul (comportement conservateur).
        """
        if self.llm_score is None:
            # Passe LLM échouée → on utilise l'embedding avec un facteur pénalisant
            self.final_score = self.embed_score * 0.80
        else:
            self.final_score = (
                EMBED_WEIGHT * self.embed_score
                + LLM_WEIGHT * self.llm_score
            )


# ─────────────────────────────────────────────────────────────────────────────
# Singleton du modèle d'embedding (chargé une seule fois par processus)
# ─────────────────────────────────────────────────────────────────────────────

_embedding_model: Optional[SentenceTransformer] = None

def _get_embedding_model() -> SentenceTransformer:
    """Charge le modèle d'embedding de façon paresseuse (lazy singleton)."""
    global _embedding_model
    if _embedding_model is None:
        logger.info("Chargement du modèle d'embedding '%s'…", EMBEDDING_MODEL_NAME)
        _embedding_model = SentenceTransformer(EMBEDDING_MODEL_NAME)
        logger.info("Modèle d'embedding chargé.")
    return _embedding_model


# ─────────────────────────────────────────────────────────────────────────────
# Passe 1 — Embeddings
# ─────────────────────────────────────────────────────────────────────────────

def _cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
    """Similarité cosinus entre deux vecteurs numpy 1D."""
    norm_a = np.linalg.norm(a)
    norm_b = np.linalg.norm(b)
    if norm_a == 0.0 or norm_b == 0.0:
        return 0.0
    return float(np.dot(a, b) / (norm_a * norm_b))


def _embed_batch(texts: list[str]) -> np.ndarray:
    """
    Encode une liste de textes en une seule inférence batch.
    Retourne un tableau de shape (N, embedding_dim).
    """
    model = _get_embedding_model()
    # normalize_embeddings=True → les vecteurs sont L2-normalisés,
    # le produit scalaire devient directement la similarité cosinus.
    return model.encode(texts, normalize_embeddings=True, show_progress_bar=False)


def _pass1_embedding(
    ref_text: str,
    candidate_texts: list[str],
) -> list[float]:
    """
    Calcule les similarités cosinus entre ref_text et chaque candidat.
    Retourne une liste de scores dans [0, 1] (même ordre que candidate_texts).
    """
    all_texts = [ref_text] + candidate_texts
    embeddings = _embed_batch(all_texts)

    ref_emb   = embeddings[0]          # shape (dim,)
    cand_embs = embeddings[1:]         # shape (N, dim)

    # Produit scalaire vectorisé (tous les candidats d'un coup)
    similarities = (cand_embs @ ref_emb).tolist()  # liste de N floats
    return similarities


# ─────────────────────────────────────────────────────────────────────────────
# Passe 2 — LLM
# ─────────────────────────────────────────────────────────────────────────────

_LLM_SYSTEM_PROMPT = """Tu es un expert en analyse de désinformation et de propagation de l'information sur les réseaux sociaux.

Ta tâche est de comparer deux posts Bluesky et de déterminer s'ils parlent du MÊME FAIT ou de la MÊME AFFIRMATION principale.

Règles d'évaluation :
- Concentre-toi sur l'AFFIRMATION FACTUELLE centrale, pas sur la forme, le style ou le contexte éditorial.
- Deux posts peuvent utiliser des mots très différents mais rapporter le même fait → score élevé.
- Une reformulation partielle ou biaisée du même fait doit recevoir un score élevé (c'est précisément ce qu'on cherche à détecter).
- Des posts sur des sujets superficiellement similaires mais avec des faits différents → score bas.

Tu dois répondre UNIQUEMENT en JSON valide, sans aucun texte autour, avec exactement ces clés :
{
  "same_claim": <true|false>,
  "similarity_score": <float entre 0.0 et 1.0>,
  "reasoning": "<explication concise en français, max 80 mots>"
}"""

_LLM_USER_TEMPLATE = """POST DE RÉFÉRENCE (plus récent) :
\"\"\"
{ref_text}
\"\"\"

POST CANDIDAT (antérieur) :
\"\"\"
{candidate_text}
\"\"\"

Ces deux posts rapportent-ils le même fait ou la même affirmation principale ?"""


def _parse_llm_response(raw: str) -> tuple[float, str]:
    """
    Parse la réponse JSON du LLM.
    Retourne (similarity_score, reasoning).
    En cas d'erreur de parsing, retourne (-1.0, "parse_error").
    """
    # Extraction robuste : on cherche le premier bloc JSON même si le LLM
    # a ajouté du texte malgré les instructions.
    json_match = re.search(r'\{.*\}', raw, re.DOTALL)
    if not json_match:
        logger.warning("LLM: aucun JSON trouvé dans la réponse : %s", raw[:200])
        return -1.0, "parse_error"

    try:
        data = json.loads(json_match.group())
    except json.JSONDecodeError as exc:
        logger.warning("LLM: JSON invalide (%s) : %s", exc, raw[:200])
        return -1.0, "json_decode_error"

    # Validation et coercition du score
    score = data.get("similarity_score", -1.0)
    try:
        score = float(score)
        score = max(0.0, min(1.0, score))   # clamp [0, 1]
    except (TypeError, ValueError):
        logger.warning("LLM: similarity_score invalide : %s", score)
        return -1.0, "invalid_score"

    reasoning = str(data.get("reasoning", ""))
    return score, reasoning


def _call_llm_single(
    client: OpenAI,
    ref_text: str,
    candidate_text: str,
    candidate_idx: int,
) -> tuple[int, float, str]:
    """
    Appel LLM pour une seule paire (ref, candidat).
    Retourne (candidate_idx, llm_score, reasoning).
    llm_score == -1.0 en cas d'échec.
    """
    prompt = _LLM_USER_TEMPLATE.format(
        ref_text=ref_text.strip(),
        candidate_text=candidate_text.strip(),
    )

    try:
        response = client.chat.completions.create(
            model=OPENAI_MODEL,
            messages=[
                {"role": "system", "content": _LLM_SYSTEM_PROMPT},
                {"role": "user",   "content": prompt},
            ],
            response_format={"type": "json_object"},
            temperature=0.0,    # déterministe
            max_tokens=256,
            timeout=20.0,
        )
        raw = response.choices[0].message.content or ""
        score, reasoning = _parse_llm_response(raw)
        logger.debug("LLM[%d]: score=%.2f  reasoning=%s", candidate_idx, score, reasoning[:60])
        return candidate_idx, score, reasoning

    except Exception as exc:
        logger.warning("LLM[%d]: appel échoué (%s)", candidate_idx, exc)
        return candidate_idx, -1.0, f"api_error: {exc}"


def _pass2_llm_parallel(
    ref_text: str,
    candidates: list[tuple[int, str]],   # [(original_idx, text), ...]
    workers: int = DEFAULT_WORKERS,
) -> dict[int, tuple[float, str]]:
    """
    Lance les appels LLM en parallèle pour tous les candidats pré-filtrés.
    Retourne un dict {original_idx: (llm_score, reasoning)}.
    """
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise EnvironmentError(
            "OPENAI_API_KEY non définie. "
            "La passe LLM (étape 4) nécessite une clé OpenAI valide."
        )

    client = OpenAI(api_key=api_key)
    results: dict[int, tuple[float, str]] = {}

    with ThreadPoolExecutor(max_workers=workers) as executor:
        futures = {
            executor.submit(
                _call_llm_single, client, ref_text, text, orig_idx
            ): orig_idx
            for orig_idx, text in candidates
        }

        for future in as_completed(futures):
            orig_idx = futures[future]
            try:
                _, llm_score, reasoning = future.result()
            except Exception as exc:
                logger.error("Future[%d] inattendue : %s", orig_idx, exc)
                llm_score, reasoning = -1.0, "unexpected_error"

            results[orig_idx] = (llm_score, reasoning)
            logger.info(
                "  Candidat[%d] → LLM score=%.2f  (%s)",
                orig_idx, llm_score, reasoning[:50]
            )

    return results


# ─────────────────────────────────────────────────────────────────────────────
# Fonction principale — API publique
# ─────────────────────────────────────────────────────────────────────────────

def validate_candidates(
    ref_post,
    candidates: list,
    embed_prefilter_threshold: float = EMBED_PREFILTER_THRESHOLD,
    final_threshold: float = FINAL_THRESHOLD,
    workers: int = DEFAULT_WORKERS,
) -> list[tuple[object, float]]:
    """
    Étape 4 — Validation sémantique des posts candidats.

    Args:
        ref_post  : BlueskyPost de référence (le post dont on cherche les antécédents).
                    Doit exposer un attribut `.text` (str).
        candidates: Liste de BlueskyPost candidats (issus de l'étape 3).
                    Chaque objet doit exposer `.text`.
        embed_prefilter_threshold:
                    Seuil de pré-filtrage par similarité cosinus.
                    Les candidats en dessous ne sont pas soumis au LLM.
                    Défaut : 0.38 (conservateur).
        final_threshold:
                    Seuil de validation finale (score combiné).
                    Défaut : 0.65, conforme à MIN_SIMILARITY du graphe.
        workers   : Nombre de threads parallèles pour les appels LLM.

    Returns:
        Liste de (BlueskyPost, final_score) pour les candidats validés,
        triée par final_score décroissant.
        Seuls les candidats avec final_score >= final_threshold sont inclus.

    Raises:
        EnvironmentError : si OPENAI_API_KEY n'est pas définie et que
                           des candidats ont survécu au pré-filtrage.
    """
    if not candidates:
        logger.info("validate_candidates: aucun candidat → retour vide immédiat.")
        return []

    ref_text = ref_post.text
    candidate_texts = [c.text for c in candidates]

    logger.info(
        "validate_candidates: %d candidats, ref='%s…'",
        len(candidates), ref_text[:60]
    )

    # ── Passe 1 : Embedding ────────────────────────────────────────────────
    logger.info("Passe 1 — calcul des embeddings (batch de %d textes)…", len(candidates) + 1)
    embed_scores = _pass1_embedding(ref_text, candidate_texts)

    # Pré-filtrage
    prefiltered: list[tuple[int, str]] = []   # (original_index, text)
    validation_results: list[_ValidationResult] = []

    for idx, (post, embed_score) in enumerate(zip(candidates, embed_scores)):
        result = _ValidationResult(post=post, embed_score=embed_score, llm_score=None)
        validation_results.append(result)

        if embed_score >= embed_prefilter_threshold:
            prefiltered.append((idx, candidate_texts[idx]))
            logger.info(
                "  Candidat[%d] passe le pré-filtrage  embed=%.3f  '%s…'",
                idx, embed_score, candidate_texts[idx][:50]
            )
        else:
            logger.info(
                "  Candidat[%d] éliminé par embedding  embed=%.3f  '%s…'",
                idx, embed_score, candidate_texts[idx][:50]
            )

    logger.info(
        "Passe 1 terminée : %d/%d candidats survivants (seuil=%.2f).",
        len(prefiltered), len(candidates), embed_prefilter_threshold
    )

    # ── Passe 2 : LLM (uniquement sur les survivants) ─────────────────────
    if prefiltered:
        logger.info(
            "Passe 2 — validation LLM sur %d candidats (workers=%d)…",
            len(prefiltered), workers
        )
        llm_results = _pass2_llm_parallel(ref_text, prefiltered, workers=workers)

        for orig_idx, (llm_score, reasoning) in llm_results.items():
            r = validation_results[orig_idx]
            # llm_score == -1.0 signifie un échec d'appel → on laisse None
            r.llm_score   = llm_score if llm_score >= 0.0 else None
            r.llm_reasoning = reasoning
    else:
        logger.info("Passe 2 — aucun candidat à soumettre au LLM.")

    # ── Score final + filtrage ─────────────────────────────────────────────
    validated: list[tuple[object, float]] = []

    for idx, result in enumerate(validation_results):
        result.compute_final()
        logger.info(
            "  Candidat[%d] : embed=%.3f  llm=%s  final=%.3f  → %s",
            idx,
            result.embed_score,
            f"{result.llm_score:.3f}" if result.llm_score is not None else "N/A",
            result.final_score,
            "✓ VALIDÉ" if result.final_score >= final_threshold else "✗ rejeté",
        )

        if result.final_score >= final_threshold:
            validated.append((result.post, round(result.final_score, 4)))

    # Tri par score décroissant
    validated.sort(key=lambda x: x[1], reverse=True)

    logger.info(
        "validate_candidates : %d/%d candidats validés (seuil final=%.2f).",
        len(validated), len(candidates), final_threshold
    )
    return validated


# ─────────────────────────────────────────────────────────────────────────────
# Tests unitaires légers (sans dépendance externe)
# ─────────────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
    """
    Smoke test local — valide la passe embedding sans appel LLM.
    Lance : python step4_validate_candidates.py
    """
    import logging
    logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")

    # ── Stub minimal de BlueskyPost ────────────────────────────────────────
    class _FakePost:
        def __init__(self, text: str, handle: str = "user"):
            self.text          = text
            self.author_handle = handle
            self.uri           = f"at://fake/{handle}"
            self.date          = "2025-01-01T00:00:00Z"
            self.likes         = 0
            self.reposts       = 0
            self.replies       = 0

    ref = _FakePost(
        "Des manifestants arborant des croix gammées ont été photographiés lors "
        "du rassemblement de Lyon selon le journaliste de BFM présent sur place.",
        handle="ref_post"
    )

    candidates = [
        _FakePost(
            # Très similaire — doit passer le pré-filtrage embedding
            "Le correspondant de BFM TV à Lyon confirme avoir vu des symboles nazis "
            "parmi les participants au rassemblement.",
            handle="similar_post"
        ),
        _FakePost(
            # Sujet différent — doit être éliminé par embedding
            "Recette de quiche lorraine maison : 200g de lardons, 3 œufs, 20cl de crème.",
            handle="unrelated_post"
        ),
        _FakePost(
            # Modérément similaire — résultat dépend du LLM
            "Les organisateurs du rassemblement lyonnais démentent toute présence "
            "d'éléments d'extrême droite lors de l'événement.",
            handle="moderate_post"
        ),
    ]

    print("\n" + "="*60)
    print("SMOKE TEST — Passe 1 (embedding) uniquement")
    print("(La passe LLM sera ignorée si OPENAI_API_KEY est absente)")
    print("="*60 + "\n")
    try:
        results = validate_candidates(ref, candidates)
        print(f"\n→ {len(results)} candidat(s) validé(s) :\n")
        for post, score in results:
            print(f"  [{score:.4f}]  @{post.author_handle} : {post.text[:80]}…")
    except EnvironmentError as e:
        # OPENAI_API_KEY manquante — on teste uniquement l'embedding
        print(f"Note : {e}")
        print("Test de la passe embedding seule…\n")
        scores = _pass1_embedding(ref.text, [c.text for c in candidates])
        for c, s in zip(candidates, scores):
            status = "✓ survit" if s >= EMBED_PREFILTER_THRESHOLD else "✗ éliminé"
            print(f"  [{s:.4f}] {status}  @{c.author_handle}")

  from .autonotebook import tqdm as notebook_tqdm
INFO validate_candidates: 3 candidats, ref='Des manifestants arborant des croix gammées ont été photogra…'
INFO Passe 1 — calcul des embeddings (batch de 4 textes)…
INFO Chargement du modèle d'embedding 'paraphrase-multilingual-MiniLM-L12-v2'…
INFO Use pytorch device_name: cpu
INFO Load pretrained SentenceTransformer: paraphrase-multilingual-MiniLM-L12-v2



SMOKE TEST — Passe 1 (embedding) uniquement
(La passe LLM sera ignorée si OPENAI_API_KEY est absente)



INFO HTTP Request: HEAD https://huggingface.co/sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2/resolve/main/modules.json "HTTP/1.1 307 Temporary Redirect"
INFO HTTP Request: HEAD https://huggingface.co/api/resolve-cache/models/sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2/e8f8c211226b894fcb81acc59f3b34ba3efd5f42/modules.json "HTTP/1.1 200 OK"
INFO HTTP Request: HEAD https://huggingface.co/sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2/resolve/main/config_sentence_transformers.json "HTTP/1.1 307 Temporary Redirect"
INFO HTTP Request: HEAD https://huggingface.co/api/resolve-cache/models/sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2/e8f8c211226b894fcb81acc59f3b34ba3efd5f42/config_sentence_transformers.json "HTTP/1.1 200 OK"
INFO HTTP Request: HEAD https://huggingface.co/sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2/resolve/main/config_sentence_transformers.json "HTTP/1.1 307 Temporary Redirect"
INFO HTTP Request: HEAD http

Note : OPENAI_API_KEY non définie. La passe LLM (étape 4) nécessite une clé OpenAI valide.
Test de la passe embedding seule…

  [0.5331] ✓ survit  @similar_post
  [-0.0736] ✗ éliminé  @unrelated_post
  [0.5086] ✓ survit  @moderate_post
