In [1]:
import torch
from transformers import BertTokenizerFast, BertForTokenClassification
from transformers import logging

import os
import json

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# select a device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

print(device)

cuda


In [3]:
logging.set_verbosity_error()  # Show error messages only

In [4]:
MODEL_CKPT = "DeepPavlov/rubert-base-cased"

DATA_PATH = "data"
NERS_PATH = os.path.join(DATA_PATH, "ners.txt")

TRAIN_PATH = os.path.join(DATA_PATH, "public_data", "train.jsonl")
TEST_PATH = os.path.join(DATA_PATH, "public_data", "test.jsonl")

In [5]:
MAX_LENGTH = 128

LR = 1e-3
BATCH_SIZE = 32
DR = 0.1
EPOCHS = 10

In [6]:
NULL_ENTITY = "O"  # Define the label for a non-entity token
B_PREFIX = "B-"  # Define the prefix for beginning of an entity
I_PREFIX = "I-"  # Define the prefix for inside an entity

In [7]:
model = BertForTokenClassification.from_pretrained(
    MODEL_CKPT
)  # Load the pre-trained BERT model for token classification
tokenizer = BertTokenizerFast.from_pretrained(
    MODEL_CKPT
)  # Load the tokenizer for the pre-trained BERT model

In [8]:
with open(NERS_PATH, "r") as f:
    ner_types = [
        line.strip() for line in f.readlines()
    ]  # Read the NER types from the specified file

ner_labels = (
    [NULL_ENTITY]
    + [f"{B_PREFIX}{ner}" for ner in ner_types]
    + [f"{I_PREFIX}{ner}" for ner in ner_types]
)  # Create a list of NER labels, including the null entity and the beginning and inside entity labels
label_map = {
    label: i for i, label in enumerate(ner_labels)
}  # Create a mapping from NER labels to numerical indices

In [9]:
train_data = []  # Initialize an empty list to store the training data
with open(TRAIN_PATH, "r") as f:
    for line in f.readlines():
        data = json.loads(line)  # Load the JSON data from the training file
        text = data["sentences"]  # Extract the text from the JSON data
        ners = data["ners"]  # Extract the NER annotations from the JSON data
        ners = sorted(
            ners, key=lambda x: x[0]
        )  # Sort the NER annotations based on the start position
        train_data.append(
            {"text": text, "ners": ners}
        )  # Add the text and NER annotations to the train_data list

In [10]:
test_data = []  # Initialize an empty list to store the test data
with open(TEST_PATH, "r") as f:
    for line in f.readlines():
        data = json.loads(line)  # Load the JSON data from the test file
        text = data["senences"]  # Extract the text from the JSON data
        id = data["id"]  # Extract the ID from the JSON data
        test_data.append(
            {"id": id, "text": text}
        )  # Add the ID and text to the test_data list

In [11]:
def get_encoding(text):
    """
    Encodes the given text using the pre-trained BERT tokenizer.

    Args:
        text (str): The input text to be encoded.

    Returns:
        dict: A dictionary containing the encoded text, attention masks, and other relevant information.
    """
    encoding = tokenizer.encode_plus(
        text,
        return_tensors="pt",
        padding="max_length",
        max_length=MAX_LENGTH,
        truncation=True,
        return_offsets_mapping=True,
    )
    return encoding

In [12]:
def get_encoding_and_labels(text, ners):
    """
    Encodes the given text and generates corresponding NER labels based on the provided NER annotations.

    Args:
        text (str): The input text.
        ners (list): A list of tuples, where each tuple contains (start_index, end_index, entity_type) for an NER annotation.

    Returns:
        tuple: A tuple containing the encoded text and a list of corresponding NER labels.
    """
    encoding = get_encoding(text)  # Get the encoding for the text
    token_ids = encoding["input_ids"][0]  # Extract the token IDs from the encoding
    offset_mapping = encoding["offset_mapping"][
        0
    ]  # Extract the offset mapping from the encoding
    ner_i = 0  # Initialize a counter for iterating over the NER annotations
    labels = []  # Initialize an empty list to store the labels

    for offset, token_id in zip(offset_mapping, token_ids):
        start, end = offset  # Unpack the start and end offsets for the current token
        if start == end:
            labels.append(
                label_map[NULL_ENTITY]
            )  # If the token is a special token, assign the null entity label
            continue

        try:
            ner_start, ner_end, ner_type = ners[
                ner_i
            ]  # Get the start, end, and type of the current NER annotation
        except:
            break  # If there are no more NER annotations, break out of the loop

        while ner_end < start:
            ner_i += 1  # Move to the next NER annotation if the current one ends before the current token
            try:
                ner_start, ner_end, ner_type = ners[ner_i]
            except:
                break

        if ner_start > end:
            labels.append(
                label_map[NULL_ENTITY]
            )  # If the current token is outside of any NER annotation, assign the null entity label
        elif ner_start == start:
            labels.append(
                label_map[B_PREFIX + ner_type]
            )  # If the current token is the start of an NER annotation, assign the beginning entity label
        else:
            labels.append(
                label_map[I_PREFIX + ner_type]
            )  # If the current token is inside an NER annotation, assign the inside entity label

    return encoding, labels

In [13]:
# Initialize empty lists to store tokenized data for training and testing
train_tokenized_data = []
test_tokenized_data = []


# Tokenize and encode the training data
for sample in train_data:
    text = sample["text"]
    ners = sample["ners"]
    train_tokenized_data.append(get_encoding_and_labels(text, ners))


# Tokenize and encode the test data
for sample in test_data:
    text = sample["text"]
    id = sample["id"]
    test_tokenized_data.append((get_encoding(text), id))

In [14]:
class NERDataset(torch.utils.data.Dataset):
    """
    A PyTorch Dataset class for NER data.

    Args:
        tokenized_dataset (list): A list of tuples containing the tokenized input and corresponding labels.
    """

    def __init__(self, tokenized_dataset):
        self.tokenized_dataset = tokenized_dataset

    def __getitem__(self, idx):
        """
        Get the tokenized input and labels for a given index.

        Args:
            idx (int): The index of the sample to retrieve.

        Returns:
            dict: A dictionary containing the input IDs, attention mask, and labels as tensors.
        """
        inputs, label = self.tokenized_dataset[idx]
        return {
            "input_ids": inputs["input_ids"].flatten(),
            "attention_mask": inputs["attention_mask"].flatten(),
            "labels": torch.tensor(label),
        }

    def __len__(self):
        """
        Get the number of samples in the dataset.

        Returns:
            int: The length of the dataset.
        """
        return len(self.tokenized_dataset)

In [15]:
class NERTestDataset(torch.utils.data.Dataset):
    """
    A PyTorch Dataset class for NER test data.

    Args:
        tokenized_dataset (list): A list of tuples containing the tokenized input and corresponding IDs.
    """

    def __init__(self, tokenized_dataset):
        self.tokenized_dataset = tokenized_dataset

    def __getitem__(self, idx):
        """
        Get the tokenized input, offset mapping, and ID for a given index.

        Args:
            idx (int): The index of the sample to retrieve.

        Returns:
            dict: A dictionary containing the input IDs, attention mask, offset mapping, and ID.
        """
        inputs, id = self.tokenized_dataset[idx]
        return {
            "input_ids": inputs["input_ids"].flatten(),
            "attention_mask": inputs["attention_mask"].flatten(),
            "offset_mapping": inputs["offset_mapping"][0],
            "id": id,
        }

    def __len__(self):
        """
        Get the number of samples in the dataset.

        Returns:
            int: The length of the dataset.
        """
        return len(self.tokenized_dataset)

In [29]:
def collate_fn(batch):
    """
    A collate function for NER data, used to pad and batch the samples.

    Args:
        batch (list): A list of dictionaries containing the input IDs, attention masks, and labels.

    Returns:
        dict: A dictionary containing the padded input IDs, attention masks, and labels as tensors.
    """
    input_ids = [item["input_ids"] for item in batch]
    attention_mask = [item["attention_mask"] for item in batch]
    labels = [item["labels"] for item in batch]

    # Find the maximum length in the batch for input_ids
    max_len = max(len(x) for x in input_ids)

    # Create padded tensors for input_ids and attention_mask
    padded_input_ids = torch.zeros(len(input_ids), max_len, dtype=torch.long)
    padded_attention_mask = torch.zeros(len(attention_mask), max_len, dtype=torch.long)

    # Fill the padded tensors with actual data
    for i, (input_id, mask) in enumerate(zip(input_ids, attention_mask)):
        padded_input_ids[i, :len(input_id)] = input_id.clone().detach()
        padded_attention_mask[i, :len(mask)] = mask.clone().detach()

    # Find the maximum label length in the batch
    max_label_len = max(len(label) for label in labels)

    # Create a padded tensor for labels
    padded_labels = torch.zeros(
        len(labels), max_label_len, len(ner_labels), dtype=torch.float32
    )

    # Fill the padded labels tensor with actual data
    for i, label in enumerate(labels):
        for j, label_id in enumerate(label):
            padded_labels[i, j, label_id] = 1.0

    return {
        "input_ids": padded_input_ids,
        "attention_mask": padded_attention_mask,
        "labels": padded_labels,
    }

In [30]:
def collate_fn_test(batch):
    """
    A collate function for NER test data, used to pad and batch the samples.

    Args:
        batch (list): A list of dictionaries containing the input IDs, attention masks, offset mappings, and IDs.

    Returns:
        dict: A dictionary containing the padded input IDs, attention masks, offset mappings, and IDs as tensors.
    """
    input_ids = [item["input_ids"] for item in batch]
    attention_mask = [item["attention_mask"] for item in batch]
    offset_mapping = [item["offset_mapping"] for item in batch]
    ids = torch.tensor([item['id'] for item in batch])

    # Find the maximum length in the batch for input_ids
    max_len = max(len(x) for x in input_ids)

    # Create padded tensors for input_ids and attention_mask
    padded_input_ids = torch.zeros(len(input_ids), max_len, dtype=torch.long)
    padded_attention_mask = torch.zeros(len(attention_mask), max_len, dtype=torch.long)
    padded_offset_mapping = torch.zeros(len(offset_mapping), max_len, 2, dtype=torch.long)

    # Fill the padded tensors with actual data
    for i, (input_id, mask, mapping) in enumerate(zip(input_ids, attention_mask, offset_mapping)):
        padded_input_ids[i, :len(input_id)] = input_id.clone().detach()
        padded_attention_mask[i, :len(mask)] = mask.clone().detach()
        padded_offset_mapping[i, :len(mapping)] = mapping.clone().detach()

    return {
        "input_ids": padded_input_ids,
        "attention_mask": padded_attention_mask,
        "offset_mapping": padded_offset_mapping,
        "ids": ids
    }

In [31]:
# Create a PyTorch dataset for the tokenized training data
train_dataset = NERDataset(train_tokenized_data)
# Create a PyTorch data loader for the training dataset
train_dataloader = torch.utils.data.DataLoader(
    train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn
)

# Create a PyTorch dataset for the tokenized test data
test_dataset = NERTestDataset(test_tokenized_data)
# Create a PyTorch data loader for the test dataset
test_dataloader = torch.utils.data.DataLoader(
    test_dataset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn_test
)

In [46]:
class NERModel(torch.nn.Module):
    """
    A PyTorch module for the NER model.

    Args:
        model (BertForTokenClassification): The pre-trained BERT model for token classification.
    """

    def __init__(self, model):
        super(NERModel, self).__init__()
        self.model = model
        self.model.dropout = torch.nn.Dropout(DR)  # Set the dropout rate
        self.model.classifier = torch.nn.Linear(
            model.config.hidden_size, len(ner_labels)
        )  # Initialize the classifier layer

        # Freeze the pretrained model weights
        for param in self.model.parameters():
            param.requires_grad = False

        # Unfreeze the weights of the last transformer layer
        for para in self.model.bert.encoder.layer[11].parameters():
            param.requires_grad = True

        # Set the classifier layer weights to be trainable
        for param in self.model.classifier.parameters():
            param.requires_grad = True

    def forward(self, input_ids, attention_mask):
        """
        Forward pass of the NER model.

        Args:
            input_ids (torch.Tensor): The input IDs for the BERT model.
            attention_mask (torch.Tensor): The attention mask for the BERT model.

        Returns:
            torch.Tensor: The logits from the NER model.
        """
        outputs = self.model(input_ids, attention_mask=attention_mask)
        return outputs.logits

In [47]:
# Create an instance of the NER model and move it to the GPU device
ner_model = NERModel(model).to(device)

# Define the loss function
criterion = torch.nn.CrossEntropyLoss()

# Define the optimizer
optimizer = torch.optim.Adam(ner_model.parameters(), lr=LR)

In [48]:
# Training loop
for epoch in range(EPOCHS):
    ner_model.train()  # Set the model to training mode
    total_loss = 0

    for batch in train_dataloader:
        input_ids = batch["input_ids"].to(device)  # Move input_ids to GPU device
        attention_mask = batch["attention_mask"].to(
            device
        )  # Move attention_mask to GPU device
        labels = batch["labels"].to(device)  # Move labels to GPU device

        optimizer.zero_grad()  # Reset gradients

        outputs = ner_model(input_ids, attention_mask)  # Forward pass
        loss = criterion(outputs, labels)  # Calculate loss

        loss.backward()  # Backpropagation
        optimizer.step()  # Update weights

        total_loss += loss.item()  # Accumulate the loss

    print(
        f"Epoch {epoch+1}, Loss: {total_loss / len(train_dataloader)}"
    )  # Print the average loss for the epoch

Epoch 1, Loss: 10.096461408278522
Epoch 2, Loss: 9.244666323942297
Epoch 3, Loss: 8.76241050047033
Epoch 4, Loss: 8.480520024019128
Epoch 5, Loss: 8.336179649128633
Epoch 6, Loss: 8.249768032747156
Epoch 7, Loss: 8.179402716019574
Epoch 8, Loss: 8.138147943160114
Epoch 9, Loss: 8.097515723284554
Epoch 10, Loss: 8.060415520387537


In [49]:
# Evaluation loop
ner_model.eval()  # Set the model to evaluation mode
results = []

for batch in test_dataloader:
    input_ids = batch["input_ids"].to(device)  # Move input_ids to GPU device
    attention_mask = batch["attention_mask"].to(device)  # Move attention_mask to GPU device
    offset_mapping = batch["offset_mapping"].to(device)  # Move offset_mapping to GPU device
    ids = batch["ids"].to(device)  # Move ids to GPU device

    outputs = ner_model(input_ids, attention_mask)  # Forward pass
    predictions = torch.argmax(outputs, -1)  # Get the predicted labels

    # Store the results as tuples of (id, prediction, offset_mapping)
    results += list(zip(ids, predictions, offset_mapping))

In [50]:
labeled_result = []  # Initialize an empty list to store the labeled results

for id, predictions, offset_mapping in results:  # Iterate over the results
    ners = []  # Initialize an empty list to store the NER spans
    prev_label = NULL_ENTITY  # Initialize the previous label as the null entity
    start_index = 0  # Initialize the start index of the current NER span
    end_index = 0  # Initialize the end index of the current NER span

    for p, m in zip(
        predictions, offset_mapping
    ):  # Iterate over predictions and offset mappings
        label = ner_labels[p]  # Get the label for the current prediction

        if label == NULL_ENTITY:  # If the current label is the null entity
            if prev_label != NULL_ENTITY and start_index != end_index:
                # If a non-null entity was previously encountered, append it to the ners list
                ners.append([start_index, end_index, prev_label[2:]])
            prev_label = label  # Update the previous label

        elif (
            label[:2] == B_PREFIX
        ):  # If the current label is the beginning of an entity
            if prev_label != NULL_ENTITY and start_index != end_index:
                # If a non-null entity was previously encountered, append it to the ners list
                ners.append([start_index, end_index, prev_label[2:]])
            prev_label = label  # Update the previous label
            start_index = m[0].item()  # Update the start index of the current NER span
            end_index = m[1].item()  # Update the end index of the current NER span

        elif label[:2] == I_PREFIX:  # If the current label is inside an entity
            if prev_label[2:] != label[2:]:
                # If the current entity type is different from the previous entity type
                if prev_label != NULL_ENTITY and start_index != end_index:
                    # If a non-null entity was previously encountered, append it to the ners list
                    ners.append([start_index, end_index, prev_label[2:]])
                start_index = m[
                    0
                ].item()  # Update the start index of the current NER span
            prev_label = label  # Update the previous label
            end_index = m[1].item()  # Update the end index of the current NER span

    # If there is a remaining non-null entity, append it to the ners list
    if prev_label != NULL_ENTITY and start_index != end_index:
        ners.append([start_index, end_index, prev_label[2:]])

    res = {
        "id": id.item(),
        "ners": ners,
    }  # Create a dictionary with the sample ID and NER spans
    labeled_result.append(res)  # Append the result to the labeled_result list

In [51]:
with open('test.jsonl', 'w') as f:
    for item in labeled_result:
        f.write(json.dumps(item) + '\n')  # Write each item in the labeled_result list to the test.jsonl file

In [52]:
import zipfile

with zipfile.ZipFile("test.zip", "w") as zip_file:
    zip_file.write("test.jsonl")  # Add the test.jsonl file to the ZIP file