# Process OASIS records
Vocabulary-based Named Entity Recognition (NER) and vocabulary alignment, applied to a set of XML OASIS abstracts obtained from ADS. Detecting temporal phrases and object/monument types.

In [1]:
%%capture
# install required dependencies
%pip install --upgrade pip
%pip install spacy
%pip install ipywidgets
%sx python -m spacy download en_core_web_sm

In [2]:
# reference required modules
#from IPython.display import display, HTML
from slugify import slugify # for creating valid filenames from identifiers
import spacy # for NER processing
from spacy.tokens import Doc # for NER results
from lxml import etree as ET # for parsing input records from XML file
from datetime import datetime as DT # for timestamps
from weasyprint import HTML
from html import escape # for writing escaped text within HTML 
import pandas as pd  # for DataFrame
import json, os
from rematch2 import \
    PeriodoRuler, \
    VocabularyRuler, \
    NegationRuler, \
    DocSummary, \
    TextNormalizer, \
    SpanRemover
from rematch2.spacypatterns import patterns_en_ATTRIBUTE_RULES # rules to override POS tags in some cases
import warnings
from pprint import pprint
#import pdfkit # to produce PDF output from html output
# suppress user warnings during execution
warnings.filterwarnings(action='ignore', category=UserWarning)


In [3]:
# parse and extract a list of OASIS abstract records from source XML file 
# returns [{"id", "text"}, {"id", "text"}, ...] for subsequent processing
def get_records_from_xml_file(file_path: str="") -> list:
    records = []
    try:
        # read XML file
        tree = ET.parse(file_path)
        root = tree.getroot()
    except:
        print(f"Could not read from {file_path}")
        return []

    # find rows to be processed in the XML file
    rows = tree.xpath("/table/rows/row")

    for row in rows:
        # find abstract(s) in the current item
        abstracts = row.xpath("value[@columnNumber='1']/text()")
       
        # if multiple abstracts, get first one
        if (len(abstracts) > 0):
            abstract = abstracts[0]
        else:
            abstract = ""

         # find identifier(s) in the current item
        identifiers = row.xpath("value[@columnNumber='0']/text()")

        # if multiple identifiers, get first one (remove URL prefix if present)
        if (len(identifiers) > 0):
            identifier = identifiers[0]
            identifier = identifier.replace(
                "https://archaeologydataservice.ac.uk/archsearch/record?titleId=", "")
        else:
            identifier = ""

        ## create new (cleaned) record and add it
        #record = {}
        #record["id"] = str(identifier).strip()
        #record["text"] = str(abstract).strip()
        records.append({
            "id": str(identifier).strip(),
            "text": str(abstract).strip()
        })

    # finally, return the extracted list
    return records


# parse and extract a list of OASIS abstract records from source CSV file 
# returns [{"id", "text"}, {"id", "text"}, ...] for subsequent processing
def get_records_from_csv_file(file_path: str="") -> list:
    records = []
    
    # read the CSV file to a DataFrame
    df = pd.read_csv(file_path, skip_blank_lines=True)
    # set any NaN values to blank string
    df.fillna("", inplace=True)
    # convert the data to a dict structure
    items = df.to_dict(orient="records") 
    
    records = list(map(lambda item: { 
            "id": str(item.get("file", "")).strip(), 
            "title": str(item.get("title", "")).strip(), 
            "text": str(item.get("abstract")).strip() 
        }, items))
    
    return records
     

In [None]:
# using predefined spaCy pipeline (English)
nlp = spacy.load("en_core_web_sm", disable = ['ner'])

# adding custom rules to override default POS tagging for specific cases
# NOTE: adding rules to existing attribute_ruler component didn't seem to
# work, so inserting another one directly after it and adding rules to that
#nlp.get_pipe("attribute_ruler").add_patterns(patterns_en_ATTRIBUTE_RULES)
ar = nlp.add_pipe("attribute_ruler", name="custom_attribute_ruler", after="attribute_ruler")
ar.add_patterns(patterns_en_ATTRIBUTE_RULES)
#pprint(nlp.get_pipe("attribute_ruler2").patterns[-5:])

# using HE Cultural Periods authority
periodo_authority_id = "p0kh9ds" 

# The chosen controlled vocabularies do not always contain terms expressed in the same way as natural language
# and sometimes contain ambiguous terms. To mitigate these issues the vocabulary-based components accept lists
# of supplementary terms and stop lists to be passed into the configuration, to fine-tune the matching, where 
# problematic matches or non-matches are encountered

# reading supplementary lists from JSON files
def read_json(file_name):
    data = []
    try:
        with open(file_name, "r") as f:
            data = json.load(f)
    except Exception as e:
        print(f"Problem reading \"{file_name}\": {e}")
    return data

supp_list_obj = read_json("./supp_list_en_FISH_ARCHOBJECTS.json")
supp_list_mon = read_json("./supp_list_en_FISH_MONUMENTS.json")
supp_list_per = read_json("./supp_list_en_FISH_PERIODS.json")

# lists of vocabulary concepts we don't want to appear in the results even if legitimately matches 
# hardcoded directly here, but could also be read in from JSON files as above
stop_list_mon = [
    "http://purl.org/heritagedata/schemes/eh_tmt2/concepts/71054",  # Roundhouse (Lock Up)
    "http://purl.org/heritagedata/schemes/eh_tmt2/concepts/93179",  # Roundhouse (Railway)
    "http://purl.org/heritagedata/schemes/eh_tmt2/concepts/69434",  # works
    "http://purl.org/heritagedata/schemes/eh_tmt2/concepts/92230",  # term
    "http://purl.org/heritagedata/schemes/eh_tmt2/concepts/93931",  # model
    "http://purl.org/heritagedata/schemes/eh_tmt2/concepts/88215",  # half moon
    "http://purl.org/heritagedata/schemes/eh_tmt2/concepts/68762",  # crest
]

stop_list_obj = [
    "http://purl.org/heritagedata/schemes/mda_obj/concepts/139085", # Coin (Contemporary Imitation)
    "http://purl.org/heritagedata/schemes/mda_obj/concepts/139087", # Coin (Modern Forgery)
    "http://purl.org/heritagedata/schemes/mda_obj/concepts/139086", # Coin (Modern Imitation)
    "http://purl.org/heritagedata/schemes/mda_obj/concepts/95353",  # level
    "http://purl.org/heritagedata/schemes/mda_obj/concepts/95306",  # scale
    "http://purl.org/heritagedata/schemes/mda_obj/concepts/96615",  # shift
    "http://purl.org/heritagedata/schemes/mda_obj/concepts/96379",  # point
    "http://purl.org/heritagedata/schemes/mda_obj/concepts/143243", # setting
    "http://purl.org/heritagedata/schemes/mda_obj/concepts/96735",  # staff
    "http://purl.org/heritagedata/schemes/mda_obj/concepts/100151", # paper
    "http://purl.org/heritagedata/schemes/mda_obj/concepts/96473",  # model
    "http://purl.org/heritagedata/schemes/mda_obj/concepts/100107", # desk
    "http://purl.org/heritagedata/schemes/mda_obj/concepts/95183",  # crest
]

# add rematch2 NER pipeline components (usually to the end of the pipeline, but may be inserted anywhere)
nlp.add_pipe("normalize_text", before = "tagger")

nlp.add_pipe("yearspan_ruler", last=True)    
nlp.add_pipe("periodo_ruler", last=True, config={"periodo_authority_id": periodo_authority_id, "supp_list": supp_list_per}) 
nlp.add_pipe("fish_archobjects_ruler", last=True, config={"supp_list": supp_list_obj, "stop_list": stop_list_obj}) 
nlp.add_pipe("fish_monument_types_ruler", last=True, config={"supp_list": supp_list_mon, "stop_list": stop_list_mon})   
nlp.add_pipe("negation_ruler", last=True) 
nlp.add_pipe("child_span_remover", last=True) 
#nlp.add_pipe("stop_list_span_remover", last=True, config={"stop_list": stop_list}) 

# process ADS CSV report examples
input_directory = "./data/ner-input/oasis-report-metadata"
input_file_name = "report_metadata.csv"
output_directory = "./data/ner-output/ner-output-oasis-report-metadata"

# process ADS CSV journal examples
#input_directory = "./data/ner-input/ads-journal-metadata"
#input_file_name = "journal_metadata.csv"
#output_directory = "./data/ner-output/ner-output-ads-journal-metadata"

# process ADS XML metadata examples
#input_directory = "./data/ner-input/oasis-descr-examples"
#input_file_name = "oasis_descr_examples.xml"
#output_directory = f"./data/ner-output/ner-output-oasis-descr-examples"

# timestamp for use in directory names
timestamp = DT.now().strftime('%Y%m%d')

# create output file path if it does not already exist
output_directory = f"{output_directory}-{timestamp}"
if not os.path.exists(output_directory):
    os.makedirs(output_directory)

input_records = []
input_file = os.path.join(input_directory, input_file_name)
if input_file.lower().endswith(".xml"):
    input_records = get_records_from_xml_file(input_file)
elif input_file.lower().endswith(".csv"):
    input_records = get_records_from_csv_file(input_file)

record_count = len(input_records)

metadata = {
    "identifier": "",
    "title": "vocabulary-based NER results",
    "description": "vocabulary-based NER annotation on text abstracts",
    "creator": "T4-1-2-NER-OASIS-metadata-records.ipynb",
    "created": DT.now().strftime('%Y-%m-%dT%H:%M:%SZ'),
    "periodo_authority_id": periodo_authority_id,
    "ner_pipeline": nlp.pipe_names,
    "input_file_name": input_file_name,
    "input_record_count": record_count
}

current_record = 0
for record in input_records or []:
    current_record += 1

    # get ID from the record
    identifier = record.get("id", "").strip()   
    metadata["identifier"] = identifier

    # print progress indicator
    print(f"processing record {current_record} of {record_count} [ID: {identifier}]")
        
    # Combine title and main text from the record   
    # input_text = "\n".join([record.get("title", "") + ".", record.get("text", "")])
    # 04/10/24 just process main text don't include title
    input_text = record.get("text", "")

    # normalise input text prior to NER processing
    if(len(input_text) > 0):
        # perform annotation on (normalized) text
        doc = nlp(input_text)

        # (optionally) add any identified place entities to the custom spans        
        #for ent in filter(lambda e: e.label_ == "GPE", doc.ents):
            #doc.spans["rematch"].append(ent)            

        summary = DocSummary(doc, metadata=metadata)
       
        # build output file names incorporating record identifiers
        # slugify identifiers in case of bad characters for file names
        #html_file_name = os.path.join(output_directory, f"ner-output-{slugify(identifier)}.html") 
        #text_file_name = os.path.join(output_directory, f"ner-output-{slugify(identifier)}.txt")
        json_file_name = os.path.join(output_directory, f"ner-output-{slugify(identifier)}.json")
        pdf_file_name = os.path.join(output_directory, f"ner-output-{slugify(identifier)}.pdf") 
       
        # write results report to PDF and JSON files    
        report = summary.report(format="html")      
        HTML(None, string=report, encoding="utf-8").write_pdf(target=pdf_file_name)
                
        #report = summary.report(format="text")
        #with open(text_file_name, "w") as file:
            #file.write(report)
                
        report = summary.report(format="json")
        with open(json_file_name, "w") as file:
            file.write(report) 
        
        # temp interrupt after a few records (while testing)
        #if current_record == 2:
            #break

ordinal_ruler
dateprefix_ruler
datesuffix_ruler
dateseparator_ruler
monthname_ruler
seasonname_ruler
yearspan_ruler
ordinal_ruler
dateprefix_ruler
datesuffix_ruler
dateseparator_ruler
monthname_ruler
seasonname_ruler
yearspan_ruler
ordinal_ruler
dateprefix_ruler
datesuffix_ruler
dateseparator_ruler
monthname_ruler
seasonname_ruler
yearspan_ruler
ordinal_ruler
dateprefix_ruler
datesuffix_ruler
dateseparator_ruler
monthname_ruler
seasonname_ruler
yearspan_ruler
ordinal_ruler
dateprefix_ruler
datesuffix_ruler
dateseparator_ruler
monthname_ruler
seasonname_ruler
yearspan_ruler
ordinal_ruler
dateprefix_ruler
datesuffix_ruler
dateseparator_ruler
monthname_ruler
seasonname_ruler
yearspan_ruler
ordinal_ruler
dateprefix_ruler
datesuffix_ruler
dateseparator_ruler
monthname_ruler
seasonname_ruler
yearspan_ruler
ordinal_ruler
dateprefix_ruler
datesuffix_ruler
dateseparator_ruler
monthname_ruler
seasonname_ruler
yearspan_ruler
ordinal_ruler
dateprefix_ruler
datesuffix_ruler
dateseparator_ruler
mont

In [5]:
"""
from IPython.display import display, HTML

# build list of results
def result_link(record):
    identifier = record["id"] 
    file_path=f"https://html-preview.github.io/?url=https://github.com/cbinding/rematch2/blob/main/data/output/{slugify(identifier)}.html"
    return f"<li><a href='{file_path}'>{identifier}</a></li>" 
results = list(map(result_link, input_records or []))
results.sort()
#display(HTML("<ul>" + "".join(results) + "</ul>"))
with open("./data/output/results.md", "w") as file:
    file.write("<ul>" + "".join(results) + "</ul>")
"""

'\nfrom IPython.display import display, HTML\n\n# build list of results\ndef result_link(record):\n    identifier = record["id"] \n    file_path=f"https://html-preview.github.io/?url=https://github.com/cbinding/rematch2/blob/main/data/output/{slugify(identifier)}.html"\n    return f"<li><a href=\'{file_path}\'>{identifier}</a></li>" \nresults = list(map(result_link, input_records or []))\nresults.sort()\n#display(HTML("<ul>" + "".join(results) + "</ul>"))\nwith open("./data/output/results.md", "w") as file:\n    file.write("<ul>" + "".join(results) + "</ul>")\n'