In [None]:
import torch
%pip install transformers
%pip install accelerate -U
from transformers import AutoTokenizer, AutoModelForTokenClassification, TrainingArguments, Trainer, pipeline
import importlib
from importlib import reload
import numpy as np

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

In [None]:
# If you are using Colab
dir_path = "/content/drive/Othercomputers/my_computer/dl-nlp_project_named-entity-recognition/"
module_path = dir_path[9:].replace("/", ".")
# imports
data_module = importlib.import_module(module_path + "data")
load_data = data_module.load_data
extract_sentences_and_labels = data_module.extract_sentences_and_labels
generate_label_vocab = data_module.generate_label_vocab
encode_labels = data_module.encode_labels
build_label_to_idx = data_module.build_label_to_idx
build_idx_to_label = data_module.build_idx_to_label
build_word_to_idx = data_module.build_word_to_idx
build_idx_to_word = data_module.build_idx_to_word
split_data = data_module.split_data
create_data_loaders = data_module.create_data_loaders

TRAIN_DATA_PATH = data_module.TRAIN_DATA_PATH
TEST_DATA_PATH = data_module.TEST_DATA_PATH

In [None]:
reload(data_module)

In [None]:
# If you are NOT using colab
# dir_path = ""
# from data_new import (
#     prepare_data_pipeline,
#     TRAIN_DATA_PATH,
#     TEST_DATA_PATH,
#     PAD,
#     tensor_to_sentences,
#     tensor_to_labels,
# )

In [None]:
train_file_path = dir_path + "data/train.json"
test_file_path = dir_path + "data/test.json"

In [None]:
train_data, test_data = load_data(train_file_path, test_file_path)
train_sentences, train_raw_labels = extract_sentences_and_labels(train_data)
test_sentences, test_raw_labels = extract_sentences_and_labels(test_data)

# Generate label vocabulary
label_vocab = generate_label_vocab(train_raw_labels + test_raw_labels)

# Encode labels pre-transformer
train_encoded_labels = encode_labels(train_raw_labels, label_vocab, train_sentences)
test_encoded_labels = encode_labels(test_raw_labels, label_vocab, test_sentences)

word_to_idx = build_word_to_idx(train_sentences + test_sentences)
idx_to_word = build_idx_to_word(word_to_idx)
label_to_idx = build_label_to_idx(label_vocab)
idx_to_label = build_idx_to_label(label_to_idx)

train_sentences, train_labels, val_sentences, val_labels = split_data(
    train_sentences, train_encoded_labels
)
train_data_loader, val_data_loader, test_data_loader = create_data_loaders(
    train_sentences,
    train_labels,
    val_sentences,
    val_labels,
    test_sentences,
    test_encoded_labels,
    batch_size=32,
)
test_labels = test_encoded_labels

In [None]:
task = "ner"
model_checkpoint = "distilbert-base-uncased"
batch_size = 16

In [None]:
# DatasetDict({
#     train: Dataset({
#         features: ['id', 'tokens', 'pos_tags', 'chunk_tags', 'ner_tags'],
#         num_rows: 14041
#     })
#     validation: Dataset({
#         features: ['id', 'tokens', 'pos_tags', 'chunk_tags', 'ner_tags'],
#         num_rows: 3250
#     })
#     test: Dataset({
#         features: ['id', 'tokens', 'pos_tags', 'chunk_tags', 'ner_tags'],
#         num_rows: 3453
#     })
# })
# https://colab.research.google.com/github/huggingface/notebooks/blob/main/examples/token_classification.ipynb#scrollTo=GWiVUF0jIrIv

In [None]:
def tokenize_and_align_labels(sentences, labels):
    tokenized_inputs = tokenizer(sentences, is_split_into_words=True, padding=True)

    label_list = []
    for i, label in enumerate(labels):
        word_ids = tokenized_inputs.word_ids(batch_index=i)  # Map tokens to their respective word.
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:  # Set the special tokens to -100.
            if word_idx is None:
                label_ids.append([0 for i in range(len(labels[0][0]))])
            elif word_idx != previous_word_idx:  # Only label the first token of a given word.
                label_ids.append(label[word_idx])
            else:
                label_ids.append([0 for i in range(len(labels[0][0]))])
            previous_word_idx = word_idx
        label_list.append(label_ids)

    tokenized_inputs["labels"] = label_list
    return tokenized_inputs

In [None]:
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
model = CustomBertForTokenClassification.from_pretrained("bert-base-uncased", num_labels=36)

In [None]:
# Tokenize sentences and align labels
tokenized_train_data = tokenize_and_align_labels(train_sentences, train_labels)
tokenized_val_data = tokenize_and_align_labels(val_sentences, val_labels)
tokenized_test_data = tokenize_and_align_labels(test_sentences, test_labels)
tokens = tokenizer.convert_ids_to_tokens(tokenized_train_data["input_ids"][0])
labels = tokenized_train_data["labels"][0]
print(tokenized_train_data["input_ids"][0])
for token, label in zip(tokens[:10], labels[:10]):
    print(f"{label}: {token}")


In [None]:
tokenized_train_data.keys()

In [None]:
from torch.utils.data import Dataset

class NERDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {
            'input_ids': torch.tensor(self.encodings['input_ids'][idx], dtype=torch.long),
            'attention_mask': torch.tensor(self.encodings['attention_mask'][idx], dtype=torch.long),
            'labels': self.labels[idx]
        }
        return item

    def __len__(self):
        return len(self.labels)


In [None]:
train_dataset = NERDataset(tokenized_train_data, tokenized_train_data['labels'])
val_dataset = NERDataset(tokenized_val_data, tokenized_val_data['labels'])
test_dataset = NERDataset(tokenized_test_data, tokenized_test_data['labels'])

In [None]:
%pip install evaluate
%pip install seqeval
import evaluate
seqeval = evaluate.load("seqeval")

In [None]:
from sklearn.metrics import f1_score

def compute_metrics(p, threshold=0.5):
    logits, labels = p  # logits are raw scores, not probabilities

    # Convert numpy to PyTorch tensor
    logits = torch.tensor(logits)
    labels = torch.tensor(labels)

    # Apply threshold to logits to get binary predictions
    predictions = (logits.sigmoid() > threshold).int()

    # Use a mask to remove padding tokens (all-zero vectors)
    padding_mask = labels.sum(dim=-1) != 0
    active_labels = labels[padding_mask]
    active_predictions = predictions[padding_mask]

    # Compute metrics
    f1 = f1_score(active_labels.cpu().numpy(), active_predictions.cpu().numpy(), average='micro')

    return {
        "f1": f1,
    }

In [None]:
from transformers import BertForTokenClassification
import torch.nn as nn

class CustomBertForTokenClassification(BertForTokenClassification):
    def __init__(self, config):
        super(CustomBertForTokenClassification, self).__init__(config)
        self.loss_fct = nn.BCEWithLogitsLoss()

    def forward(self, input_ids=None, attention_mask=None, labels=None, **kwargs):
        outputs = super().forward(input_ids=input_ids, attention_mask=attention_mask, **kwargs)
        logits = outputs[0]

        loss = None
        if labels is not None:
            # print(logits.shape) # [8, 289, 36]
            # print(labels.shape) # [8, 289, 36]
            # print(attention_mask.shape) # [8, 289]
            # Reshape labels and compute loss
            active_loss = attention_mask.view(-1) == 1
            active_logits = logits.view(-1, 36)
            active_labels = labels.view(-1, 36)
            # print(active_loss.shape) # [2312]
            # print(active_logits.shape) # [2312, 36]
            # print(active_labels.shape) # [2312, 36]
            loss = self.loss_fct(active_logits[active_loss], active_labels[active_loss].type_as(active_logits))

        return (loss, logits) + outputs[2:]


In [None]:

training_args = TrainingArguments(
    output_dir="my_model",
    learning_rate=5e-5,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=2,
    load_best_model_at_end=True,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    warmup_steps=500,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
)

trainer.train()

In [None]:
from transformers import pipeline

classifier = pipeline("ner", model="my_model/checkpoint-326")
classifier(train_sentences[0])