<a href="https://colab.research.google.com/github/Falk358/Bert_NER/blob/main/bert_ner.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# BERT Named Entity Recognition
this colab notebook was strongly inspired by this tutorial: https://towardsdatascience.com/named-entity-recognition-with-bert-in-pytorch-a454405e0b6a


In [1]:
!pip install transformers

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
from pandas import DataFrame as df
from pandas import read_csv
from transformers import BertTokenizerFast
from transformers.tokenization_utils_base import BatchEncoding
import torch
import numpy as np
from tqdm import tqdm

First, lets start by loading the dataset uploaded to our github repo:

the dataset is for Named Entitity recognition and provides the following labels:

1. `geo` for geographical entity
2. `org` for organization entity
3. `per` for person entity
4. `gpe` for geopolitical entity
5. `tim` for time indicator entity
6. `art` for artifact entity
7. `eve` for event entity
8. `nat` for natural phenomenon entity
9. `0` if the word doesn't belong to any above label


In [3]:
dataset_url = "https://raw.githubusercontent.com/Falk358/Bert_NER/main/dataset/ner.csv"

dataset_df = read_csv(dataset_url)
dataset_df.head()


Unnamed: 0,text,labels
0,Thousands of demonstrators have marched throug...,O O O O O O B-geo O O O O O B-geo O O O O O B-...
1,Iranian officials say they expect to get acces...,B-gpe O O O O O O O O O O O O O O B-tim O O O ...
2,Helicopter gunships Saturday pounded militant ...,O O B-tim O O O O O B-geo O O O O O B-org O O ...
3,They left after a tense hour-long standoff wit...,O O O O O O O O O O O
4,U.N. relief coordinator Jan Egeland said Sunda...,B-geo O O B-per I-per O B-tim O B-geo O B-gpe ...


## Preprocessing and tokenization

We need to tokenize our dataset. However, tokenization splits some words into multiple parts, so we need to adjust our labels so that they still match correctly. Furthermore, we will define the `Dataset` class for `pytorch`

In [4]:
tokenizer = BertTokenizerFast.from_pretrained("bert-base-cased")

text_list = dataset_df["text"].values.tolist()

print(text_list[0])

text_tokenized_example = tokenizer(text_list[0], padding='max_length', max_length=512, truncation=True, return_tensors="pt") # use padding of 512 (needed for BERT) and pytorch tensor format (pt)
print(type(text_tokenized_example))

Thousands of demonstrators have marched through London to protest the war in Iraq and demand the withdrawal of British troops from that country .
<class 'transformers.tokenization_utils_base.BatchEncoding'>


In [5]:

def extract_unique_labels(dataset: df) -> set:
  
  # Split labels based on whitespace and turn them into a list
  labels = [i.split() for i in dataset['labels'].values.tolist()]

  # Check how many labels are there in the dataset
  unique_labels = set()

  for lb in labels:
    [unique_labels.add(i) for i in lb if i not in unique_labels]
  
  return unique_labels


def map_labels_to_ids(dataset: df) -> dict:
  
  unique_labels = extract_unique_labels(dataset)
 
  # Map each label into its id representation and vice versa
  labels_to_ids = {k: v for v, k in enumerate(sorted(unique_labels))}
  return labels_to_ids


def map_ids_to_labels(dataset: df) -> dict:
  
  unique_labels = extract_unique_labels(dataset)

  # Map each label into its id representation and vice versa
  ids_to_labels = {v: k for v, k in enumerate(sorted(unique_labels))}
  return ids_to_labels


def align_label_sentence(tokenized_sentence: BatchEncoding, labels: str, labels_to_ids: dict)-> list: # aligns labels for a single sentence (row in dataframe)
  word_ids = tokenized_sentence.word_ids()

  previous_word_idx = None
  label_ids = []
   
  for word_idx in word_ids: # word_ids is target length for labels vector

    if word_idx is None:
      label_ids.append(-100)
                
    elif word_idx != previous_word_idx: # 
        try:
          label_ids.append(labels_to_ids[labels[word_idx]])
        except:
          label_ids.append(-100)
        
    else:
      label_ids.append(-100) # only the first token of a word will be labelled, the rest get -100
      previous_word_idx = word_idx
      

  return label_ids

In [6]:
class DataSequence(torch.utils.data.Dataset): # defines pytorch dataset

  def __init__(self, datset_df: df, labels_to_ids: dict):

    lb = [i.split() for i in dataset_df['labels'].values.tolist()]
    txt = dataset_df['text'].values.tolist()
    self.texts = [tokenizer(str(i), padding='max_length', max_length = 512, truncation=True, return_tensors="pt") for i in txt]
    self.labels = [align_label_sentence(i,j, labels_to_ids) for i,j in zip(self.texts, lb)]

  def __len__(self):

    return len(self.labels)

  def get_batch_data(self, idx):

    return self.texts[idx]

  def get_batch_labels(self, idx):

    return torch.LongTensor(self.labels[idx])

  def __getitem__(self, idx):

    batch_data = self.get_batch_data(idx)
    batch_labels = self.get_batch_labels(idx)

    return batch_data, batch_labels

## BERT Model definition

Here, we define the class for the `BERT` model itself.

In [7]:
from transformers import BertForTokenClassification

class BertModel(torch.nn.Module):

  def __init__(self, unique_labels: set):

    super(BertModel, self).__init__()

    self.bert = BertForTokenClassification.from_pretrained('bert-base-cased', num_labels=len(unique_labels))

  def forward(self, input_id, mask, label):

    output = self.bert(input_ids=input_id, attention_mask=mask, labels=label, return_dict=False)

    return output

## Training


Here, we train the model by defining the training loop function:

In [None]:
from torch.utils.data import DataLoader
from torch.optim import SGD

def train_loop(model, df_train: df, df_val: df, labels_to_ids: dict, LEARNING_RATE: float, BATCH_SIZE: int, EPOCHS: int):

  train_dataset = DataSequence(df_train, labels_to_ids)
  val_dataset = DataSequence(df_val, labels_to_ids)

  
  train_dataloader = DataLoader(train_dataset, num_workers=4, batch_size=BATCH_SIZE, shuffle=True)
  val_dataloader = DataLoader(val_dataset, num_workers=4, batch_size=BATCH_SIZE)

  use_cuda = torch.cuda.is_available()
  device = torch.device("cuda" if use_cuda else "cpu")
  print(f"device used: {device}")
  optimizer = SGD(model.parameters(), lr=LEARNING_RATE) # use stochastic gradient descent

  if use_cuda:
      model = model.cuda()

  best_acc = 0
  best_loss = 1000

  for epoch_num in range(EPOCHS):

      total_acc_train = 0
      total_loss_train = 0

      model.train()

      for train_data, train_label in tqdm(train_dataloader):

          train_label = train_label.to(device)
          mask = train_data['attention_mask'].squeeze(1).to(device)
          input_id = train_data['input_ids'].squeeze(1).to(device)

          optimizer.zero_grad()
          loss, logits = model(input_id, mask, train_label)

          for i in range(logits.shape[0]):

            logits_clean = logits[i][train_label[i] != -100]
            label_clean = train_label[i][train_label[i] != -100]

            predictions = logits_clean.argmax(dim=1)
            acc = (predictions == label_clean).float().mean()
            total_acc_train += acc
            total_loss_train += loss.item()

          loss.backward()
          optimizer.step()

      model.eval()

      total_acc_val = 0
      total_loss_val = 0

      for val_data, val_label in val_dataloader:

          val_label = val_label.to(device)
          mask = val_data['attention_mask'].squeeze(1).to(device)
          input_id = val_data['input_ids'].squeeze(1).to(device)

          loss, logits = model(input_id, mask, val_label)

          for i in range(logits.shape[0]):

            logits_clean = logits[i][val_label[i] != -100]
            label_clean = val_label[i][val_label[i] != -100]

            predictions = logits_clean.argmax(dim=1)
            acc = (predictions == label_clean).float().mean()
            total_acc_val += acc
            total_loss_val += loss.item()

      val_accuracy = total_acc_val / len(df_val)
      val_loss = total_loss_val / len(df_val)

      print(f'Epochs: {epoch_num + 1} | Loss: {total_loss_train / len(df_train): .3f} | Accuracy: {total_acc_train / len(df_train): .3f} | Val_Loss: {total_loss_val / len(df_val): .3f} | Accuracy: {total_acc_val / len(df_val): .3f}')


# EVERYTHING RUNS HERE
LEARNING_RATE = 5e-3
EPOCHS = 5
BATCH_SIZE = 10

dataset_df = dataset_df[0:2000] # only use 1000 entries (long loading times for dataset)
df_train, df_val, df_test = np.split(dataset_df.sample(frac=1, random_state=42), [int(.8 * len(dataset_df)), int(.9 * len(dataset_df))])

unique_labels = extract_unique_labels(dataset_df)
labels_to_ids = map_labels_to_ids(dataset_df)
model = BertModel(unique_labels)
train_loop(model, df_train, df_val, labels_to_ids, LEARNING_RATE, BATCH_SIZE, EPOCHS)

Some weights of the model checkpoint at bert-base-cased were not used when initializing BertForTokenClassification: ['cls.seq_relationship.weight', 'cls.seq_relationship.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.bias', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.transform.dense.bias', 'cls.predictions.transform.LayerNorm.weight']
- This IS expected if you are initializing BertForTokenClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForTokenClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of BertForTokenClassification were not initialized from the model checkpoint at bert-base-cased and are newly initialized: ['cl

device used: cuda


100%|██████████| 200/200 [02:45<00:00,  1.21it/s]


Epochs: 1 | Loss:  0.952 | Accuracy:  1.039 | Val_Loss:  6.120 | Accuracy:  8.413


  8%|▊         | 17/200 [00:14<02:31,  1.21it/s]

## Testing

In [None]:
def evaluate(model, df_test: df, labels_to_ids: dict):
  
  test_dataset = DataSequence(df_test, labels_to_ids)

  test_dataloader = DataLoader(test_dataset, num_workers=4, batch_size=1)

  use_cuda = torch.cuda.is_available()
  device = torch.device("cuda" if use_cuda else "cpu")

  if use_cuda:
      model = model.cuda()

  total_acc_test = 0.0

  for test_data, test_label in test_dataloader:

          test_label = test_label.to(device)
          mask = test_data['attention_mask'].squeeze(1).to(device)

          input_id = test_data['input_ids'].squeeze(1).to(device)

          loss, logits = model(input_id, mask, test_label)

          for i in range(logits.shape[0]):

            logits_clean = logits[i][test_label[i] != -100]
            label_clean = test_label[i][test_label[i] != -100]

            predictions = logits_clean.argmax(dim=1)
            acc = (predictions == label_clean).float().mean()
            total_acc_test += acc

  val_accuracy = total_acc_test / len(df_test)
  print(f'Test Accuracy: {total_acc_test / len(df_test): .3f}')


evaluate(model, df_test, labels_to_ids)

## Run inference on your own text

The code in this section accepts a string as input and uses the fine tuned bert model we just created to perform Named Entity Recognition

In [None]:
def tokenize_and_align_ids(text: str) -> list:
  tokenized_inputs = tokenizer(text, padding='max_length', max_length=512, truncation=True)

  word_ids = tokenized_inputs.word_ids()

  previous_word_idx = None
  label_ids = []

  for word_idx in word_ids:

      if word_idx is None:
        label_ids.append(-100)

      elif word_idx != previous_word_idx:
          try:
            label_ids.append(1)
          except:
            label_ids.append(-100)
      else:
          try:
            label_ids.append(-100)
          except:
            label_ids.append(-100)
      previous_word_idx = word_idx

  return label_ids

def evaluate_one_text(model, ids_to_labels: dict, sentence):


  use_cuda = torch.cuda.is_available()
  device = torch.device("cuda" if use_cuda else "cpu")

  if use_cuda:
    model = model.cuda()

  text = tokenizer(sentence, padding='max_length', max_length = 512, truncation=True, return_tensors="pt")

  mask = text['attention_mask'].to(device)
  input_id = text['input_ids'].to(device)
  label_ids = torch.Tensor(tokenize_and_align_ids(sentence)).unsqueeze(0).to(device)

  logits = model(input_id, mask, None)
  logits_clean = logits[0][label_ids != -100]

  predictions = logits_clean.argmax(dim=1).tolist()
  prediction_label = [ids_to_labels[i] for i in predictions]
  print(sentence)
  print(prediction_label)


ids_to_labels = map_ids_to_labels(dataset_df)
evaluate_one_text(model, ids_to_labels, 'Bill Gates is the founder of Microsoft')