In [None]:
from datasets.features.features import ClassLabel
import pandas as pd
import numpy as np
from datasets import Dataset
import datasets
import transformers

In [None]:
import itertools
import pandas as pd
import numpy as np
from datasets import Dataset
from datasets import load_metric
from transformers import AutoTokenizer
from transformers import AutoModelForTokenClassification, TrainingArguments, Trainer
from transformers import DataCollatorForTokenClassification
import torch

In [None]:
label_list = ['O', 'B-PER', 'B-LOC']

In [None]:
label_encoding_dict = {'O':0,'B-PER':1,'B-LOC':2}

In [None]:
import os
import random

def read_patent_folders(folder_name, test_set):
  
  train_patents = []
  test_patents = []

  filenames = sorted(os.listdir(folder_name))

  for filename in filenames:
    with open(os.path.join(folder_name, filename), 'r') as f: # open in readonly mode
        lines = f.readlines()
        if filename in test_set:
          test_patents.append(lines)
        else:
          train_patents.append(lines)
  
  return train_patents, test_patents
  

def read_patents(folder_name, test_file_id):
  
  train_patents = []
  test_patents = []

  filenames = sorted(os.listdir(folder_name))

  for filename in filenames:
    with open(os.path.join(folder_name, filename), 'r') as f: # open in readonly mode
        lines = f.readlines()
        if (filename in test_file_id):
          test_patents.append(lines)
        else:
          train_patents.append(lines)
  
  return train_patents, test_patents  

In [None]:

def make_dataset(corpus, tags):
  
  T_prime = []
  L_prime = []
  input_seqs = []
  labels = []
  total_length = 0

  for word, label in zip(corpus, tags):
    tokens = tokenizer([word], is_split_into_words=True, add_special_tokens=False).input_ids
    cur_length = len(tokens)
    
    if (total_length + cur_length) > (MAX_SEQ_LENGTH-2):
        
      # add new sequence to the list
      input_seqs.append(T_prime)
      labels.append(L_prime)
    
      T_prime = []
      L_prime = []
    
      total_length = 0

    T_prime.append(word)
    L_prime.append(label)
    
    total_length+=cur_length

  input_seqs.append(T_prime)
  labels.append(L_prime)

  return input_seqs, labels


In [None]:
def label_to_ner(label):
  dict1 = {'O':'O','B':'B-PER','I':'B-LOC'}
  return dict1[label]
  

def return_sequences(patents):
    
  corpus = []
  tags = []

  input_seqs = []
  labels = []

  for patent in patents:
    # split each patent to get the words and labels
    for line in patent:
      tmp = line.split('\t')
      corpus.append(tmp[0])
      tags.append(label_to_ner(tmp[2].rstrip()))
    
    X, Y = make_dataset_optimized(corpus, tags)
    input_seqs+=(X)
    labels+=(Y)
    
  return input_seqs, labels

In [None]:
def make_dataset_optimized(corpus, tags):
  
  T_prime = []
  L_prime = []
  input_seqs = []
  labels = []
  
  tokenized_corpus = tokenizer(corpus, is_split_into_words=True, add_special_tokens=False)
  word_ids = tokenized_corpus.word_ids()

  token_length=0
  cur_index=0
  total_length = 0

  for elem in word_ids:
    if elem == cur_index:
      token_length+=1
    
    else:
      if (total_length + token_length) > (MAX_SEQ_LENGTH-2):
        # add new sequence to the list
        input_seqs.append(T_prime)
        labels.append(L_prime)
        T_prime = []
        L_prime = []
        total_length = 0
      
      T_prime.append(corpus[cur_index])
      L_prime.append(tags[cur_index])
      total_length+=token_length
      token_length = 1
      cur_index = elem

  T_prime.append(corpus[cur_index])
  L_prime.append(tags[cur_index])
  input_seqs.append(T_prime)
  labels.append(L_prime)

  return input_seqs, labels

In [None]:
def down_sample(input_seqs, labels):
    
    pos_input_seqs = []
    pos_labels = []
    
    neg_input_seqs = []
    neg_labels = []
    
    for i in range(0, len(labels)):
        if ('B-PER' in labels[i] or 'B-LOC' in labels[i]):
            pos_input_seqs.append(input_seqs[i])
            pos_labels.append(labels[i])
            
        else:
            neg_input_seqs.append(input_seqs[i])
            neg_labels.append(labels[i])            

    return pos_input_seqs, pos_labels


In [None]:
def tokenize_and_align_labels(examples):

    tokenized_inputs = tokenizer(list(examples["tokens"]), padding='max_length', truncation=True, max_length=512,
                                 is_split_into_words=True)

    labels = []
    for i, label in enumerate(examples[f"{task}_tags"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:
            if word_idx is None:
                label_ids.append(-100)
            elif label[word_idx] == '0':
                label_ids.append(0)
            elif word_idx != previous_word_idx:
                label_ids.append(label_encoding_dict[label[word_idx]])
            else:
                label_ids.append(label_encoding_dict[label[word_idx]] if label_all_tokens else -100)
            previous_word_idx = word_idx
        labels.append(label_ids)
        
    tokenized_inputs["labels"] = labels
    
    return tokenized_inputs

In [None]:
def compute_metrics(p):
    
    predictions, labels = p
    predictions = np.argmax(predictions, axis=2)

    true_predictions = [[label_list[p] for (p, l) in zip(prediction, label) if l != -100] for prediction, label in zip(predictions, labels)]
    true_labels = [[label_list[l] for (p, l) in zip(prediction, label) if l != -100] for prediction, label in zip(predictions, labels)]

    results = metric.compute(predictions=true_predictions, references=true_labels)
    return {"precision": results["overall_precision"], "recall": results["overall_recall"], 
            "f1": results["overall_f1"], "accuracy": results["overall_accuracy"],
           "prec_B":results["PER"]["precision"], "recall_B":results["PER"]["recall"], "B_count":results["PER"]["number"],
           "prec_I":results["LOC"]["precision"], "recall_I":results["LOC"]["recall"], "I_count":results["LOC"]["number"]}
 

In [None]:
task = "ner" # Should be one of "ner", "pos" or "chunk"

# model_checkpoint = "./bert-base-cased"
model_checkpoint = "./bert-for-patents"
# model_checkpoint = "allenai/scibert_scivocab_cased"
# model_checkpoint = "emilyalsentzer/Bio_ClinicalBERT"
# model_checkpoint = "dmis-lab/biobert-base-cased-v1.1"
# model_checkpoint = "./PatentBERT"
batch_size = 16

label_all_tokens=False
DOWN_SAMPLE=True
LOOCV=True

data_path = 'patents22/'

MAX_SEQ_LENGTH = 512

tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)

assert isinstance(tokenizer, transformers.PreTrainedTokenizerFast)

args = TrainingArguments(
    f"test-{task}",
    evaluation_strategy = "epoch",
    learning_rate=1e-4,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=1,
    num_train_epochs=6,
    weight_decay=1e-5,
)

max_length=512
data_collator = DataCollatorForTokenClassification(tokenizer, padding='max_length', max_length=512, 
                                                    label_pad_token_id=-100)

metric = load_metric("seqeval")


In [None]:
def train_model_with_test_data(test_data):
    
    train_patents, test_patents = read_patents(data_path, test_data)
    
    # Train dataset
    input_seqs_train, labels_train = return_sequences(train_patents)
    if DOWN_SAMPLE==True:
        input_seqs_train, labels_train = down_sample(input_seqs_train, labels_train)
    pd_train = pd.DataFrame([input_seqs_train, labels_train], index=['tokens', 'ner_tags']).transpose()

    # Test dataset
    input_seqs_test, labels_test = return_sequences(test_patents)
    if DOWN_SAMPLE==True:
        input_seqs_test, labels_test = down_sample(input_seqs_test, labels_test)
    pd_test = pd.DataFrame([input_seqs_test, labels_test], index=['tokens', 'ner_tags']).transpose()
    
    train_dataset = Dataset.from_pandas(pd_train)
    test_dataset = Dataset.from_pandas(pd_test)
    
    tokenized_patent_train = train_dataset.map(tokenize_and_align_labels, batched=True)
    tokenized_patent_test = test_dataset.map(tokenize_and_align_labels, batched=True)
    
    model = AutoModelForTokenClassification.from_pretrained(model_checkpoint, num_labels=len(label_list))
       
    trainer = Trainer(
        model,
        args,
        train_dataset=tokenized_patent_train,
        eval_dataset=tokenized_patent_test,
        data_collator=data_collator,
        tokenizer=tokenizer,
        compute_metrics=compute_metrics)
    
    trainer.train()    
    trainer.evaluate()
    
    return

In [None]:
train_patents, test_patents = read_patents(data_path, 'KLJJJ')
    
# Train dataset
input_seqs_train, labels_train = return_sequences(train_patents)
print(len(input_seqs_train))
    

In [None]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
if LOOCV==True:
    train_model_with_test_data('US8168418B2.bio')

In [None]:
if LOOCV==True:
    train_model_with_test_data('US8088361B2.bio')

In [None]:
if LOOCV==True:
    train_model_with_test_data('US8168418B2.bio')

In [None]:
if LOOCV==True:
    train_model_with_test_data('US8058419B2.bio')

In [None]:
if LOOCV==True:
    train_model_with_test_data('US7892537B1.bio')

In [None]:
if LOOCV==True:
    train_model_with_test_data('US7943822B2.bio')

In [None]:
if LOOCV==True:
    train_model_with_test_data('US8048987B2.bio')

In [None]:
if LOOCV==True:
    train_model_with_test_data('US7972611B2.bio')

In [None]:
if LOOCV==True:
    train_model_with_test_data('US8092995B2.bio')

In [None]:
if LOOCV==True:
    train_model_with_test_data('US8106171B2.bio')

In [None]:
if LOOCV==True:
    train_model_with_test_data('US8124829B2.bio')

In [None]:
if LOOCV==True:
    train_model_with_test_data('US8133710B2.bio')

In [None]:
if LOOCV==True:
    train_model_with_test_data('US8148089B2.bio')

In [None]:
if LOOCV==True:
    train_model_with_test_data('US8158348B2.bio')

In [None]:
if LOOCV==True:
    train_model_with_test_data('US8158424B2.bio')

In [None]:
if LOOCV==True:
    train_model_with_test_data('US8227661B2.bio')

In [None]:
if LOOCV==True:
    train_model_with_test_data('US8258289B2.bio')

In [None]:
if LOOCV==True:
    train_model_with_test_data('US8273354B2.bio')

In [None]:
if LOOCV==True:
    train_model_with_test_data('US8293506B2.bio')

In [None]:
if LOOCV==True:
    train_model_with_test_data('US8299100B2.bio')

In [None]:
if LOOCV==True:
    train_model_with_test_data('US8338131B2.bio')

In [None]:
if LOOCV==True:
    train_model_with_test_data('US8409856B2.bio')

In [None]:
if LOOCV==True:
    train_model_with_test_data('US8114637B2.bio')