In [22]:
import random
import os
import json
import re
from typing import Tuple, List
import pandas as pd
import matplotlib.pyplot as plt
import spacy
from spacy.util import minibatch, compounding
from operator import itemgetter

In [23]:
class Filesystem:
    def is_file_valid_json(self, path: str) -> bool:
        if not os.path.isfile(path):
            return False

        try:
            f = open(path, encoding="utf8")
            json.load(f)
            return True
        except ValueError:  # includes JSONDecodeError
            return False

    def get_dataset(self, dataset_size: int) -> Tuple[List[str], List[str]]:
        dir = "../dataset"
        test_dir = "../dataset-test/"
        train_dataset = []
        test_dataset = []

        # Clean macOS specific files
        workdir = os.listdir(dir)
        test_workdir = os.listdir(test_dir)
        if ".DS_Store" in workdir:
            workdir.remove(".DS_Store")

        if ".DS_Store" in test_workdir:
            workdir.remove(".DS_Store")

        # Always randomize the training dataset
        random.shuffle(workdir)
        enumeration = enumerate(workdir)

        for i, filename in enumeration:
            path = os.path.join(dir, filename)
            # checking if it is a file
            if self.is_file_valid_json(path) and i < dataset_size:
                f = open(path, encoding="utf8")
                data = json.load(f)

                entities = []

                for item in data["items"]:
                    # if item["type"] != "RATIO_DECIDENDI" and item["type"] != "SUBJECT":
                    entities.append((item["start"], item["end"], item["type"]))

                train_dataset.append([data["source"], {"entities": entities}])

            # elif self.is_file_valid_json(path) and i >= dataset_size:
            #     f = open(path, encoding="utf8")
            #     data = json.load(f)

            #     entities = []

            #     for item in data["items"]:
            #         # if (item["type"] != "RATIO_DECIDENDI" and item["type"] != "SUBJECT"):
            #         entities.append(
            #             (
            #                 item["start"],
            #                 item["end"],
            #                 item["type"],
            #                 item["selected-text"],
            #             )
            #         )

            #     test_dataset.append([data["source"], {"entities": entities}])

        for filename in test_workdir:
            path = os.path.join(test_dir, filename)
            # checking if it is a file
            if self.is_file_valid_json(path):
                f = open(path, encoding="utf8")
                data = json.load(f)

                entities = []

                for item in data["items"]:
                    # if item["type"] != "RATIO_DECIDENDI" and item["type"] != "SUBJECT":
                    entities.append(
                        (
                            item["start"],
                            item["end"],
                            item["type"],
                            item["selected-text"],
                        )
                    )

                test_dataset.append([data["source"], {"entities": entities}])

        return (train_dataset, test_dataset)

    def get_models(self):
        return [
            "../models/" + d
            for d in os.listdir("../models")
            if os.path.isdir(os.path.join("../models", d))
        ]

In [24]:
class Training:
    def run(self):
        # Get a randomized TRAIN_DATA with specified size
        dataset_sizes_str = input("Insert the dataset size for training: ").split()
        dataset_sizes_int = list(set([int(item) for item in dataset_sizes_str]))

        for dataset_size in dataset_sizes_int:
            dataset = fs.get_dataset(dataset_size)
            TRAIN_DATA = dataset[0]
            TEST_DATA = dataset[1]

            n_iter = 10
            random.seed(0)

            # Create blank model
            nlp = spacy.blank("pt")
            # nlp = spacy.load('pt_core_news_sm')

            ner = None

            # Get ner pipeline component (create if necessary)
            if "ner" not in nlp.pipe_names:
                ner = nlp.create_pipe("ner")
                nlp.add_pipe(ner)
            else:
                ner = nlp.get_pipe("ner")

            # Add new entity labels to entity recognizer
            labels = []
            for _, entities in TRAIN_DATA:
                e = entities["entities"]
                [labels.append(entity[2]) for entity in e]
            labels = set(labels)
            [ner.add_label(l) for l in labels]

            # Set optimizer
            optimizer = nlp.begin_training()
            # optimizer = nlp.resume_training()

            # Get names of other pipes to disable them during training
            other_pipes = [pipe for pipe in nlp.pipe_names if pipe != "ner"]

            # Only train NER pipe
            with nlp.disable_pipes(*other_pipes):
                # Process our training examples in iterations using shuffle, batches, and dropouts
                sizes = compounding(1, 16, 1.001)
                for itn in range(n_iter):
                    random.shuffle(TRAIN_DATA)

                    batches = minibatch(TRAIN_DATA, size=sizes)
                    losses = {}
                    misses = 0
                    total_items = 0
                    for batch in batches:
                        texts, annotations = zip(*batch)

                        # For each example, nlp.update steps through the words of the input
                        # At each word, it makes a prediction on the text and checks the annotations
                        # If it was wrong, it adjusts its weights
                        try:
                            nlp.update(
                                texts,
                                annotations,
                                sgd=optimizer,
                                drop=0.2,
                                losses=losses,
                            )
                            total_items += 1
                        except Exception as e:
                            misses += 1
                            # print(f"item , error: {e}")
                    # print("Losses", losses)
                    # print("Misses", misses)
                    # print("Total Items", total_items)

            # Save model to output directory
            nlp.meta["name"] = f"juridic-{dataset_size}"
            nlp.to_disk(f"../models/juridic-{dataset_size}")

            mt.run(f"../models/juridic-{dataset_size}", TEST_DATA)
            mtt.run(f"../models/juridic-{dataset_size}", TEST_DATA)

In [25]:
class Metrics:
    def find_matching_item(self, givenItem, givenJson):
        # alias
        selected_text = 3

        for jsonItem in givenJson["entities"]:
            if givenItem[selected_text] == jsonItem[selected_text]:
                return jsonItem
        return None

    def find_not_matching_items(
        self, expectedItems, fullItems, outputItems
    ):  # expected fullJson output
        # Aliases
        selected_text = 3

        missing_items = []

        for fullItem in fullItems["entities"]:
            found = False
            for expectedItem in expectedItems["entities"]:
                if (
                    expectedItem[selected_text] == fullItem[0]
                ):  # fullItem selected_text is misaligned
                    found = True
                    break
            if not found:
                missing_items.append(fullItem)

        count_not_found_in_output = 0
        for missingItem in missing_items:
            for outputItem in outputItems["entities"]:
                if missingItem[selected_text] != outputItem[selected_text]:
                    count_not_found_in_output += 1
                    break

        return count_not_found_in_output

    def create_json_all_items(self, expectedJson):
        # aliases
        source = 0
        items = 1

        all_items = []

        all_itemsJson = [expectedJson[source]]

        for item in expectedJson[items]["entities"]:
            current_items = (
                item[3],  # Text
                item[0],  # Start
                item[1],  # End
                item[2],  # Tag / Type
            )
            all_items.append(current_items)

        current_start = 0
        for item in expectedJson[items]["entities"]:
            current_end = item[0]  # Start
            # if current_end == 0: continue

            text_not_used_item = expectedJson[source][current_start:current_end].strip()

            if text_not_used_item and text_not_used_item.strip() != ".":
                not_used_item = (
                    text_not_used_item,
                    current_start,
                    current_end,
                    "NOT_USED",
                )
                all_items.append(not_used_item)

            current_start = item[1]  # End

        if current_start < len(expectedJson[source]):
            final_text = expectedJson[source][current_start:].strip()

            if final_text and final_text.strip() != ".":
                not_used_item = (
                    final_text,
                    current_start,
                    len(expectedJson[source]),  # End
                    "NOT_USED",
                )
                all_items.append(not_used_item)

        all_itemsJson.append({"entities": all_items})

        return all_itemsJson

    def calculate_accuracy(self, expectedJson, outputJson):
        # aliases
        items = 1
        selected_text = 3
        type = 2

        fullJson = self.create_json_all_items(outputJson)

        expectedTotalItems = len(expectedJson[items]["entities"])
        outputTotalItems = len(outputJson[items]["entities"])

        incorrectMatches = 0
        correctMatches = 0
        itemsFound = 0

        for item in expectedJson[items]["entities"]:
            matchingItem = self.find_matching_item(item, outputJson[items])
            if matchingItem and item[selected_text] == matchingItem[selected_text]:
                correctMatches += 1
                itemsFound += 1
            if matchingItem and item[type] == matchingItem[type]:
                correctMatches += 1
            if matchingItem and item[type] != matchingItem[type]:
                incorrectMatches += 1

        trueNegatives = (
            self.find_not_matching_items(
                expectedJson[items], fullJson[items], outputJson[items]
            )
            * 2
        )
        truePositives = correctMatches
        falsePositives = ((outputTotalItems - itemsFound) * 2) + incorrectMatches
        falseNegatives = (expectedTotalItems - itemsFound) * 2

        return truePositives, falsePositives, trueNegatives, falseNegatives

    def run(self, model, test_data):
        nlp = spacy.load(model)

        expected_list = test_data

        inferred_list = [nlp(item[0]) for item in test_data]
        given_list = []

        for item in inferred_list:
            given_list.append(
                [
                    item.text,
                    {
                        "entities": [
                            (ent.start_char, ent.end_char, ent.label_, ent.text)
                            for ent in item.ents
                        ]
                    },
                ]
            )

        totalTruePositives = 0
        totalFalsePositives = 0
        totalTrueNegatives = 0
        totalFalseNegatives = 0

        for i in range(len(expected_list)):
            (
                truePositives,
                falsePositives,
                trueNegatives,
                falseNegatives,
            ) = self.calculate_accuracy(expected_list[i], given_list[i])

            totalTruePositives += truePositives
            totalFalsePositives += falsePositives
            totalTrueNegatives += trueNegatives
            totalFalseNegatives += falseNegatives

        try:
            accuracy = (truePositives + trueNegatives) / (
                truePositives + falsePositives + trueNegatives + falseNegatives
            )
        except ZeroDivisionError:
            accuracy = 0

        try:
            precision = truePositives / (truePositives + falsePositives)
        except ZeroDivisionError:
            precision = 0

        try:
            recall = truePositives / (truePositives + falseNegatives)
        except ZeroDivisionError:
            recall = 0

        try:
            f1score = 2 * (precision * recall) / (precision + recall)
        except ZeroDivisionError:
            f1score = 0

        model_metrics.loc[int(re.sub("[^\d]", "", nlp.meta["name"]))] = [
            nlp.meta["name"],  # Model name
            accuracy,
            precision,
            recall,
            f1score,
        ]

In [26]:
class TagMetric:
    def __init__(self, tag: str) -> None:
        self.tag = tag
        self.foundItems = 0
        self.correctMatches = 0
        self.incorrectMatches = 0

        self.truePositives = 0
        self.falsePositives = 0
        self.trueNegatives = 0
        self.falseNegatives = 0

In [27]:
class MetricsByTag:
    def find_matching_item(self, givenItem, givenJson):
        # alias
        selected_text = 3

        for jsonItem in givenJson["entities"]:
            if givenItem[selected_text] == jsonItem[selected_text]:
                return jsonItem
        return None

    def find_not_matching_items(
        self, expectedItems, fullItems, outputItems, tag
    ):  # expected fullJson output
        # Aliases
        selected_text = 3
        type = 2

        missing_items = []

        for fullItem in fullItems["entities"]:
            found = False
            for expectedItem in expectedItems["entities"]:
                if (
                    expectedItem[selected_text] == fullItem[0]
                    and expectedItem[type] == tag
                ):  # fullItem selected_text is misaligned
                    found = True
                    break
            if not found:
                missing_items.append(fullItem)

        count_not_found_in_output = 0
        for missingItem in missing_items:
            for outputItem in outputItems["entities"]:
                if (
                    missingItem[selected_text] != outputItem[selected_text]
                    and missingItem[type] == tag
                ):
                    count_not_found_in_output += 1
                    break

        return count_not_found_in_output

    def create_json_all_items(self, expectedJson):
        # aliases
        source = 0
        items = 1

        all_items = []

        all_itemsJson = [expectedJson[source]]

        for item in expectedJson[items]["entities"]:
            current_items = (
                item[3],  # Text
                item[0],  # Start
                item[1],  # End
                item[2],  # Tag / Type
            )
            all_items.append(current_items)

        current_start = 0
        for item in expectedJson[items]["entities"]:
            current_end = item[0]  # Start
            # if current_end == 0: continue

            text_not_used_item = expectedJson[source][current_start:current_end].strip()

            if text_not_used_item and text_not_used_item.strip() != ".":
                not_used_item = (
                    text_not_used_item,
                    current_start,
                    current_end,
                    "NOT_USED",
                )
                all_items.append(not_used_item)

            current_start = item[1]  # End

        if current_start < len(expectedJson[source]):
            final_text = expectedJson[source][current_start:].strip()

            if final_text and final_text.strip() != ".":
                not_used_item = (
                    final_text,
                    current_start,
                    len(expectedJson[source]),  # End
                    "NOT_USED",
                )
                all_items.append(not_used_item)

        all_itemsJson.append({"entities": all_items})

        return all_itemsJson

    def calculate_accuracy(self, expectedJson, outputJson):
        # aliases
        items = 1
        selected_text = 3
        type = 2

        fullJson = self.create_json_all_items(outputJson)

        tagMetrics = []

        for item in expectedJson[items]["entities"]:
            matchingItem = self.find_matching_item(item, outputJson[items])
            if matchingItem and item[selected_text] == matchingItem[selected_text]:
                tagMetric = next((x for x in tagMetrics if x.tag == item[type]), None)
                if tagMetric != None:
                    tagMetric.correctMatches += 1
                    tagMetric.foundItems += 1
                else:
                    newTagMetric = TagMetric(item[type])
                    newTagMetric.correctMatches += 1
                    newTagMetric.foundItems += 1
                    tagMetrics.append(newTagMetric)

            if matchingItem and item[type] == matchingItem[type]:
                tagMetric = next((x for x in tagMetrics if x.tag == item[type]), None)
                if tagMetric != None:
                    tagMetric.correctMatches += 1
                else:
                    newTagMetric = TagMetric(item[type])
                    newTagMetric.correctMatches += 1
                    tagMetrics.append(newTagMetric)

            if matchingItem and item[type] != matchingItem[type]:
                tagMetric = next((x for x in tagMetrics if x.tag == item[type]), None)
                if tagMetric != None:
                    tagMetric.incorrectMatches += 1
                else:
                    newTagMetric = TagMetric(item[type])
                    newTagMetric.incorrectMatches += 1
                    tagMetrics.append(newTagMetric)

        for tagMetric in tagMetrics:
            expectedTotalItems = sum(
                i[type] == tagMetric.tag for i in expectedJson[items]["entities"]
            )
            outputTotalItems = sum(
                i[type] == tagMetric.tag for i in outputJson[items]["entities"]
            )

            tagMetric.trueNegatives = (
                self.find_not_matching_items(
                    expectedJson[items],
                    fullJson[items],
                    outputJson[items],
                    tagMetric.tag,
                )
                * 2
            )

            tagMetric.truePositives = tagMetric.correctMatches
            tagMetric.falsePositives = abs(
                ((outputTotalItems - tagMetric.foundItems) * 2)
                + tagMetric.incorrectMatches
            )
            tagMetric.falseNegatives = abs(
                (expectedTotalItems - tagMetric.foundItems) * 2
            )

        # truePositives = correctMatches
        # falsePositives = ((outputTotalItems - itemsFound) * 2) + incorrectMatches
        # falseNegatives = (expectedTotalItems - itemsFound) * 2
        return tagMetrics

    def run(self, model, test_data):
        nlp = spacy.load(model)

        expected_list = test_data

        inferred_list = [nlp(item[0]) for item in test_data]
        given_list = []

        for item in inferred_list:
            given_list.append(
                [
                    item.text,
                    {
                        "entities": [
                            (ent.start_char, ent.end_char, ent.label_, ent.text)
                            for ent in item.ents
                        ]
                    },
                ]
            )

        tagMetrics = []

        for i in range(len(expected_list)):
            newItems = self.calculate_accuracy(expected_list[i], given_list[i])
            for item in newItems:
                tagMetric = next((x for x in tagMetrics if x.tag == item.tag), None)
                if tagMetric != None:
                    tagMetric.truePositives = item.truePositives
                    tagMetric.falsePositives = item.falsePositives
                    tagMetric.trueNegatives = item.trueNegatives
                    tagMetric.falseNegatives = item.falseNegatives
                else:
                    tagMetrics.append(item)

        for tag in tagMetrics:
            accuracy = 0
            precision = 0
            recall = 0
            f1score = 0

            try:
                accuracy = (tag.truePositives + tag.trueNegatives) / (
                    tag.truePositives
                    + tag.falsePositives
                    + tag.trueNegatives
                    + tag.falseNegatives
                )
            except ZeroDivisionError:
                accuracy = 0

            try:
                precision = tag.truePositives / (tag.truePositives + tag.falsePositives)
            except ZeroDivisionError:
                precision = 0

            try:
                recall = tag.truePositives / (tag.truePositives + tag.falseNegatives)
            except ZeroDivisionError:
                recall = 0

            try:
                f1score = 2 * (precision * recall) / (precision + recall)
            except ZeroDivisionError:
                f1score = 0

            tag_metrics.loc[(int(re.sub("[^\d]", "", nlp.meta["name"])), tag.tag),:] = [
                nlp.meta["name"],  # Model name
                accuracy,
                precision,
                recall,
                f1score,
            ]

In [None]:
fs = Filesystem()
tr = Training()
mt = Metrics()
mtt = MetricsByTag()

plt.style.use('fivethirtyeight')


model_metrics = pd.DataFrame(
    columns=["model_name", "accuracy", "precision", "recall", "f1_score"]
)

tag_metrics = pd.DataFrame(
    columns = ["model_name", "accuracy", "precision", "recall", "f1_score"],
    index = pd.MultiIndex(levels=[[], []], codes=[[], []], names=["dataset_size", "tag"])
)

tr.run()

model_metrics.sort_index(inplace=True)
tag_metrics.sort_index(inplace=True)

print("Performance evolution across different dataset sizes")
display(model_metrics)

model_metrics[["accuracy", "precision", "recall", "f1_score"]].plot()
plt.title("Metrics for Each Model")
plt.xlabel("Dataset Size")
plt.xticks(model_metrics.index)
plt.figure(figsize=(10,6))
plt.show()

print(model_metrics.to_latex())

print("Performance for each tag with the largest dataset model")
best_model_tag_metrics = tag_metrics[["accuracy", "precision", "recall", "f1_score"]].loc[tag_metrics.index.get_level_values(0).max()]

display(best_model_tag_metrics)

best_model_tag_metrics[["accuracy", "precision", "recall", "f1_score"]].plot.bar()
plt.title("Metrics for Each Tag")
plt.legend(loc='center left', bbox_to_anchor=(1, 0.5))
plt.show()

print(best_model_tag_metrics.to_latex())