<a href="https://colab.research.google.com/github/OdysseusPolymetis/journees_cluster5b_7/blob/main/4_ner_lat_gk.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Reconnaissance d'entités nommées en latin et en grec**
---

Une entité nommée est une expression linguistique référentielle : pour simplifier, il s'agit d'entités culturellement reconnaissables et référençables. Exemple : Rome est une `LOC`, César est un `PER`, etc.

Il existe de très nombreux modèles pour les langues modernes, c'est beaucoup moins fréquent pour les langues anciennes. Les premiers modèles NER sur transformers ont été ciblés par langue (exemple, le modèle d'Ugarit), mais de plus en plus on s'oriente vers des modèles massifs multilingues, qui comprennent aussi le grec ancien (Roberta large comprend maintenant le grec ancien, plutôt classique).

In [None]:
!pip install flair

In [49]:
import stanza
import numpy as np
from tqdm import tqdm
from flair.data import Sentence
from flair.models import SequenceTagger
from transformers import pipeline
from collections import defaultdict
import numpy as np
import re
import unicodedata as ud
from collections import Counter

## **1. Tests sur phrases courtes**

Nous allons d'abord tester sur deux phrases, mais vous pouvez faire le test sur d'autres choses. Nous le ferons sur des textes entiers plus tard.

In [14]:
sentence_gk = 'ταῦτα εἴπας ὁ Ἀλέξανδρος παρίζει Πέρσῃ ἀνδρὶ ἄνδρα Μακεδόνα ὡς γυναῖκα τῷ λόγῳ · οἳ δέ , ἐπείτε σφέων οἱ Πέρσαι ψαύειν ἐπειρῶντο , διεργάζοντο αὐτούς .'
sentence_lat = 'Quo usque tandem abutere, Catilina, patientia nostra ?'

Voici un premier modèle, assez ancien (2022), de l'équipe UGARIT. Petite explication des étiquettes :
*   **O** : Outside, pas d'entité reconnaissable
*   **S-PER** : Personnage/Personne, Single, l'entité est faite d'un seul mot
*   **B-PER** : Personnage/Personne, Begin, le mot est le début d'une entité
*   **E-PER** : Personnage/Personne, End, dernier mot de l'entité
*   **I-PER** : Personnage/Personne, Inside, mot au milieu de l'entité
*   **S-MISC** : Miscalleneous (ni `PER` ni `LOC`), Single, mot seul
*   **B-MISC** : Miscalleneous (ni `PER` ni `LOC`), Begin, début de l'entité
*   **E-MISC** : Miscalleneous (ni `PER` ni `LOC`), End, dernier mot de l'entité
*   **I-MISC** : Miscalleneous (ni `PER` ni `LOC`), Inside, mot au milieu de l'entité
*   **S-LOC** : Location, Single, l'entité est faite d'un seul mot
*   **B-LOC** : Location, Begin, début de l'entité
*   **E-LOC** : Location, End, fin de l'entité
*   **I-LOC** : Location, Inside, mot au milieu de l'entité
*   **\<START>** : marqueur crf pour le modèle (début)
*   **\<STOP>** : marqueur crf pour le modèle (fin)








In [None]:
tagger = SequenceTagger.load("UGARIT/flair_grc_bert_ner")

In [None]:
sent = Sentence('ταῦτα εἴπας ὁ Ἀλέξανδρος παρίζει Πέρσῃ ἀνδρὶ ἄνδρα Μακεδόνα ὡς γυναῖκα τῷ λόγῳ · οἳ δέ , ἐπείτε σφέων οἱ Πέρσαι ψαύειν ἐπειρῶντο , διεργάζοντο αὐτούς .')
tagger.predict(sent)
for entity in sent.get_spans('ner'):
    print(entity)

Essayons maintenant avec un très gros modèle multilingue (plus de 100 langues). Il s'agit d'un modèle plus récent (2023, je n'ai pas trouvé de modèle de NER multilingue (non finetuné sur une langue en particulier) plus récent).

In [7]:
ner = pipeline(
    "token-classification",
    model="51la5/roberta-large-NER",
    aggregation_strategy="simple"
)

In [22]:
tok = ner.tokenizer

In [131]:
print(ner(sentence_gk))

[{'entity_group': 'PER', 'score': np.float32(0.97942376), 'word': 'Ἀλέξανδρος', 'start': 14, 'end': 24}, {'entity_group': 'MISC', 'score': np.float32(0.8012814), 'word': 'Πέρσ', 'start': 33, 'end': 37}, {'entity_group': 'MISC', 'score': np.float32(0.9995103), 'word': 'Μακεδόνα', 'start': 51, 'end': 59}, {'entity_group': 'MISC', 'score': np.float32(0.99963826), 'word': 'Πέρσαι', 'start': 105, 'end': 111}]


In [None]:
print(ner(sentence_lat))

Et faisons un test absurde, mélangeons deux langues en une phrase !

In [130]:
mix = 'Quo usque tandem abutere, ὦ Ἀλέξανδρε, patientia nostra ?'
print(ner(mix))

[{'entity_group': 'PER', 'score': np.float32(0.89059514), 'word': 'Ἀλέξανδρε', 'start': 28, 'end': 37}]


### _Pourquoi ça marche ?_
En fait il faut partir de l'idée que ça fonctionnerait sur n'importe quelle langue sur laquelle il a été entraîné : toutes les langues sont mappées sur un seul espace sémantique, pour lui il n'existe pas vraiment des langues, mais une langue, une représentation sémantique globale.

## **2. Long texte et représentation graphique**

**⚠** Je le précise tout de suite, le résultat ne peut pas être parfait. Il y aura forcément des erreurs, d'autant que les données annotées sont limitées.

Nous allons reprendre l'_Odyssée_, en la téléchargeant dans la cellule suivante. Mais vous pouvez mettre votre propre texte (latin ou grec, nous allons utiliser le gros modèle, et ça prendra du temps).

In [None]:
!wget https://raw.githubusercontent.com/OdysseusPolymetis/digital_classics_course/refs/heads/main/odyssee_integrale.txt

In [19]:
filepath_of_text = "/content/odyssee_integrale.txt"

In [20]:
full_text = open(filepath_of_text, encoding="utf-8").read()

Ici on va télécharger les deux modèles, pour la lemmatisation, en latin et en grec.

In [55]:
stanza.download('la', verbose=False)
stanza.download('grc', verbose=False)
lemma_la  = stanza.Pipeline('la',  processors='tokenize,lemma', use_gpu=True)
lemma_grc = stanza.Pipeline('grc', processors='tokenize,lemma', use_gpu=True)

Ici on va donner les grandes variables dont on aura besoin ensuite (c'est plus clair de les mettre dès le début)

In [121]:
MODEL_NAME       = "51la5/roberta-large-NER"
WORDS_PER_CHUNK  = 100
STRIDE_WORDS     = 50
WINDOW_SIZE      = 20
MIN_FREQ         = 5
EDGE_MIN         = 1
USE_GPU_STANZA   = True
MIN_WORD_LEN     = 6

In [71]:
WORD_RE        = re.compile(r"[^\W\d_]+", re.UNICODE)
GREEK_RE       = re.compile(r'[\u0370-\u03FF\u1F00-\u1FFF]')
ZERO_WIDTH_RE  = re.compile(r"[\u200B-\u200D\uFEFF]")
WS_MULTI_RE    = re.compile(r"\s+")

Fonction qui permet la normalisation unicode (pour le grec)

In [72]:
def normalize_unicode(s: str) -> str:
    return ud.normalize("NFC", s)

Tokénisation toute bête.

In [73]:
def tokenize_words(text: str):
    return [(m.group(0), m.start(), m.end()) for m in WORD_RE.finditer(text)]

Détection de la langue en fonction de la regex de grec.

In [74]:
def detect_lang_by_script(s: str) -> str:
    return "grc" if GREEK_RE.search(normalize_unicode(s)) else "la"

Fonction de lemmatisation de petites portions de texte après traitement par le NER.

In [75]:
def lemmatize_span(text: str) -> str:
    text = normalize_unicode(text.strip())
    nlp  = lemma_grc if detect_lang_by_script(text) == "grc" else lemma_la
    doc  = nlp(text)
    lemmas = []
    for sent in doc.sentences:
        for w in sent.words:
            if re.fullmatch(r"\W+", w.text or ""):
                continue
            lemmas.append(w.lemma or w.text)
    return " ".join(lemmas) if lemmas else text

Petite fonction qui permet juste de lancer la lemmatisation des labels

In [76]:
def canonicalize_entity(surface: str) -> str:
    return lemmatize_span(surface)

Tokénisation et fenêtres glissantes.

In [77]:
def iter_word_chunks(text: str, words_per_chunk=WORDS_PER_CHUNK, stride_words=STRIDE_WORDS):
    toks = tokenize_words(text)
    if not toks: return
    n, i = len(toks), 0
    while i < n:
        j = min(n, i + words_per_chunk)
        yield toks[i][1], text[toks[i][1]:toks[j-1][2]]
        if j == n: break
        i += stride_words

In [78]:
def _yield_token_safe_slices(sub: str, base_char: int, tokenizer, max_tokens: int, overlap_words: int = 10):
    words = tokenize_words(sub)
    n, i = len(words), 0
    while i < n:
        j, last_ok = i+1, None
        while j <= n:
            piece = sub[words[i][1]:words[j-1][2]]
            if len(tokenizer(piece)["input_ids"]) <= max_tokens:
                last_ok = j; j += 1
            else:
                break
        if last_ok is None:
            start_c = words[i][1]; end_c = min(len(sub), start_c + 2000)
            yield base_char + start_c, base_char + end_c, sub[start_c:end_c]
            i += 1
        else:
            start_c = words[i][1]; end_c = words[last_ok-1][2]
            yield base_char + start_c, base_char + end_c, sub[start_c:end_c]
            i = max(last_ok - overlap_words, last_ok)

Stockage des entités nommées qui nous intéressent (ici `PER`, mais ça peut être autre chose).

In [79]:
def collect_entities(text: str, keep_labels={"PER"}, words_per_chunk=WORDS_PER_CHUNK, stride_words=STRIDE_WORDS, tokenizer=None, ner=None):
    assert tokenizer is not None and ner is not None
    text = normalize_unicode(text)
    seen, out = set(), []
    model_max = getattr(tokenizer, "model_max_length", 512)
    safe_max  = min(510, model_max - 2)

    for base, sub in iter_word_chunks(text, words_per_chunk, stride_words):
        enc_len = len(tokenizer(sub)["input_ids"])
        pieces  = _yield_token_safe_slices(sub, base, tokenizer, safe_max) if enc_len > safe_max else [(base, base+len(sub), sub)]
        for abs_start, _, piece in pieces:
            for e in ner(piece):
                label = e.get("entity_group") or e.get("entity")
                if keep_labels and label not in keep_labels:
                    continue
                start = abs_start + int(e["start"])
                end   = abs_start + int(e["end"])
                key = (start, end, label)
                if key in seen:
                    continue
                seen.add(key)
                out.append({"text": e["word"], "label": label, "start": start, "end": end})
    return out

Rassemblement des entités

In [80]:
def attach_token_index_to_entities(text: str, entities):
    words = tokenize_words(text)
    if not words: return entities, words
    for ent in entities:
        s, i = ent["start"], 0
        while i < len(words) and words[i][2] <= s: i += 1
        ent["tok_idx"] = i if (i < len(words) and words[i][1] <= s < words[i][2]) else min(max(i,0), len(words)-1)
    return entities, words

Calcul des cooccurences

In [81]:
def compute_cooccurrence(entities, window_size_words=WINDOW_SIZE, key_fn=lambda e: e["text"]):
    co = defaultdict(lambda: defaultdict(int))
    ents = sorted(entities, key=lambda e: e["tok_idx"])
    for i, ei in enumerate(ents):
        a, ai = key_fn(ei), ei["tok_idx"]
        j = i + 1
        while j < len(ents) and ents[j]["tok_idx"] - ai <= window_size_words:
            b = key_fn(ents[j])
            if a != b:
                co[a][b] += 1
                co[b][a] += 1
            j += 1
    return co

Création de la matrice de cooccurrence

In [82]:
def cooccurrence_to_matrix(co):
    nodes = set(co.keys())
    for a in co: nodes.update(co[a].keys())
    nodes = sorted(nodes)
    idx = {n:i for i,n in enumerate(nodes)}
    M = np.zeros((len(nodes), len(nodes)), dtype=int)
    for a, nbrs in co.items():
        i = idx[a]
        for b, w in nbrs.items():
            M[i, idx[b]] = int(w)
    return nodes, M

Effacement des diacritiques (qui ne sont pas bons pour rassembler les labels)

In [83]:
def strip_diacritics(s: str) -> str:
    s = ud.normalize("NFD", s)
    s = "".join(ch for ch in s if ud.category(ch) != "Mn")
    return ud.normalize("NFC", s)

In [84]:
def normalize_for_match(s: str) -> str:
    s = strip_diacritics(s)
    s = s.lower()
    s = re.sub(r"\s+", " ", s).strip()
    return s

Quand des labels ont trois ou moins caractères qui se suivent qui diffèrent (pour les mots de plus de huit caractères), on les rassemble.

In [85]:
def contiguous_diff_leq(s1: str, s2: str, max_run: int = 3) -> bool:
    if s1 == s2:
        return True
    p = 0
    L1, L2 = len(s1), len(s2)
    while p < L1 and p < L2 and s1[p] == s2[p]:
        p += 1
    q = 0
    while q < (L1 - p) and q < (L2 - p) and s1[L1-1-q] == s2[L2-1-q]:
        q += 1
    m1 = s1[p:L1-q] if p + q <= L1 else ""
    m2 = s2[p:L2-q] if p + q <= L2 else ""
    return max(len(m1), len(m2)) <= max_run

Rassemblement des labels après tri par distance de caractères

In [116]:
def build_merge_map(keys, freq: Counter, min_len: int = 6, max_run: int = 3):
    order = sorted(keys, key=lambda k: (-freq[k], k))
    rep_map, reps = {}, []
    norm = {k: normalize_for_match(k) for k in order}
    for k in order:
        if k in rep_map:
            continue
        rep = k
        reps.append(rep)
        rep_map[rep] = rep
        rep_norm = norm[rep]
        for t in order:
            if t in rep_map:
                continue
            t_norm = norm[t]
            same_norm = (rep_norm == t_norm)
            long_and_close = (len(rep_norm) >= min_len and len(t_norm) >= min_len and
                              contiguous_diff_leq(rep_norm, t_norm, max_run=max_run))
            if same_norm or long_and_close:
                rep_map[t] = rep
    return rep_map

Nettoyage des labels

In [87]:
def clean_label(s: str) -> str:
    s = normalize_unicode(s)
    s = ZERO_WIDTH_RE.sub("", s)
    s = s.replace("\r"," ").replace("\n"," ").replace("\t"," ")
    return WS_MULTI_RE.sub(" ", s).strip()

In [88]:
def make_id(label: str) -> str:
    base = clean_label(label)
    if not base: return ""
    return "n_" + hashlib.sha1(base.encode("utf-8")).hexdigest()[:12]

Création du graphe pour Gephi

In [89]:
def build_graph_from_matrix_safe(entities, M, edge_min=EDGE_MIN):
    G = nx.Graph()
    id_of = {}
    for lab in entities:
        nid = make_id(lab)
        if not nid: continue
        id_of[lab] = nid
        G.add_node(nid, label=clean_label(lab))
    n = len(entities)
    for i in range(n):
        for j in range(i+1, n):
            w = int(M[i, j])
            if w >= edge_min:
                a, b = id_of.get(entities[i], ""), id_of.get(entities[j], "")
                if a and b and a != b:
                    G.add_edge(a, b, weight=w, label=str(w))
    return G

Attributs pour Gephi

In [91]:
def enrich_and_write_gexf(G, path="network.gexf"):
    strength = dict(G.degree(weight="weight"))
    btw = nx.betweenness_centrality(G, weight="weight", normalized=True) if G.number_of_edges() else {n:0.0 for n in G}
    try:
        eig = nx.eigenvector_centrality_numpy(G, weight="weight") if G.number_of_edges() else {n:0.0 for n in G}
    except Exception:
        eig = {n:0.0 for n in G}

    for n_ in G.nodes():
        G.nodes[n_]["strength"]    = float(strength.get(n_, 0))
        G.nodes[n_]["betweenness"] = float(btw.get(n_, 0.0))
        G.nodes[n_]["eigenvector"] = float(eig.get(n_, 0.0))

    from networkx.algorithms.community import louvain_communities, greedy_modularity_communities
    try:
        parts = louvain_communities(G, weight="weight", seed=42)
    except Exception:
        parts = list(greedy_modularity_communities(G, weight="weight"))
    palette = [(244,67,54),(33,150,243),(76,175,80),(255,193,7),(156,39,176),(255,87,34),(0,188,212),(121,85,72),(63,81,181)]
    comm_of = {n_:i for i,comm in enumerate(parts) for n_ in comm}
    for n_ in G.nodes():
        i = comm_of.get(n_, 0)
        r,g,b = palette[i % len(palette)]
        size = 10 + 4*math.sqrt(max(0.0, strength.get(n_,0)))
        G.nodes[n_]["community"] = int(i)
        G.nodes[n_]["viz"] = {"color":{"r":r,"g":g,"b":b}, "size":float(size)}

    pos = nx.spring_layout(G, weight="weight", seed=42)
    for n_, (x,y) in pos.items():
        G.nodes[n_].setdefault("viz", {})
        G.nodes[n_]["viz"]["position"] = {"x": float(x*1000), "y": float(y*1000), "z": 0.0}

    for u,v,d in G.edges(data=True):
        d.setdefault("viz", {})
        d["viz"]["thickness"] = max(1.0, float(d.get("weight",1)))

    nx.write_gexf(G, path, encoding="utf-8")
    print(f"GEXF écrit: {path} — {G.number_of_nodes()} nœuds / {G.number_of_edges()} arêtes")

Les cellules qui suivent sont simplement les appels des fonctions créées plus haut.

In [117]:
ents = collect_entities(full_text, keep_labels={"PER"}, tokenizer=tok, ner=ner)

In [119]:
for e in ents:
    e["key"] = canonicalize_entity(e["text"])

In [122]:
freq_all   = Counter(e["key"] for e in ents)
merge_map  = build_merge_map(set(e["key"] for e in ents), freq_all, min_len=MIN_WORD_LEN, max_run=3)
for e in ents:
    e["key2"] = merge_map.get(e["key"], e["key"])

In [123]:
print("Avant fusion:", len(set(e["key"] for e in ents)),
      "Après fusion:", len(set(e["key2"] for e in ents)))

Avant fusion: 837 Après fusion: 569


In [124]:
freq_merged = Counter(e["key2"] for e in ents)
ents_filt   = [e for e in ents if freq_merged[e["key2"]] >= MIN_FREQ]
print(f"Labels gardés (≥{MIN_FREQ}) :", sum(v>=MIN_FREQ for v in freq_merged.values()))

Labels gardés (≥5) : 96


In [125]:
ents_filt, _words = attach_token_index_to_entities(normalize_unicode(full_text), ents_filt)

In [126]:
co = compute_cooccurrence(ents_filt, window_size_words=WINDOW_SIZE, key_fn=lambda e: e["key2"])

In [127]:
entities, cooccurrence_matrix = cooccurrence_to_matrix(co)

In [128]:
G = build_graph_from_matrix_safe(entities, cooccurrence_matrix, edge_min=EDGE_MIN)
enrich_and_write_gexf(G, path="network.gexf")

GEXF écrit: network.gexf — 95 nœuds / 579 arêtes


In [129]:
merged_pairs = [(k, v) for k, v in merge_map.items() if k != v]
print("Exemples de fusions :", merged_pairs[:20])

Exemples de fusions : [('Ὀδυσσεύ', 'Ὀδυσσεύς'), ('Ὀδυσσε', 'Ὀδυσσεύς'), ('ΟΔΥΣΣΕΙΑΣ', 'Ὀδυσσεύς'), ('ἐ Ὀδυσσεύς', 'Ὀδυσσεύς'), ('Ὀδυσσήιος', 'Ὀδυσσεύς'), ('Τηλέμαχ', 'Τηλέμαχος'), ('Τηλεμάχ', 'Τηλέμαχος'), ('Τηλέμα', 'Τηλέμαχος'), ('Τηλεμός', 'Τηλέμαχος'), ('Τηλέμαξ', 'Τηλέμαχος'), ('ἴς Τηλέμαχος', 'Τηλέμαχος'), ("Τηλέμαχ'", 'Τηλέμαχος'), ('ἐά Τηλέμαχος', 'Τηλέμαχος'), ('Πηνελόπις', 'Πηνελόπεια'), ('Πηνελοπεύς', 'Πηνελόπεια'), ('Ἀντίνοος', 'Ἀλκίνοος'), ('Ἀλκίνος', 'Ἀλκίνοος'), ('Ἀλκίνων', 'Ἀλκίνοος'), ('̓ἀλκίνος', 'Ἀλκίνοος'), ('Ἀλκινός', 'Ἀλκίνοος')]


Juste quelques infos sur le graphe que vous allez obtenir.
<br>**Couleur** du nœud : on applique Louvain (détection de communautés sur le graphe pondéré) puis on assigne une couleur par communauté avec une palette fixe dans le code.

<br>L'index de communauté est enregistré dans l'attribut community du nœud.

<br>La couleur affichée correspond à cet index (avec un seed=42 pour que ça reste stable d'un run à l'autre, si le graphe ne change pas).

<br>S'il y a plus de 9 communautés, les couleurs recyclent (deux communautés peuvent partager la même teinte).

<br>**Taille** du nœud : proportionnelle à la force (strength = degré pondéré), via size = 10 + 4*sqrt(strength) (dans viz.size).

<br>**Épaisseur** des arêtes : proportionnelle au poids (cooccurrences), via viz.thickness.