In [None]:
import transformers

In [None]:
from transformers import AutoTokenizer, AutoModelForTokenClassification
from transformers import pipeline
# tokenizer = AutoTokenizer.from_pretrained(MODEL)
# model = AutoModelForTokenClassification.from_pretrained(MODEL)
# nlp = pipeline("ner", model=model, tokenizer=tokenizer)

In [None]:
MODEL = "../artifacts/distilbert-base-multilingual-cased-finetuned-ner/"


def load_model(model):
    tokenizer = AutoTokenizer.from_pretrained(model)
    model = AutoModelForTokenClassification.from_pretrained(model)
    nlp = pipeline("ner", model=model, tokenizer=tokenizer)
    return nlp

In [None]:
nlp = load_model(MODEL)

In [None]:
text = '16: Freiraurn, 547,68- 547,68 \n2 \n1 St \nÜbertrag: 988,00 \nwindmühlenweg 8 02625 bautzen sachs Firmensitz : Telefon : 03 40 / 5 40 09 - 0'
text

In [None]:
nlp(text * 11, aggregation_strategy="first")

In [None]:
nlp("lucas lives at calle Augusto Figueroa 14 28004 Madrid", aggregation_strategy="first")

In [None]:
nlp("Welcome to kreutzigerstrasse 4 28004 Frankfurt am main", aggregation_strategy="first")

# FULL STOP

this is good enough!

In [None]:
from copy import deepcopy

IOB_TRANSITIONS = {
    "B": ["B", "I"],
    "I": ["I"],
    "": ["B"] # HuggingFace represents "O" as empty string
}

BILUO_TRANSITIONS = {
    "B": ["B", "I", "L"],
    "I": ["I", "L"],
    "L": [],
    "U": [],
    "": ["B", "U", "I", "L"] # HuggingFace represents "O" as empty string
}

def _get_action(tag):
    return tag[:1]

def _get_class(tag):
    return tag.split("-")[-1]

def _agg_first(entity, agg=[]):
    for key in agg:
        entity[key] = [entity[key]]
    return entity

def _begin_new_ent(entity, label="label", agg=[], transitions=IOB_TRANSITIONS):
    action = _get_action(entity[label])
    if action in transitions[""]:
        return _agg_first(entity, agg=agg)
    return {}

def _extend_ent(current_entity, new_entity, label="label", end="end", agg=[], transitions=IOB_TRANSITIONS):
    if current_entity == {}:
        return _begin_new_ent(new_entity, label=label, agg=agg, transitions=transitions)
    
    current_entity[end] = new_entity[end]
    current_entity[label] = new_entity[label]
    
    # aggregrate
    for key in agg:
        if isinstance(current_entity[key], list):
            current_entity[key].append(new_entity[key])
        else:
            current_entity[key] = [current_entity[key], entity[key]]
    
    return current_entity

def _can_extend(current_entity, new_entity, label="label", begin="begin", end="end", transitions=IOB_TRANSITIONS):
    if current_entity == {}:
        return True
    
    can_extend = True
    can_extend = can_extend and (_get_class(current_entity[label]) == _get_class(new_entity[label]))
    can_extend = can_extend and (_get_action(new_entity[label]) in IOB_TRANSITIONS[_get_action(current_entity[label])])
    can_extend = can_extend and current_entity[end] == new_entity[begin]
    return can_extend

def _dump_ent(entity, label="label"):
    entity[label] = _get_class(entity[label])
    return entity

In [None]:
def iob_to_ents(entities, begin="begin", end="end", label="label", agg=["score"], transitions=IOB_TRANSITIONS):
    
    entities = sorted(deepcopy(entities), key=lambda ent: ent[begin])
    entities = [ent for ent in entities if _get_action(ent[label]) != "O"]
    
    out = []
    current_ent = {}
    
    while entities:
        
        ent = entities.pop(0)
        
        if _can_extend(current_ent, ent, label=label, begin=begin, end=end, transitions=transitions):
            current_ent = _extend_ent(current_ent, ent, end=end, label=label, agg=agg)
        
        else:
            if current_ent:
                out.append(_dump_ent(current_ent, label=label))
            
            current_ent = _begin_new_ent(ent, label=label, agg=agg, transitions=transitions)
    
    # dump buffer
    if current_ent:
        out.append(_dump_ent(current_ent, label=label))
        
    return out

In [None]:
%%time
ents = iob_to_ents(ner_results, label="entity", begin="start", agg=["score", "word"], transitions=BILUO_TRANSITIONS)
ents

In [None]:
def _new_clique(entity, label="label", begin="begin", end="end"):
    clique = {}
    clique["entities"] = [entity]
    clique["begin"] = entity[begin]
    clique["end"] = entity[end]
    clique["label"] = entity[label]
    return clique

def _combine_cliques(cliques, begin="begin", agg=[]):
    combined = []
    for clique in cliques:
        entity = {
            "begin": clique["begin"],
            "end": clique["end"],
            "label": clique["label"]
        }
        entities = sorted(clique["entities"], key=lambda ent: ent[begin])
        
        # aggregate sub-values
        for key in agg:
            aggd = []
            for ent in entities:
                aggd += ent[key]
            entity[key] = aggd
            
        combined.append(entity)
    return combined
    
def single_linkage(entities, begin="begin", end="end", label="label", max_distance=5, max_length=30, agg=[]):
    cliques = []
    clique = {}
    
    entities = sorted(entities, key=lambda ent: ent[begin])
    for entity in entities:
        if not clique:
            clique = _new_clique(entity, begin=begin, end=end, label=label)
            continue
        
        if (entity[label] == clique["label"]) and ((entity[begin] - clique["end"]) <= max_distance):
            clique["entities"].append(entity)
            clique["end"] = clique["entities"][-1][end]
            continue
        
        cliques.append(clique)
        clique = _new_clique(entity, begin=begin, end=end, label=label)
        
    # entity that are within distance chars of each-other form a clique
    if clique:
        cliques.append(clique)
    
    cliques = _combine_cliques(cliques, begin=begin, agg=agg)
    return _clean_ents(cliques, max_length=max_length)
    

def _clean_ents(entities, max_length=30):
    # entities should be less than max_length chars
    return [ent for ent in entities if (ent["end"] - ent["begin"]) <= max_length]

In [None]:
single_linkage(ents, begin="start", label="entity", max_distance=20, agg=["score"])

In [None]:
def _collect_text(ents, text, begin="begin", end="end"):
    for ent in ents:
        ent["text"] = text[ent[begin]:ent[end]]
    return ents

In [None]:
def infer(text, join_entity_distance=20, max_entity_length=50):
    preds = nlp(text)
    ents = iob_to_ents(preds, label="entity", begin="start", agg=["score"], transitions=BILUO_TRANSITIONS)
    ents = extend_ents(ents, begin="start", label="entity", max_distance=join_entity_distance, max_length=max_entity_length, agg=["score"])
    ents = _collect_text(ents, text)
    
    return ents

In [None]:
%%time 
infer(text)