In [None]:
# !pip install --user spacy
# !pip install --user tqdm

In [None]:
import spacy
import json
import os
from spacy.tokens import Doc
from spacy.tokens import DocBin
from spacy.util import filter_spans
from spacy.training.example import Example
from spacy.scorer import Scorer
from collections import defaultdict

# from sklearn.metrics import classification_report

path = ""

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
# !python --user -m spacy download en_core_web_lg

In [None]:
# !pip install https://github.com/explosion/spacy-models/releases/download/en_core_web_lg-3.6.0/en_core_web_lg-3.6.0.tar.gz

In [None]:
# Multiple Text and Ann files of Train and Test into one single JSON file


def convert_ann_to_json(ann_file_path):
    annotations = []
    with open(ann_file_path, "r") as file:
        for line in file.readlines():
            if line.startswith("T"):
                semicolon_count = line.count(";")
                if semicolon_count == 0:
                    annotation_data = {
                        "id": line.split()[0],
                        "tag": line.split()[1],
                        "start": line.split()[2],
                        "end": line.split()[3],
                        "text": " ".join(line.split()[4:]),
                    }
                elif semicolon_count == 1:
                    annotation_data = {
                        "id": line.split()[0],
                        "tag": line.split()[1],
                        "start": line.split()[2],
                        "end": line.split()[4],
                        "text": " ".join(line.split()[5:]),
                    }
                else:  # semicolon_count > 1
                    index_offset = semicolon_count + 1  # Adjust for extra semicolons
                    annotation_data = {
                        "id": line.split()[0],
                        "tag": line.split()[1],
                        "start": line.split()[2],
                        "end": line.split()[2 + index_offset],
                        "text": " ".join(line.split()[2 + index_offset + 1 :]),
                    }
                annotations.append(annotation_data)

    return annotations


def convert_multiple_files_to_json(ann_folder_path, output_json_file):
    all_data = []
    for filename in os.listdir(ann_folder_path):
        if filename.endswith(".ann"):
            ann_file_path = os.path.join(ann_folder_path, filename)
            annotations = convert_ann_to_json(ann_file_path)

            txt_file_path = os.path.join(
                ann_folder_path, filename.replace(".ann", ".txt")
            )
            if os.path.exists(txt_file_path):
                with open(txt_file_path, "r") as txt_file:
                    text_content = txt_file.read()
            else:
                text_content = "No text content found for this file."

            filename_without_extension = os.path.splitext(filename)[0]

            file_info = {
                "file_name": filename_without_extension,
                "content": text_content,
                "annotations": annotations,
                "file_start": 0,
            }
            all_data.append(file_info)

    with open(output_json_file, "w") as json_file:
        json.dump(all_data, json_file, indent=4)


train_ann_folder_path = path + "train"
train_output_json_file = path + "train_txt_ann.json"
test_ann_folder_path = path + "test"
test_output_json_file = path + "test_txt_ann.json"

convert_multiple_files_to_json(train_ann_folder_path, train_output_json_file)
convert_multiple_files_to_json(test_ann_folder_path, test_output_json_file)

In [None]:
# Build Random Data

with open(path + "train_txt_ann.json", "r") as f:
    full_data = json.load(f)

split_index = int(len(full_data) * 0.9)
train_files_index = full_data[:split_index]
dev_files_index = full_data[split_index:]


def process_data(data):
    processed_data = []
    for item in data:
        temp_dict = {"text": item["content"], "entities": []}
        for annotation in item["annotations"]:
            start = int(annotation["start"])
            end = int(annotation["end"])
            label = annotation["tag"].upper()
            temp_dict["entities"].append((start, end, label))
        processed_data.append(temp_dict)
    return processed_data


training_data = process_data(train_files_index)
dev_data = process_data(dev_files_index)

print(f"Lengths of Training Data = {len(training_data)} Dev data = {len(dev_data)}")

In [None]:
# Delete the previous train docbins

import os

folder_path = "Train_docbin"

for filename in os.listdir(folder_path):
    file_path = os.path.join(folder_path, filename)
    if os.path.isfile(file_path):
        os.remove(file_path)

# Build Train Data into a stream of files

nlp = spacy.blank("en")
doc_bin = DocBin()

bin_size_threshold = 25  # Adjust this as needed

skipped = 0
bin_counter = 0

for training_example in training_data:
    text = training_example["text"]
    labels = training_example["entities"]
    doc = nlp.make_doc(text)
    ents = []
    for start, end, label in labels:
        span = doc.char_span(start, end, label=label, alignment_mode="contract")
        if span is None:
            skipped += 1
        else:
            ents.append(span)
    filtered_ents = filter_spans(ents)
    doc.ents = filtered_ents
    doc_bin.add(doc)

    # Check if the bin size threshold is reached
    if len(doc_bin) >= bin_size_threshold:
        # Serialize the current DocBin
        with open(path + f"Train_docbin/train_bin_{bin_counter}.spacy", "wb") as f:
            f.write(doc_bin.to_bytes())
        # Reset the DocBin and increment the bin counter
        doc_bin = DocBin()
        bin_counter += 1

print(f"Skipped Train Entities: {skipped}")

# Serialize any remaining documents in the last bin
if len(doc_bin) > 0:
    with open(path + f"Train_docbin/train_bin_{bin_counter}.spacy", "wb") as f:
        f.write(doc_bin.to_bytes())

In [None]:
nlp = spacy.blank("en")
doc_bin = DocBin()

# Build Train Data

skipped = 0
for training_example in training_data:
    text = training_example["text"]
    labels = training_example["entities"]
    doc = nlp.make_doc(text)
    ents = []
    for start, end, label in labels:
        span = doc.char_span(start, end, label=label, alignment_mode="contract")
        if span is None:
            skipped += 1
        #             print("Skipping entity")
        else:
            ents.append(span)
    filtered_ents = filter_spans(ents)
    doc.ents = filtered_ents
    doc_bin.add(doc)

print(f"Skipped Entities: {skipped}")

doc_bin.to_disk(path + "train.spacy")

In [None]:
# Load a Blank Model

nlp = spacy.blank("en")
dev_doc_bin = DocBin()

# Build Dev Data into one Single file

skipped = 0
for dev_example in dev_data:
    text = dev_example["text"]
    labels = dev_example["entities"]
    doc = nlp.make_doc(text)
    ents = []
    for start, end, label in labels:
        span = doc.char_span(start, end, label=label, alignment_mode="contract")
        if span is None:
            skipped += 1
        #             print("Skipping entity")
        else:
            ents.append(span)
    filtered_ents = filter_spans(ents)
    doc.ents = filtered_ents
    dev_doc_bin.add(doc)

print(f"Skipped Dev Entities: {skipped}")

dev_doc_bin.to_disk(path + "dev.spacy")

In [None]:
# Build config file

%run -m spacy init fill-config base_config.cfg config.cfg
# Edit config.cfg as needed

In [None]:
%%capture captured_output
%%time
# Train

# # Command to run train on all data at once in single docbin
# %run -m spacy train config.cfg --output ./ --paths.train ./train.spacy --paths.dev ./train.spacy

# command to run train on a stream of multiple docbins
%run -m spacy train config.cfg --output ./ --paths.train ./train.spacy --paths.dev ./dev.spacy

In [None]:
with open(f"Cmd_Text/CMD_Output_{len(full_data)}f.txt", "w") as f:
    f.write(captured_output.stdout)
    f.write(f"{len(full_data)} Files")

In [None]:
# # All Trained models
Model_303f = "models/model-best-303f"

In [None]:
# Load Best Model
nlp_ner = spacy.load(path + Model_303f)

Named Entity Recognition on Test file

In [None]:
with open(path + "test/105446.txt", "r") as f:
    text = f.read()

doc = nlp_ner(text)

colors = {
    "DRUG": "#1c7ea0",
    "STRENGTH": "#97cdca",
    "REASON": "#20d133",
    "FORM": "#0c8d8e",
    "FREQUENCY": "#b47359",
    "ADE": "#ef3921",
    "DOSAGE": "#8ec0e9",
    "DURATION": "#820bf0",
    "ROUTE": "#9a989f",
}
options = {"colors": colors}

spacy.displacy.render(doc, style="ent", options=options, jupyter=True)

Scores of All the Test files

In [None]:
%%time

with open(path + "test_txt_ann.json", "r") as file:
    list_of_json_data = json.load(file)

Model = Model_303f

start_index = Model.find("best-") + len("best-")
no_of_files = Model[start_index:]

accuracy_results = []

colors = {
    "DRUG": "#1c7ea0",
    "STRENGTH": "#97cdca",
    "REASON": "#20d133",
    "FORM": "#0c8d8e",
    "FREQUENCY": "#b47359",
    "ADE": "#ef3921",
    "DOSAGE": "#8ec0e9",
    "DURATION": "#820bf0",
    "ROUTE": "#9a989f",
}
options = {"colors": colors}

with open(
    path + f"Scores/Other_Scores/Accuracy_details_{no_of_files}.txt", "w"
) as accuracy_file, open(
    path + f"Scores/Other_Scores/Entity_details_{no_of_files}.txt", "w"
) as entity_file, open(
    path + f"Scores/Other_Scores/Score_details_{no_of_files}.txt", "w"
) as score_file, open(
    path + f"Scores/All_scores/All_Scores_details_{no_of_files}.txt", "w"
) as all_scores_file:

    precision_scores = defaultdict(list)
    recall_scores = defaultdict(list)
    f1_scores = defaultdict(list)

    score_file.write(f"Model Name: {Model}\n\n")

    # [:5] for first 5 records
    for json_data in list_of_json_data:

        # Load Model
        nlp = spacy.load(path + Model)

        file_name = json_data["file_name"]

        test_text = json_data["content"]

        doc = nlp(test_text)

        spacy_entities = [(ent.text, ent.label_) for ent in doc.ents]

        annotated_entities = [
            (annotation["text"], annotation["tag"].upper())
            for annotation in json_data["annotations"]
        ]
        # spacy.displacy.render(doc, style="ent", options= options, jupyter=True)

        correct_count = 0
        for spacy_entity in spacy_entities:
            if spacy_entity in annotated_entities:
                correct_count += 1

        # Calculating Accuracy
        accuracy = (
            (correct_count / len(annotated_entities)) * 100 if spacy_entities else 0
        )
        accuracy_results.append({"file_name": file_name, "accuracy": accuracy})

        accuracy_file.write(f"Model Name: {Model}\n")
        accuracy_file.write(f"Entities for file: {file_name}\n")
        accuracy_file.write(f"Accuracy: {accuracy}\n\n")

        entity_file.write(f"Entities for file: {file_name}\n")
        entity_file.write(f"Accuracy: {accuracy}\n\n")
        entity_file.write(
            f"No of Entities discovered by SpaCy: {len(spacy_entities)}\n\n"
        )
        entity_file.write(f"No of Actual Entities: {len(annotated_entities)}\n\n")
        for entity, label in spacy_entities:
            entity_file.write(f"{entity} --- {label}\n")
        entity_file.write("\n\n")

        # Calculate F1 score
        true_positive = defaultdict(int)
        false_positive = defaultdict(int)
        false_negative = defaultdict(int)

        for annotated_entity in annotated_entities:
            if annotated_entity in spacy_entities:
                true_positive[annotated_entity[1]] += 1
            else:
                false_negative[annotated_entity[1]] += 1

        for spacy_entity in spacy_entities:
            if spacy_entity not in annotated_entities:
                false_positive[spacy_entity[1]] += 1

        tags = list(
            set(
                list(true_positive.keys())
                + list(false_positive.keys())
                + list(false_negative.keys())
            )
        )

        score_file.write(f"Score for file: {file_name}\n")

        for tag in tags:
            precision = (
                true_positive[tag] / (true_positive[tag] + false_positive[tag])
                if (true_positive[tag] + false_positive[tag]) > 0
                else 0
            )
            recall = (
                true_positive[tag] / (true_positive[tag] + false_negative[tag])
                if (true_positive[tag] + false_negative[tag]) > 0
                else 0
            )
            f1_score = (
                (2 * precision * recall) / (precision + recall)
                if (precision + recall) > 0
                else 0
            )

            precision_scores[tag].append(precision)
            recall_scores[tag].append(recall)
            f1_scores[tag].append(f1_score)

            score_file.write(f"Tag: {tag}\n")
            score_file.write(f"Precision: {precision}\n")
            score_file.write(f"Recall: {recall}\n")
            score_file.write(f"F1-Score: {f1_score}\n")
            score_file.write(f"\n")
        score_file.write(f"---------------\n")

    # Calculate mean of precision, recall and f1 score
    all_scores_file.write(f"Model Name: {Model}\n\n")
    all_scores_file.write(f"Average Scores:\n")
    all_scores_file.write(f"---------------------------\n")

    mean_precision_scores = {}
    mean_recall_scores = {}
    mean_f1_scores = {}

    # Mean of Precision Scores
    all_scores_file.write(f"Mean Precision Scores:\n")
    for tag, precision_score in precision_scores.items():
        mean_precision_scores[tag] = sum(precision_score) / len(precision_score)
    for tag, mean_precision in dict(sorted(mean_precision_scores.items())).items():
        all_scores_file.write(f"{tag} : {mean_precision}\n")
    all_scores_file.write("---------------------------\n")

    # Mean of Recall Scores
    all_scores_file.write(f"Mean Recall Scores:\n")
    for tag, recall_score in recall_scores.items():
        mean_recall_scores[tag] = sum(recall_score) / len(recall_score)
    for tag, mean_recall in dict(sorted(mean_recall_scores.items())).items():
        all_scores_file.write(f"{tag} : {mean_recall}\n")
    all_scores_file.write("---------------------------\n")

    # Mean of F1 Scores
    all_scores_file.write(f"Mean F1 Scores:\n")
    for tag, f1_score in f1_scores.items():
        mean_f1_scores[tag] = sum(f1_score) / len(f1_score)
    for tag, mean_f1 in dict(sorted(mean_f1_scores.items())).items():
        all_scores_file.write(f"{tag} : {mean_f1}\n")
    all_scores_file.write("---------------------------\n")

    mean_accuracy = sum([result["accuracy"] for result in accuracy_results]) / len(
        accuracy_results
    )
    all_scores_file.write(f"Mean Accuracy of test result: {mean_accuracy}\n")

print(f"Final Mean Accuracy: {mean_accuracy}")
print(Model)

Final Mean Accuracy: 79.25598098237857
models/model-best-273f-100e
CPU times: user 10min 4s, sys: 1min 37s, total: 11min 41s
Wall time: 12min 17s
