# NER model evaluation notebook
Author name: Daniel J. S. Bright  
Author contact: 12004727@uhi.ac.uk  
Date last touched: 23 February 2023  
Description: Jupyter Notebook sheet to test the performance of NER systems. Presents speed of NER execution and calculations for Precision, Recall and F1-Score. Created for a dissertation for the MSc in Web Technologies at University of the Highlands & Islands.

In [None]:
# install the required Python libraries
!pipenv install spacy numpy pandas spacy_stanza spacy-transformers   # install for pipenv environment
#!pip install spacy numpy pandas spacy_stanza spacy-transformers   # install for Google Colab or other regular pip environment

In [None]:
# download the spaCy models
!python -m spacy download en_core_web_lg 
!python -m spacy download en_core_web_trf

In [None]:
# function to mount Google Drive storage
def mount_google_drive():
    from google.colab import drive

    drive.mount("/content/drive")


In [None]:
# import packages
import spacy, json, glob, os, stanza, spacy_stanza, spacy_transformers
import pandas as pd
from spacy import displacy

# function to get the URLs
def get_annotation_urls(url, ext, print_output=True):
    global annotated_files
    # ensure url has trailing slash
    url = url + "/" if url[-1:] != "/" else url
    # load hand annotated examples
    annotated_files = glob.glob(url + f"*.{ext}")
    # sort based on filename
    annotated_files.sort(key=lambda x: os.path.basename(x))
    # print counted files to demonstrate success
    if print_output:
        print(f"Number of annotated files: {len(annotated_files)}")


# function to put JSON data into Python dictionaries
def json_to_doc(print_output=False):
    # Load json into list of Python dictionaries
    global annotations
    annotations = []
    for f in annotated_files:
        with open(f, "r", encoding="utf-8") as file:
            annotations.append(json.loads(file.read()))
    if print_output:
        # print count of annotation dictionaries to verify success
        print(f"Number of annotations in files: {len(annotations)}")
        # print first element (document), to verify
        print(f"Annotation sample: {annotations[:1]}")


# function to load the models
def load_models(print_output, model):
    global nlp, nlp_trf_orig
    if model == "stanza":
        stanza.download("en")
        nlp = spacy_stanza.load_pipeline("en")
    elif model in ["trf-model-best", "trf-model-best-tuned", "cnn-model-best"]:
        nlp = spacy.load(
            trained_model_path + model,
            exclude="parser,tagger,attribute_ruler,lemmatizer",
        )
        nlp_trf_orig = spacy.load(
            "en_core_web_trf"
            if model in ["trf-model-best", "trf-model-best-tuned"]
            else "en_core_web_lg"
        )  # required as workaround to frozen components bug
        nlp.add_pipe(
            "parser",
            source=nlp_trf_orig,
            after="transformer"
            if model in ["trf-model-best", "trf-model-best-tuned"]
            else "tok2vec",
        )
        nlp.add_pipe("tagger", source=nlp_trf_orig, after="parser")
        nlp.add_pipe("attribute_ruler", source=nlp_trf_orig, after="tagger")
        nlp.add_pipe("lemmatizer", source=nlp_trf_orig, after="attribute_ruler")
        print(f"Evaluating model: {model}") if print_output else None
    else:
        model = (
            trained_model_path + model
            if model not in ["en_core_web_lg", "en_core_web_trf"]
            else model
        )
        print(f"Evaluating model: {model}") if print_output else None
        nlp = spacy.load(model)


# function to set up the paths, etc.
def setup(model, dataset, print_output=False, colab=0):
    global trained_model_path, doc_results_path, doc_results_filename, corpus_results_path, corpus_results_filename
    """paths"""
    # mount google drive if colab boolean True
    mount_google_drive() if colab else None
    # path on google drive to a data directory
    google_drive_path = "/content/drive/MyDrive/"
    # path to annotations
    annotations_path = f'{google_drive_path if colab else "./"}data/{dataset}/'
    # path to the model
    trained_model_path = f'{google_drive_path if colab else "./"}data/model/'
    # path for tested documents results
    doc_results_path = f'{google_drive_path if colab else "./"}data/results/'
    # filename for tested documents results
    doc_results_filename = f"doc_{model}.csv"
    # path for corpus results
    corpus_results_path = f'{google_drive_path if colab else "./"}data/results/'
    # filename for corpus results
    corpus_results_filename = f"corpus_{model}.csv"
    annotations_data_filetype = "json"

    """globals set here"""
    global annotated_files, labels_of_interest
    # define entity labels of interest
    labels_of_interest = ["GPE", "LOC", "DATE", "TIME", "COLOR", "TYPE"]
    # run setup functions
    get_annotation_urls(annotations_path, annotations_data_filetype, print_output)
    json_to_doc(print_output)
    load_models(print_output, model)


In [None]:
# function to get all entities in a hand-annotated doc (all paras)
def get_annotated_entities(annotations):
    return [para[1]["entities"] for para in annotations]


# function to get all raw text from the hand-annotated doc (all paras)
def get_text(annotations):
    return [para[0] for para in annotations]


# function to get the annotations
def get_annotations(print_output=False):
    global annotated_entities, annotated_text

    """
    Note: Entities to be stored in the form [[[element1, element2]],[[element1, element2]]]
    """
    # run function to get all entities from all paragraphs in all passed-in hand-annotated docs
    annotated_entities = [
        get_annotated_entities(doc["annotations"]) for doc in annotations
    ]
    # run function to get all text from all paragraphs in all passed-in hand-annotated docs
    annotated_text = [get_text(doc["annotations"]) for doc in annotations]

    """
    Note: Annotated text stored in form [[para1, para1],[para1, para2]]
    i.e., a list of document-lists of paras
    """
    if print_output:
        # print total counts of annotated documents; paragraphs & entities
        print(f"Number of documents: {len(annotated_entities)}")
        print(f"Number of paras: {sum([len(x) for x in annotated_entities])}")
        print(
            f"Number of entities: {sum([sum(len(y) for y in x ) for x in annotated_entities])}\n"
        )
        # print first entity, of first paragraph, of first doc, to verify entities
        print(
            f"Annotated entities sample (doc 4, para 1): {annotated_entities[3][0]}\n"
        )
        # print sample of annotated text to verify
        print(f"Annotated text sample (doc 4, para 1): {annotated_text[3][0]}\n")


In [None]:
# function to run the NER
def run_ner(model, print_output=False):
    global extracted_entities
    # returned in the form [[[[element1, element2]],[doc2[element1, element2]]]]
    extracted_entities = []
    for doc_text in annotated_text:
        doc_ents = []
        for para in nlp.pipe(
            doc_text, disable=["tagger", "parser", "attribute_ruler", "lemmatizer"]
        ):
            doc_ents.append(
                [[ent.start_char, ent.end_char, ent.label_] for ent in para.ents]
            )
        extracted_entities.append(doc_ents)

    if print_output:
        # print total counts of processed documents; paragraphs & entities
        print(f"Number of documents: {len(extracted_entities)}")
        print(f"Number of paras: {sum([len(x) for x in extracted_entities])}")
        print(
            f"Number of entities: {sum([sum(len(y) for y in x ) for x in extracted_entities])}"
        )
        print("\n")
        print(f"Extract entities: {extracted_entities}")


In [None]:
# function to count annotated and extracted entities in doc
def count_entities(doc):
    return sum([len(list(ent for ent in para)) for para in doc]) if any(doc) else 0


# function to count number of sample paragraphs
def count_sample_paragraphs():
    return sum([len(doc["annotations"]) for doc in annotations])


# function to count the entity class label types
def count_entity_types(dataset="annotations"):
    global entity_count
    entity_count = {k: 0 for k in labels_of_interest}
    for doc in annotated_entities if dataset == "annotations" else extracted_entities:
        for para in doc:
            for ent in para:
                entity_count[ent[2]] += 1
    return entity_count


# function to run the counts
def run_counts(print_output=False):
    global doc_extracted_entities_count, doc_annotated_entities_count, corpus_extracted_entities_count, corpus_annotated_entities_count, extracted_entities, annotated_entities, corpus_sample_paras_total
    # count all entities for each doc
    doc_extracted_entities_count = [count_entities(doc) for doc in extracted_entities]
    doc_annotated_entities_count = [count_entities(doc) for doc in annotated_entities]
    # count all sample paragraphs for corpus
    corpus_sample_paras_total = count_sample_paragraphs()
    # count all entities for corpus
    corpus_extracted_entities_count = sum(doc_extracted_entities_count)
    corpus_annotated_entities_count = sum(doc_annotated_entities_count)
    # print output (always)
    print(
        f"\nExtracted entities count for each doc: {doc_extracted_entities_count}\nAnnotated entities count for each doc: {doc_annotated_entities_count}"
    )
    print(
        f"\nTotal annotated sample paragraphs in the corpus: {corpus_sample_paras_total}\n"
    )
    print(f"Extracted entities count for corpus: {corpus_extracted_entities_count}")
    print(f"Annotated entities count for corpus: {corpus_annotated_entities_count}\n")
    print(f'Totals of annotated entity types: {count_entity_types("annotations")}')
    print(f'Totals of extracted entity types: {count_entity_types("ner")}')
    # print output (optional)
    if print_output:
        print(f"\nExtracted entities for corpus: {extracted_entities}")
        print(f"Annotated entities for corpus: {annotated_entities}\n")


In [None]:
# function to remove any entities not of interest in doc
def remove_irrelevant_entities(doc):
    """
    Note: This ensures extra detected entities do not influence performance
    calculations and is useful for calculating performance
    without the influence of custom entities
    """
    for idx, para in enumerate(doc):
        doc[idx] = [ent for ent in para if ent[2] in labels_of_interest]
    return doc


# function to perform preprocessing on the data
def preprocess_data(print_output=False):
    global extracted_entities, annotated_entities, annotated_text, test_data
    # extract irrelevant entities
    extracted_entities = [remove_irrelevant_entities(doc) for doc in extracted_entities]
    annotated_entities = [remove_irrelevant_entities(doc) for doc in annotated_entities]


In [None]:
# function to run the analysis process
def analyse(print_output=False):

    # define globals
    global doc_extracted_entities_count, doc_annotated_entities_count, corpus_extracted_entities_count, corpus_annotated_entities_count, extracted_entities, corpus_false_positive_entities, true_positives, false_negatives, corpus_true_positives_total, doc_results

    # function to find true positives and false negatives, per document
    def find_matches(print_output, doc_idx, annotated, extracted):
        true_pos = 0
        false_neg = (
            doc_annotated_entities_count[doc_idx]
            - doc_extracted_entities_count[doc_idx]
        )
        for para_idx, (a_ents, e_ents) in enumerate(zip(annotated, extracted)):
            ee_matched = []
            ae_matched = []
            # look for extracted entities within annotated entity boundaries (matches)
            for ae in a_ents:
                for ee in e_ents:
                    if ee[1] >= ae[0] and ee[0] <= ae[1]:
                        if ee[2] == ae[2]:  # true positive identified!
                            if ee in ee_matched:
                                false_neg -= 1  # decrement false negatives by 1
                            else:
                                ee_matched.append(
                                    ee
                                )  # record NER identified entity as seen
                                true_pos += 1  # increment true positives by 1
                            if ae in ae_matched:
                                false_neg += 1  # increment false negatives by 1
                            else:
                                ae_matched.append(ae)  # record appended entity as seen
        """
        Note: Following is to prevent false_neg falling < 0 in event that 
        more NER extracted entities than annotated unduly influence this calculation
        """
        false_neg = false_neg if false_neg > 0 else 0
        return true_pos, false_neg

    # function to calculate true positives for the corpus
    def calc_corpus_true_positives_total(print_output):
        global corpus_true_positives_total
        corpus_true_positives_total = sum(true_positives)

    # function to calculate false negatives for the corpus
    def calc_corpus_false_negatives_total(print_output):
        global corpus_false_negatives_total
        corpus_false_negatives_total = sum(false_negatives)

    # function to calculate true positive & false negatives for each doc
    def calc_true_pos_false_neg(print_output):
        global true_positives, false_negatives
        true_positives = []
        false_negatives = []
        for doc_idx, doc in enumerate(annotated_entities):
            true_pos, false_neg = find_matches(
                print_output,
                doc_idx,
                annotated_entities[doc_idx],
                extracted_entities[doc_idx],
            )
            true_positives.append(true_pos)
            false_negatives.append(false_neg)

    # functions to compute precision, recall and f1-score for model-level evaluation
    def model_level_eval_doc():
        global doc_results
        doc_results = []
        precision_list = []
        recall_list = []
        f1_score_list = []

        for tp, fn, ee in zip(
            true_positives, false_negatives, doc_extracted_entities_count
        ):
            # calculate precision for each doc
            precision_list.append(tp / ee) if tp > 0 else precision_list.append(
                1.0
            ) if (fn == 0 and ee == 0) else precision_list.append(0)
            # calculate recall for each doc
            recall_list.append(tp / (tp + fn)) if tp > 0 else recall_list.append(
                1.0
            ) if (fn == 0 and ee == 0) else recall_list.append(0)
        for idx, (p, r, tp, fn, ee) in enumerate(
            zip(
                precision_list,
                recall_list,
                true_positives,
                false_negatives,
                doc_extracted_entities_count,
            )
        ):
            # calculate f1-score for each doc
            f1_score_list.append((2 * p * r / (p + r))) if (
                p > 0 and r > 0
            ) else f1_score_list.append(1.0) if (
                tp == 0 and (ee == 0 and fn == 0)
            ) else f1_score_list.append(
                0
            )
        # add results to results dictionary
        for doc in range(len(precision_list)):
            doc_results.append(
                {
                    "precision": round(precision_list[doc], 3),
                    "recall": round(recall_list[doc], 3),
                    "f1-score": round(f1_score_list[doc], 3),
                }
            )

    # function to calculate precision of corpus
    def model_level_eval_corpus():
        global corpus_results
        precision = (
            corpus_true_positives_total / corpus_extracted_entities_count
            if corpus_true_positives_total > 0
            else 1.0
            if (
                corpus_extracted_entities_count == 0
                and corpus_false_negatives_total == 0
            )
            else 0
        )
        # calculate recall for each doc
        recall = (
            corpus_true_positives_total
            / (corpus_true_positives_total + corpus_false_negatives_total)
            if corpus_true_positives_total > 0
            else 1.0
            if (
                corpus_extracted_entities_count == 0
                and corpus_false_negatives_total == 0
            )
            else 0
        )
        # calculate f1-score for each doc
        f1_score = (
            2 * precision * recall / (precision + recall)
            if (precision > 0 and recall > 0)
            else 1.0
            if corpus_true_positives_total == 0
            and (
                corpus_extracted_entities_count == 0
                and corpus_false_negatives_total == 0
            )
            else 0
        )
        corpus_results = {
            "precision": round(precision, 3),
            "recall": round(recall, 3),
            "f1_score": round(f1_score, 3),
        }

    # define variables to hold the working data
    corpus_false_positive_entities = []
    corpus_missed_entities = []
    corpus_false_positive_entities.clear()  # clear list
    corpus_missed_entities.clear()  # clear list first

    # run the above functions to evaluate
    preprocess_data(print_output=0)  # create working copies of data
    run_counts(0)
    calc_true_pos_false_neg(print_output)
    calc_corpus_true_positives_total(print_output)
    calc_corpus_false_negatives_total(print_output)
    model_level_eval_doc()
    model_level_eval_corpus()

    # print the output
    if print_output:
        print("\n")
        # print per document totals
        for doc_idx, doc in enumerate(extracted_entities):
            print(f"Document {doc_idx}")
            print(f"True positives: {true_positives[doc_idx]}")
            print(
                f"False positives: {doc_extracted_entities_count[doc_idx] - true_positives[doc_idx]}"
            )
            print(f"False negatives: {false_negatives[doc_idx]}\n")
        # print all totals
        print(f"\nDocument analysis results: {doc_results}")
        print(f"Corpus analysis results {corpus_results}\n")


In [None]:
# function to display examples using the displacy module
def display_examples(docs_of_interest):
    for doc in docs_of_interest:
        try:
            for para_idx, para in enumerate(annotated_entities[doc]):
                ae = annotated_entities[doc][para_idx]
                text = annotated_text[doc][para_idx]
                print(f'\nAnnotated example for document {doc}')
                displacy.render({
                'text': text,
                'ents': [{"start": e[0], "end": e[1], "label": e[2]} for e in ae],
                "title": f'Document {doc}, para {para_idx}'
            }, manual=True, style='ent', jupyter=True)
                print(f'Extracted example for document {doc}')
                ee = extracted_entities[doc][para_idx]
                displacy.render({
                'text': text,
                'ents': [{"start": e[0], "end": e[1], "label": e[2]} for e in ee],
                "title": f'Document {doc}, para {para_idx}'
            }, manual=True, style='ent', jupyter=True)
        except IndexError:
            print(f'You appear to be trying to display results for document {doc}, which does not appear to exist!')

In [None]:
# function to display results using a pandas dataframe
def display_results(show_label_examples=[], show_scores=True):
    global df_docs_results, df_corpus_results
    if show_label_examples:
        display_examples(show_label_examples)
    if show_scores:
        # convert Python dictionary to pandas dataframe
        dr = dict()
        for idx, doc in enumerate(doc_results):
            dr[idx] = {k.capitalize(): v for k,v in doc.items()}
        print('\nDocument Analysis Results')
        df_docs_results = pd.DataFrame.from_dict(dr)
        df_docs_results = df_docs_results.T
        df_docs_results = df_docs_results[['Precision','Recall','F1-score']]
        df_docs_results.index.name = 'Document'
        display(df_docs_results)
        print('\nCorpus Analysis Results')
        cr = {k.capitalize():[v] for k, v in corpus_results.items()}
        df_corpus_results = pd.DataFrame.from_dict(cr)
        df_corpus_results.index.name = 'Corpus'
        display(df_corpus_results)

In [None]:
# function to write results to a file
def write_results_to_file(write):
    if write:
        df_docs_results.to_csv(doc_results_path + doc_results_filename)
        df_corpus_results.to_csv(corpus_results_path + corpus_results_filename)

In [None]:
"""
Configure the testing.

Arguments:

model: The model to test. Options include 'en_core_web_lg', 'en_core_web_trf', 'stanza', and 'trf-model-best'
dataset: The dataset to use, from: 'train' or 'test'.
print_output: Whether to print verbose output (boolean)
colab: Whether this is run on the Google Colaboratory platform (boolean)

"""
global model
model = "cnn-model-best"
setup(model=model, dataset="test", print_output=0, colab=1)
# import the annotations
get_annotations(print_output=0)


In [None]:
# run the NER process, using the configuration defined above
run_ner(model, print_output=0)

In [None]:
""" 
Run the analysis process, using the configuration defined above.

Arguments: 

analyse: Whether to print the output (boolean)
display_results: ([Indexes (counting from 1) of docs to display entities for (list)], show scores? (boolean))
write_results_to_file: Whether to write the results of the testing to an output file (boolean)

"""
analyse(print_output=0)
display_results([4], 1)
write_results_to_file(1)


In [None]:
# function to print entities extracted for semantic testing
def print_ents(doc):
    if doc.ents:
        for ent in doc.ents:
            print(f"Text: {ent.text}")
            print(f"Label: {ent.label_}")
    else:
        print("None")


# define the data for the semantic test
token = "disk"
token_in_context = f"I saw an object in the sky, it looked like a {token}"
token_in_wrong_context = f"I played a {token} on my CD player!"

# run the semantic test
print(f"Token absent context: ")
print_ents(nlp(token))
print(f"\nToken in correct context: ")
print_ents(nlp(token_in_context))
print(f"\nToken in incorrect context: ")
print_ents(nlp(token_in_wrong_context))
