In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
cd drive/MyDrive/Colab Notebooks

/content/drive/MyDrive/Colab Notebooks


Import library

In [3]:
import torch
import pandas as pd
from torch.utils.data import DataLoader, Dataset
from transformers import DistilBertTokenizer, DistilBertModel
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from torch import nn, optim
from torch.cuda.amp import autocast, GradScaler
from torch.nn.functional import sigmoid

Train data

In [4]:
# Load train data
train_data = pd.read_csv('./NLU/train.csv')

# Replace na values in text columns with an empty string
train_data['text_1'] = train_data['text_1'].fillna('')
train_data['text_2'] = train_data['text_2'].fillna('')

In [5]:
# A custom dataset class for authorship verification tasks
class AuthorshipDataset(Dataset):
    # Initialize the dataset with text pairs, labels and a tokenizer
    def __init__(self, texts_a, texts_b, labels, tokenizer, max_len=256):
        self.texts_a = texts_a
        self.texts_b = texts_b
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len

    # Return the total number of items in the dataset
    def __len__(self):
        return len(self.labels)

    # Retrive the pair of texts from the dataset and process them through the tokenizer
    def __getitem__(self, idx):
        # Tokenize the text pair at the specified index with truncation and padding
        encoding_a = self.tokenizer(self.texts_a[idx], max_length=self.max_len, padding='max_length', truncation=True, return_tensors="pt")
        encoding_b = self.tokenizer(self.texts_b[idx], max_length=self.max_len, padding='max_length', truncation=True, return_tensors="pt")

        # Combine the tokenized output and label into a single dictionary
        return {
            'input_ids_a': encoding_a['input_ids'].squeeze(0),
            'attention_mask_a': encoding_a['attention_mask'].squeeze(0),
            'input_ids_b': encoding_b['input_ids'].squeeze(0),
            'attention_mask_b': encoding_b['attention_mask'].squeeze(0),
            'labels': torch.tensor(self.labels[idx], dtype=torch.float)
        }

BERT model

In [6]:
# Load the DistilBERT tokenizer
tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')

# Initialize the dataset with text pairs and labels, using the DistilBERT tokenizer
train_dataset = AuthorshipDataset(train_data['text_1'].tolist(), train_data['text_2'].tolist(), train_data['label'].tolist(), tokenizer)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/28.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/483 [00:00<?, ?B/s]

In [7]:
# A custom dataset class for a Siamese network class utilizing a BERT model for embedding text pairs
class SiameseBERT(nn.Module):
    # Initialize the Siamese BERT model with a pretrained BERT model and additional layers
    def __init__(self, bert_model, hidden_size=768, output_size=1):
        super().__init__()
        self.bert = bert_model
        self.dense = nn.Linear(hidden_size, hidden_size)
        self.output = nn.Linear(hidden_size, output_size)

    # Applies mean pooling to the last hidden states of BERT outputs using an attention mask
    def mean_pooling(self, model_output, attention_mask):
        # Extract the last hidden states as token embeddings
        token_embeddings = model_output.last_hidden_state

        # Expand the attention mask for element-wise multiplication
        input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()

        # Sum the embddings while applying the mask
        sum_embeddings = torch.sum(token_embeddings * input_mask_expanded, 1)

        # Compute sum of the mask with a clamp
        sum_mask = torch.clamp(input_mask_expanded.sum(1), min=1e-9)

        # Return the mean pooled embeddings
        return sum_embeddings / sum_mask

    # Defines the forward pass for the SiameseBERT network with two input sequences
    def forward(self, input_ids_a, attention_mask_a, input_ids_b, attention_mask_b):
        # Get the output from BERT for both sequences
        output_a = self.bert(input_ids_a, attention_mask=attention_mask_a)
        output_b = self.bert(input_ids_b, attention_mask=attention_mask_b)

        # Perform mean pooling on the outputs
        pooled_output_a = self.mean_pooling(output_a, attention_mask_a)
        pooled_output_b = self.mean_pooling(output_b, attention_mask_b)

        # Pass through the dense layer
        dense_output_a = torch.relu(self.dense(pooled_output_a))
        dense_output_b = torch.relu(self.dense(pooled_output_b))

        # Compute distance metric
        combined_output = torch.abs(dense_output_a - dense_output_b)
        logits = self.output(combined_output)

        return logits

In [8]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize the model by loading pretrained DistillBERT model
model = SiameseBERT(DistilBertModel.from_pretrained('distilbert-base-uncased'))
model.to(device)

model.safetensors:   0%|          | 0.00/268M [00:00<?, ?B/s]

SiameseBERT(
  (bert): DistilBertModel(
    (embeddings): Embeddings(
      (word_embeddings): Embedding(30522, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (transformer): Transformer(
      (layer): ModuleList(
        (0-5): 6 x TransformerBlock(
          (attention): MultiHeadSelfAttention(
            (dropout): Dropout(p=0.1, inplace=False)
            (q_lin): Linear(in_features=768, out_features=768, bias=True)
            (k_lin): Linear(in_features=768, out_features=768, bias=True)
            (v_lin): Linear(in_features=768, out_features=768, bias=True)
            (out_lin): Linear(in_features=768, out_features=768, bias=True)
          )
          (sa_layer_norm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
          (ffn): FFN(
            (dropout): Dropout(p=0.1, inplace=False)
            (lin1): Linear(in_f

In [9]:
# Initialize a DataLoader for the training dataset
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

# Define the Adam optimizer and GradScaler for mixed precision
optimizer = optim.Adam(model.parameters(), lr=5e-5)
scaler = GradScaler()

In [10]:
def train_epoch(model, data_loader, optimizer, device, scaler):
    model.train()
    total_loss = 0
    correct_predictions = 0
    total_predictions = 0

    for batch in data_loader:
        input_ids_a = batch['input_ids_a'].to(device)
        attention_mask_a = batch['attention_mask_a'].to(device)
        input_ids_b = batch['input_ids_b'].to(device)
        attention_mask_b = batch['attention_mask_b'].to(device)
        labels = batch['labels'].to(device).unsqueeze(1)

        optimizer.zero_grad()

        # Apply automatic mixed precision
        with autocast():
            # Forward pass through the model
            logits = model(input_ids_a, attention_mask_a, input_ids_b, attention_mask_b)

            # Calculate loss between output logits and actual labels
            loss = nn.BCEWithLogitsLoss()(logits, labels.float())

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        # Add current batch's loss to the total loss
        total_loss += loss.item()

        # Calculate predictions based on logits
        predictions = torch.sigmoid(logits) > 0.5

        # Update correct predictions count
        correct_predictions += (predictions == labels).float().sum().item()

        # Update total predictions count
        total_predictions += labels.size(0)

    # Calculate the average loss and accuracy for the epoch
    avg_loss = total_loss / len(data_loader)
    accuracy = correct_predictions / total_predictions

    return avg_loss, accuracy

In [11]:
# Run the training loop
epochs = 5
for epoch in range(epochs):
    train_loss, train_acc = train_epoch(model, train_loader, optimizer, device, scaler)
    print(f"Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.4f}, Train Accuracy: {train_acc:.4f}")

Epoch 1/5, Train Loss: 0.6177, Train Accuracy: 0.6340
Epoch 2/5, Train Loss: 0.4748, Train Accuracy: 0.7688
Epoch 3/5, Train Loss: 0.2877, Train Accuracy: 0.8811
Epoch 4/5, Train Loss: 0.1195, Train Accuracy: 0.9582
Epoch 5/5, Train Loss: 0.0583, Train Accuracy: 0.9822


In [12]:
# Save model
torch.save(model.state_dict(), './NLU/bert.pth')

Development data

In [13]:
# Load development data
dev_data = pd.read_csv('./NLU/dev.csv')
dev_labels = dev_data['label'].values

# Replace na values in text columns with an empty string
dev_data['text_1'] = dev_data['text_1'].fillna('')
dev_data['text_2'] = dev_data['text_2'].fillna('')

In [14]:
# A custom dataset class for development data
class DevDataset(Dataset):
    # Initialize the dataset with text pairs and a tokenizer
    def __init__(self, texts_a, texts_b, tokenizer, max_len=512):
        self.texts_a = texts_a
        self.texts_b = texts_b
        self.tokenizer = tokenizer
        self.max_len = max_len

    # Return the total number of items in the dataset
    def __len__(self):
        return len(self.texts_a)

    # Return the encoded pair of texts at the given index
    def __getitem__(self, idx):
        text_a = self.texts_a[idx]
        text_b = self.texts_b[idx]

        encoding_a = tokenizer.encode_plus(
            text_a,
            add_special_tokens=True,
            max_length=self.max_len,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt',
        )

        encoding_b = tokenizer.encode_plus(
            text_b,
            add_special_tokens=True,
            max_length=self.max_len,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt',
        )

        return {
            'input_ids_a': encoding_a['input_ids'].squeeze(0),
            'attention_mask_a': encoding_a['attention_mask'].squeeze(0),
            'input_ids_b': encoding_b['input_ids'].squeeze(0),
            'attention_mask_b': encoding_b['attention_mask'].squeeze(0)
        }

In [15]:
# Load the DistilBERT tokenizer and development dataset
dev_dataset = DevDataset(dev_data['text_1'].tolist(), dev_data['text_2'].tolist(), tokenizer)
dev_loader = DataLoader(dev_dataset, batch_size=32, shuffle=False)

In [16]:
# Generate predictions for the given data loader
def generate_predictions(model, data_loader):
    model.eval()
    predictions = []

    # Deactivate gradients for evaluation
    with torch.no_grad():
        for batch in data_loader:
            input_ids_a = batch['input_ids_a'].to(device)
            attention_mask_a = batch['attention_mask_a'].to(device)
            input_ids_b = batch['input_ids_b'].to(device)
            attention_mask_b = batch['attention_mask_b'].to(device)

            # Apply sigmoid to output logits
            outputs = model(input_ids_a, attention_mask_a, input_ids_b, attention_mask_b)
            preds = sigmoid(outputs).squeeze().cpu().numpy()
            predictions.extend(preds)
    return predictions

Testing model on development data

In [17]:
# Generate predictions for the development dataset
dev_predictions = generate_predictions(model, dev_loader)
result_df = pd.DataFrame(dev_predictions, columns=['prediction'])

# Convert probabilities to binary labels using a threshold = 0.5
best_threshold = 0.5
dev_predicted_labels = (result_df['prediction'] > best_threshold).astype(int)

In [18]:
result_df = pd.DataFrame(dev_predicted_labels, columns=['prediction'])
result_df.to_csv("./NLU/bert_dev_result.csv")

Evaluation

In [19]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

In [20]:
print("Accuracy:", accuracy_score(dev_labels, dev_predicted_labels))
print("Precision:", precision_score(dev_labels, dev_predicted_labels))
print("Recall:", recall_score(dev_labels, dev_predicted_labels))
print("F1 Score:", f1_score(dev_labels, dev_predicted_labels))

Accuracy: 0.7335
Precision: 0.7320841551610783
Recall: 0.7396213882431086
F1 Score: 0.7358334710061126
