In [None]:
%%capture
# Install necessary libraries
!pip install transformers torch scikit-learn datasets matplotlib

# Unzip data
!unzip data.zip

In [None]:
# Import required libraries
import torch
import numpy as np
import pandas as pd
from tqdm import tqdm
from datasets import load_dataset
from torch.utils.data import DataLoader
import torch.nn.functional as F

In [None]:
from transformers import AutoTokenizer, AutoModel

In [None]:
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
type(tokenizer)

In [None]:
sentences = ["The quick brown fox jumps over the lazy dog.", "A stitch in time saves nine."]

# tokenize the sentences:
inputs = tokenizer(sentences,
  return_tensors='pt',    # return the output of this function as pytorch tensors.
                          # Other options: 'np' -> numpy
                          #                'tf' -> tensorflow

  padding='max_length',   # pad the sentences to context length of the model.
                          # Other options: 'longest' / True     -> pad to longest length in batch
                          #                'do_not_pad' / False -> no padding

  truncation=True         # Options: 'longest_first' / True    -> Truncate to a maximum length specified with the argument max_length or to the maximum acceptable input length for the model if that argument is not provided.
                          #          'do_not_truncate' / False -> No truncation (i.e., can output batch with sequence lengths greater than the model maximum admissible input size)
)

inputs

In [None]:
# in encoded form:
inputs['input_ids']

In [None]:
# the shape is number of input texts x sequnece length:
inputs['input_ids'].shape

In [None]:
# can be converted back to text:
[tokenizer.decode(ids) for ids in inputs['input_ids']]

In [None]:
tokenizer.special_tokens_map

In [None]:
# Unknown token -> Encodes tokens that have not occured in the training data
tokenizer.unk_token, tokenizer.unk_token_id

In [None]:
# classification token -> Starts a sequence. Because of this, other transformers
# usually use "beginning of sequence" ([bos]) instead. For BERT-like models
# it corresponds to the position of the classification output.
tokenizer.cls_token, tokenizer.cls_token_id

In [None]:
# Separator token -> Separates two sentences for the next sentence prediction task
# after pretraining usually used to end the input sequence. Because of this,
# other transformers usually use "end of sequence" ([eos]) instead.
tokenizer.sep_token, tokenizer.sep_token_id

In [None]:
# Padding token -> Pads sequences to the full input length of the transformer.
tokenizer.pad_token, tokenizer.pad_token_id

In [None]:
# Mask token -> For the masked language modelling pretraining task. Rarelly used
# after pretraining.
tokenizer.mask_token, tokenizer.mask_token_id

In [None]:
inputs['attention_mask']

In [None]:
inputs['attention_mask'].shape

In [None]:
[tokenizer.decode(ids[mask == 1]) for ids, mask in zip(inputs['input_ids'], inputs['attention_mask'])]

In [None]:
[tokenizer.decode(ids[mask == 0]) for ids, mask in zip(inputs['input_ids'], inputs['attention_mask'])]

In [None]:
inputs['token_type_ids']

In [None]:
inputs['token_type_ids'].shape

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

In [None]:
# load model:
model = AutoModel.from_pretrained('bert-base-uncased')
model.to(device)
model.eval()

model

In [None]:
model.config

In [None]:
with torch.no_grad():
  outputs = model(
      input_ids=inputs['input_ids'].to(device),
      attention_mask=inputs['attention_mask'].to(device),
      output_hidden_states=True,                # return the hidden states after each transformer layer (default: False)
      output_attentions=True                    # return the self-attention weights (default: False)
  )
outputs.keys()

In [None]:
outputs['last_hidden_state']

In [None]:
outputs['last_hidden_state'].shape

In [None]:
outputs['pooler_output']

In [None]:
outputs['pooler_output'].shape

In [None]:
type(outputs['hidden_states']), len(outputs['hidden_states'])

In [None]:
[t.shape for t in outputs['hidden_states']]

In [None]:
type(outputs['attentions']), len(outputs['attentions'])

In [None]:
[t.shape for t in outputs['attentions']]

The average attention in the last layer:

In [None]:
import matplotlib.pyplot as plt

for i in range(2):
  fig, axs = plt.subplots(ncols=12, figsize=(20, 5))
  for layer, ax in enumerate(axs):
    # get attention weights of last transformer layer:
    aw = outputs['attentions'][layer][i].cpu()

    # average over heads:
    aw = aw.mean(dim=0)

    # remove padding tokens:
    mask = inputs['attention_mask'][i]
    aw = aw[mask == 1, :][:, mask == 1]

    # create labels:
    labels = tokenizer.convert_ids_to_tokens(inputs['input_ids'][i][mask == 1])
    x = np.arange(len(labels))

    ax.imshow(aw.detach().numpy())
    ax.set_xticks(ticks=x, labels=labels, rotation=90)
    ax.set_yticks(ticks=x, labels=['']*len(x))
    ax.set_title(f'Layer {layer+1}')

  axs[0].set_yticks(ticks=x, labels=labels)
  plt.tight_layout()
  plt.show()

In [None]:
import matplotlib.pyplot as plt

In [None]:
from typing import Iterable
from transformers import BertModel, BertTokenizer
from numpy.typing import NDArray
from sklearn.metrics.pairwise import cosine_similarity

 Now I'm going to compute text similarity using cosine similarity between BERT embeddings of different texts.

In [None]:
from typing import Iterable
from numpy.typing import NDArray
from transformers import BertTokenizer, BertModel
from sklearn.metrics.pairwise import cosine_similarity

def text_similarity(
    texts: Iterable[str],
    model: BertModel,
    tokenizer: BertTokenizer,
    device: torch.device
) -> NDArray[np.float32]:
    model = model.to(device)
    model.eval()

    embeddings = []
    with torch.no_grad():
        for text in texts:
            inputs = tokenizer(text,
                              padding=True,
                              truncation=True,
                              max_length=128,
                              return_tensors='pt')

            inputs = {k: v.to(device) for k, v in inputs.items()}

            outputs = model(**inputs)


            if hasattr(outputs, 'pooler_output'):
                embedding = outputs.pooler_output.cpu().numpy().flatten()
            else:
                embedding = outputs.last_hidden_state[:, 0, :].cpu().numpy().flatten()

            embeddings.append(embedding)

    embeddings_array = np.array(embeddings)
    similarity = cosine_similarity(embeddings_array)

    return similarity

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_name = "bert-base-uncased"
tokenizer = BertTokenizer.from_pretrained(model_name)
model = BertModel.from_pretrained(model_name)


sentences  = pd.read_csv('data/task1/sentences.csv', index_col=0)
similarity = text_similarity(sentences['sentences'].values.tolist(), model, tokenizer, device)
pd.DataFrame(similarity).to_csv('similarity.csv')

In [None]:
plt.imshow(text_similarity(['Paul is cooking dinner for his friend.', 'Maria is cooking dinner for her friend.', 'Stockholm is a beautiful city!'], model, tokenizer, device))

I want to update the pretrained weights, not re-train the model

Fine-Tuning RoBERTa:



In [None]:
# Load labeled list of training files:
train_files = pd.read_csv('data/task2/train/labels.csv', index_col=0)
train_files['file'] = ['data/task2/train/' + s for s in train_files['file']]
print(f'# of positive samples: {(train_files.label == 1).sum():d}')
print(f'# of negative samples: {(train_files.label == 0).sum():d}')
train_files.head()

In [None]:
# Load training data sample:
with open(train_files.file.sample(1).iloc[0], 'r') as file:
    print(file.read())

In [None]:
# Load list of test files:
import os
test_files = ['data/task2/test/' + s for s in os.listdir('data/task2/test/')]
test_files.sort()
test_files = pd.DataFrame({'file': test_files})
test_files.head()

In [None]:
print(f"Number of samples in train_files: {len(train_files)}")
print("Class distribution in train_files:")
print(train_files['label'].value_counts())

In [None]:
from transformers import RobertaTokenizer, RobertaForSequenceClassification, get_linear_schedule_with_warmup
from torch.utils.data import Dataset, DataLoader
from torch.optim import AdamW
from sklearn.model_selection import train_test_split
class TextDataset(Dataset):
    def __init__(self, file_paths, labels=None, tokenizer=None, max_length=256): # Corrected: __init__, max_length is 256
        self.file_paths = file_paths
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self): # Corrected: __len__
        return len(self.file_paths)

    def __getitem__(self, idx): # Corrected: __getitem__
        # Read text from file (on demand)
        with open(self.file_paths[idx], 'r', encoding='utf-8') as f:
            text = f.read().strip()

        # Tokenize
        encoding = self.tokenizer(
            text,
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )

        item = {
            'input_ids': encoding['input_ids'].squeeze(),
            'attention_mask': encoding['attention_mask'].squeeze()
        }

        # Add label if available
        if self.labels is not None:
            item['labels'] = torch.tensor(self.labels[idx], dtype=torch.long)

        return item

# Load tokenizer and model
tokenizer = RobertaTokenizer.from_pretrained('roberta-base')
model = RobertaForSequenceClassification.from_pretrained('roberta-base', num_labels=2)

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
print(f"Using device: {device}")

train_paths, val_paths, train_labels, val_labels = train_test_split(
    train_files['file'].values,
    train_files['label'].values,
    test_size=0.1,
    random_state=42,
    stratify=train_files['label'].values
)

# Create datasets
train_dataset = TextDataset(train_paths, train_labels, tokenizer)
val_dataset = TextDataset(val_paths, val_labels, tokenizer)

# Create dataloaders
train_dataloader = DataLoader(train_dataset, batch_size=8, shuffle=True) # batch_size is 8
val_dataloader = DataLoader(val_dataset, batch_size=8) # batch_size is 8
print(f"Train data size: {len(train_dataset)}, Validation data size: {len(val_dataset)}")

# Optimizer and scheduler
optimizer = AdamW(model.parameters(), lr=1e-5)
num_epochs =2
total_steps = len(train_dataloader) * num_epochs # Correctly uses num_epochs
scheduler = get_linear_schedule_with_warmup(
    optimizer,
    num_warmup_steps=int(0.1 * total_steps),
    num_training_steps=total_steps
)

# Training loop
best_val_loss = float('inf')
best_model = None

for epoch in range(num_epochs): # Loop iterates based on num_epochs
    # Training
    model.train()
    total_train_loss = 0

    for batch in train_dataloader:
        # Move batch to device
        batch = {k: v.to(device) for k, v in batch.items()}

        # Forward pass
        outputs = model(**batch)
        loss = outputs.loss
        total_train_loss += loss.item()

        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        scheduler.step()

    avg_train_loss = total_train_loss / len(train_dataloader)

    # Validation
    model.eval()
    val_loss = 0
    correct = 0
    total = 0

    with torch.no_grad():
        for batch in val_dataloader:
            batch = {k: v.to(device) for k, v in batch.items()}
            outputs = model(**batch)
            val_loss += outputs.loss.item()

            # Calculate accuracy
            preds = torch.argmax(outputs.logits, dim=1)
            correct += (preds == batch['labels']).sum().item()
            total += len(batch['labels'])

    val_loss /= len(val_dataloader)
    val_accuracy = correct / total
    print(f"Epoch {epoch+1}/{num_epochs} - Train loss: {avg_train_loss:.4f}, Val loss: {val_loss:.4f}, Val accuracy: {val_accuracy:.4f}") # Print also uses num_epochs

    # Save best model
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        best_model = model.state_dict().copy()
        print(f"  New best model saved! Val loss: {val_loss:.4f}")

# Load best model
if best_model:
    model.load_state_dict(best_model)
    print("Best model loaded for prediction.")

# Load test files
test_dir = 'data/task2/test'
# Ensure 'train_files' DataFrame is loaded correctly and 'data/task2/test' exists with .txt files.
test_files = [os.path.join(test_dir, f) for f in sorted(os.listdir(test_dir)) if f.endswith('.txt')]
test_dataset = TextDataset(test_files, labels=None, tokenizer=tokenizer)
test_dataloader = DataLoader(test_dataset, batch_size=8) # batch_size is 8
print(f"Test data size: {len(test_dataset)}")

# Generate predictions
model.eval()
predictions = []

with torch.no_grad():
    for batch in test_dataloader:
        batch = {k: v.to(device) for k, v in batch.items()}
        outputs = model(**batch)
        preds = torch.argmax(outputs.logits, dim=1).cpu().numpy()
        predictions.extend(preds)
pd.DataFrame(predictions, columns=['predictions']).to_csv('submission.csv')

Simple Autoregressive Extension of BERT



In [None]:
from transformers import AutoModelForMaskedLM
model = AutoModelForMaskedLM.from_pretrained('bert-base-uncased')
model = model.to(device)
model.eval()
model

In [None]:
from typing import Optional
def complete_text(prompt:str, max_tokens:Optional[int]=None, model=model, tokenizer=tokenizer, device=device):
  # use the whole context window if max_tokens not specified:
  if max_tokens is None: max_tokens = tokenizer.model_max_length - len(tokenizer(prompt).input_ids)

  # pad prompt with '[MASK]' tokens to tell BERT the number of tokens:
  prompt += ' '.join(['[MASK]']*max_tokens)

  # tokenize:
  inputs = tokenizer(prompt, return_tensors="pt").to(device)

  # generate token probabilities:
  with torch.no_grad():
    logits = model(**inputs).logits

  # get top prediction for first masked token:
  predicted_token_id = logits[0, -max_tokens-1].argmax(axis=-1).cpu().tolist()

  text = tokenizer.decode(inputs.input_ids[0, 1:-max_tokens-1].cpu().tolist() + [predicted_token_id])

  # end autoregression if max_tokens == 1:
  if max_tokens == 1: return text

  # end autoregression on '.' token:
  if predicted_token_id == tokenizer.vocab['.']: return text

  # end autoregression on [SEP] token:
  if predicted_token_id == tokenizer.sep_token_id: return text

  return complete_text(text, max_tokens=max_tokens-1, model=model, tokenizer=tokenizer, device=device)