Install dependencies and download spaCy's [`en_core_web_sm` model](https://spacy.io/models/en#en_core_web_sm), which is trained on a small corpus of general English text on the web. Import libraries we'll use.

In [2]:
!pip install spacy unidecode
!python -m spacy download en_core_web_sm

import csv        # loading/saving data
import spacy      # nlp library
import difflib    # comparing lists of terms
import unidecode  # normalizing terms for comparison

from collections import Counter, defaultdict
from itertools import islice

[38;5;2m✔ Download and installation successful[0m
You can now load the model via spacy.load('en_core_web_sm')


Load the spaCy model and define the types of entities we are interested in recognizing. Create `Counter` objects to track the number of times we see entities.

In [29]:
nlp = spacy.load("en_core_web_sm")
ent_types = ["GPE", "PERSON"]       # "GPE" = geopolitical entity (place)
ent_counts = defaultdict(Counter)   # stats container
ents = defaultdict(lambda: defaultdict(list))            # lists of identified entities

Load all items from the PGP Metadata spreadsheet, storing their descriptions and PGPIDs.

In [63]:
items = []
with open("pgp_items.csv") as file:
  reader = csv.DictReader(file)
  for row in reader:
    items.append((
        row["Description"],
        { "PGPID": row["\ufeffPGPID"], }
    ))
print(f"loaded {len(items)} items")

loaded 29242 items


Create a normalization function for comparing entities that strips out diacritics, lowercases, and removes whitespace. Also add a custom property to identified entities to store their normalized forms and PGPID where they occur.

In [52]:
def norm_text(text):
    return unidecode.unidecode(text.lower().strip())
spacy.tokens.Span.set_extension("pgpid", default="")
spacy.tokens.Span.set_extension("norm_text", default="")

ValueError: [E090] Extension 'norm_text' already exists on Span. To overwrite the existing extension, set `force=True` on `Span.set_extension`.

Process all item descriptions using our spaCy pipeline, counting and storing occurrences of named entities as we encounter them.

In [64]:
for doc, context in nlp.pipe(items, as_tuples=True, disable=["tagger", "parser"]):  # only run NER part of pipe
  for ent in doc.ents:
    ent._.norm_text = norm_text(ent.text)          # normalize entity text
    ent._.pgpid = context["PGPID"]                 # store pgpid on entity
    ents[ent.label_][ent._.norm_text].append(ent)  # list of occurrences in docs for each named entity, sorted by type 
    ent_counts[ent.label_][ent._.norm_text] += 1   # count of occurrences of each named entity, sorted by type
print("processing completed")

processing completed


Load the list of known places.

In [31]:
known_places = []
with open("pgp_places.csv") as file:
    reader = csv.reader(file)
    for row in islice(reader, 1, None):   # skip header row
        for cell in row:
            if cell:                      # skip blank cells
                known_places.append(norm_text(cell))    # normalize
known_places = list(set(known_places))    # dedup
print(f"loaded {len(known_places)} known places")

loaded 224 known places


Further process the list of places to prevent false positives.

In [58]:
# GPE = geopolitical entity (place)
places = ents["GPE"]

# some markers clearly indicate a person has been misidentified as a place
person_markers = ["b. ", "bt. ", "ibn ", "bint ", "ben ", "bat ", "abū ", "abu ", "abu-", "bu ", "umm ", "sitt "]

output = defaultdict(list)
for name, occurrences in places.items():
    # check if any clear person markers or numbers are present;
    # if there are numbers present it's likely a date, not a place
    if not any([m in name for m in person_markers]) and \
       not any([c.isdigit() for c in name]):    
       output[name] = occurrences
places = dict(output)
place_counts = dict(ent_counts["GPE"])

print(f"identified {len(places)} places after normalization")

identified 1166 places after normalization


Compare the two lists, checking how many times places were automatically identified that weren't in the known list of places.

In [74]:
# compare the two lists and keep those we didn't find in the known places list
missing = []
for line in difflib.ndiff(sorted(places), sorted(known_places)):
    if line.startswith("-"):
        missing.append(line[2:])
print(f"identified {len(missing)} potential places not in list")

# get the most frequently occurring missing places
missing_counts = { place: count for (place, count) in ent_counts["GPE"].items() if place in missing }

identified 1034 potential places not in list


Write results to a file.

In [81]:
# write the results to a file
with open("missing_places.csv", mode="w") as file:
    fieldnames = ["place", "count", "pgpid"]
    writer = csv.DictWriter(file, fieldnames=fieldnames)
    writer.writeheader()
    for place, count in reversed(sorted(missing_counts.items(), key=lambda e: e[1])):       # order by count
        pgpids = list(set([o._.pgpid for o in ents["GPE"][place] if o._.pgpid.strip() != ""]))
        writer.writerow({ "place": place, "count": count, "pgpid": ", ".join(pgpids) })
print("wrote missing_places.csv")

wrote missing_places.csv


Load the lists of known authors/editors into a single list of known people.

In [37]:
known_people = []
for sheet in ["pgp_authors.csv", "pgp_editors.csv"]:
    with open(sheet) as file:
        reader = csv.DictReader(file)
        for row in reader:
            known_people.append(norm_text(row["Name PGP"])) # normalize
known_people = list(set(known_people)) # dedup
print(f"loaded {len(known_people)} known people")

loaded 8893 known people


Further process the list of identified people to prevent false positives.

In [48]:
people = ents["PERSON"]

# add additional context to people's names since the recognizer doesn't do a great job with arabic names
output = defaultdict(list)
for name, occurrences in people.items():
    # if there are numbers present it's likely a date, not a person
    if not any([c.isdigit() for c in name]):
        # for names starting with common markers, add the two tokens to the left as context
        # note that this will often make it not match the known names, but there's little overlap anyway
        if any([name.startswith(m) for m in person_markers]):
            for o in occurrences:
                name_with_context = str(o.doc[max(o.start - 2, 0):o.end])
                output[name_with_context].append(o)                 # track occurrences for contextualized versions
                ent_counts["PERSON"][name_with_context] += 1        # track counts for contextualized versions
        # for other names just keep the saved occurrences
        else:
            output[name] = occurrences
people = dict(output)

print(f"identified {len(people)} people after normalization")

identified 11535 people after normalization


Compare the two lists, saving the identified people who weren't a known author or editor. Check how many times the missing people were identified in descriptions.

In [83]:
# compare the two lists and keep those we didn't find in the known people list
missing = []
for line in difflib.ndiff(sorted(people), sorted(known_people)):
    if line.startswith("-"):
        missing.append(line[2:])
print(f"identified {len(missing)} potential people not in list")

# get the most frequently occurring missing people
missing_counts = Counter({ person: count for (person, count) in ent_counts["PERSON"].items() if person in missing })

identified 10435 potential people not in list


Write the results to a file.

In [87]:
# write the results to a file
with open("missing_people.csv", mode="w") as file:
    fieldnames = ["person", "count", "pgpid"]
    writer = csv.DictWriter(file, fieldnames=fieldnames)
    writer.writeheader()
    for person, count in missing_counts.most_common():       # order by count
        pgpids = list(set([o._.pgpid for o in ents["PERSON"][person] if o._.pgpid.strip() != ""]))
        writer.writerow({ "person": person, "count": count, "pgpid": ", ".join(pgpids) })
print("wrote missing_people.csv")

wrote missing_people.csv
