In [1]:
from pathlib import Path

from spacy.cli import apply
from spacy.tokens import DocBin, Doc
import pandas as pd
import os
import geopandas as gpd
import spacy
from spacy import displacy
from spacy.language import Language
from spacy.pipeline.functions import merge_entities
from spacy.tokens import Span
from spacy.util import filter_spans
from spacy.matcher.phrasematcher import PhraseMatcher

from scripts.utils.config import Config
from scripts.geoms.operations import sides
from scripts.utils.spacy import load_spacy

In [2]:
config = Config()
train_path = config.get_data_path("entity_recognition.article_text_train")
dev_path = config.get_data_path("entity_recognition.article_text_dev")
explore_path = config.get_data_path("entity_recognition.explore")
sent_model = os.path.join(config.get_file_path("sent_relevance.trained_model"), 'model-best')
comm_area_path = config.get_data_path("geoms.comm_areas")
neighborhood_path = config.get_data_path("geoms.neighborhoods")
street_name_path = config.get_data_path("geoms.street_names")

In [3]:
nlp = load_spacy("en_core_web_sm")
nlp_base = load_spacy("en_core_web_sm")

# Reading this twice because both nlp's modifies the docs in place.
docs_base = list(DocBin().from_disk(train_path).get_docs(nlp.vocab))
docs_base = [d for d in docs_base if 'WHERE' in d.user_data and d.user_data['WHERE'] > .5]
docs = list(DocBin().from_disk(train_path).get_docs(nlp.vocab))
docs = [d for d in docs if 'WHERE' in d.user_data and d.user_data['WHERE'] > .5]


In [5]:
gpes = pd.concat([gpd.read_parquet(comm_area_path)['community_name'].rename('name'),
                        pd.read_csv(neighborhood_path)['name'],
                        pd.Series(sides)], ignore_index=True)
gpes = gpes.str.split(",", expand=False).explode()
gpes = gpes.str.title().drop_duplicates().sort_values()

street_names = pd.read_csv(street_name_path)
street_names = street_names.filter(like='combined').melt()['value']
street_names = street_names.str.title().drop_duplicates().sort_values()

In [6]:
matcher = PhraseMatcher(nlp.vocab)
patterns = list(nlp.tokenizer.pipe(gpes))
matcher.add("GPE", patterns)
patterns = list(nlp.tokenizer.pipe(street_names))
matcher.add("FAC", patterns)

@Language.component("loc_matcher")
def pattern_matcher(doc: Doc):
    matches = matcher(doc, as_spans=True)
    doc.ents = filter_spans(matches)
    return doc

nlp.add_pipe("loc_matcher", before="ner")

<function __main__.pattern_matcher(doc: spacy.tokens.doc.Doc)>

In [7]:
@Language.component("block_matcher")
def expand_street_blocks(doc: Doc):
    new_ents = []
    for idx, ent in enumerate(doc.ents):
        # Only check for title if it's a person and not the first token
        if ent.label_ == "FAC" and ent.start >= 3 and idx >= 1:
            prev_ent = list(doc.ents)[idx-1]
            prev_tokens = doc[ent.start - 3: ent.start]
            # Must match [CARDINAL] block of [FAC]
            if (prev_tokens[2].text == "of" and prev_tokens[1].text == "block"
                and prev_ent.label_ == "CARDINAL" and prev_tokens[0].text == prev_ent.text):
                new_ent = Span(doc, ent.start - 3, ent.end, label=ent.label)
                new_ents.append(new_ent)
    doc.ents = filter_spans(list(doc.ents) + new_ents)
    return doc

nlp.add_pipe("block_matcher", after="ner")

<function __main__.expand_street_blocks(doc: spacy.tokens.doc.Doc)>

In [8]:
@Language.component("intersection_matcher")
def expand_intersections(doc: Doc):
    new_ents = []
    for idx, ent in enumerate(doc.ents):
        # Only check for title if it's a person and not the first token
        if ent.label_ == "FAC" and ent.start >= 2 and idx >= 1:
            prev_ent = list(doc.ents)[idx-1]
            prev_tokens = doc[ent.start - 2: ent.start]
            # Must match [STREET] and [STREET]
            if ((prev_tokens[1].text == "and" or prev_tokens[1].text == "&")
                and prev_ent.label_ == "FAC" and prev_tokens[0].text == prev_ent.text):
                new_ent = Span(doc, ent.start - 2, ent.end, label=ent.label)
                new_ents.append(new_ent)
    doc.ents = filter_spans(list(doc.ents) + new_ents)
    return doc

nlp.add_pipe("intersection_matcher", before="block_matcher")

<function __main__.expand_intersections(doc: spacy.tokens.doc.Doc)>

In [9]:
from IPython.display import clear_output

def ents_eq(e1, e2):
    return e1.text == e2.text and e1.start == e2.start \
        and e1.end == e2.end and e1.label == e2.label \
        and e1.label_ == e2.label_

for doc_base, doc in zip(docs_base, docs):
    # pred_base = nlp_base(doc_base)
    pred = nlp(doc)
    # if not all([ents_eq(e1, e2) for e1, e2 in zip(pred_base.ents, pred.ents)]):
    clear_output(wait=True)
        # displacy.render(pred_base, style="ent")
    displacy.render(pred, style="ent")
    wait = input("Press any key to continue")
    if wait:
        continue

KeyboardInterrupt: Interrupted by user