In [1]:
from pathlib import Path
import pandas as pd

DATA_PATH = Path("bbc_news_train.csv")
if not DATA_PATH.exists():
    raise FileNotFoundError(f"Impossible de trouver le fichier : {DATA_PATH.resolve()}")

df = pd.read_csv(DATA_PATH)
df.head()

Unnamed: 0,ArticleId,Text,Category
0,1833,worldcom ex-boss launches defence lawyers defe...,business
1,154,german business confidence slides german busin...,business
2,1101,bbc poll indicates economic gloom citizens in ...,business
3,1976,lifestyle governs mobile choice faster bett...,tech
4,917,enron bosses in $168m payout eighteen former e...,business


In [2]:
# df_tech = df[df["Category"] == "tech"].head(100).copy()
df_tech = df[df["Category"] == "tech"].head(10).copy()
display(df_tech.head())
print(f"Nombre d'articles dans le dataset filtré : {len(df_tech)}")

Unnamed: 0,ArticleId,Text,Category
3,1976,lifestyle governs mobile choice faster bett...,tech
19,1552,moving mobile improves golf swing a mobile pho...,tech
24,405,bt boosts its broadband packages british telec...,tech
26,702,peer-to-peer nets here to stay peer-to-peer ...,tech
30,1951,pompeii gets digital make-over the old-fashion...,tech


Nombre d'articles dans le dataset filtré : 10


In [3]:
# petit set de relations pour éviter un graphe incohérent

REL_SCHEMA = [
  {"pred":"schema:about",
   "subj":["schema:NewsArticle"], "obj":["schema:Thing"],
   "note":"Article -> entités mentionnées/centrales"},

  {"pred":"schema:author",
   "subj":["schema:NewsArticle"], "obj":["foaf:Person","schema:Organization"],
   "note":"Optionnel (souvent absent) : auteur ou média"},

  {"pred":"schema:worksFor",
   "subj":["foaf:Person"], "obj":["schema:Organization"],
   "note":"Emploi / affiliation forte"},

  {"pred":"schema:founder",
   "subj":["schema:Organization"], "obj":["foaf:Person"],
   "note":"Organisation -> fondateur(s)"},

  {"pred":"schema:acquiredBy",
   "subj":["schema:Organization"], "obj":["schema:Organization"],
   "note":"Convention: acquis -> acquéreur"},

  {"pred":"schema:produces",
   "subj":["schema:Organization"], "obj":["schema:Product"],
   "note":"Entreprise -> produit (hardware/software)"},

  {"pred":"schema:location",
   "subj":["schema:Organization","schema:Event"], "obj":["schema:Place"],
   "note":"Si une localisation est explicitement mentionnée"},

  {"pred":"onto:announced",
   "subj":["schema:Organization","foaf:Person"], "obj":["schema:Product","schema:Event"],
   "note":"Local (si tu veux expliciter les annonces/lancements)"},

  {"pred":"schema:datePublished",
   "subj":["schema:NewsArticle"], "obj":["xsd:date","xsd:dateTime"],
   "note":"Attribut date article (si dispo / inférée)"},

  {"pred":"schema:releaseDate",
   "subj":["schema:Product","schema:Event"], "obj":["xsd:date","xsd:dateTime"],
   "note":"Attribut date sortie/lancement (si mentionné)"}
]

In [4]:
PRED_TYPES = {
  "schema:worksFor":   ({"PERSON"}, {"ORG"}),
  "schema:founder":    ({"ORG"}, {"PERSON"}),
  "schema:acquiredBy": ({"ORG"}, {"ORG"}),
  "schema:produces":   ({"ORG"}, {"PRODUCT"}),
  "schema:location":   ({"ORG","EVENT"}, {"GPE"}),
  "onto:announced":    ({"ORG","PERSON"}, {"PRODUCT","EVENT"}),
  # about/author/datePublished/releaseDate : laisse plus souple pour l’instant
}

In [5]:
import os, json, re, time
from pathlib import Path
from typing import List, Literal, Optional, Dict, Any, Tuple

import requests
from tqdm import tqdm
from pydantic import BaseModel, Field, ValidationError

# Détection "best effort" des colonnes
TEXT_COL = next((c for c in ["Text", "text", "News", "news", "Content", "content"] if c in df_tech.columns), None)
if TEXT_COL is None:
    raise KeyError(f"Colonne texte introuvable. Colonnes dispo: {list(df_tech.columns)}")

ID_COL = next((c for c in ["id", "Id", "ID", "article_id", "ArticleId"] if c in df_tech.columns), None)

ALLOWED_PREDS = [x["pred"] for x in REL_SCHEMA]

CACHE_DIR = Path("cache_extractions")
CACHE_DIR.mkdir(exist_ok=True)

In [6]:
EXTRACTION_JSON_SCHEMA = {
  "type": "object",
  "properties": {
    "entities": {
      "type": "array",
      "items": {
        "type": "object",
        "properties": {
          "mention": {"type": "string", "minLength": 1},
          "type": {"type": "string", "enum": ["PERSON","ORG","GPE","PRODUCT","EVENT"]},
          "start": {"type": "integer"},
          "end": {"type": "integer"}
        },
        "required": ["mention","type","start","end"],
        "additionalProperties": False
      }
    },
    "relations": {
      "type": "array",
      "items": {
        "type": "object",
        "properties": {
          "subj": {"type": "string", "minLength": 1},
          "pred": {"type": "string", "enum": ALLOWED_PREDS},
          "obj": {"type": "string", "minLength": 1},
          "evidence": {"type": "string"}
        },
        "required": ["subj","pred","obj","evidence"],
        "additionalProperties": False
      }
    }
  },
  "required": ["entities","relations"],
  "additionalProperties": False
}


In [7]:
EntityType = Literal["PERSON", "ORG", "GPE", "PRODUCT", "EVENT"]

class Entity(BaseModel):
    mention: str = Field(min_length=1)
    type: EntityType
    start: int
    end: int

class Relation(BaseModel):
    subj: str = Field(min_length=1)
    pred: str = Field(min_length=1)
    obj: str = Field(min_length=1)
    evidence: str = ""

class Extraction(BaseModel):
    entities: List[Entity] = []
    relations: List[Relation] = []

In [8]:
def _strip_code_fences(s: str) -> str:
    s = s.strip()
    s = re.sub(r"^```(?:json)?\s*", "", s, flags=re.IGNORECASE)
    s = re.sub(r"\s*```$", "", s)
    return s.strip()

def _extract_first_json_object(s: str) -> str:
    # essaie de récupérer le premier bloc {...} si le modèle a bavé
    s = _strip_code_fences(s)
    start = s.find("{")
    end = s.rfind("}")
    if start == -1 or end == -1 or end <= start:
        return s
    return s[start:end+1]

def _find_span(text: str, mention: str):
    if not mention:
        return -1, -1, mention
    # cherche case-insensitive mais renvoie les indices exacts
    m = re.search(re.escape(mention), text, flags=re.IGNORECASE)
    if not m:
        return -1, -1, mention
    real = text[m.start():m.end()]  # substring exact du texte (casse d'origine)
    return m.start(), m.end(), real

def _postprocess(extr: Extraction, text: str) -> Extraction:
    # offsets
    fixed_entities = []
    for e in extr.entities:
        s, t, real = _find_span(text, e.mention)
        fixed_entities.append(Entity(mention=real, type=e.type, start=s, end=t))
    extr.entities = fixed_entities


    entity_mentions = {e.mention for e in extr.entities}

    # relations: pred whitelist + subj/obj exist + evidence substring
    fixed_rel = []
    for r in extr.relations:
        if r.pred not in ALLOWED_PREDS:
            continue
        if (r.subj not in entity_mentions) or (r.obj not in entity_mentions and r.pred not in ["schema:datePublished", "schema:releaseDate"]):
            # pour les dates, obj peut être un literal ISO. Sinon on exige obj dans entities.
            continue
        if r.evidence and (r.evidence not in text):
            r.evidence = ""
        fixed_rel.append(r)

    extr.relations = fixed_rel
    return extr

# ============================================================
# A) Corriger les mentions : récupérer la casse réelle + dédoublonner
# ============================================================

def find_span_case_insensitive(text: str, mention: str) -> Tuple[int, int, str]:
    """Trouve la mention dans le texte (case-insensitive) et retourne la casse réelle."""
    if not mention:
        return -1, -1, mention
    m = re.search(re.escape(mention), text, flags=re.IGNORECASE)
    if not m:
        return -1, -1, mention
    real = text[m.start():m.end()]  # substring exact (casse d'origine)
    return m.start(), m.end(), real

def dedup_entities(entities: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Dédoublonne les entités par (type, start, end) si start!=-1, sinon (type, mention.lower())."""
    seen = set()
    out = []
    for e in entities:
        key = (e["type"], e["start"], e["end"]) if e["start"] != -1 else (e["type"], e["mention"].lower())
        if key in seen:
            continue
        seen.add(key)
        out.append(e)
    return out

def fix_entities_from_text(extr: Dict[str, Any], text: str) -> Dict[str, Any]:
    """Corrige les mentions (casse réelle) et dédoublonne les entités."""
    ents = []
    for e in extr.get("entities", []):
        s, t, real = find_span_case_insensitive(text, e["mention"])
        ents.append({**e, "mention": real, "start": s, "end": t})
    extr["entities"] = dedup_entities(ents)
    return extr

# ============================================================
# B) Nettoyer / retourner les relations + vérifier l'evidence
# ============================================================

def clean_relations(extr: Dict[str, Any], text: str) -> Dict[str, Any]:
    """
    Nettoie les relations :
    - Retourne automatiquement les relations inversées (ex: worksFor Person→Org)
    - Rejette les relations dont les types ne matchent pas le schéma
    - Exige que l'evidence contienne les deux mentions (subj+obj)
    """
    types = {e["mention"]: e["type"] for e in extr.get("entities", [])}
    mentions = set(types.keys())

    def has_both_in_evidence(subj: str, obj: str, ev: str) -> bool:
        if not ev:
            return False
        # case-insensitive containment
        return (re.search(re.escape(subj), ev, re.IGNORECASE) is not None) and \
               (re.search(re.escape(obj), ev, re.IGNORECASE) is not None)

    cleaned = []
    for r in extr.get("relations", []):
        pred = r.get("pred")
        subj = r.get("subj")
        obj  = r.get("obj")
        ev   = r.get("evidence", "") or ""

        if pred in PRED_TYPES:
            if not ev:  # pas de preuve => on drop
                continue
        if pred not in ALLOWED_PREDS:
            continue
        if subj not in mentions:
            continue
        if pred not in ["schema:datePublished", "schema:releaseDate"] and obj not in mentions:
            continue

        # evidence doit être substring du texte (sinon vide)
        if ev and ev not in text:
            ev = ""

        if pred == "schema:releaseDate":
            if types.get(subj) not in {"PRODUCT","EVENT"}:
                continue

        # typing + auto-swap si inversé
        if pred in PRED_TYPES and obj in mentions:
            subj_ok, obj_ok = PRED_TYPES[pred]
            st = types.get(subj)
            ot = types.get(obj)

            # swap si inversé
            if (st in obj_ok) and (ot in subj_ok):
                subj, obj = obj, subj
                st, ot = ot, st

            if not (st in subj_ok and ot in obj_ok):
                continue

            # evidence doit contenir les 2 mentions (sinon on drop)
            if not has_both_in_evidence(subj, obj, ev):
                continue

        cleaned.append({"subj": subj, "pred": pred, "obj": obj, "evidence": ev})

    extr["relations"] = cleaned
    return extr

In [9]:
OLLAMA_URL = "http://localhost:11434/api/chat"
MODEL = "llama3.1:8b"

def _ollama_chat(messages: List[Dict[str, str]], temperature: float = 0.0, timeout: int = 120) -> str:
    payload = {
        "model": MODEL,
        "messages": messages,
        "stream": False,
        "format": EXTRACTION_JSON_SCHEMA,
        "options": {"temperature": temperature}
    }
    r = requests.post(OLLAMA_URL, json=payload, timeout=timeout)
    r.raise_for_status()
    return r.json()["message"]["content"]

def _build_prompt(article_text: str) -> List[Dict[str, str]]:
    system = (
        "Tu es un extracteur d'information. "
        "Tu DOIS répondre en JSON strict, sans aucun texte autour, sans markdown. "
        "Le JSON doit respecter EXACTEMENT ce schéma: "
        "{\"entities\":[{\"mention\":str,\"type\":\"PERSON|ORG|GPE|PRODUCT|EVENT\",\"start\":int,\"end\":int}],"
        "\"relations\":[{\"subj\":str,\"pred\":str,\"obj\":str,\"evidence\":str}]}. "
        f"pred doit être dans cette liste exacte: {ALLOWED_PREDS}. "
        "Règles: "
        "1) Les champs entities/relations existent toujours (liste vide ok). "
        "2) subj et obj doivent correspondre à des 'mention' présentes dans entities (sauf pred datePublished/releaseDate où obj peut être une date ISO). "
        "3) evidence doit être un extrait COPIÉ du texte source (30-200 chars si possible). "
        "4) N'invente rien: si doute -> n'extrais pas."
    )

    user = (
        "Texte:\n"
        f"{article_text}\n\n"
        "Retourne UNIQUEMENT le JSON."
    )
    return [{"role": "system", "content": system}, {"role": "user", "content": user}]

def validate_extraction_loose(parsed: dict) -> Extraction:
    ents = []
    for e in parsed.get("entities", []) or []:
        try:
            ents.append(Entity.model_validate(e))
        except ValidationError:
            pass

    rels = []
    for r in parsed.get("relations", []) or []:
        # skip si champs essentiels vides
        if not r.get("subj") or not r.get("pred") or not r.get("obj"):
            continue
        try:
            rels.append(Relation.model_validate(r))
        except ValidationError:
            pass

    return Extraction(entities=ents, relations=rels)

def extract(article_text: str, article_id: str | int, max_retries: int = 3) -> Dict[str, Any]:
    cache_path = CACHE_DIR / f"{article_id}.jsonl"
    if cache_path.exists():
        with cache_path.open("r", encoding="utf-8") as f:
            return json.loads(f.readline())

    messages = _build_prompt(article_text)

    last_err = None
    raw = ""
    for attempt in range(1, max_retries + 1):
        raw = _ollama_chat(messages, temperature=0.0)
        raw_json = _extract_first_json_object(raw)

        try:
            parsed = json.loads(raw_json)
            # extr = Extraction.model_validate(parsed)
            extr = validate_extraction_loose(parsed)
            extr = _postprocess(extr, article_text)

            out = extr.model_dump()
            # cache (jsonl = une ligne)
            with cache_path.open("w", encoding="utf-8") as f:
                f.write(json.dumps(out, ensure_ascii=False) + "\n")
            return out

        except (json.JSONDecodeError, ValidationError) as e:
            last_err = str(e)
            # retry: demander une correction JSON stricte
            messages = [
                {"role": "system", "content": "Corrige la sortie pour qu'elle soit du JSON strict conforme au schéma. Réponds en JSON uniquement."},
                {"role": "user", "content": f"ERREUR:\n{last_err}\n\nSORTIE A CORRIGER:\n{raw}\n\nRappel: pred doit être dans {ALLOWED_PREDS}."}
            ]
            time.sleep(0.2)

    # si échec total: on cache un résultat vide (évite boucles)
    fallback = {"entities": [], "relations": [], "error": last_err, "raw": raw[:1000]}
    with cache_path.open("w", encoding="utf-8") as f:
        f.write(json.dumps(fallback, ensure_ascii=False) + "\n")
    return fallback

In [10]:
results = []

for i, row in tqdm(df_tech.iterrows(), total=len(df_tech)):
    article_id = row[ID_COL] if ID_COL else i
    text = str(row[TEXT_COL])

    res = extract(text, article_id=article_id)

    # Post-traitement : correction casse + déduplication + nettoyage relations
    res = fix_entities_from_text(res, text)
    res = clean_relations(res, text)

    results.append({"article_id": article_id, **res})

results[0].keys(), len(results)

100%|██████████| 10/10 [02:16<00:00, 13.60s/it]


(dict_keys(['article_id', 'entities', 'relations']), 10)

# 5) NED — Désambiguïsation intra-corpus

Normalisation des mentions et construction de noms canoniques pour regrouper les variantes (IBM vs I.B.M., etc.)

In [11]:
# 5.1. Normalisation + collecte des mentions
from collections import Counter, defaultdict
from rapidfuzz import fuzz

def norm_name(s: str) -> str:
    """Normalise un nom pour la comparaison (minuscule, sans ponctuation)."""
    s = s.lower().strip()
    s = re.sub(r"[\W_]+", " ", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s

# On part de `results` (un élément par article avec "article_id", "entities", "relations")
mention_freq = Counter()
mentions_by_type = defaultdict(set)

for r in results:
    for e in r.get("entities", []):
        m = e["mention"]
        t = e["type"]
        if not m or len(m) < 2:
            continue
        mention_freq[(t, m)] += 1
        mentions_by_type[t].add(m)

len(mention_freq), list(mentions_by_type.keys())

(73, ['ORG', 'PERSON', 'PRODUCT', 'GPE', 'EVENT'])

In [12]:
# 5.2. Construire des "canonical names" par type (IBM vs I.B.M.)

def build_canonical_map(threshold: int = 92):
    """
    Retourne un dict (type, mention) -> canonical_mention
    threshold : score RapidFuzz au-dessus duquel deux mentions sont fusionnées
    """
    canon_map = {}
    for t, mentions in mentions_by_type.items():
        # plus fréquent d'abord (devient souvent le canon)
        sorted_mentions = sorted(list(mentions),
                                 key=lambda m: mention_freq[(t, m)],
                                 reverse=True)
        reps = []  # représentants canoniques

        for m in sorted_mentions:
            nm = norm_name(m)
            best_rep = None
            best_score = -1

            for rep in reps:
                score = fuzz.ratio(nm, norm_name(rep))
                if score > best_score:
                    best_score = score
                    best_rep = rep

            if best_score >= threshold:
                # proche d'un représentant existant
                canon_map[(t, m)] = best_rep
            else:
                # nouveau représentant canonique
                reps.append(m)
                canon_map[(t, m)] = m
    return canon_map

CANON_MAP = build_canonical_map(threshold=92)
len(CANON_MAP)

73

In [13]:
import re

def acronym(s: str) -> str:
    # "British Telecom" -> "BT"
    words = re.findall(r"[A-Za-z]+", s)
    if len(words) < 2:
        return ""
    return "".join(w[0].upper() for w in words)

# Merge acronyme -> forme longue (ORG seulement)
# Exemple: "BT" devient canonique "British Telecom" si présent dans le corpus
for (t, m), canon in list(CANON_MAP.items()):
    if t != "ORG":
        continue

    short = m.strip()
    if len(short) > 5:
        continue  # on ne considère que les entités très courtes (BT, IBM, …)

    for long_m in mentions_by_type["ORG"]:
        if len(long_m) < 8:
            continue  # ignore formes trop courtes
        if acronym(long_m) == short.upper():
            # force le canon de "BT" vers le canon de "British Telecom"
            CANON_MAP[(t, m)] = CANON_MAP.get((t, long_m), long_m)
            break

In [14]:
# 5.3. Appliquer la canonicalisation dans results

# On ajoute un champ "canonical" à chaque entité
for r in results:
    ents = []
    for e in r.get("entities", []):
        key = (e["type"], e["mention"])
        canonical = CANON_MAP.get(key, e["mention"])
        ents.append({**e, "canonical": canonical})
    r["entities"] = ents

# petit check
for e in results[0]["entities"][:5]:
    print(e["mention"], " -> ", e["canonical"], "(", e["type"], ")")

ericsson  ->  ericsson ( ORG )
dr michael bjorn  ->  dr michael bjorn ( PERSON )
bbc news website  ->  bbc news website ( PRODUCT )
japan  ->  japan ( GPE )
uk  ->  uk ( GPE )


# 6) NEL — Lier les canoniques à Wikidata

On va :
1. Créer un cache NEL pour ne pas spammer Wikidata
2. Faire une requête SPARQL simple par label + type
3. Choisir un candidat (heuristique simple, top1)

In [15]:
# 6.1. Préparer le cache + mapping des types vers Wikidata

NEL_CACHE_DIR = Path("cache_nel")
NEL_CACHE_DIR.mkdir(exist_ok=True)

# Mapping type -> QID générique Wikidata
WIKIDATA_TYPE_QID = {
    "PERSON": "Q5",          # human
    "ORG": "Q43229",         # organization
    "GPE": "Q17334923",      # geographic location
    "PRODUCT": "Q2424752",   # product
    "EVENT": "Q1656682",     # event
}

In [16]:
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

WIKIDATA_API_URL = "https://www.wikidata.org/w/api.php"

# session avec retries + backoff (utile si 429 / 503)
session = requests.Session()
retries = Retry(
    total=6,
    backoff_factor=1.0,
    status_forcelist=[429, 500, 502, 503, 504],
    allowed_methods=["GET"],
    respect_retry_after_header=True,
)
session.mount("https://", HTTPAdapter(max_retries=retries))

HEADERS = {
    "User-Agent": "td3-kg/1.0 (student project; contact: your_email_or_name)",
    "Accept": "application/json",
}

def wikidata_candidates(label: str, ent_type: str, limit: int = 5):
    """
    Retourne [{uri,label,desc,id}] via wbsearchentities.
    ent_type gardé pour la clé de cache (et éventuellement filtrage plus tard).
    """
    key = f"{ent_type}__{norm_name(label)}.json"
    cache_path = NEL_CACHE_DIR / key
    if cache_path.exists():
        return json.loads(cache_path.read_text(encoding="utf-8"))

    if not label or not label.strip():
        return []

    params = {
        "action": "wbsearchentities",
        "search": label.strip(),
        "language": "en",
        "limit": limit,
        "format": "json",
    }

    resp = session.get(WIKIDATA_API_URL, params=params, headers=HEADERS, timeout=20)
    resp.raise_for_status()
    data = resp.json()

    out = []
    for s in data.get("search", []):
        out.append({
            "uri": s.get("concepturi"),       # URL Wikidata de l'item (URI)
            "id": s.get("id"),                # Qxxxx
            "label": s.get("label", ""),
            "desc": s.get("description", ""),
        })

    cache_path.write_text(json.dumps(out, ensure_ascii=False), encoding="utf-8")
    time.sleep(0.2)  # politesse (Wikidata recommande de ne pas spam) :contentReference[oaicite:3]{index=3}
    return out

In [17]:
def acronym_label(s: str) -> str:
    words = re.findall(r"[A-Za-z]+", s)
    if len(words) < 2:
        return ""
    return "".join(w[0].upper() for w in words)

def token_set(s: str) -> set:
    return set(norm_name(s).split())

def pick_best_candidate(label: str, ent_type: str, candidates: list):
    if not candidates:
        return None

    nl = norm_name(label)
    short = len(nl) <= 3  # ex: uk, bt, ibm

    # 1) Abréviations: cherche un candidat dont l'acronyme matche (UK -> United Kingdom)
    if short:
        for c in candidates:
            ac = acronym_label(c.get("label", ""))
            if ac and norm_name(ac) == nl:
                return c

        # 1bis) Pour GPE abréviation: préfère un "country" si présent
        if ent_type == "GPE":
            for c in candidates:
                d = (c.get("desc") or "").lower()
                if "country" in d:
                    return c

        # sinon: on ne prend PAS candidates[0] (souvent faux comme Ukrainian)
        # fallback: meilleur fuzzy sur label
        best = max(candidates, key=lambda c: fuzz.ratio(nl, norm_name(c.get("label",""))))
        return best

    # 2) Score combiné fuzzy + overlap tokens + bonus type
    qtokens = token_set(label)

    def score(c):
        cl = c.get("label", "")
        cd = (c.get("desc") or "").lower()
        s1 = fuzz.ratio(nl, norm_name(cl))
        s2 = int(100 * (len(qtokens & token_set(cl)) / max(1, len(qtokens))))
        bonus = 0
        if ent_type == "GPE" and "country" in cd:
            bonus += 10
        if ent_type == "ORG" and ("company" in cd or "telecom" in cd or "provider" in cd):
            bonus += 10
        return 0.7 * s1 + 0.3 * s2 + bonus

    best = max(candidates, key=score)

    # seuil léger (TD-friendly)
    best_score = score(best)
    if best_score < (75 if ent_type == "PERSON" else 55):
        return None
    return best


In [18]:
# 6.4. Construire une table canonical -> uri (NEL)

# On lie uniquement les entités :
# - avec start != -1
# - dont la canonical apparaît au moins 2 fois dans le corpus (moins de bruit)
# - de type PERSON / ORG / GPE (les plus "faciles")

# 1) fréquence des canoniques
canon_freq = Counter()
for r in results:
    for e in r.get("entities", []):
        if e["start"] == -1:
            continue
        canon = e.get("canonical") or e["mention"]
        canon_freq[(e["type"], canon)] += 1

# 2) NEL
entity_links = {}  # (type, canonical) -> uri

for (t, canon), f in tqdm(canon_freq.items(), desc="NEL Wikidata"):
    if t not in {"PERSON", "ORG", "GPE"}:
        continue
    if f < 2:
        continue  # trop rare, on saute

    cands = wikidata_candidates(canon, t, limit=5)
    chosen = pick_best_candidate(canon, t, cands)
    if chosen:
        entity_links[(t, canon)] = chosen["uri"]

len(entity_links)

NEL Wikidata: 100%|██████████| 71/71 [00:03<00:00, 19.87it/s]


8

In [19]:
# 6.5. Ajouter l'URI dans les entités

for r in results:
    ents = []
    for e in r.get("entities", []):
        canon = e.get("canonical") or e["mention"]
        uri = entity_links.get((e["type"], canon))
        ents.append({**e, "uri": uri})
    r["entities"] = ents

# check rapide
for e in results[0]["entities"]:
    print(e["mention"], " | canonical:", e["canonical"], " | uri:", e.get("uri"))

ericsson  | canonical: ericsson  | uri: None
dr michael bjorn  | canonical: dr michael bjorn  | uri: None
bbc news website  | canonical: bbc news website  | uri: None
japan  | canonical: japan  | uri: http://www.wikidata.org/entity/Q17
uk  | canonical: uk  | uri: http://www.wikidata.org/entity/Q145
