In [1]:
import torch
from torch.utils.data import Dataset, DataLoader, random_split
from transformers import BertTokenizerFast, BertForTokenClassification, AdamW
from sklearn.metrics import classification_report
import numpy as np
from sklearn.metrics import classification_report, f1_score, accuracy_score
import torch.nn.functional as F  # Add this import

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
MAX_LEN = 174
BATCH_SIZE = 64
EPOCHS = 1
MODEL_NAME = 'bert-base-uncased'

In [3]:
def clean_tag(tag):
    # Ensure tags are in the correct format
    if tag.count('-') > 1:
        prefix, entity = tag.split('-', 1)
        tag = f"{prefix}-{entity.replace('-', '')}"
    return tag

In [4]:
def read_data(file_path):
    sentences, labels = [], []
    sentence, label = [], []
    with open(file_path, encoding="utf-8") as file:
        for line in file:
            if line.startswith("#"):
                continue
            elif line == "\n":
                if sentence:
                    sentences.append(sentence)
                    labels.append(label)
                    sentence, label = [], []
            else:
                parts = line.strip().split("\t")
                sentence.append(parts[1].lower())  # Convert the token to lowercase before appending
                label.append(clean_tag(parts[2]))
    if sentence:
        sentences.append(sentence)
        labels.append(label)
    return sentences, labels


In [5]:
def read_names(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        names = [name.strip().lower() for name in file.readlines()]
    return names

In [6]:
character_names = read_names('./scraping_res/character_names.txt')
location_names = read_names('./scraping_res/location_names.txt')
organization_names = read_names('./scraping_res/organization_names.txt')

In [7]:
all_names = character_names + location_names + organization_names

tokenizer = BertTokenizerFast.from_pretrained(MODEL_NAME)
num_added_toks = tokenizer.add_tokens(all_names)



In [8]:
train_tokens, train_tags = read_data("./tagged_sentences_train.iob2")
tag_values = list(set(tag for doc in train_tags for tag in doc))
tag_values.append("PAD")
tag2idx = {tag: idx for idx, tag in enumerate(tag_values)}
idx2tag = dict([(value, key) for key, value in tag2idx.items()])

In [9]:
print(len(tag2idx))

7


In [10]:
print(set(tag for doc in train_tags for tag in doc))

{'B-CHAR', 'B-LOC', 'B-ORG', 'I-CHAR', 'I-LOC', 'O'}

In [11]:
len(tag_values)

7

In [12]:
test_tokens, test_tags = read_data("./tagged_sentences_test.iob2")

# Combine train and test tags to create a comprehensive tag set
all_tags = set(tag for doc in train_tags for tag in doc).union(set(tag for doc in test_tags for tag in doc))
all_tags.add("PAD")  # Add the PAD tag

In [13]:
print(f"All tags: {all_tags}")

All tags: {'I-CHAR', 'B-CHAR', 'I-LOC', 'O', 'B-LOC', 'PAD', 'B-ORG'}


In [14]:
class NERDataset(Dataset):
    def __init__(self, sentences, tags, tokenizer, max_len, tag2idx):
        self.sentences = sentences
        self.tags = tags
        self.tokenizer = tokenizer
        self.max_len = max_len
        self.tag2idx = tag2idx

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, idx):
        sentence = self.sentences[idx]
        word_labels = self.tags[idx]
        encoding = self.tokenizer(sentence, is_split_into_words=True, return_offsets_mapping=True, padding='max_length', truncation=True, max_length=self.max_len, return_tensors='pt')
        labels = [self.tag2idx['O']] * self.max_len  # Initialize labels with "O"
        offsets = encoding['offset_mapping'].squeeze().tolist()  # Get the offsets
        encoding.pop('offset_mapping')  # Remove offsets, not needed for model input

        idx = 0
        for i, (start, end) in enumerate(offsets):
            if start == end:  # Special tokens
                labels[i] = self.tag2idx['O']
            elif start == 0:  # Start of a new word
                if idx < len(word_labels):
                    labels[i] = self.tag2idx[word_labels[idx]]
                else:
                    labels[i] = self.tag2idx['O']
                idx += 1
            else:  # Subtoken of a word
                labels[i] = -100  # PyTorch's convention to ignore these tokens in loss computation

        item = {key: val.squeeze() for key, val in encoding.items()}  # Remove batch dimension
        item['labels'] = torch.tensor(labels)
        return item

In [15]:
train_data = NERDataset(train_tokens, train_tags, tokenizer, MAX_LEN, tag2idx)
train_loader = DataLoader(train_data, batch_size=BATCH_SIZE)

In [16]:
model = BertForTokenClassification.from_pretrained(MODEL_NAME, num_labels=len(tag_values))

Some weights of BertForTokenClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [17]:
if num_added_toks > 0:
    model.resize_token_embeddings(len(tokenizer))

In [18]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'Using device: {device}')
model.to(device)
optimizer = AdamW(model.parameters(), lr=3e-5)

Using device: cuda




In [19]:
model.train()
print(f'Using device: {device}')
for epoch in range(EPOCHS):
    print(f'Using device: {device}')
    for step, batch in enumerate(train_loader):
        batch = {k: v.to(device) for k, v in batch.items()}
        outputs = model(**batch)
        loss = outputs[0]
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if step % 10 == 0:
            print(f"Epoch {epoch + 1}, Step {step}, Loss: {loss.item()}")

Using device: cuda
Using device: cuda
Epoch 1, Step 0, Loss: 1.6785365343093872
Epoch 1, Step 10, Loss: 0.045099589973688126
Epoch 1, Step 20, Loss: 0.015861421823501587
Epoch 1, Step 30, Loss: 0.022854037582874298
Epoch 1, Step 40, Loss: 0.029340513050556183
Epoch 1, Step 50, Loss: 0.01307598315179348
Epoch 1, Step 60, Loss: 0.032763056457042694
Epoch 1, Step 70, Loss: 0.019419947639107704
Epoch 1, Step 80, Loss: 0.013492172583937645
Epoch 1, Step 90, Loss: 0.02313671074807644
Epoch 1, Step 100, Loss: 0.022113250568509102
Epoch 1, Step 110, Loss: 0.01595035195350647
Epoch 1, Step 120, Loss: 0.011320076882839203
Epoch 1, Step 130, Loss: 0.011424909345805645
Epoch 1, Step 140, Loss: 0.009597595781087875
Epoch 1, Step 150, Loss: 0.005002311430871487
Epoch 1, Step 160, Loss: 0.0041248612105846405
Epoch 1, Step 170, Loss: 0.009335093200206757
Epoch 1, Step 180, Loss: 0.006383749656379223
Epoch 1, Step 190, Loss: 0.007022277917712927
Epoch 1, Step 200, Loss: 0.004724388942122459
Epoch 1, St

In [20]:
# Step 1: Load the test data
test_tokens, test_tags = read_data("./tagged_sentences_test.iob2")

# Step 2: Create a DataLoader for the test data
test_data = NERDataset(test_tokens, test_tags, tokenizer, MAX_LEN, tag2idx)
test_loader = DataLoader(test_data, batch_size=BATCH_SIZE)

In [21]:
from sklearn.metrics import f1_score, accuracy_score, classification_report

# Define evaluation function
def evaluate(model, dataloader, device, tag2idx, idx2tag):
    model.eval()
    total_loss = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for batch in dataloader:
            batch = {k: v.to(device) for k, v in batch.items()}
            outputs = model(**batch)
            loss = outputs.loss
            total_loss += loss.item()

            logits = outputs.logits
            predictions = torch.argmax(logits, dim=2)

            # Collect the predictions and true labels for calculating F1 score and accuracy
            all_preds.extend(predictions.cpu().numpy().tolist())
            all_labels.extend(batch['labels'].cpu().numpy().tolist())

    avg_loss = total_loss / len(dataloader)

    # Flatten the lists to calculate metrics
    all_preds_flat = [p for preds in all_preds for p in preds]
    all_labels_flat = [l for labels in all_labels for l in labels]

    # Remove padding tokens, the label 0 (O), and -100 for accuracy and F1 calculation
    true_preds = [pred for pred, label in zip(all_preds_flat, all_labels_flat) if label != tag2idx['PAD'] and label != tag2idx['O'] and label != -100]
    true_labels = [label for label in all_labels_flat if label != tag2idx['PAD'] and label != tag2idx['O'] and label != -100]

    # Map indices back to tags
    true_preds_tags = [idx2tag[pred] for pred in true_preds]
    true_labels_tags = [idx2tag[label] for label in true_labels]

    # Get the list of unique tags in the dataset (excluding PAD and O)
    unique_tags = [tag for tag in tag2idx if tag != 'PAD' and tag != 'O']

    f1 = f1_score(true_labels_tags, true_preds_tags, average='weighted')
    accuracy = accuracy_score(true_labels_tags, true_preds_tags)

    print(f'Average Loss: {avg_loss}')
    print(f'F1 Score (excluding PAD and O): {f1}')
    print(f'Accuracy (excluding PAD and O): {accuracy}')
    print(classification_report(true_labels_tags, true_preds_tags, labels=unique_tags, target_names=unique_tags))

    return avg_loss, f1, accuracy, true_labels_tags, true_preds_tags

# Example usage
avg_loss, f1, accuracy, true_labels_tags, true_preds_tags = evaluate(model, test_loader, device, tag2idx, idx2tag)


Average Loss: 0.005300000141788688
F1 Score (excluding PAD and O): 0.5369410231345716
Accuracy (excluding PAD and O): 0.5582222222222222
              precision    recall  f1-score   support

      I-CHAR       0.00      0.00      0.00        85
      B-CHAR       0.71      0.77      0.74       820
       I-LOC       0.00      0.00      0.00         2
       B-LOC       0.00      0.00      0.00       216
       B-ORG       0.00      0.00      0.00         2

   micro avg       0.71      0.56      0.62      1125
   macro avg       0.14      0.15      0.15      1125
weighted avg       0.52      0.56      0.54      1125



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [22]:
# def evaluate(model, dataloader, device, tag2idx, idx2tag):
#     model.eval()
#     total_loss = 0
#     all_preds = []
#     all_labels = []

#     with torch.no_grad():
#         for batch in dataloader:
#             batch = {k: v.to(device) for k, v in batch.items()}
#             outputs = model(**batch)
#             loss = outputs.loss
#             total_loss += loss.item()

#             logits = outputs.logits
#             predictions = torch.argmax(logits, dim=2)

#             # Collect the predictions and true labels for calculating F1 score and accuracy
#             all_preds.extend(predictions.cpu().numpy().tolist())
#             all_labels.extend(batch['labels'].cpu().numpy().tolist())

#     avg_loss = total_loss / len(dataloader)

#     # Flatten the lists to calculate metrics
#     all_preds_flat = [p for preds in all_preds for p in preds]
#     all_labels_flat = [l for labels in all_labels for l in labels]

#     # Remove padding tokens, the label 0 (O), and -100 for accuracy and F1 calculation
#     true_preds = [pred for pred, label in zip(all_preds_flat, all_labels_flat) if label != tag2idx['PAD'] and label != tag2idx['O'] and label != -100]
#     true_labels = [label for label in all_labels_flat if label != tag2idx['PAD'] and label != tag2idx['O'] and label != -100]

#     # Map indices back to tags
#     true_preds_tags = [idx2tag[pred] for pred in true_preds]
#     true_labels_tags = [idx2tag[label] for label in true_labels]

#     # Get the list of unique tags in the dataset (excluding PAD and O)
#     unique_tags = [tag for tag in tag2idx if tag != 'PAD' and tag != 'O']

#     f1 = f1_score(true_labels_tags, true_preds_tags, average='weighted')
#     accuracy = accuracy_score(true_labels_tags, true_preds_tags)

#     return avg_loss, f1, accuracy, true_labels_tags, true_preds_tags


In [23]:
# def train_and_evaluate(dataset_class, tokenizer, tag2idx, idx2tag, train_tokens, train_tags, max_epochs=10, patience=2, batch_size=16, learning_rate=3e-5):    # Load and prepare data
#     tag_values = list(set(tag for doc in train_tags for tag in doc))
#     tag_values.append("PAD")
#     tag2idx = {tag: idx for idx, tag in enumerate(tag_values)}
#     idx2tag = {idx: tag for tag, idx in tag2idx.items()}
#     print('len(tag2idx): ', len(tag2idx))
#     print('tag_values: ', tag_values)
#     # Use the whole training dataset
#     train_data = dataset_class(train_tokens, train_tags, tokenizer, MAX_LEN, tag2idx)
#     train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)

#     # Load the test data
#     test_tokens, test_tags = read_data("./tagged_sentences_test.iob2")
#     test_data = dataset_class(test_tokens, test_tags, tokenizer, MAX_LEN, tag2idx)
#     test_loader = DataLoader(test_data, batch_size=batch_size)

#     model = BertForTokenClassification.from_pretrained(MODEL_NAME, num_labels=len(tag2idx))
#     device = torch.device("cpu")    
#     model.to(device)
#     optimizer = AdamW(model.parameters(), lr=learning_rate)

#     best_f1 = 0
#     patience_counter = 0

#     for epoch in range(max_epochs):
#         model.train()
#         total_loss = 0
#         print(f"Using device: {device}, Epoch {epoch + 1}/{max_epochs}")

#         for step, batch in enumerate(train_loader):
#             batch = {k: v.to(device) for k, v in batch.items()}
#             outputs = model(**batch)
#             loss = outputs.loss
#             total_loss += loss.item()
#             loss.backward()
#             optimizer.step()
#             optimizer.zero_grad()

#             if step % 10 == 0:
#                 print(f"Epoch {epoch + 1}, Step {step}, Loss: {loss.item()}")

#         avg_train_loss = total_loss / len(train_loader)
#         print(f"Epoch {epoch + 1}, Average Training Loss: {avg_train_loss}")

#         avg_loss, f1, accuracy, _, _ = evaluate(model, test_loader, device, tag2idx, idx2tag)
#         print(f"Epoch {epoch + 1}, Validation F1: {f1}, Validation Accuracy: {accuracy}")

#         if f1 > best_f1:
#             best_f1 = f1
#             patience_counter = 0
#         else:
#             patience_counter += 1

#         if patience_counter >= patience:
#             print("Early stopping triggered")
#             break

#     avg_loss, f1, accuracy, true_labels_tags, true_preds_tags = evaluate(model, test_loader, device, tag2idx, idx2tag)
#     print(f'Final Average Loss: {avg_loss}')
#     print(f'Final F1 Score (excluding PAD and O): {f1}')
#     print(f'Final Accuracy (excluding PAD and O): {accuracy}')
#     print(classification_report(true_labels_tags, true_preds_tags, target_names=[tag for tag in tag2idx if tag != 'PAD' and tag != 'O']))

#     return avg_loss

In [24]:
# # Generate learning curve
# def generate_learning_curve(dataset_class, tokenizer, tag2idx, idx2tag):
#     train_tokens, train_tags = read_data("./tagged_sentences_train.iob2")
#     sizes = [0.1, 0.25, 0.5, 0.75, 1.0]
#     losses = []
#     for size in sizes:
#         print('size: ', size)
#         # Use only a portion of the data
        
#         subset_size = int(size * len(train_tokens))
#         loss = train_and_evaluate(dataset_class, tokenizer, tag2idx, idx2tag, train_tokens[:subset_size], train_tags[:subset_size])
#         losses.append(loss)
#     plt.figure(figsize=(10, 6))
#     plt.plot([size*100 for size in sizes], losses, marker='o')
#     plt.title('Learning Curve')
#     plt.xlabel('Training Data Size (%)')
#     plt.ylabel('Average Loss')
#     plt.grid(True)
#     plt.show()

In [25]:
# generate_learning_curve(NERDataset, tokenizer, tag2idx, idx2tag)

In [26]:
# class NERDataset(Dataset):
#     def __init__(self, sentences, tags, tokenizer, max_len, tag2idx):
#         self.sentences = sentences
#         self.tags = tags
#         self.tokenizer = tokenizer
#         self.max_len = max_len
#         self.tag2idx = tag2idx

#     def __len__(self):
#         return len(self.sentences)

#     def __getitem__(self, idx):
#         sentence = self.sentences[idx]
#         word_labels = self.tags[idx]

#         # Tokenize the sentence
#         encoding = self.tokenizer(
#             sentence,
#             is_split_into_words=True,
#             return_offsets_mapping=True,
#             padding='max_length',
#             truncation=True,
#             max_length=self.max_len,
#             return_tensors='pt'
#         )

#         # Get the offsets
#         offsets = encoding['offset_mapping'].squeeze().tolist()
#         encoding.pop('offset_mapping')  # Remove offsets, not needed for model input

#         # Initialize labels with "O"
#         labels = [self.tag2idx['O']] * self.max_len

#         idx = 0
#         for i, (start, end) in enumerate(offsets):
#             if start == end:
#                 # Special tokens (CLS, SEP, PAD)
#                 labels[i] = self.tag2idx['O']
#             elif start == 0:
#                 # Start of a new word
#                 if idx < len(word_labels):
#                     labels[i] = self.tag2idx[word_labels[idx]]
#                 idx += 1
#             else:
#                 # Subtoken of a word
#                 labels[i] = -100  # PyTorch's convention to ignore these tokens in loss computation

#         item = {key: val.squeeze() for key, val in encoding.items()}  # Remove batch dimension
#         item['labels'] = torch.tensor(labels)
#         return item
