<center><h2>ALTeGraD 2023<br>Lab Session 3: Transfer learning for NLP</h2> 24 / 10 / 2023<br> Dr. G. Shang, H. Abdine<br><br>


<b>Student name:</b> Balthazar Neveu

</center>

<br><br>
In this lab we will:
* Implement and pretrain a language model with transformer architecture.
* Use the pretrained model (transfer learning) to perform a sentiment analysis task which consists of classifying some books reviews into positive and negative ones.
* Compare the performance of the pretrained model to a model trained from scratch.
 <br>

<b>The deadline for this lab is October 31, 2023 11:59 PM.</b> More details about the submission and the architecture for this lab can be found in the handout PDF.

In [None]:
import math

import torch
import torch.nn as nn
from torch.nn.utils.rnn import pad_sequence
from pathlib import Path

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

In [None]:
!wget https://raw.githubusercontent.com/moussaKam/transfer_learning_transformers/main/pretraining_subset.txt
!wget https://raw.githubusercontent.com/moussaKam/transfer_learning_transformers/main/dict.txt
!wget https://raw.githubusercontent.com/moussaKam/transfer_learning_transformers/main/pretrained_model_4layers.pt
!wget https://raw.githubusercontent.com/moussaKam/transfer_learning_transformers/main/cls-books/train.review.spm
!wget https://raw.githubusercontent.com/moussaKam/transfer_learning_transformers/main/cls-books/train.label
!wget https://raw.githubusercontent.com/moussaKam/transfer_learning_transformers/main/cls-books/test.review.spm
!wget https://raw.githubusercontent.com/moussaKam/transfer_learning_transformers/main/cls-books/test.label
!head -5 dict.txt

In [None]:
path_data_train = Path("pretraining_subset.txt")
path_vocab = Path("dict.txt")
pretrained_model = Path("pretrained_model_4layers.pt")
assert path_data_train.exists()
assert path_data_train.exists()
assert pretrained_model.exists()

path_data_train = Path("train.review.spm")
path_labels_train = Path("train.label")
path_data_valid = Path("test.review.spm")
path_labels_valid = Path("test.label")

# Tensor convention for NLP
`[L, N, D]`
- L sequence length
- N batch size
- V vocabulary dimension `ntokens`
- E embeddings dimension `embedding_dim`
- D hidden dimension

### Simplification:
- `E=D` hidden dimension set equal to th embedding dimension for simplicity in the following code `nhid = embedding_dim`



### The Model

In [None]:
class PositionalEncoding(nn.Module):
    """Add fixed precomputed positional encoding to the embeddings
    Add means (=literally addition)
    """
    def __init__(self, embdeddings_dim: int , dropout: float =0.1, max_len: int =5000):
        """Precompute a positional encoding vector of length `max_len`

        Args:
            embdeddings_dim (int): dimension of word embeddings. Note th
            dropout (float, optional): dropout ratio. Defaults to 0.1.
            max_len (int, optional): maximum sequence length. Defaults to 5000.
        """
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, embdeddings_dim)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(
            torch.arange(0, embdeddings_dim, 2).float() * (-math.log(10000.0) / embdeddings_dim)
        )
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer("pe", pe)

    def forward(self, x: torch.FloatTensor) -> torch.FloatTensor:
        """Add positional encoding to the word embeddings.
        Simply add the pre

        Args:
            x (torch.FloatTensor): embeddings tensor [L, N, D]

        Returns:
            torch.FloatTensor: Enhanced embeddings tensor, ready to go straight to the transformer blocks. 
        """
        x = x + self.pe[: x.size(0), :]
        return self.dropout(x)

In [None]:
# ntokens: the size of vocabulary
# nhid: the hidden dimension of the model.
# We assume that embedding_dim = nhid
# nlayers: 
# nhead: the number of heads in the multihead-attention models
# dropout: 
class TransformerModel(nn.Module):
    """Transformer base model 
    ========================
    - embedding from word to vectors
    - add positional encoding
    - `nlayers` * transformer blocks
    """
    def __init__(self, ntokens:int, nhead:int, nhid:int, nlayers:int, dropout=0.5):
        """Transformer base model

        Args:
            ntokens (int): the size of vocabulary
            nhead (int): number of heads in each of the MHA models
            nhid (int): hidden dimension of the model. assume `embedding_dim` = `nhid`
            nlayers (int): number of nn.TransformerEncoderLayer in nn.TransformerEncoder
            dropout (float, optional): dropout value. Defaults to 0.5.
        """
        super(TransformerModel, self).__init__()
        self.model_type = "Transformer"
        embedding_dim = nhid # use the same embedding & hidden dimensions
        self.encoder = nn.Embedding(ntokens, embedding_dim) # fill me, nhid = the dim_embed
        self.pos_encoder = PositionalEncoding(nhid, dropout=dropout) #fill me, the PositionalEncoding class is implemented in the next cell
        
        encoder_layers = nn.TransformerEncoderLayer(
            d_model=nhid, # input dimension to the transformer encoder layer
            nhead=nhead, # number of heads for MHA (Multi-head attention)
            dim_feedforward=nhid, # output dimension of the MLP on top of the transformer.
            dropout=dropout
        ) # we assume nhid = d_model = dim_feedforward
        
        self.transformer_encoder = nn.TransformerEncoder(
            encoder_layers,
            num_layers=nlayers
        )
        self.nhid = nhid
        self.init_weights()
    
    @staticmethod
    def generate_square_subsequent_mask(sz: int) -> torch.FloatTensor:
        """Generate causality mask = mask future tokens for next word prediction

        Args:
            sz (int): mask size M

        Returns:
            torch.FloatTensor: squares matrix [M, M] to mask the attention matrix.
        """
        mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
        mask = (
            mask.float()
            .masked_fill(mask == 0, float("-inf"))
            .masked_fill(mask == 1, float(0.0))
        )
        return mask

    def init_weights(self):
        initrange = 0.1
        self.encoder.weight.data.uniform_(-initrange, initrange)

    def forward(
            self, src: torch.LongTensor,
            src_mask: torch.FloatTensor
        ) -> torch.FloatTensor:
        """Embdeddings, positional encoders, go trough `nlayers` of residual {multi (`nhead`) attention heads + MLP}.

        Args:
            src (torch.LongTensor): [L, N, V] sequence of tokens , V=vocabu
            src_mask (torch.FloatTensor): [L, L] squared mask

        Returns:
            torch.FloatTensor: encoded sequence [L, N, D]
        """
        src = self.encoder(src) * math.sqrt(self.nhid) #embed [L, N, V] -> [L, N, E]
        src = self.pos_encoder(src) # [L, N, E]  - add positional encoding
        output = self.transformer_encoder(src, mask=src_mask)
        return output

In [None]:
class ClassificationHead(nn.Module):
    def __init__(self, nhid: int, nclasses: int):
        """Linear classification head -> returns logits (not probabilities)

        Args:
            nhid (int): hidden dimension
            nclasses (int): number of classes.
        """
        super(ClassificationHead, self).__init__()
        self.decoder = nn.Linear(nhid, nclasses)
        self.init_weights()

    def init_weights(self):
        initrange = 0.1
        self.decoder.bias.data.zero_()
        self.decoder.weight.data.uniform_(-initrange, initrange)

    def forward(self, src: torch.FloatTensor) -> torch.FloatTensor:
        """Classify encoded feature vectors

        Args:
            src (torch.FloatTensor): Encoded feature vectors [L, N, D]

        Returns:
            torch.FloatTensor: Logits (no softmax applied)
        """
        output = self.decoder(src)
        return output

class Model(nn.Module):
    def __init__(self, ntoken: int, nhead: int, nhid: int, nlayers: int, nclasses: int, dropout: float=0.5):
        """TransformerModel+ClassificationHead
        This allows defining a model for next word prediction (classification with ntoken classes)
        Or other downstream tasks if the base `TransformerModel` is pretrained

        Args:
        
            ntoken (int): size of vocabulary for (`TransformerModel`)
            nhead (int): number of heads in each of the MHA models (`TransformerModel`)
            nhid (int): hidden dimension of the model. assume `embedding_dim` = `nhid`
            nlayers (int):  number of nn.TransformerEncoderLayer in nn.TransformerEncoder (`TransformerModel`)
            nclasses (int): number of output classes in the classifier `ClassificationHead`
                - =size of vocabulary for next word prediction
                - other for downstream tasks like sentiment analyzis.
            dropout (float, optional): _description_. Defaults to 0.5.  (`TransformerModel`)
        """
        super(Model, self).__init__()
        self.base = TransformerModel(ntoken, nhead, nhid, nlayers, dropout=dropout)
        self.classifier = ClassificationHead(nhid, nclasses)

    def forward(self, src:torch.LongTensor, src_mask: torch.FloatTensor) -> torch.FloatTensor:
        """Encoder + linear classifier

        Args:
            src (torch.LongTensor): sequence of tokens [L, N, V]
            src_mask (torch.FloatTensor): [L, L] squared mask.

        Returns:
            torch.FloatTensor: [N, C]
        """
        # base model
        x = self.base(src, src_mask)
        # classifier model
        output = self.classifier(x)
        return output

### Causal attention mask & useless computations (question 1)

In [None]:
sentence_length_test = 5
src_mask = TransformerModel.generate_square_subsequent_mask(sentence_length_test)
useless_computations = sentence_length_test*(sentence_length_test-1)//2
assert int( ((-src_mask).isinf()).sum()) == useless_computations
print(f"{useless_computations} useless computations for a sequence of {sentence_length_test} tokens")
src_mask

# Unit test

In [None]:
def test_transformer_based_classifier():
    ntokens = 100 #  V the size of vocabulary
    nhid = 200  # hidden dimension
    nlayers = 4  # the number of nn.TransformerEncoderLayer in nn.TransformerEncoder
    nhead = 2  # the number of heads in the multiheadattention models
    dropout = 0  # the dropout value
    nclasses = ntokens # classification to get output words in the same language
    model = Model(ntokens, nhead, nhid, nlayers, nclasses, dropout).to(device)
    dummy_input = torch.tensor([[2, 6, 2, 5, 43, 21], [8, 5, 3, 42, 43, 21]]).to(device)

    sequence_length = dummy_input.shape[0] #L
    batch_size = dummy_input.shape[1] #N

    src_mask = TransformerModel.generate_square_subsequent_mask(sequence_length).to(device)
    assert list(src_mask.shape) == [sequence_length,sequence_length]
    # batch dimension N is not involved in the mask computation! We assume all sequences in the batch has the same sequence length L
    out = model.forward(dummy_input, src_mask)
    expected_size = [sequence_length, batch_size, nclasses]
    assert list(out.shape) == expected_size, f"{out.shape}, {expected_size}"
    print(out.shape)
test_transformer_based_classifier()

## Vocabulary and Tokenization

In [None]:
SRC = "source_sequence"
TGT = "target"
SOS = "<sos>"
PAD = "<pad>"
EOS = "<eos>"
OOV = "<oov>"
token2ind = {SOS: 0, PAD : 1, EOS: 2, OOV: 3} # the 4 first indices are reserved to special tokens
with open(path_vocab, "r") as f:
    for idx, line in enumerate(f):
        word = line.split()[0].strip()
        token2ind[word] = idx
ind2token = {index: token for token, index in token2ind.items()}
print(ind2token[1111])

### Data Loader


In [None]:
import numpy as np
import torch
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader, Dataset
from pathlib import Path
from typing import Dict, List, Tuple

class Dataset(Dataset):
    def __init__(
        self,
        path_documents: Path,
        path_labels: Path = None,
        token2ind: Dict[str, int]={},
        max_len: int=512,
        task: str="language_modeling",
    ):
        self.task = task
        self.max_len = max_len
        self.token2ind = token2ind
        self.documents = []
        self.labels = []
        with open(path_documents, "r") as f1:
            for line in f1:
                self.documents.append(line.strip())
        if task == "classification":
            with open(path_labels, "r") as f1:
                for line in f1:
                    self.labels.append(int(line.strip()))
            assert len(self.labels) == len(self.documents)

    def __len__(self):
        return len(self.documents)

    def __getitem__(self, index: int):
        sequence = self.documents[index].split()
        if len(sequence) > self.max_len - 1:
            sequence = sequence[: self.max_len - 1] 
        source_sequence = map(lambda x: self.token2ind.get(x, "<oov>"))
        source_sequence = sequence.prepend("<eos>") 
        #fill me (constract the input sequence using token2ind, sequence and special tokens)
        if self.task == "language_modeling":
            target = source_sequence[1:] # offset the sequence by one
            # A, B , C, D , <EOS>
            target.append(self.token2ind["<eos>"])
        elif self.task == "classification":
            target = [self.labels[index]]
        sample = {
            SRC: torch.tensor(source_sequence),
            TGT: torch.tensor(target),
        }
        return sample


def collate_sentences(batch: List[Dict[str, torch.LongTensor]]) -> Tuple[torch.LongTensor, torch.LongTensor]:
    """Uniformize batches (have the same sentence length with padding for all sentences across the batch)

    Args:
        batch (List[Dict[str, torch.LongTensor]]): List of dict samples containing 

    Returns:
        Tuple[torch.LongTensor, torch.LongTensor]: 
            - source [L, N, V]
            where L is the maximum length along all sentences in the batch
            - target 
                - [L, N, V] for language modeling task
                - [N, C] for classification with C the number of classes
            
    """
    source_sequences = pad_sequence(
        #we use padding to match the length of the sequences in the same batch
        [sample[SRC] for sample in batch], padding_value=token2ind["<pad>"]
    )
    target = pad_sequence(
        [sample[TGT] for sample in batch], padding_value=token2ind["<pad>"]
    )
    return source_sequences, target.reshape(-1)


def get_loader(
    path_documents :Path,
    path_labels: Path =None,
    token2ind : Dict[str, int]={},
    max_len: int =512,
    batch_size: int =32,
    task: str="language_modeling",
):
    dataset = Dataset(
        path_documents,
        path_labels=path_labels,
        token2ind=token2ind,
        max_len=max_len,
        task=task,
    )
    data_loader = DataLoader(
        dataset=dataset,
        batch_size=batch_size,
        shuffle=True,
        collate_fn=collate_sentences,
        pin_memory=True,
        drop_last=True,
    )
    return data_loader

In [None]:
data_loader = get_loader(
    path_data_train,
    path_labels_train,
    token2ind,
    task=task,
    batch_size=batch_size,
)


# Model definition

In [None]:
ntokens = len(ind2token) # the size of vocabulary
nhid = 200  # the dimension of the feedforward network model in nn.TransformerEncoder
nlayers = 4  # the number of nn.TransformerEncoderLayer in nn.TransformerEncoder
nhead = 2  # the number of heads in the multiheadattention models
dropout = 0  # the dropout value

nclasses = 2 # for classification task only

model = Model(ntokens, nhead, nhid, nlayers, ntokens, dropout).to(device)

In [None]:
# optimization parameters
criterion = nn.CrossEntropyLoss(ignore_index=token2ind['<pad>'])
lr = 0.0003  # learning rate
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

## Training loop

In [None]:
def train(
    path_data_train: Path,
    path_labels_train: Path =None,
    path_data_valid: Path =None,
    save_interval: int =-1,
    log_interval: int=5,
    task: str="language_modeling",
    batch_size: int =32,
):
    model.train()
    total_loss = 0.0
    ntokens = len(token2ind)
    data_loader = get_loader(
        path_data_train,
        path_labels_train,
        token2ind,
        task=task,
        batch_size=batch_size,
    )

    losses = []
    for idx, data in enumerate(data_loader): #step 1
        optimizer.zero_grad()
        src_mask = model.base.generate_square_subsequent_mask(data[0].size(0)).to(
            device
        )
        input = data[0].to(device)
        output = model(input, src_mask) #step 2
        if task == 'classification':
            #last vector only
            output = #fill me
        output = output.view(-1, output.shape[-1])
        target =  #fill me
        target = target.to(device)
        loss =  #fill me, Cross entropy check next cells
        #fill me step 3

        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5) # prevent exploding gradient
        #fill me step 4

        total_loss += loss.item()
        if idx % log_interval == 0 and idx > 0:
            cur_loss = total_loss / log_interval
            print(
                "| epoch {:3d} | {:5d}/{:5d} steps | "
                "loss {:5.5f} | ppl {:8.3f}".format(
                    epoch, idx, len(data_loader), cur_loss, math.exp(cur_loss),
                )
            )
            losses.append(cur_loss)
            total_loss = 0
    return losses

In [None]:
#pretraining on a tiny subset
log_interval = 500
epochs = 2
for epoch in range(1, epochs + 1): #5
    train(
        path_data_train,
        save_interval=-1,
        task=, # fill me
        batch_size=16,
        log_interval=log_interval,
    )

## Text Generation


In [None]:
model = Model(ntokens, nhead, nhid, nlayers, ntokens).to(device)

#load the checkpoint
checkpoint = torch.load('pretrained_model_4layers.pt')
#load state dict
model.load_state_dict(checkpoint['model_state_dict'])

In [None]:
# !pip install sentencepiece   # uncomment this if you are using google colab
!wget https://raw.githubusercontent.com/moussaKam/transfer_learning_transformers/main/sentencepiece.french.model

import sentencepiece as spm

s = spm.SentencePieceProcessor(model_file='sentencepiece.french.model') #load sentencepiece model

#examples
encoded = s.encode_as_pieces("Bonjour les amis!")
decoded = s.decode_pieces(encoded)
print(encoded)
print(decoded)

In [None]:
def infer_next_token(sent):
    model.eval()
    sent_pieces = s.encode_as_pieces(sent)
    source = [token2ind['<sos>']] + [token2ind[el] for el in sent_pieces] # list of tokens
    source = torch.tensor(source).to(device)
    source = source.reshape(-1, 1)
    src_mask = model.base.generate_square_subsequent_mask(source.size(0)).to(device)
    out = model(source, src_mask)
    next_token_ind =  #fill me
    return next_token_ind, out

def infer_next_tokens(sent, max_len=50):
    # to be implemented

In [None]:
sent = "Bonjour les"
infer_next_tokens(sent)

### Supervised task

In [None]:
# a function to evaluate the validation accuracy of the model.
def evaluate_accuracy(data_loader):
    #to be implemented

In [None]:
#save the base model to be loaded later in the fine-tuning phase
torch.save({"model_state_dict": model.base.state_dict(),}, "pretrained_model_4layers_no_class_head.pt")

In [None]:
from_scratch_settings = [True, False]

from_scratch_valid_acc = []
pretrained_valid_acc = []
lr = 0.0001

for from_scratch in from_scratch_settings:
    model = Model(ntokens, nhead, nhid, nlayers, 2, dropout).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    if not from_scratch:
        print("=====PRETRAINED MODEL======")
        #load checkpoint
        checkpoint = torch.load("pretrained_model_4layers_no_class_head.pt")
        #load state dict
        model.base.load_state_dict(checkpoint['model_state_dict'])
    else:
        print("=====Trainig FROM SCRATCH======")
    epochs = 15
    for epoch in range(1, epochs + 1):
        train(
            path_data_train,
            path_labels_train,
            save_interval=-1,
            task='classification',
            batch_size=8,
            log_interval=50,
        )
        acc = evaluate_accuracy(
            get_loader(
                path_data_valid,
                path_labels_valid,
                token2ind=token2ind,
                batch_size=20,
                task='classification',
            )
        )
        if from_scratch:
            from_scratch_valid_acc.append(acc)
        else:
            pretrained_valid_acc.append(acc)
    print()

In [None]:
#Visualize the accuracy