# Dependencies


In [None]:
!pip install typing-extensions
!pip install transformers
!pip install huggingface_hub
!pip install seqeval
!pip install transformers huggingface_hub
!pip install sklearn_crfsuite
!pip install transformers datasets evaluate seqeval

# Imports

In [None]:
import pandas as pd
import numpy as np
import random
import torch
from transformers import BertModel, BertConfig, BertTokenizer
from sklearn.metrics.pairwise import cosine_similarity
from sklearn_crfsuite import CRF, metrics
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split
import nltk
nltk.download('averaged_perceptron_tagger')
from nltk.tag import pos_tag
import torch
from string import punctuation
import re
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')
from seqeval.metrics import classification_report, f1_score, precision_score, recall_score
from datasets import load_dataset
from transformers import AutoTokenizer
from transformers import DataCollatorForTokenClassification
import evaluate
from transformers import AutoModelForTokenClassification, TrainingArguments, Trainer
import datetime

# Dataset

## download dataset from kaggle

In [None]:
!kaggle datasets download -d naseralqaydeh/named-entity-recognition-ner-corpus

## unzip the folder

In [None]:
!unzip /content/named-entity-recognition-ner-corpus.zip

## read the dataset

In [None]:
data = pd.read_csv("/content/ner.csv")

In [None]:
data.head()

In [None]:
data.info()

# Embedding Model

## Load BERT tokenizer and model

In [None]:
bert = BertModel.from_pretrained('bert-base-uncased')

## function to get the sentence embedding

In [None]:
def get_embeddings(input_ids, attention_mask):
  with torch.no_grad():

      outputs = bert(input_ids, attention_mask=attention_mask)
  word_embeddings = outputs.last_hidden_state
  return word_embeddings

# Preprocessing & Feature extraction

## Check for nulls

In [None]:
data.isna().sum()

In [None]:
data.isnull().sum()

## Get unique POS and NE tags and mapping dict

In [None]:
pos_tags = []
for i in data["POS"]:
  i = i.replace("'","").replace('[','').replace(']','').replace(" ", "").split(',')[:-1]
  for j in i:
    if j not in pos_tags:
      pos_tags.append(j)
pos_tags_to_index = {pos_tags[i]:i for i in range(len(pos_tags))}
pos_tags_to_index

In [None]:
tags = []
for i in data["Tag"]:
  i = i.replace("'","").replace('[','').replace(']','').replace(" ", "").split(',')[:-1]
  for j in i:
    if j not in tags:
      tags.append(j)
taps_to_index = {tags[i-1]:i for i in range(1, len(tags) + 1)}
taps_to_index

# Build feature Dict

## Features extraction for CRF

In [None]:
def sentence2feat_CRF(sentence, pos):
  sentence = sentence.split()
  feat = []
  for i in range(len(sentence)):
    word_feat = {}
    word_feat['word'] = sentence[i]
    word_feat['pos'] = pos[i]
    word_feat['is_capitalizes'] = 1 if sentence[i][0].isupper() else 0
    word_feat['is_all_caps'] = 1 if sentence[i].isupper() else 0
    word_feat['lower_case'] = sentence[i].lower()
    word_feat['shape'] = ''.join(["X" if j.isupper() else 'x' for j in sentence[i]])

    if len(sentence) == 1:
      word_feat['prev_word'] = '<S>'
      word_feat['prev_pos'] = '<S>'
      word_feat['next_word'] = '<E>'
      word_feat['next_pos'] = '<E>'
      feat.append(word_feat)
      continue

    if i == 0:
      word_feat['prev_word'] = '<S>'
      word_feat['prev_pos'] = '<S>'
      word_feat['next_word'] = sentence[i+1]
      word_feat['next_pos'] = pos[i+1]

    elif i == len(sentence)-1:
      word_feat['prev_word'] = sentence[i-1]
      word_feat['prev_pos'] = pos[i-1]

    elif i == 1:
      word_feat['prev_word'] = sentence[i-1]
      word_feat['prev_pos'] = pos[i-1]
      word_feat['next_word'] = sentence[i+1]
      word_feat['next_pos'] = pos[i+1]

    else:
      word_feat['prev_word'] = sentence[i-1]
      word_feat['next_word'] = sentence[i+1]
      word_feat['prev_pos'] = pos[i-1]
      word_feat['next_pos'] = pos[i+1]

    feat.append(word_feat)

  return feat


# CRF Alogorithm

## Create dataset

In [None]:
dataset = []
labels = []
for sentence, pos, label in zip(data["Sentence"], data["POS"], data["Tag"]):
  pos = pos.replace("'","").replace('[','').replace(']','').split(',')
  if len(sentence.split()) != len(pos):
    continue
  dataset.append(sentence2feat_CRF(sentence, pos))
  labels.append(label.replace("'","").replace('[','').replace(']','').split(','))

## Split Dataset

In [None]:
X_train, X_test, y_train, y_test = train_test_split(dataset, labels, test_size=0.2, random_state=42, shuffle=True)

## Train CRF and make prediction

In [None]:
crf = CRF(algorithm='lbfgs', c1=0.1, c2=10, max_iterations=50)
crf.fit(X_train, y_train)
y_pred = crf.predict(X_test)

## Evaluate Predictions

In [None]:
for true_seq, pred_seq in zip(y_test, y_pred):
    assert len(true_seq) == len(pred_seq), "Mismatch in sequence lengths"

print(f"Precision: {precision_score(y_test, y_pred)}")
print(f"Recall: {recall_score(y_test, y_pred)}")
print(f"F1-Score: {f1_score(y_test, y_pred)}")
print("\nClassification Report:")
print(classification_report(y_test, y_pred))

# Create Dataset of Feature map and label

feature map is the concatination of sentence embedding with the one hot encoded POS tags

the dataset contains some instances with missing POS tags

solution: pre-traind POS tagger

## Get unique tags and construct mapping dict

In [None]:
pos_tags = []
for i in data["Sentence"]:
  sen_tags = pos_tag(i.split())
  for word, j in sen_tags:
    if j not in pos_tags:
      pos_tags.append(j)
pos_tags_to_index = {pos_tags[i]:i for i in range(len(pos_tags))}
pos_tags_to_index

## function to map the original dataset with the tokenizer output

In [None]:
dataset = []
labels = []

with tqdm(total=len(data["Sentence"])) as pbar:
  for sentence, tag in zip(data["Sentence"],  data["Tag"]):
    pbar.set_description("Constructing Dataset")
    try:
      one_hot = []
      label = []
      tag = tag.replace("'","").replace('[','').replace(']','').replace(" ","").split(',')
      j=0
      embedding, tokens = get_embeddings([sentence.replace("-", "").replace("~", "")])

      for i, token in enumerate(tokens):

        if (token ==',' ) and tokens[i - 1].isdigit() and tokens[i + 1].isdigit():
          label.append('O')

        elif (token =='.' ) and tokens[i - 1].isdigit():
          label.append('O')
        elif  bool(re.search("^[a-z]+(-[a-z]+)+$", sentence.lower().split()[j])):
          label.append(tag[j])
          if token == sentence.lower().split()[j].split("-")[1]:
            j+=1
        elif  "'" in tokens[i-1] and bool(re.search("^[a-z]{1,3}$" , token)):
          label.append('O')
        elif  "'" in token and bool(re.search("^[a-z]{1,3}$" , tokens[i+1])):
          label.append(tag[j])

        elif  bool(re.search('([a-z]\.)+', sentence.lower().split()[j]))and token in sentence.lower().split()[j] :
          if tokens[i] == sentence.lower().split()[j][0]:
            label.append(tag[j])
          else:
            if tag[j] == "O":
              label.append(tag[j])
            else:
              label.append('I-'+tag[j].split('-')[1])

          if tokens[i-1] == sentence.lower().split()[j][-2]:
            j+=1
        elif token in sentence.lower().split():
          label.append(tag[j])
          j+=1

        else:
          if "##" not in token and not token.isdigit():
            label.append(tag[j])
            j+=1
          elif "##" in token:
            if tag[j-1] == "O":
              label.append(tag[j-1])
            else:
              label.append('I-'+tag[j-1].split('-')[1])
          else:
            label.append('O')
      sen_tags = pos_tag(tokens)
      for _, pos_tag_ in sen_tags:
        t = torch.zeros(len(pos_tags))
        t[pos_tags_to_index[pos_tag_]] = 1
        one_hot.append(t)
      one_hot = torch.stack(one_hot)
      if (embedding.squeeze(0).shape[0])!= len(one_hot):
        continue
      data_point = torch.cat((embedding.squeeze(0), one_hot), dim=1)
      label = [taps_to_index[i] for i in label]
      if len(label) != data_point.shape[0]:
        continue
      dataset.append(data_point)
      labels.append(label)
      pbar.update(1)
    except IndexError:
      continue

In [None]:
len(dataset), len(labels)

## Labels to one hot encodding form

In [None]:
one_hot_labels = []
for label in labels:
  one_hot = []
  for tag in label:
    t = torch.zeros(len(tags))
    t[tag - 1] = 1
    one_hot.append(t)
  one_hot = torch.stack(one_hot)
  one_hot_labels.append(one_hot)

In [None]:
max = 0
for i in dataset:
  if i.shape[0] > max:
    max = i.shape[0]

After constructing the labels the dataset need high memory but i showed the concept of reconstructing the dataset labels to fit the BERT  tokenizer

As a result of low memory resources i will use another dataset which is tokenized and labeled

# WNUT 17: Emerging and Rare entity recognition

## Load Dataset

In [None]:
wnut = load_dataset("wnut_17")

## List of labels

In [None]:
label_list = wnut["train"].features[f"ner_tags"].feature.names
label_list

## Load tokenizer

In [None]:
tokenizer = AutoTokenizer.from_pretrained("distilbert/distilbert-base-uncased")

## Example of tokenizer usage

In [None]:
example = wnut["train"][0]
tokenized_input = tokenizer(example["tokens"], is_split_into_words=True)
tokens = tokenizer.convert_ids_to_tokens(tokenized_input["input_ids"])
tokens

## Align the labels with the text after tokenized

In [None]:
def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(examples["tokens"], truncation=True, is_split_into_words=True)
    labels = []
    for i, label in enumerate(examples[f"ner_tags"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)  # Map tokens to their respective word.
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:  # Set the special tokens to -100.
            if word_idx is None:
                label_ids.append(-100)
            elif word_idx != previous_word_idx:  # Only label the first token of a given word.
                label_ids.append(label[word_idx])
            else:
                label_ids.append(-100)
            previous_word_idx = word_idx
        labels.append(label_ids)

    tokenized_inputs["labels"] = labels
    return tokenized_inputs

## Map Dataset to the new labels

In [None]:
tokenized_wnut = wnut.map(tokenize_and_align_labels, batched=True)

In [None]:
labels = [label_list[i] for i in example[f"ner_tags"]]
labels

## Dataset into batches

In [None]:
def dataset_to_batches(data_, batch_size):
  dataset = []
  batch = []
  batch_labels = []
  i=0
  for data_point in data_:
    data = get_embeddings(torch.tensor(data_point['input_ids']).unsqueeze(0),torch.tensor(data_point['attention_mask']).unsqueeze(0))
    labels = []
    for label in data_point['labels']:
      one_hot = torch.zeros(14)
      if label == -100:
        one_hot[13] = 1
      else:
        one_hot[label] = 1
      labels.append(one_hot)
    labels = torch.stack(labels)
    if i % batch_size == 0 and i != 0:
      dataset.append((batch, batch_labels))
      batch = []
      batch_labels = []
      batch.append(data.squeeze(0))
      batch_labels.append(labels)
    else:
      batch.append(data.squeeze(0))
      batch_labels.append(labels)
    i+=1
  return dataset

In [None]:
wnut_train = dataset_to_batches(tokenized_wnut['train'], 16)
wnut_val = dataset_to_batches(tokenized_wnut['validation'], 16)

In [None]:
wnut_test = dataset_to_batches(tokenized_wnut['test'], 16)

In [None]:
id2label = {
    0: "O",
    1: "B-corporation",
    2: "I-corporation",
    3: "B-creative-work",
    4: "I-creative-work",
    5: "B-group",
    6: "I-group",
    7: "B-location",
    8: "I-location",
    9: "B-person",
    10: "I-person",
    11: "B-product",
    12: "I-product",
    13: "sptag"
}

In [None]:
def convert_labels_to_strings(labels, label_map=id2label):
    return [[label_map[label] for label in labels]]

# LSTM

## LSTM Model Class

In [None]:
class LSTMModel(torch.nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
      super().__init__()
      self.hidden_size = hidden_size
      self.num_layers = num_layers
      self.lstm = torch.nn.LSTM(input_size, hidden_size, num_layers, bidirectional=True, batch_first=True)
      self.fc1 = torch.nn.Linear(hidden_size * 2, 1024)
      self.fc2 = torch.nn.Linear(1024, 512)
      self.fc3 = torch.nn.Linear(512, output_size)

    def forward(self, x):
      h0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_size ).to(x.device)
      c0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_size).to(x.device)
      out, _ = self.lstm(x, (h0, c0))
      out = torch.relu(self.fc1(out))
      out = torch.dropout(out, p=0.5, train=True)
      out = torch.relu(self.fc2(out))
      out = torch.dropout(out, p=0.5, train=True)
      out = self.fc3(out)
      return out

## Model Hyper-prammeters

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = LSTMModel(768, 1024, 5, 14)
model.to(device)
loss = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=0.001)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.95)
epochs = 10

## Training loop

In [None]:
for epoch in range(epochs):
  print(f'{datetime.datetime.now()} Epoch [{epoch + 1}/{epochs}]', end=" ")
  epoch_loss = 0
  count = 0
  with tqdm(total=len(wnut_train)) as pbar:
    for batch in wnut_train:
      inputs, labels  = batch
      optimizer.zero_grad()
      for input, label in zip(inputs, labels):
        output = model(input.unsqueeze(0).to(device))
        loss_value = loss(output.squeeze(0), label.to(device))
        epoch_loss += loss_value.item()
        count += 1
        loss_value.backward()
      optimizer.step()
      pbar.update(1)
      pbar.set_description(f'Training Progress | loss: {(epoch_loss / count):.4f}')
  scheduler.step()
  test_loss = 0
  test_count = 0
  with torch.no_grad():
    with tqdm(total=len(wnut_val)) as pbar2:
      precision = 0
      recall = 0
      for batch in wnut_val:
        inputs, labels = batch
        for input, label in zip(inputs, labels):
          output = model(input.unsqueeze(0).to(device))
          loss_value = loss(output.squeeze(0), label.to(device))
          test_loss += loss_value.item()
          test_count += 1
          y_true = torch.argmax(label, dim=1).cpu().numpy().tolist()
          y_pred = torch.argmax(output, dim=2).cpu().numpy().tolist()
          y_true = convert_labels_to_strings(y_true)
          y_pred = convert_labels_to_strings(y_pred[0])
          if len(y_true[0]) != len(y_pred[0]):
            continue
          precision += precision_score(y_true, y_pred, average='macro', zero_division=0)
          recall += recall_score(y_true, y_pred, average='macro', zero_division=0)
        pbar2.set_description(f'Testing Progress | loss: {(test_loss / test_count):.4f} Precision: {(precision/test_count):.2f} Recall: {(recall/test_count):.2f}')
        pbar2.update(1)

## Test model

In [None]:
with torch.no_grad():
  with tqdm(total=len(wnut_test)) as pbar2:
    precision = 0
    recall = 0
    for batch in wnut_test:
      inputs, labels = batch
      for input, label in zip(inputs, labels):
        output = model(input.unsqueeze(0).to(device))
        loss_value = loss(output.squeeze(0), label.to(device))
        test_loss += loss_value.item()
        test_count += 1
        y_true = torch.argmax(label, dim=1).cpu().numpy().tolist()
        y_pred = torch.argmax(output, dim=2).cpu().numpy().tolist()
        y_true = convert_labels_to_strings(y_true)
        y_pred = convert_labels_to_strings(y_pred[0])
        if len(y_true[0]) != len(y_pred[0]):
          continue
        precision += precision_score(y_true, y_pred, average='macro', zero_division=0)
        recall += recall_score(y_true, y_pred, average='macro', zero_division=0)
      pbar2.set_description(f'Testing Progress | loss: {(test_loss / test_count):.4f} Precision: {(precision/test_count):.2f} Recall: {(recall/test_count):.2f}')
      pbar2.update(1)

#FineTune Pre-Trained BERT

In [None]:
seqeval = evaluate.load("seqeval")

In [None]:
def compute_metrics(p):
    predictions, labels = p
    predictions = np.argmax(predictions, axis=2)

    true_predictions = [
        [label_list[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    true_labels = [
        [label_list[l] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]

    results = seqeval.compute(predictions=true_predictions, references=true_labels)
    return {
        "precision": results["overall_precision"],
        "recall": results["overall_recall"],
        "f1": results["overall_f1"],
        "accuracy": results["overall_accuracy"],
    }

In [None]:
id2label = {
    0: "O",
    1: "B-corporation",
    2: "I-corporation",
    3: "B-creative-work",
    4: "I-creative-work",
    5: "B-group",
    6: "I-group",
    7: "B-location",
    8: "I-location",
    9: "B-person",
    10: "I-person",
    11: "B-product",
    12: "I-product",
}
label2id = {
    "O": 0,
    "B-corporation": 1,
    "I-corporation": 2,
    "B-creative-work": 3,
    "I-creative-work": 4,
    "B-group": 5,
    "I-group": 6,
    "B-location": 7,
    "I-location": 8,
    "B-person": 9,
    "I-person": 10,
    "B-product": 11,
    "I-product": 12,
}

In [None]:
data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer)

In [None]:
model = AutoModelForTokenClassification.from_pretrained(
    "distilbert/distilbert-base-uncased", num_labels=13, id2label=id2label, label2id=label2id
)

In [None]:
training_args = TrainingArguments(
    output_dir="/content/Untitled_Folder",
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=2,
    weight_decay=0.01,
    eval_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_wnut["train"],
    eval_dataset=tokenized_wnut["test"],
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
)

trainer.train()

In [None]:
text = "The Golden State Warriors are an American professional basketball team based in San Francisco."

In [None]:
from transformers import pipeline

classifier = pipeline("ner", model="/content/Untitled_Folder/checkpoint-426")
classifier(text)