In [1]:
import re
import torch
import numpy as np
import torch.nn as nn
import torch.optim as optim
from transformers import AutoModelForSequenceClassification, AutoTokenizer, AutoModel, AutoConfig
from torch.utils.data import DataLoader, TensorDataset, Subset, random_split
from sklearn.metrics import accuracy_score, f1_score
from sklearn.model_selection import StratifiedKFold
from transformers import BertForSequenceClassification
from torch.optim import AdamW
import matplotlib.pyplot as plt
import numpy as np

np.random.seed(42)

## **1. Setting Up the Tokenizer**
Before processing the text data, we modify the DeBERTa tokenizer to recognize entity markers.

## **2. Parsing the Data**
The parse_data function reads a structured text file where:
Since each example consists of 4 lines, we process 4 lines at a time.
- We preprocess the sentences, record the entity tag positions, and remove the tags.
- Then we extract the relationship information and add the data to the list.

For example, the train file.txt looks like below

_10	"The solute was placed inside a beaker and 5 mL of the <e1>solvent</e1> was pipetted into a 25 mL glass <e2>flask</e2> for each trial."_

_Entity-Destination(e1,e2)_

_Comment:_

_(empty)_

## **3. Preprocessing Data for Model Input**

The preprocess_data function:

- Inserts the special entity markers (@E1@ ... @/E1@, @E2@ ... @/E2@) into the sentence.
- Tokenizes the modified sentence using DeBERTa's tokenizer.
- Identifies entity positions in the tokenized sequence.
- Converts relation labels into numerical IDs

## **4. Preparing the Dataset**

- Load the training and validation data from TXT files.
- Extract unique relation labels and create a label map.
- Preprocess both training and validation data to obtain tokenized inputs.

In [4]:
tokenizer = AutoTokenizer.from_pretrained('microsoft/deberta-base') # Load the DeBERTa tokenizer from huggingface
special_tokens = ["@E1@", "@/E1@", "@E2@", "@/E2@"] # The tokens which will be used as entity identifiers
tokenizer.add_tokens(special_tokens) # Add these tokens to the tokenizer
def parse_data(file_path):
    '''
    Take the TXT file as input and extract the sentences, relations and entities
    and return it in form of a dictionary
    '''
    data = []
    with open(file_path, 'r', encoding='utf-8') as f:
        lines = f.readlines()

    for i in range(0, len(lines), 4):  # Each example spans 4 lines
        # Extract sentence
        sentence = lines[i].split('\t')[1].strip()
        sentence = sentence.replace('< e1 >', '<e1>')
        sentence = sentence.replace('< e2 >', '<e2>')
        sentence = sentence.replace('< /e1 >', '</e1>')
        sentence = sentence.replace('< /e2 >', '</e2>')

        sentence = re.sub(' {2,}', ' ', sentence)
        # Extract entity markers
        e1_start = sentence.find('<e1>')
        e1_end = sentence.find('</e1>')
        e2_start = sentence.find('<e2>')
        e2_end = sentence.find('</e2>')

        # Remove entity markers from the sentence
        sentence = sentence.replace('<e1>', '').replace('</e1>', '').replace('<e2>', '').replace('</e2>', '')

        # Extract relation label
        relation = lines[i + 1].strip()

        # Append to data
        data.append({
            'sentence': sentence,
            'e1_start': e1_start,
            'e1_end': e1_end - 4,  # Adjust for removed markers
            'e2_start': e2_start - 4 if e2_start > e1_end else e2_start - 8,
            'e2_end': e2_end - 4 if e2_end > e1_end else e2_end - 8,
            'relation': relation
        })

    return data
def preprocess_data(data, tokenizer, label_map):
    '''
    Takes the data dictionary, tokenizer and label_map and returns the tokenized inputs, attention_mask, position of entity 1,
    position of entiy 2 and the labels
    '''
    processed_data = []

    for item in data:
        sentence = item['sentence']
        e1_start, e1_end = item['e1_start'], item['e1_end']
        e2_start, e2_end = item['e2_start'], item['e2_end']

        # Insert entity markers (using unique symbols for better token recognition)
        sentence = (
            sentence[:e1_start] + ' @E1@ ' + sentence[e1_start:e1_end] + ' @/E1@ ' +
            sentence[e1_end:e2_start] + ' @E2@ ' + sentence[e2_start:e2_end] + ' @/E2@ ' +
            sentence[e2_end:]
        )

        # Tokenize the sentence
        inputs = tokenizer(sentence, padding='max_length', truncation=True, max_length=128, return_tensors='pt')

        # Find entity positions in the tokenized sequence
        input_ids = inputs['input_ids'].squeeze(0).tolist()
        entity_pos_1 = next((i for i, id in enumerate(input_ids) if id == tokenizer.convert_tokens_to_ids("@E1@")), -1)
        entity_pos_2 = next((i for i, id in enumerate(input_ids) if id == tokenizer.convert_tokens_to_ids("@E2@")), -1)

        if entity_pos_1 == -1 or entity_pos_2 == -1:
            print(f"Warning: Entity positions not found for sentence: {sentence}")

        # Convert label to ID
        label = label_map[item['relation']]

        processed_data.append({
            'input_ids': torch.tensor(input_ids),
            'attention_mask': inputs['attention_mask'].squeeze(0),
            'entity_pos_1': torch.tensor(entity_pos_1),
            'entity_pos_2': torch.tensor(entity_pos_2),
            'label': label
        })

    return processed_data
train_data = parse_data('TRAIN_FILE.TXT') # Get the sentences and relations from TRAIN_FILE.TXT
valid_data = parse_data('TEST_FILE_FULL.TXT') # Get the sentences and relations from TEST_FILE_FULL.TXT
all_labels = set([item['relation'] for item in train_data] + [item['relation'] for item in valid_data]) # Get the labels
label_map = {label: idx for idx, label in enumerate(sorted(all_labels))} # Create the label map
train_processed = preprocess_data(train_data, tokenizer, label_map)
valid_processed = preprocess_data(valid_data, tokenizer, label_map)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/52.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/474 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

In [5]:
def extract_entity_representations(sequence_output, entity_positions, max_len):
    """
    Extracts entity representations by averaging over all tokens in the entity span.

    Args:
        sequence_output: Tensor of shape (batch_size, seq_length, hidden_size)
        entity_positions: Tensor of shape (batch_size, max_entity_len)
        max_len: Maximum number of tokens to consider per entity

    Returns:
        Tensor of shape (batch_size, hidden_size)
    """
    batch_size, _, hidden_size = sequence_output.shape
    entity_embs = torch.zeros(batch_size, hidden_size, device=sequence_output.device)

    for i in range(batch_size):
        positions = entity_positions[i]  # Extract tensor of indices
        if positions.numel() > 0:  # Check if it's non-empty
            positions = positions.tolist()  # Convert tensor to list
            entity_embs[i] = sequence_output[i, positions, :].mean(dim=0)  # Compute mean

    return entity_embs


In [10]:
class FCLayer(nn.Module):
  '''
    Fully Connected Layer with optional dropout, layer normalization, and activation.

    This class implements a standard feedforward neural network layer with:
    - Dropout for regularization.
    - Layer normalization for stable training.
    - An optional activation function (default: Tanh).

    Args:
        input_dim (int): Size of the input feature vector.
        output_dim (int): Size of the output feature vector.
        dropout_rate (float, optional): Dropout probability (default: 0.1).
        use_activation (bool, optional): Whether to apply the activation function (default: True).
        use_norm (bool, optional): Whether to apply layer normalization (default: True).

    Forward Pass:
        x (Tensor): Input tensor of shape `(batch_size, input_dim)`.

    Returns:
        Tensor: Transformed output of shape `(batch_size, output_dim)`.
  '''
  def __init__(self, input_dim, output_dim, dropout_rate=0.1, use_activation=True, use_norm=True):
    super(FCLayer, self).__init__()
    self.use_activation = use_activation
    self.Dropout = nn.Dropout(dropout_rate)
    self.linear = nn.Linear(input_dim, output_dim)
    self.layer_norm = nn.LayerNorm(output_dim)
    self.activation = nn.Tanh()
    self.use_norm = use_norm
  def forward(self, x):
    x = self.Dropout(x)
    if self.use_norm:
      x = self.layer_norm(x)
    if self.use_activation:
      x = self.activation(x)
    return self.linear(x)

class DeBertaEntityBasedClassifier(nn.Module):
  '''
    DeBERTa-based classifier for relation extraction using entity-aware representations.

    This model fine-tunes DeBERTa to classify relationships between two entities in a sentence.
    It uses:
    - A DeBERTa backbone to extract contextualized token embeddings.
    - A fully connected (FC) layer for CLS token processing.
    - FC layers for entity-specific feature extraction.
    - A final FC layer for classification.

    Args:
        model_name (str): Name of the pre-trained DeBERTa model from Hugging Face.
        num_labels (int): Number of relation classes.
        dropout_rate (float, optional): Dropout probability for regularization (default: 0.1).

    Forward Pass:
        input_ids (Tensor): Tokenized input of shape `(batch_size, sequence_length)`.
        attention_mask (Tensor): Attention mask of shape `(batch_size, sequence_length)`.
        entity_pos_1 (Tensor): Positions of the first entity in each sentence `(batch_size,)`.
        entity_pos_2 (Tensor): Positions of the second entity in each sentence `(batch_size,)`.
        labels (Tensor, optional): True labels for computing loss `(batch_size,)`.

    Returns:
        dict:
            - 'logits' (Tensor): Classification logits `(batch_size, num_labels)`.
            - 'loss' (Tensor, optional): Cross-entropy loss if labels are provided.
  '''
  def __init__(self, model_name, num_labels, dropout_rate=0.1):
    super(DeBertaEntityBasedClassifier, self).__init__()
    config = AutoConfig.from_pretrained(model_name)
    self.backbone = AutoModel.from_pretrained(model_name, config=config)

    hidden_size = config.hidden_size
    self.num_labels = num_labels

    self.cls_fc = FCLayer(hidden_size, hidden_size, dropout_rate)
    self.entity_fc = FCLayer(hidden_size, hidden_size, dropout_rate)
    self.final_fc = FCLayer(hidden_size * 3, num_labels, dropout_rate, use_activation=False, use_norm=False)

  def forward(self, input_ids, attention_mask, entity_pos_1, entity_pos_2, labels=None):
    outputs = self.backbone(input_ids=input_ids, attention_mask=attention_mask)
    sequence_output = outputs.last_hidden_state

    cls_output = sequence_output[:, 0, :]
    cls_fc = self.cls_fc(cls_output)

    # entity_1_emb = sequence_output[torch.arange(sequence_output.shape[0]), entity_pos_1, :]
    # entity_2_emb = sequence_output[torch.arange(sequence_output.shape[0]), entity_pos_2, :]  # (batch_size, hidden_size)
    entity_1_emb = extract_entity_representations(sequence_output, entity_pos_1, max_len=input_ids.shape[1])
    entity_2_emb = extract_entity_representations(sequence_output, entity_pos_2, max_len=input_ids.shape[1])

    entity_1_emb = self.entity_fc(entity_1_emb)
    entity_2_emb = self.entity_fc(entity_2_emb)

    combined = torch.cat([cls_output, entity_1_emb, entity_2_emb], dim=-1)

    logits = self.final_fc(combined)
    loss = None
    if labels != None:
      loss_fn = nn.CrossEntropyLoss()
      loss = loss_fn(logits, labels)
    return {'logits': logits, 'loss': loss}

## Model Training
- Set up the device variable to cuda if GPU is available, else leave it to cpu
- Initialise the model and move it to the device
- Make 80-20 split of the training data for training and testing
- Save the model parameters with the max test accuracy

In [11]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # Specify the device
model_name = 'microsoft/deberta-base' # Specify the pretrained model name
model = DeBertaEntityBasedClassifier(model_name=model_name, num_labels=len(label_map)).to(device) # Initialise the model and move it to the device
model.backbone.resize_token_embeddings(len(tokenizer)) # Resize the token embedding size to match the tokenizer size

input_ids = torch.stack([item['input_ids'] for item in train_processed]) # Extract the input_ids from train_processed
attention_masks = torch.stack([item['attention_mask'] for item in train_processed]) # Extract the attention_mask from train_processed
labels_train = torch.tensor([item['label'] for item in train_processed]) # Extract the label from train_processed
entity_1_positions = torch.stack([item['entity_pos_1'] for item in train_processed]) # Extract the entity_pos_1 from train_processed
entity_2_positions = torch.stack([item['entity_pos_2'] for item in train_processed]) # Extract the entity_pos_2 from train_processed

dataset = TensorDataset(input_ids, attention_masks, labels_train, entity_1_positions, entity_2_positions) # Make the Tensor Dataset

# Process Test Data

train_size = int(0.8 * len(dataset))  # 80% for training
valid_size = len(dataset) - train_size  # Remaining 20% for validation
train_data, valid_data = random_split(dataset, [train_size, valid_size]) # Make a random split of 80-20

train_loader = DataLoader(train_data, batch_size=32, shuffle=True) # Initialise the trainloader
test_loader = DataLoader(valid_data, batch_size=32, shuffle=False) # Initialise the testloader


max_acc = float('-inf')
# Optimizer
optimizer = AdamW(model.parameters(), lr=2e-5)
accuracy_scores_rdeberta = []
f1_scores_rdeberta = []
training_losses_rdeberta = []

# Training Loop
for epoch in range(7):  # Number of epochs
    epoch_loss = 0
    model.train() # Set the model to training mode

    for batch in train_loader:
        optimizer.zero_grad()
        input_ids, attention_mask, labels, entity_1_positions, entity_2_positions = batch
        input_ids = input_ids.to(device)
        attention_mask = attention_mask.to(device)
        labels = labels.to(device)
        entity_1_positions = entity_1_positions.to(device)
        entity_2_positions = entity_2_positions.to(device)

        # Forward pass
        outputs = model(input_ids, attention_mask=attention_mask, labels=labels, entity_pos_1=entity_1_positions, entity_pos_2=entity_2_positions)
        loss = outputs['loss']  # Corrected from outputs['loss']

        # Backward pass
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()

    # Print final epoch loss
    print(f"Final Loss for epoch {epoch + 1}: {epoch_loss / len(train_loader)}")

    # Evaluation
    model.eval()
    predictions, true_labels = [], []
    training_losses_rdeberta.append(epoch_loss / len(train_loader))
    with torch.no_grad():  # Disable gradient computation
        for batch in test_loader:
            input_ids, attention_mask, labels, entity_1_positions, entity_2_positions = batch
            input_ids = input_ids.to(device)
            attention_mask = attention_mask.to(device)
            labels = labels.to(device)
            entity_1_positions = entity_1_positions.to(device)
            entity_2_positions = entity_2_positions.to(device)

            # Forward pass
            outputs = model(input_ids, attention_mask=attention_mask, entity_pos_1=entity_1_positions, entity_pos_2=entity_2_positions)
            logits = outputs['logits']

            # Get predicted labels
            preds = torch.argmax(logits, dim=1)

            # Store predictions and true labels
            predictions.extend(preds.cpu().numpy())
            true_labels.extend(labels.cpu().numpy())

    # Calculate accuracy
    accuracy = accuracy_score(true_labels, predictions)
    accuracy_scores_rdeberta.append(accuracy)
    print(f'Test Accuracy for epoch {epoch + 1}: {accuracy}')
    predictions = np.array(predictions)
    true_labels = np.array(true_labels)
    mask = true_labels != label_map['Other']
    filtered_predictions = predictions[mask]
    filtered_true_labels = true_labels[mask]
    if accuracy > max_acc:
        max_acc = accuracy
        torch.save(model.state_dict(), 'best_model_rdeberta.pth')
    # Calculate accuracy (optional)
    accuracy = accuracy_score(filtered_true_labels, filtered_predictions)

    # Calculate F1 score for the remaining classes
    f1 = f1_score(filtered_true_labels, filtered_predictions, average='macro')  # or 'micro' or 'weighted'
    print(f'Test F1 Score for epoch {epoch + 1}: {f1}')
    f1_scores_rdeberta.append(f1)

pytorch_model.bin:   0%|          | 0.00/559M [00:00<?, ?B/s]

The new embeddings will be initialized from a multivariate normal distribution that has old embeddings' mean and covariance. As described in this article: https://nlp.stanford.edu/~johnhew/vocab-expansion.html. To disable this, use `mean_resizing=False`


model.safetensors:   0%|          | 0.00/559M [00:00<?, ?B/s]

Final Loss for epoch 1: 1.814133634865284
Test Accuracy for epoch 1: 0.741875
Test F1 Score for epoch 1: 0.697568086198355
Final Loss for epoch 2: 0.7075657595694065
Test Accuracy for epoch 2: 0.8075
Test F1 Score for epoch 2: 0.8166761699039841
Final Loss for epoch 3: 0.4670141227543354
Test Accuracy for epoch 3: 0.81
Test F1 Score for epoch 3: 0.8280728648336839
Final Loss for epoch 4: 0.3063644069246948
Test Accuracy for epoch 4: 0.825625
Test F1 Score for epoch 4: 0.8344590377449146
Final Loss for epoch 5: 0.20264908503741025
Test Accuracy for epoch 5: 0.83375
Test F1 Score for epoch 5: 0.8381038453210123
Final Loss for epoch 6: 0.14012637767009437
Test Accuracy for epoch 6: 0.83875
Test F1 Score for epoch 6: 0.8469009631751248
Final Loss for epoch 7: 0.09052504009567201
Test Accuracy for epoch 7: 0.825625
Test F1 Score for epoch 7: 0.8348796370930383
