In [None]:
easy_sample = "Alice dropped her phone. She picked it up and smiled. "

sample = "Dr. Glenn Tyler (Elvis Presley), a childish 25-year old, gets into a fight with and badly injures his drunken brother. A court releases him on probation into the care of his uncle in a small town, appointing Irene Sperry (Hope Lange) to give him psychological counselling. Marked as a trouble-maker, he is falsely suspected of various misdemeanors including an affair with Irene. Eventually shown to be innocent, he leaves to go to college and become a writer."

### GLINER2 Spacy Component

In [None]:
import json
from preprocess import create_gliner_component
import spacy

nlp = spacy.load("en_core_web_sm")
nlp.add_pipe(
    "gliner-ner",
    name="gliner-ner",
    last=True,
    config={
        "gliner_model": "fastino/gliner2-base-v1",
        "entities": ["location", "fictionnal character", "actors"],
        "threshold": 0.5,
        "gpu": True,
    },
)

In [None]:
doc = nlp(sample)
doc._.ents

In [None]:
import torch
from gliner2 import GLiNER2

entities_to_extract = ["location", "fictionnal character", "actors"]

exctractor = GLiNER2.from_pretrained("fastino/gliner2-base-v1").to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
out = exctractor.extract_entities(
    text=doc.text,
    entity_types=entities_to_extract,
    threshold=0.5,
    include_confidence=True,
) # -> Dict[str, Any]
out

In [None]:
def find_all_occurrences(text: str, pattern: str):
    """Return all start indexes where `pattern` appears in `text`."""
    indexes = []
    start = 0

    while True:
        idx = text.find(pattern, start)
        if idx == -1:
            break
        indexes.append((idx, idx + len(pattern)))
        start = idx + 1  # move forward to avoid infinite loops

    return indexes


all_occurences = []
for ent in out["entities"]["actors"]:
    occurences = find_all_occurrences(doc.text, ent)
    print(f"{occurences}, {ent} -> {doc.text[occurences[0][0]:occurences[0][1]]}")
    all_occurences.append(occurences)

for ent in out["entities"]["fictionnal character"]:
    occurences = find_all_occurrences(doc.text, ent)
    print(f"{occurences}, {ent} -> {doc.text[occurences[0][0]:occurences[0][1]]}")
    all_occurences.append(occurences)

for ent in out["entities"]["location"]:
    occurences = find_all_occurrences(doc.text, ent)
    print(f"{occurences}, {ent} -> {doc.text[occurences[0][0]:occurences[0][1]]}")
    all_occurences.append(occurences)



In [None]:
from maverick import Maverick

# model = Maverick(
# #   hf_name_or_path = "sapienzanlp/maverick-mes-preco",
#   hf_name_or_path = "sapienzanlp/maverick-mes-ontonotes",
# #   device = "cuda"
# )
model = Maverick(hf_name_or_path="sapienzanlp/maverick-mes-preco")

In [None]:
tokens, eos_indices, speakers, char_offsets = model.preprocess(doc.text)

In [None]:
print(char_offsets)
print(tokens)

In [None]:
print(all_occurences)

In [None]:
def char_span_to_word_span(char_span: list[tuple], char_offsets: list[tuple]):
    """
    char_span = (start char index, end char index) for entities (character indices)
    char_offset = (word start char index, word end char index) for eah word (char to word map)
    """
    res = []
    for entity_start_char_idx, entity_end_char_idx in char_span:
        word_start_idx = -1
        word_end_idx = len(char_offsets)
        for idx, (word_start_char_idx, word_end_char_idx) in enumerate(char_offsets):
            if word_start_char_idx == entity_start_char_idx:
                word_start_idx = idx
            if word_end_char_idx == entity_end_char_idx - 1: # Inclusive outer boundary
                word_end_idx = idx
            
            if word_start_idx > -1 and word_end_idx < len(char_offsets):
                break
        
        res.append((word_start_idx, word_end_idx))
    return res

            
entities_word_format = []
for occurence in all_occurences:
    entities_word_format.append(char_span_to_word_span(occurence, char_offsets))

for char_poses, word_poses in zip(all_occurences, entities_word_format):
    for char_pos, word_pos in zip(char_poses, word_poses):
        print(f"{char_pos} -> {doc.text[char_pos[0]: char_pos[1]]} || {word_pos} -> {' '.join(tokens[word_pos[0]: word_pos[1] + 1])}")

In [None]:
model.predict(doc.text, add_gold_clusters=entities_word_format)

### GLINER

In [None]:
from gliner import GLiNER

# Initialize GLiNER with the base model
model = GLiNER.from_pretrained("urchade/gliner_medium-v2.1")

In [None]:
from textwrap import wrap, fill

# Perform entity prediction
entities = model.predict_entities(sample, entities_to_extract, threshold=0.5)

print(fill(sample), "\n")
# Display predicted entities and their labels
for entity in entities:
    print(entity["text"], "=>", entity["label"])

### FastCoref

In [None]:
from fastcoref import FCoref, spacy_component
import json
from preprocess import create_gliner_component
import spacy

nlp = spacy.load("en_core_web_sm")
nlp.add_pipe(
    "fastcoref",
    config={
        "model_architecture": "FCoref",
        "model_path": "biu-nlp/f-coref",
        "device": 0,
    },
)
nlp.add_pipe(
    "gliner-ner",
    name="gliner-ner",
    last=True,
    config={
        "gliner_model": "fastino/gliner2-base-v1",
        "entities": ["location", "fictionnal character", "actors"],
        "threshold": 0.5,
        "gpu": True,
    },
)

In [None]:
doc = nlp(sample,
             component_cfg={"fastcoref": {'resolve_text': True}})

print("\n", fill(doc.text), "\n\n", fill(doc._.resolved_text))
doc._.ents

### Stanza

In [None]:
import stanza

# pipe = stanza.Pipeline("en", processors="tokenize, coref", package={"ner": ["CoNLL03", "ontonotes"]})
# pipe = stanza.Pipeline("en", processors="tokenize, coref", package={"ner": ["ontonotes"]})
pipe = stanza.Pipeline("en", processors="tokenize, coref")

In [None]:
out = pipe(sample)
out

In [None]:
for w in out.iter_words():
    print(w)

In [None]:
# def coref_mentions(doc):
    
#     corefs = dict()
    
#     # Iter Words
#     for word in doc.iter_words():
#         word_dict = word.to_dict()
        
#         # Iter Corefs
#         for chain in word_dict.get("coref_chains", []):
#             chain = chain.to_json()
#             entity_ndx = chain.get("index")
#             # First time to see this entity
#             if not corefs.get(entity_ndx):
#                 corefs[entity_ndx] = []

#             corefs[entity_ndx].append(word_dict.get("text"))

    
#     return corefs

def coref_mentions(doc):
    # Final result dictionary: {entity_index: ["Full Entity Name", "Another Mention"]}
    corefs = dict()
    
    # Temporary buffer to hold words while a mention is being built
    # Format: {entity_index: ["Dr.", "Glenn"]}
    active_spans = dict()
    active_positions = dict()
    
    # Iter Words
    for word in doc.iter_words():
        word_dict = word.to_dict()
        word_text = word_dict.get("text")
        word_pos_start = word_dict.get("start_char")
        word_pos_end = word_dict.get("end_char")
        
        # Iter Corefs
        for chain in word_dict.get("coref_chains", []):
            # Ensure chain is a dict
            if not isinstance(chain, dict):
                chain = chain.to_json()
                
            entity_ndx = chain.get("index")
            is_start = chain.get("is_start", False)
            is_end = chain.get("is_end", False)

            # 1. Start of a new mention span
            if is_start:
                active_spans[entity_ndx] = []
                active_positions[entity_ndx] = []

            # 2. Add current word to the active buffer if we are tracking this index
            if entity_ndx in active_spans:
                active_spans[entity_ndx].append(word_text)
                active_positions[entity_ndx].extend([word_pos_start, word_pos_end])

            # 3. End of the mention span
            if is_end:
                if entity_ndx in active_spans:
                    # Combine the buffered words into a single string
                    full_entity_text = " ".join(active_spans[entity_ndx])
                    full_entity_start = min(active_positions[entity_ndx])
                    full_entity_end = max(active_positions[entity_ndx])
                    
                    # Initialize list in final dict if not present
                    if entity_ndx not in corefs:
                        corefs[entity_ndx] = []
                    
                    # Add the full string to results
                    corefs[entity_ndx].append((full_entity_text, full_entity_start, full_entity_end))
                    
                    # Remove from active spans so we don't keep appending to it
                    del active_spans[entity_ndx]
                    del active_positions[entity_ndx]

    return corefs

In [None]:
coref_mentions(out)

In [None]:
from preprocess import preprocess
import json

out = preprocess(sample)
print(json.dumps(out["sentences"], indent=2, ensure_ascii=False))
doc = out["doc"]

In [None]:
import spacy
from extraction import (
    detect_entities,
    detect_events,
    create_narrative_segment,
    merge_narrative_segments,
)


nlp = spacy.load("en_core_web_sm")
graph = None
segs = []
for sentence in out["sentences"]:
    print(f"- {sentence}")
    doc = nlp(sentence)
    ents = detect_entities(doc)
    evs = detect_events(doc)
    curr_seg_nodes, graph = create_narrative_segment(ents, evs, graph=graph)
    segs.append({"nodes": curr_seg_nodes, "graph": graph})
    print(f"  Entities: {len(ents)}, Events: {len(evs)}")
    print(f"  Entities: {ents}")
    print(f"  Events: {evs}")
    print("--------")

In [None]:
for text in ents:
    print("Entity Text:", text["text"])
    print(text["label"])
    print("---")

In [None]:
for text in evs:
    print("Event Text:", text["text"])
    print("Subject Texts:", text["subject_texts"])
    print("Object Texts:", text["object_texts"])
    print("---")

In [None]:
from utils import visualize_graph

curr_nodes, curr_graph = segs[0]["nodes"], segs[0]["graph"]
visualize_graph(curr_graph, nodes=curr_nodes)