<a href="https://colab.research.google.com/github/fubotz/ICL_2024W/blob/main/FinalProject_Fabian_SCHAMBECK.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ICL Final Project: Finetuning a Pretrained Multilingual Model for Cognate Detection

Model: xlm-roberta-base

Dataset: custom dataset containing en-fr cognates (Frossard et al.)

Method: < mask > approach

In [None]:
!pip install -U datasets
!pip install -U evaluate
!pip install -U transformers
!pip install -U torch

Collecting transformers
  Downloading transformers-4.48.2-py3-none-any.whl.metadata (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.4/44.4 kB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
Downloading transformers-4.48.2-py3-none-any.whl (9.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m9.7/9.7 MB[0m [31m119.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: transformers
  Attempting uninstall: transformers
    Found existing installation: transformers 4.47.1
    Uninstalling transformers-4.47.1:
      Successfully uninstalled transformers-4.47.1
Successfully installed transformers-4.48.2


Collecting torch
  Downloading torch-2.6.0-cp311-cp311-manylinux1_x86_64.whl.metadata (28 kB)
Collecting nvidia-cusparselt-cu12==0.6.2 (from torch)
  Downloading nvidia_cusparselt_cu12-0.6.2-py3-none-manylinux2014_x86_64.whl.metadata (6.8 kB)
Collecting triton==3.2.0 (from torch)
  Downloading triton-3.2.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (1.4 kB)
Downloading torch-2.6.0-cp311-cp311-manylinux1_x86_64.whl (766.7 MB)
[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m766.7/766.7 MB[0m [31m135.0 MB/s[0m eta [36m0:00:01[0m

In [None]:
from google.colab import drive
drive.mount('/content/drive')

## Load Dataset ##

In [None]:
!wget https://raw.githubusercontent.com/fubotz/ICL_2024W/refs/heads/main/word_pairs.json        # dataset taken from Frossard et al.

In [None]:
import json
with open("word_pairs.json", "r") as f:
    dataset = json.load(f)
print(dataset[:10])

In [None]:
from datasets import Dataset

# Convert the dataset to a dictionary format with separate lists for English and French words
dataset_dict = {
    "word_en": [list(pair.keys())[0] for pair in dataset],      # Extract English words
    "word_fr": [list(pair.values())[0] for pair in dataset]     # Extract French words
}

# Convert to Hugging Face dataset
dataset = Dataset.from_dict(dataset_dict)

# Verify structure
print(dataset, "\n")
print(dataset[:10])

## Load Model ##

In [69]:
from transformers import AutoTokenizer, AutoModelForMaskedLM

# Load tokenizer and model
model_name = "xlm-roberta-base"
tokenizer = AutoTokenizer.from_pretrained(model_name)

# Model for <mask> approach
pretrained_model = AutoModelForMaskedLM.from_pretrained(model_name)

# Freeze and unfreeze x encoder layers
for param in pretrained_model.base_model.parameters():
    param.requires_grad = False
for param in pretrained_model.base_model.encoder.layer[-5:].parameters():
    param.requires_grad = True

print(tokenizer)

Some weights of the model checkpoint at xlm-roberta-base were not used when initializing XLMRobertaForMaskedLM: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
- This IS expected if you are initializing XLMRobertaForMaskedLM from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing XLMRobertaForMaskedLM from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


XLMRobertaTokenizerFast(name_or_path='xlm-roberta-base', vocab_size=250002, model_max_length=512, is_fast=True, padding_side='right', truncation_side='right', special_tokens={'bos_token': '<s>', 'eos_token': '</s>', 'unk_token': '<unk>', 'sep_token': '</s>', 'pad_token': '<pad>', 'cls_token': '<s>', 'mask_token': '<mask>'}, clean_up_tokenization_spaces=False, added_tokens_decoder={
	0: AddedToken("<s>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	1: AddedToken("<pad>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	2: AddedToken("</s>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	3: AddedToken("<unk>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	250001: AddedToken("<mask>", rstrip=False, lstrip=True, single_word=False, normalized=False, special=True),
}
)


## Preprocess Dataset ##

In [70]:
def preprocess_function(examples):
    """
    Tokenizes input words, replaces the French word with <mask>,
    and assigns ALL subword tokens (BytePair Encoding) of the correct target
    word as labels.

    Args:
        examples (dict): A batch of English-French cognate pairs in dictionary format:
                         {"word_en": [...], "word_fr": [...]}

    Returns:
        dict: A dictionary containing:
            - input_ids: Tokenized sentences with <mask>
            - attention_mask: Mask indicating valid tokens
            - labels: Correct token IDs for the French word at the <mask> position
    """
    # Construct masked input sentences
    masked_sentences = [
        f"In English, the word is {word_en}. En Français, le mot est {tokenizer.mask_token}."
        for word_en in examples["word_en"]
    ]

    # Tokenize input sentences
    model_inputs = tokenizer(masked_sentences, max_length=20, truncation=True, padding="max_length")

    # Find <mask> token indices
    mask_indices = [
        (torch.tensor(input_ids) == tokenizer.mask_token_id).nonzero(as_tuple=True)[0].tolist()
        for input_ids in model_inputs["input_ids"]
    ]

    # Tokenize target words (French cognates) WITHOUT special tokens
    target_tokens = tokenizer(examples["word_fr"], add_special_tokens=False)["input_ids"]

    # Initialize label tensor with -100 (ignored positions)
    model_inputs["labels"] = [[-100] * len(input_ids) for input_ids in model_inputs["input_ids"]]

    # Assign correct token IDs at the <mask> position
    for i, mask_pos in enumerate(mask_indices):
        if mask_pos and target_tokens[i]:  # Ensure <mask> is found and target word is valid
            for j, token_id in enumerate(target_tokens[i]):  # Assign all subword tokens
                if mask_pos[0] + j < len(model_inputs["labels"][i]):  # Avoid index errors
                    model_inputs["labels"][i][mask_pos[0] + j] = token_id

    return model_inputs

In [71]:
# Apply preprocessing to the dataset
tokenized_dataset = dataset.map(preprocess_function, batched=True)

# Verify structure
print(tokenized_dataset)

Map:   0%|          | 0/492 [00:00<?, ? examples/s]

Dataset({
    features: ['word_en', 'word_fr', 'input_ids', 'attention_mask', 'labels'],
    num_rows: 492
})


In [72]:
# Get a sample processed example
example = tokenized_dataset[0]

# Decode back to text to verify tokenization
decoded_input = tokenizer.decode(example["input_ids"])
print("Tokenized Input:", decoded_input)
print("Labels (Token IDs):", example["labels"])

# Decode the tokenized labels to check if they correctly represent the French word
decoded_label_tokens = tokenizer.convert_ids_to_tokens([id for id in example["labels"] if id != -100])
print("Decoded Label Tokens:", decoded_label_tokens)

Tokenized Input: <s> In English, the word is abandon. En Français, le mot est<mask> .</s><pad>
Labels (Token IDs): [-100, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100, 32997, -100, -100, -100, -100]
Decoded Label Tokens: ['▁abandon']


In [74]:
import random

# Shuffle the dataset
tokenized_dataset = tokenized_dataset.shuffle(seed=42)

# Compute split sizes
total_size = len(tokenized_dataset)
train_size = int(0.7 * total_size)      # 70% training
val_size = int(0.2 * total_size)        # 20% validation
test_size = total_size - (train_size + val_size)        # 10% test

# Split the dataset
train_dataset = tokenized_dataset.select(range(train_size))
val_dataset = tokenized_dataset.select(range(train_size, train_size + val_size))
test_dataset = tokenized_dataset.select(range(train_size + val_size, total_size))

# Verify splits
print(f"Total samples: {total_size}")
print(f"Training samples: {len(train_dataset)}")
print(f"Validation samples: {len(val_dataset)}")
print(f"Test samples: {len(test_dataset)}")

Total samples: 492
Training samples: 344
Validation samples: 98
Test samples: 50


## Evaluate Pretrained Model ##

In [65]:
import torch

# Define the evaluation function
def evaluate_mask_accuracy(model, test_dataset, tokenizer, top_k=5):
    """
    Evaluates the accuracy of a masked language model on a cognate dataset.

    Args:
        model: The pretrained or fine-tuned masked language model.
        test_dataset: Hugging Face tokenized dataset with masked inputs.
        tokenizer: The tokenizer corresponding to the model.
        top_k (int): Number of top predictions to consider for accuracy.

    Returns:
        float: Accuracy of the model on the dataset.
    """
    correct_predictions = 0
    total_samples = len(test_dataset)

    for i in range(total_samples):
        # Get tokenized input and expected labels
        example = test_dataset[i]
        input_ids = torch.tensor(example["input_ids"]).unsqueeze(0)  # Add batch dimension
        labels = example["labels"]  # Token IDs for masked French word(s)

        # Find the <mask> token index
        mask_token_index = (input_ids == tokenizer.mask_token_id).nonzero(as_tuple=True)

        if len(mask_token_index[0]) == 0:  # If no <mask> token is found
            print(f"Error: No {tokenizer.mask_token} token found in instance {i+1}")
            continue

        mask_token_index = mask_token_index[1]  # Get index positions of <mask> token

        # Forward pass through the model
        with torch.no_grad():
            outputs = model(input_ids)
        logits = outputs.logits  # Prediction scores for each token in vocabulary

        # Get top-k predictions for each masked token
        mask_token_logits = logits[0, mask_token_index, :]
        top_k_tokens = torch.topk(mask_token_logits, k=top_k, dim=-1).indices.tolist()

        # Decode predictions into words
        predicted_words = [[tokenizer.decode([token]).strip() for token in top_k] for top_k in top_k_tokens]

        # Decode the expected French word(s)
        expected_words = tokenizer.convert_ids_to_tokens([id for id in labels if id != -100])

        # Log predictions
        print(f"Instance {i+1}:")
        print(f"    Tokenized Input: {tokenizer.decode(example['input_ids'])}")
        print(f"    Expected French word(s): {expected_words}")
        print(f"    Predicted MASK words (Top-{top_k}): {predicted_words}\n")

        # Check if all expected subword tokens are predicted in top-k
        if all(any(subword in pred_list for pred_list in predicted_words) for subword in expected_words):
            correct_predictions += 1

    # Compute accuracy
    accuracy = correct_predictions / total_samples if total_samples > 0 else 0.0
    return accuracy


# Evaluate the accuracy of pretrained model
accuracy = evaluate_mask_accuracy(pretrained_model, test_dataset, tokenizer, top_k=5)
print(f"Accuracy of the model: {accuracy:.2%}")

Instance 1:
    Tokenized Input: <s> The English word is mystery. In French, it is<mask> .</s><pad><pad><pad><pad>
    Expected French word(s): ['▁my', 'st', 'ére']
    Predicted MASK words (Top-5): [['mystery', 'mysterie', 'misterio', 'misteri', 'science']]

Instance 2:
    Tokenized Input: <s> The English word is rhythm. In French, it is<mask> .</s><pad><pad><pad>
    Expected French word(s): ['▁rythme']
    Predicted MASK words (Top-5): [['time', 'rytm', 'motion', 'movement', 'ритм']]

Instance 3:
    Tokenized Input: <s> The English word is baron. In French, it is<mask> .</s><pad><pad><pad>
    Expected French word(s): ['▁bar', 'on']
    Predicted MASK words (Top-5): [['bar', 'river', 'man', 'king', 'garden']]

Instance 4:
    Tokenized Input: <s> The English word is difference. In French, it is<mask> .</s><pad><pad><pad><pad>
    Expected French word(s): ['▁différence']
    Predicted MASK words (Top-5): [['difference', 'different', 'similar', 'change', 'opposite']]

Instance 5:
  

## Finetune Model ##

In [None]:
from transformers import TrainingArguments
import evaluate
import numpy as np

# Load accuracy metric
accuracy = evaluate.load("accuracy")

# Define metric computation function
def compute_metrics(eval_pred):
    """
    Computes accuracy during validation by ignoring padding tokens.
    """
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)

    # Flatten predictions and labels (remove -100 labels)
    flattened_predictions = []
    flattened_labels = []

    for pred, label in zip(predictions, labels):
        for p, l in zip(pred, label):
            if l != -100:       # Ignore padding token labels
                flattened_predictions.append(p)
                flattened_labels.append(l)

    return accuracy.compute(predictions=flattened_predictions, references=flattened_labels)

In [None]:
from transformers import DataCollatorForLanguageModeling

# Define MLM Data Collator
data_collator = DataCollatorForLanguageModeling(
    tokenizer=tokenizer,
    mlm=True,
    mlm_probability=0.3
)

In [None]:
from transformers import Trainer

# Define training arguments
arguments = TrainingArguments(
    output_dir="/content/drive/MyDrive/Colab Notebooks/cognate_trainer",
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    logging_steps=8,
    num_train_epochs=6,
    eval_strategy="epoch",
    save_strategy="epoch",
    learning_rate=2e-5,
    gradient_accumulation_steps=2,
    weight_decay=0.03,
    load_best_model_at_end=True,
    report_to='none',
    seed=224
)

# Initialize the Trainer
trainer = Trainer(
    model=pretrained_model,
    args=arguments,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,       # NB: change for test
    processing_class=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
)

In [None]:
# Verify dataset format
from torch.utils.data import DataLoader

# Create a DataLoader for debugging
debug_loader = DataLoader(train_dataset, batch_size=8)

# Get a batch
batch = next(iter(debug_loader))
print(batch)

# Print shapes
print(f"Input IDs shape: {batch['input_ids'].shape}")
print(f"Attention Mask shape: {batch['attention_mask'].shape}")

In [None]:
trainer.train()

In [None]:
# Save the trained model
output_dir = "/content/drive/MyDrive/Colab Notebooks/cognate_trainer_best_model"
trainer.save_model(output_dir)

# Evaluate the model on the test dataset
test_results = trainer.evaluate(test_dataset)

print("\nTest Results:")
print(test_results)

In [None]:
# Load finetuned model
finetuned_model = AutoModelForMaskedLM.from_pretrained(output_dir)

## Evaluate Finetuned Model ##

In [None]:
# Evaluate the accuracy of finetuned model
accuracy = evaluate_mask_accuracy(finetuned_model, test_data, tokenizer, top_k=5)
print(f"Accuracy of the model: {accuracy:.2%}")

## Visualization ##

In [None]:
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt

pretrained_model.config.output_hidden_states = True
finetuned_model.config.output_hidden_states = True

def visualize_embeddings_2D(model, tokenizer, test_data, method="pca"):
    """
    Visualizes word embeddings from the model in 2D space using PCA or t-SNE.

    Args:
        model: The masked language model (pretrained or fine-tuned).
        tokenizer: Tokenizer corresponding to the model.
        test_data (list of dict): List of cognate pairs from the test split.
        method (str): Dimensionality reduction method ("pca" or "tsne").
    """
    words = []
    embeddings = []

    # Extract embeddings
    for pair in test_data:
        english_word, french_word = list(pair.items())[0]
        for word in [english_word, french_word]:
            tokens = tokenizer(word, return_tensors="pt", truncation=True, padding="max_length", max_length=8)
            with torch.no_grad():
                outputs = model(**tokens)
                hidden_states = outputs.hidden_states[-1]       # Get the last hidden layer
                word_embedding = hidden_states.mean(dim=1).squeeze().numpy()        # Mean pooling over tokens
                embeddings.append(word_embedding)
                words.append(word)

    # Convert to NumPy array
    embeddings = np.array(embeddings)

    # Reduce dimensions
    if method == "pca":
        reducer = PCA(n_components=2)
    elif method == "tsne":
        from sklearn.manifold import TSNE
        reducer = TSNE(n_components=2, perplexity=10, random_state=42)
    else:
        raise ValueError("Method must be 'pca' or 'tsne'.")

    reduced_embeddings = reducer.fit_transform(embeddings)

    # Plot embeddings
    plt.figure(figsize=(10, 6))
    plt.scatter(reduced_embeddings[:, 0], reduced_embeddings[:, 1], alpha=0.7, c=np.arange(len(words)) % 2, cmap="coolwarm")

    # Annotate points with words
    for i, word in enumerate(words):
        plt.text(reduced_embeddings[i, 0], reduced_embeddings[i, 1], word, fontsize=9, ha='right', va='bottom')

    plt.title(f"2D Projection of Word Embeddings for ({method.upper()})")
    plt.xlabel("Component 1")
    plt.ylabel("Component 2")
    plt.show()

# Call function for both models using PCA
visualize_embeddings_2D(pretrained_model, tokenizer, test_data, method="pca")
visualize_embeddings_2D(finetuned_model, tokenizer, test_data, method="pca")

## Test Section ##

In [None]:
from scipy.spatial.distance import cosine

word = "variante"

tokens = tokenizer(word, return_tensors="pt")

with torch.no_grad():
    pre_emb = pretrained_model(**tokens).hidden_states[-1].mean(dim=1).squeeze().detach().numpy()
    fine_emb = finetuned_model(**tokens).hidden_states[-1].mean(dim=1).squeeze().detach().numpy()

similarity = 1 - cosine(pre_emb, fine_emb)
print(f"Cosine similarity between pretrained and fine-tuned embeddings for '{word}': {similarity:.4f}")

In [None]:
sample_input = tokenizer("The English word is risk. In French, it is <mask>.", return_tensors="pt")

with torch.no_grad():
    pretrained_preds = pretrained_model(**sample_input).logits
    finetuned_preds = finetuned_model(**sample_input).logits

# Compute difference in logits
logit_diff = (pretrained_preds - finetuned_preds).abs().sum().item()
print("Logit difference before & after fine-tuning:", logit_diff)

In [None]:
print(f"Total training steps: {arguments.max_steps}")