In [1]:
import re
from random import randrange, shuffle
import torch
from torch import nn

# Exercise 8: Encoder Model

### Toy example based
This code is based on code developed by Dong-Hyun Lee.


Please, read carefully the code, since it could help you in the subsequent programming tasks. While you read the text, try to answer each of the questions that are presented in the comments

In [2]:
# Example of a single review. This review is a modified version of the first positive review of the dataset that we are using
# Note that is a single review
review = ("Bromwell High is a cartoon comedy. It ran at the same time as some other programs about school life, such as \"Teachers\"."
        "My 35 years in the teaching profession lead me to believe that Bromwell High's satire is much closer to reality than is \"Teachers\"."
         "The scramble to survive financially, the insightful students who can see right through their pathetic teachers' pomp, the pettiness of the whole situation, all remind me of the schools I knew and their students."
         "When I saw the episode in which a student repeatedly tried to burn down the school, I immediately recalled  at  High."
         "A classic line: INSPECTOR: I'm here to sack one of your teachers, STUDENT: Welcome to Bromwell High."
         "I expect that many adults of my age think that Bromwell High is far fetched."
         "What a pity that it isn't!")


In [3]:
# Basic tokenizer - you should use your tokenizer from previous exercises or an improved version of the following code, but you can use the following code as a starting point
sentences = re.sub("[,!?\\-]", '', review.lower()).split('.')  #  How are we splitting differentiating sentence?
vocab = list(set(" ".join(sentences).split()))

# Importantly, we are adding some special tokens to the vocabulary.
# From now, start thinking what is tok1, tok2, tok3, and tok4 (based on there usage in create_B)
TOK_1 = '[tok1]'
TOK_2 = '[tok2]'
TOK_3 = '[tok3]'
TOK_4 = '[tok4]'
tokens_2_index_dict = {TOK_1: 0, TOK_2: 1, TOK_3: 2, TOK_4: 3}

# The following loops should be familiar to you, since we have been doing this from exercise #2
init_index = len(tokens_2_index_dict)

# Create two dictionaries one for mapping index (ids) to tokens and one for tokens to index
for i, token in enumerate(vocab):
    tokens_2_index_dict[token] = i + init_index

index_2_token = {}
for i, token in enumerate(tokens_2_index_dict):
    index_2_token[i] = token

vocab_size = len(tokens_2_index_dict)

sentences_2_tokens_lst = []
for sentence in sentences:

    lst_temporal = []
    for word in sentence.split():
        lst_temporal.append(tokens_2_index_dict[word])

    sentences_2_tokens_lst.append(lst_temporal)

In [4]:
sentences

['bromwell high is a cartoon comedy',
 ' it ran at the same time as some other programs about school life such as "teachers"',
 'my 35 years in the teaching profession lead me to believe that bromwell high\'s satire is much closer to reality than is "teachers"',
 "the scramble to survive financially the insightful students who can see right through their pathetic teachers' pomp the pettiness of the whole situation all remind me of the schools i knew and their students",
 'when i saw the episode in which a student repeatedly tried to burn down the school i immediately recalled  at  high',
 "a classic line: inspector: i'm here to sack one of your teachers student: welcome to bromwell high",
 'i expect that many adults of my age think that bromwell high is far fetched',
 "what a pity that it isn't"]

## Theoretical part 1 - code related

In [None]:
maxlen_X = 100 # what is X? Max length of the sequence
size_B = 6 # What is B? Size of the batch to generate
max_pred_M = 10  # what is M? Maximum number of tokens to mask

In [6]:
def create_B(maxlen_X=maxlen_X, size_B=size_B, max_pred_M=max_pred_M):
    assert size_B % 2 == 0, "size_B should be even"
    
    lst_B = []
    positive_pair_X = 0
    negative_pair_X = 0
    number_sentences = len(sentences)

    while positive_pair_X != size_B/2 or negative_pair_X != size_B/2: # we want 50% of something positive and 50% of something negative
        
        # Start by taking two random sentences from the list of sentences index
        tokens_X1_index = randrange(number_sentences)
        tokens_X2_index = randrange(number_sentences)

        # Get the relevant sentences (as token list)
        tokens_X1 = sentences_2_tokens_lst[tokens_X1_index]
        tokens_X2 = sentences_2_tokens_lst[tokens_X2_index]

        #What are we doing here? WHy do we need to do this?
        # Build a sequence like this : TOK2 + [sentence1] + TOK3 + [sentence2] + TOK3 (Tok2 is likely [START] or [CLS] and tok3 is likely [SEP])
        # Rectification : TOK1 is [PAD]
        input_ids_X = [tokens_2_index_dict[TOK_2]] + tokens_X1 + [tokens_2_index_dict[TOK_3]] + tokens_X2 + [tokens_2_index_dict[TOK_3]]

        # Get the maximum number of tokens to mask (we will pick 15% of the tokens of the sequence, with a minimum of 1)
        int_max_number_pred = max(1,int(round(len(input_ids_X) * 0.15)))
        
        # On cap a max_pred_M, au cas ou le nombre de tokens a masquer est trop grand
        n_pred_M =  min(max_pred_M, int_max_number_pred) # max - 15% of tokens in one STU
        
        # every token can be M?
        # We add a candidate list of positions for Masking tokens
        cand_M_pos = []
        # loop through the input_ids_X and find the positions of tokens that are not TOK_2 or TOK_3
        for i, token in enumerate(input_ids_X):
            if token != tokens_2_index_dict[TOK_2] and token != tokens_2_index_dict[TOK_3]:
                cand_M_pos.append(i)

        # Shuffle the candidate positions to randomly select which tokens to mask
        shuffle(cand_M_pos) # Why do we need to shuffle this?
        M_tokens = []
        M_pos = []

        #What is this loop iterating?
        # We select the first n_pred_M positions from the shuffled candidate positions
        # and replace the corresponding tokens in input_ids_X with TOK_4 (which is likely [MASK])
        # We also store the original tokens and their positions in M_tokens and M_pos
        for pos in cand_M_pos[:n_pred_M]:
            M_pos.append(pos)
            M_tokens.append(input_ids_X[pos])
            # TOK4 = [MASK]
            input_ids_X[pos] = tokens_2_index_dict[TOK_4]


        # Why do we need to pad input_ids_X?
        # We add padding just in case the input_ids_X is shorter than maxlen_X
        n_pad = maxlen_X - len(input_ids_X)
        input_ids_X.extend([tokens_2_index_dict[TOK_1]] * n_pad)


        # What would happen if we do not have the conditional?
        # If the number of tokens in M_tokens is less than max_pred_M, we pad it with TOK_1
        if max_pred_M > n_pred_M:
            n_pad = max_pred_M - n_pred_M
            M_tokens.extend([tokens_2_index_dict[TOK_1]] * n_pad)
            
            # BUG HERE! We should pad M_pos with -1 or 0, not TOK_1
            # M_pos.extend([tokens_2_index_dict[TOK_1]] * n_pad
            M_pos.extend([-1] * n_pad)

        # What are we verifying with this conditional?
        if tokens_X1_index + 1 == tokens_X2_index and positive_pair_X < size_B/2:
            lst_B.append([input_ids_X, M_tokens, M_pos, True])
            positive_pair_X += 1

        elif tokens_X1_index + 1 != tokens_X2_index and negative_pair_X < size_B/2:
            lst_B.append([input_ids_X, M_tokens, M_pos, False])
            negative_pair_X += 1

    return lst_B

In [7]:
B = create_B()

In [8]:
input_ids_Xs, M_tokens, M_pos, isNext = map(torch.LongTensor, zip(*B))

## Programming Tasks: Implementing an Encoder Model

In [9]:
class EncoderModel(nn.Module):
    def __init__(self, vocab_size, embed_dim=64, nhead=4, num_layers=8, dim_feedforward=128, max_len=100):
        super().__init__()
        self.word_embedding = nn.Embedding(vocab_size, embed_dim)
        self.positional_embedding = nn.Embedding(max_len, embed_dim)
        self.encoder_layers = nn.ModuleList(
            [nn.TransformerEncoderLayer(d_model=embed_dim, nhead=nhead, dim_feedforward=dim_feedforward) for _ in range(num_layers)]
        )
        
        self.nsp_head = nn.Linear(embed_dim, 2)  # For next sentence prediction
        self.mlm_head = nn.Linear(embed_dim, vocab_size)  # For masked language modeling
        
    def forward(self, input_ids, M_pos):
        # input_ids: shape (batch_size, seq_len)
        batch_size = input_ids.size(0)
        embeddings = self.word_embedding(input_ids)
        positions = torch.arange(0, input_ids.size(1), device=input_ids.device).unsqueeze(0)
        positional_embeddings = self.positional_embedding(positions)
        x = embeddings + positional_embeddings
        
        for encoder in self.encoder_layers:
            x = encoder(x)
        
        cls_token = x[:, 0, :]  # Assuming the first token is the CLS token
        nsp_output = self.nsp_head(cls_token)
        
        mlm_output_list = []
        for i in range(batch_size):
            sentence = x[i]
            mlm_output_list.append(sentence[M_pos[i]])
            
        mlm_output_tensor = torch.cat(mlm_output_list, dim=0)
        
        mlm_output = self.mlm_head(mlm_output_tensor)
        return nsp_output, mlm_output

In [None]:
# Training