In [None]:
# Importing necessary packages for performing NER on scientific text related to NLP

import torch
import transformers
from transformers import AutoTokenizer
from transformers import DataCollatorForTokenClassification
from transformers import AutoModelForTokenClassification, TrainingArguments, Trainer
import os
import re
from sklearn.model_selection import train_test_split
from datasets import load_metric
import numpy as np
import gc

In [None]:
def read_train_set(train_set_file_path):
  """
  Given a path to the train set "conll" file, the function reads the "conll" file.
  The reading is done as follows:
  
  1. Each paragraph (raw_doc) is extracted by splitting on double new lines
  2. Each raw_doc is then traversed 
  3. For each raw_doc we split on new line and and then on tab to extract
     the respecitve token and tag pair 

  Args:
      train_set_file_path (str): train set conll file path

  Returns:
      tuple(List[str], List[str]): Tuple where the first value are the tokens and the second
      value is the respective tag 
  """
  with open(train_set_file_path, "r") as fd:
    raw_text = fd.read().strip()
    raw_docs = re.split(r"\n\t?\n", raw_text)
    token_docs = []
    tag_docs = []
    for doc in raw_docs:
      tokens = []
      tags = []
      for line in doc.split("\n"):
        token, tag = line.split("\t") 
        tokens.append(token)
        tags.append(tag)
      token_docs.append(tokens)
      tag_docs.append(tags)
    return token_docs, tag_docs

In [None]:
# Reading the full ner dataset created
full_texts, full_tags = read_train_set(os.path.join(os.getcwd(), "data", "conll", "full_ner_dataset.conll"))

# Reading the partial ner dataset which doesn't contain one file - XLNet.conll (which is used a test set)
partial_texts, partial_tags = read_train_set(os.path.join(os.getcwd(), "data", "conll", "ner_dataset_one_left_out.conll"))

In [None]:
# The partial train dataset is then split into train and validation splits -> 80% - 20%
partial_train_texts, partial_val_texts, partial_train_tags, partial_val_tags = train_test_split(partial_texts, partial_tags, test_size=0.2)

# Reading  our own test set (xlnet) and the actual test set (sciner)
own_test_txt_path = os.path.join(os.getcwd(), "data", "own_test_set", "XLNet.txt")
actual_test_txt_path = os.path.join(os.getcwd(), "data", "final_test_set", "anlp-sciner-test.txt")

# for each test set, we obtain the respective paragraphs by splitting on newline
own_test_paragraphs = None
actual_test_paragraphs = None

with open(own_test_txt_path, "r") as fd:
  content = fd.read()
  own_test_paragraphs = content.split("\n")

with open(actual_test_txt_path, "r") as fd:
  content = fd.read()
  actual_test_paragraphs = content.split("\n")

In [None]:
# Unique tags that our present in out full train dataset
unique_tags = set(tag for doc in full_tags for tag in doc)

# tag2id dictionary which will be useful during training
tag2id = { tag: id for id, tag in enumerate(unique_tags) }

# id2tag dictionary which will be useful to decode predicted named entities
id2tag = { id: tag for tag, id in tag2id.items() }

In [None]:
# The three models we tested, their repective tokenizers are intialized here
# 1. BERT BASE CASED
# 2. SCIBERT CASED
# 3. SCIBERT UNCASED

tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")
# tokenizer = AutoTokenizer.from_pretrained("allenai/scibert_scivocab_cased", do_lower_case=False)
# tokenizer = AutoTokenizer.from_pretrained("allenai/scibert_scivocab_uncased")

# Getting the partial train, validation and full train dataset encodings
partial_train_encodings = tokenizer(partial_train_texts, is_split_into_words=True, truncation=True, padding=True, max_length=512)
partial_val_encodings = tokenizer(partial_val_texts, is_split_into_words=True, truncation=True, padding=True, max_length=512)

full_train_encodings = tokenizer(full_texts, is_split_into_words=True, truncation=True, padding=True, max_length=512)

In [None]:
def align_labels(tags, encodings):
  """
  The toknizers used previously added the special tokens used by the model and also
  certain words which were not part of it's dictionary was tokenized into two subwords.
  This introduces a mismatch between our inputs and the labels. 

  To resolve this here first rule we will ll apply is that special tokens get a label of -100
  which will be ignored during loss calculation. Then, each token gets the same label as the token 
  that started the word its inside, since they are part of the same entity

  Args:
      tags (List[str]): tags generated previously
      encodings (List): encodings generated by the tokenizer
  
  Returns:
      List[List]: aligned labels 
  """
  labels = []
  for i, label in enumerate(tags):
    word_ids = encodings.word_ids(batch_index=i) 
    previous_word_idx = None
    label_ids = []
    for word_idx in word_ids:
      if word_idx is None:
          label_ids.append(-100)
      elif word_idx != previous_word_idx:
          label_ids.append(tag2id[label[word_idx]])
      else:
          label_ids.append(-100)
      previous_word_idx = word_idx
    labels.append(label_ids)
  return labels

In [None]:
# aligning labels for partial train, validation sets and full train set
partial_train_labels = align_labels(partial_train_tags, partial_train_encodings)
partial_val_labels = align_labels(partial_val_tags, partial_val_encodings)

full_train_labels = align_labels(full_tags, full_train_encodings)

In [None]:
metric = load_metric("seqeval")

def compute_metrics(eval_preds):
    """
    Definig metrics to evaluation our predictions on wich is done using
    the Seqeval framework.

    Ref: https://vkhangpham.medium.com/build-a-custom-ner-pipeline-with-hugging-face-a84d09e03d88

    Args:
        eval_preds: Predictions done on evaluation set

    Returns:
        dict: metrics dictionary which contains - precision, recall, f1 and accuracy
    """
    logits, labels = eval_preds
    predictions = np.argmax(logits, axis=-1)

    true_labels = [[id2tag[l] for l in label if l != -100] for label in labels]
    true_predictions = [
        [id2tag[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    all_metrics = metric.compute(predictions=true_predictions, references=true_labels)
    return {
        "precision": all_metrics["overall_precision"],
        "recall": all_metrics["overall_recall"],
        "f1": all_metrics["overall_f1"],
        "accuracy": all_metrics["overall_accuracy"],
    }

In [None]:
# Defining the data collator to feed into the Trainer API from HuggingFace

data_collator = DataCollatorForTokenClassification(tokenizer)

In [None]:
# Defining the three models which were experimented uppon
# 1. BERT BASE CASED
# 2. SCIBERT CASED
# 3. SCIBERT UNCASED

model = AutoModelForTokenClassification.from_pretrained("bert-base-cased", num_labels=len(unique_tags), ignore_mismatched_sizes=True)
# model = AutoModelForTokenClassification.from_pretrained("allenai/scibert_scivocab_cased", num_labels=len(unique_tags), ignore_mismatched_sizes=True)
# model = AutoModelForTokenClassification.from_pretrained("allenai/scibert_scivocab_uncased", num_labels=len(unique_tags), ignore_mismatched_sizes=True)

In [None]:
class NERDataset(torch.utils.data.Dataset):
    """
    Torch Dataset created for NER training process
    """
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item["labels"] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

# Initializing the partial train and validation datasets and full train dataset for
# final model training

partial_train_dataset = NERDataset(partial_train_encodings, partial_train_labels)
partial_val_dataset = NERDataset(partial_val_encodings, partial_val_labels)

full_train_dataset = NERDataset(full_train_encodings, full_train_labels)

In [None]:
# In case we face Cuda OOM issues 

gc.collect()
torch.cuda.empty_cache()

In [None]:
# Definign Training Arguments for partial dataset training and validation

training_args = TrainingArguments(
    output_dir="./results",
    evaluation_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=32,
    per_device_eval_batch_size=32,
    num_train_epochs=25,
    weight_decay=0.01,
    report_to=None
)

# Initializing a Trainer instance using the above defined arguments

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=partial_train_dataset,
    eval_dataset=partial_val_dataset,
    data_collator=data_collator,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
)

In [None]:
# Calling train and evaluate - to train the model and see the performance

trainer.train()
trainer.evaluate()

In [None]:
# save the partial model

trainer.save_model('./partial_saved_model')

In [None]:
# load the partial model

model_checkpoint = "./partial_saved_model"

In [None]:
def process_test_set(paragraphs):
  """
  The function proceses the paragraphs generated from the test and returns back the final
  conll content as string which needs to be written on disk (and submitted)

  Essentially this function implements one way to get the entites for the original words. This
  is achieved by find the index of the first subword of the original word and used that subword 
  index to get the predicted tag/entity value 

  Args:
      paragraphs (List): list of paragrpahs

  Returns:
      str: conll string content which needs to be written onto a .conll file 
  """
  # result list to store the conll content
  test_set_result = []

  # iterating over each paragraph
  for i, paragraph in enumerate(paragraphs):
    # encoding the paragraph
    encoded_paragraph = tokenizer.encode(paragraph, truncation=True, max_length=512)
    input_ids = torch.tensor(encoded_paragraph).unsqueeze(0)
    # fetching the word_ids
    word_ids = tokenizer(paragraph.split(" "), return_tensors="pt", is_split_into_words=True).word_ids()

    # this condition happens due to extra new line at the end sometimes
    if all(id is None for id in word_ids):
        continue
    
    # inference to fetch model logits
    with torch.no_grad():
      input_ids = input_ids.to("cuda")
      outputs = model(input_ids)

    # extracting the predictions by taking an argmax on logits
    predictions = outputs[0].argmax(axis=-1)[0][1:-1]

    # extract words in the test set paragraph
    paragraph_words = paragraph.split(" ")

    # this loop essentially maps the words extracted before to it's respective tag/entity
    for j, word in enumerate(paragraph_words):
      
      # due to tokenization misalignment word might not be in word_ids
      if j not in word_ids:
        test_set_result.append(f"{word}\tO\n")
      else:
        first_subword_index = word_ids.index(j) - 1
        # O token probably special tokens
        if first_subword_index >= len(predictions):
          test_set_result.append(f"{word}\tO\n")
        else:
          first_subword_tag = id2tag[predictions[first_subword_index].item()]
          test_set_result.append(f"{word}\t{first_subword_tag}\n")

    test_set_result.append("\n")
  
  return test_set_result

In [None]:
# processing our generated test set - xlnet

own_test_set_result = process_test_set(own_test_paragraphs)

In [None]:
# Write the predictions onto a file system as a conll file

content = "".join(own_test_set_result)
output_s_conll_file_path = os.path.join(os.getcwd(), f"XLNet-result.conll")
with open(output_s_conll_file_path, 'w') as fd:
  fd.write(content)

In [None]:
# to prevent Cuda OOM Issues

gc.collect()
torch.cuda.empty_cache()

In [None]:
# Definign Training Arguments for full dataset training

training_args = TrainingArguments(
    output_dir="./results",
    learning_rate=2e-5,
    per_device_train_batch_size=32,
    per_device_eval_batch_size=32,
    num_train_epochs=25,
    weight_decay=0.01,
    report_to=None
)

# Initializing a Trainer instance using the above defined arguments

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=full_train_dataset,
    data_collator=data_collator,
    tokenizer=tokenizer,
)

In [None]:
# Call the train method of Trainer to train the model

trainer.train()

In [None]:
# Save the model
# All three models are present with us and we can share it if requested

trainer.save_model('./full_saved_model')
model_checkpoint = "./full_saved_model"

In [None]:
# processing our generated test set - sciner

actual_test_set_result = process_test_set(actual_test_paragraphs)

In [None]:
# Write the predictions onto a file system as a conll file

content = "".join(actual_test_set_result)
output_s_conll_file_path = os.path.join(os.getcwd(), f"sciner-mysys.conll")
with open(output_s_conll_file_path, 'w') as fd:
  fd.write(content)