<a href="https://colab.research.google.com/github/aimanyounises1/NLP_WEB/blob/master/NER_model_BIO.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install transformers


# New Section

In [None]:
!pip install cuda


In [None]:
!pip install seqeval
!nvidia-smi


In [None]:
import pandas as pd
import numpy as np
from tqdm import tqdm, trange
from collections import Counter
import matplotlib.pyplot as plt
def read_AnEM(path):
    with open(path,'r',encoding='utf-8') as f:
      data = []
      sentence = []
      label = []
      for line in f:
        if line=='\n':
          if len(sentence) > 0:
            data.append((sentence,label))
            sentence = []
            label = []
          continue
        splits = line.split()
        sentence.append(splits[0])
        label.append(splits[3])
      if len(sentence) > 0:
        data.append((sentence,label))
    return data


In [None]:
path = "/content/"
data_train = read_AnEM(path + "AnEM.train.txt")
data_test = read_AnEM(path + "AnEM.test.txt")
print(Counter([ label for sentence in data_test for label in sentence[1]]))
print(Counter([ label for sentence in data_train for label in sentence[1]]))

label_list = list(Counter([ label for sentence in data_train for label in sentence[1]]).keys())
print(label_list)
#I-Developing_anatomical_structure

In [None]:
class Dataset:
    def __init__(
        self,
        name,
        train,
        test,
        label_list,
    ):
        self.name = name
        self.train = train
        self.test = test
        self.label_list = label_list

all_datasets = []

In [None]:
data_AJGT = Dataset("AnER", data_train, data_test, label_list)
all_datasets.append(data_AJGT)

In [None]:
import numpy as np
from seqeval.metrics import accuracy_score, f1_score, precision_score, recall_score, classification_report

from transformers import AutoConfig, AutoModelForTokenClassification, AutoTokenizer
from transformers import Trainer , TrainingArguments
from transformers.trainer_utils import EvaluationStrategy
from transformers.data.processors.utils import InputFeatures
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from sklearn.utils import resample
import logging
import torch


In [None]:
dataset_name ="AnER"
model_name = 'bert-base-cased'
task_name = 'tokenclassification'

In [None]:
for d in all_datasets:
  if d.name==dataset_name:
    selected_dataset = d
    print('Dataset found')
    break

In [None]:
class NERDataset:
  def __init__(self, texts, tags, label_list, model_name, max_length):
    self.texts = texts
    self.tags = tags
    self.label_map = {label: i for i, label in enumerate(label_list)}
    self.pad_token_label_id = torch.nn.CrossEntropyLoss().ignore_index
    # Use cross entropy ignore_index as padding label id so that only
    # real label ids contribute to the loss later.
    self.tokenizer = AutoTokenizer.from_pretrained(model_name)
    self.max_length = max_length

     
  def __len__(self):
    return len(self.texts)
  
  def __getitem__(self, item):
    textlist = self.texts[item]
    tags = self.tags[item]

    tokens = []
    label_ids = []
    for word, label in zip(textlist, tags):      
      word_tokens = self.tokenizer.tokenize(word)

      if len(word_tokens) > 0:
        tokens.extend(word_tokens)    
        # Use the real label id for the first token of the word, and padding ids for the remaining tokens
        label_ids.extend([self.label_map[label]] + [self.pad_token_label_id] * (len(word_tokens) - 1))
 
    # Account for [CLS] and [SEP] with "- 2" and with "- 3" for RoBERTa.
    special_tokens_count = self.tokenizer.num_special_tokens_to_add()
    if len(tokens) > self.max_length - special_tokens_count:
      tokens = tokens[: (self.max_length - special_tokens_count)]
      label_ids = label_ids[: (self.max_length - special_tokens_count)]
  
    #Add the [SEP] token
    tokens += [self.tokenizer.sep_token]
    label_ids += [self.pad_token_label_id]
    token_type_ids = [0] * len(tokens)

    #Add the [CLS] TOKEN
    tokens = [self.tokenizer.cls_token] + tokens
    label_ids = [self.pad_token_label_id] + label_ids
    token_type_ids = [0] + token_type_ids

    input_ids = self.tokenizer.convert_tokens_to_ids(tokens)

    # The mask has 1 for real tokens and 0 for padding tokens. Only real
    # tokens are attended to.
    attention_mask = [1] * len(input_ids)

    # Zero-pad up to the sequence length.
    padding_length = self.max_length - len(input_ids)

    input_ids += [self.tokenizer.pad_token_id] * padding_length
    attention_mask += [0] * padding_length
    token_type_ids += [0] * padding_length
    label_ids += [self.pad_token_label_id] * padding_length

    assert len(input_ids) == self.max_length
    assert len(attention_mask) == self.max_length
    assert len(token_type_ids) == self.max_length
    assert len(label_ids) == self.max_length

    # if item < 5:
    #   print("*** Example ***")
    #   print("tokens:", " ".join([str(x) for x in tokens]))
    #   print("input_ids:", " ".join([str(x) for x in input_ids]))
    #   print("attention_mask:", " ".join([str(x) for x in attention_mask]))
    #   print("token_type_ids:", " ".join([str(x) for x in token_type_ids]))
    #   print("label_ids:", " ".join([str(x) for x in label_ids]))
    
    return {
        'input_ids' : torch.tensor(input_ids, dtype=torch.long),
        'attention_mask' : torch.tensor(attention_mask, dtype=torch.long),
        'token_type_ids' : torch.tensor(token_type_ids, dtype=torch.long),
        'labels' : torch.tensor(label_ids, dtype=torch.long)       
    }

In [None]:
label_map = { v:index for index, v in enumerate(selected_dataset.label_list) }
print(label_map)

for x in selected_dataset.train:
  print(x[0])
  break

train_dataset = NERDataset(
    texts=[x[0] for x in selected_dataset.train],
    tags=[x[1] for x in selected_dataset.train],
    label_list=selected_dataset.label_list,
    model_name=model_name,
    max_length=256
    )

test_dataset = NERDataset(
    texts=[x[0] for x in selected_dataset.test],
    tags=[x[1] for x in selected_dataset.test],
    label_list=selected_dataset.label_list,
    model_name=model_name,
    max_length=256
    )

In [None]:
from transformers import BertForSequenceClassification
def model_init():
    return AutoModelForTokenClassification.from_pretrained(model_name, return_dict=True, num_labels=len(label_map))

In [None]:
inv_label_map = {i: label for i, label in enumerate(label_list)}

def align_predictions(predictions, label_ids):
    preds = np.argmax(predictions, axis=2)
    print(preds.shape)
    batch_size, seq_len = preds.shape

    out_label_list = [[] for _ in range(batch_size)]
    preds_list = [[] for _ in range(batch_size)]

    for i in range(batch_size):
        for j in range(seq_len):
            if label_ids[i, j] != torch.nn.CrossEntropyLoss().ignore_index:
                out_label_list[i].append(inv_label_map[label_ids[i][j]])
                preds_list[i].append(inv_label_map[preds[i][j]])

    return preds_list, out_label_list

def compute_metrics(p):
    preds_list, out_label_list = align_predictions(p.predictions,p.label_ids)
    #print(classification_report(out_label_list, preds_list,digits=4))
    return {
        "accuracy_score": accuracy_score(out_label_list, preds_list),
        "precision": precision_score(out_label_list, preds_list),
        "recall": recall_score(out_label_list, preds_list),
        "f1": f1_score(out_label_list, preds_list),
    }

In [None]:
training_args = TrainingArguments("./train")
training_args.evaluate_during_training = True
training_args.adam_epsilon = 1e-8
training_args.learning_rate = 5e-5
training_args.fp16 = True
training_args.per_device_train_batch_size = 16
training_args.per_device_eval_batch_size = 16
training_args.gradient_accumulation_steps = 2
training_args.num_train_epochs= 8


steps_per_epoch = (len(selected_dataset.train)// (training_args.per_device_train_batch_size * training_args.gradient_accumulation_steps))
total_steps = steps_per_epoch * training_args.num_train_epochs
print(steps_per_epoch)
print(total_steps)
#Warmup_ratio
warmup_ratio = 0.1
training_args.warmup_steps = total_steps*warmup_ratio

training_args.evaluation_strategy = EvaluationStrategy.EPOCH
# training_args.logging_steps = 200
training_args.save_steps = 100000 #don't want to save any model
training_args.seed = 42
training_args.disable_tqdm = False
training_args.lr_scheduler_type = 'cosine'

In [None]:
steps_per_epoch = (len(selected_dataset.train)// (training_args.per_device_train_batch_size * training_args.gradient_accumulation_steps))
total_steps = steps_per_epoch * training_args.num_train_epochs
print(steps_per_epoch)
print(total_steps)

In [None]:
import torch
torch.cuda.empty_cache()



In [None]:
trainer = Trainer(
    args=training_args,
    train_dataset=train_dataset, 
    eval_dataset=test_dataset, 
    model_init=model_init,
    compute_metrics=compute_metrics,

)

In [None]:
def my_hp_space(trial):
    return {
        "learning_rate": trial.suggest_float("learning_rate", 2e-5, 7e-5, step=1e-5),
        "seed": trial.suggest_categorical("seed", [0, 1, 42, 666, 123, 12345]),
        "warmup_steps": trial.suggest_int("warmup_steps",0,total_steps*0.1,step=total_steps*0.1*0.5)
    }

search_space = {
    "learning_rate":  list(np.arange(2e-5, 7e-5, 1e-5)),
    "seed":  [0, 1, 42, 666, 123, 12345],
    "warmup_steps": list(range(0, int((total_steps)*0.1)+1, int(total_steps*0.1*0.5)))
}
search_space

In [None]:
def my_objective(metrics):
    return metrics['eval_f1']

In [None]:
trainer.train()

In [None]:
import torch
torch.save(trainer.model , "bio_model.pth")

In [None]:
model = torch.load("/content/drive/MyDrive/Models/bio_model.pth" , map_location= torch.device('cpu'))

In [None]:
tokenizer = AutoTokenizer.from_pretrained(model_name)

In [None]:
from transformers import pipeline
print(label_map)
key_list = list(label_map.keys())
val_list = list(label_map.values())

nlp = pipeline('ner' , model = model , tokenizer = tokenizer, aggregation_strategy="max")
text = "Creatine is very important amino acid for bones and brain and hippocampous"
with torch.no_grad():
  output = nlp(text)

for entity in output:
  labels = entity['entity_group']
  pos = labels.split("_")
  position = val_list.index(int(pos[-1]))
  val = key_list[position]
  print(val)
  entity["entity_group"] = val
  if val != "O":
      val = val.replace('_' , ' ')
      entity["entity_group"] = val[2:]

  print(entity["entity_group"])


for entity in output:
  print(entity)