In [11]:
import torch
from torch import nn
import numpy as np
import pandas as pd

import random
import math
from typing import Tuple

from torch.utils.data import Dataset, DataLoader
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator

In [2]:
df = pd.read_csv("../data/IMDB.csv")
df

Unnamed: 0,review,sentiment
0,One of the other reviewers has mentioned that ...,positive
1,A wonderful little production. <br /><br />The...,positive
2,I thought this was a wonderful way to spend ti...,positive
3,Basically there's a family where a little boy ...,negative
4,"Petter Mattei's ""Love in the Time of Money"" is...",positive
...,...,...
49995,I thought this movie did a down right good job...,positive
49996,"Bad plot, bad dialogue, bad acting, idiotic di...",negative
49997,I am a Catholic taught in parochial elementary...,negative
49998,I'm going to have to disagree with the previou...,negative


## Build the MaskedLM Dataset

In [3]:
class IMDBMaskedBertDataset(Dataset):

    PAD_TOKEN = 0
    CLS_TOKEN = 1
    SEP_TOKEN = 2
    MASK_TOKEN = 3
    UNK_TOKEN = 4

    SPECIALS = ["[PAD]", "[CLS]", "[SEP]", "[MASK]", "[UNK]"]

    def __init__(self,
                 path: str,
                 max_len: int=10) -> None:
        super().__init__()
        self.max_len = max_len
        self.df = pd.read_csv(path)

        self.tokenizer = get_tokenizer(tokenizer="spacy",
                                       language="en_core_web_sm")

        self.masked_tokens = []
        self.masked_token_idxs = []
        self.sentences = []

        self._prepare_data()
        self.sentences = torch.tensor(self.sentences)
        self.masked_token_idxs = torch.tensor(self.masked_token_idxs)
        self.masked_tokens = torch.tensor(self.masked_tokens).squeeze()


    def __len__(self) -> int:
        return len(self.sentences)
    
    def __getitem__(self, index: int) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
        return self.sentences[index], self.masked_tokens[index], self.masked_token_idxs[index]
    
    def _build_vocab(self, data_iter):
        for sent in data_iter:
            yield self.tokenizer(sent)


    def _prepare_data(self):

        for i in range(self.df.shape[0]):
            for sent in self.df.iloc[i, 0].split('. '):
                if 1 < len(self.tokenizer(sent)) <= self.max_len:
                    self.sentences.append(sent)
        
        self.vocab = build_vocab_from_iterator(self._build_vocab(self.sentences),
                                               min_freq=2,
                                               special_first=True,
                                               specials=self.SPECIALS)
        self.vocab.set_default_index(self.UNK_TOKEN)
        
        self._mask_data()

        for i in range(len(self.sentences)):
            self.sentences[i] = self.vocab(self.sentences[i])
            self.masked_tokens[i] = self.vocab(self.masked_tokens[i])

    
    def _mask_data(self):

        for i in range(len(self.sentences)):
            sentence = self.tokenizer(self.sentences[i])
            mask_idx = random.randint(0, len(sentence)-1)
            self.masked_token_idxs.append(mask_idx+1)
            self.masked_tokens.append([sentence[mask_idx]])
            sentence[mask_idx] = "[MASK]"
            sentence = ["[CLS]"] + sentence + ["[SEP]"]
            while len(sentence)<self.max_len+2:
                sentence.append("[PAD]")
            self.sentences[i] = sentence

In [4]:
masked_ds = IMDBMaskedBertDataset(path="../data/IMDB.csv")

In [5]:
masked_ds[random.randint(0, len(masked_ds)-1)]

(tensor([   1,    3, 1151,  273,    7,  220,   10, 2730,    2,    0,    0,    0]),
 tensor(20),
 tensor(1))

In [6]:
masked_ds.sentences.shape, masked_ds.masked_tokens.shape, masked_ds.masked_token_idxs.shape

(torch.Size([89172, 12]), torch.Size([89172]), torch.Size([89172]))

## DataLoader

In [15]:
BATCH_SIZE = 32
VOCAB_SIZE = len(masked_ds.vocab)
D_MODEL = 768

In [8]:
data_loader = DataLoader(dataset=masked_ds,
                         batch_size=BATCH_SIZE,
                         shuffle=True)

## BERT Embedding

Since the `Next Sentence Prediction` is not carried out in this notebook, we need not implement `Segment Embedding` for Masked LM

In [12]:
class Embedding(nn.Module):

    def __init__(self,
                 max_len: int=10,
                 vocab_size: int=1000,
                 d_model: int=768,
                 dropout_prob: float=0.1)-> None:
        super().__init__()
        
        self.max_len = max_len
        self.vocab_size = vocab_size
        self.d_model = d_model
        self.dropout_prob = dropout_prob

        self.embedding = nn.Embedding(num_embeddings=vocab_size,
                                      embedding_dim=d_model)
        

        pe = torch.zeros(size=(max_len+2, d_model))
        for pos in range(self.max_len):
            for i in range(self.d_model):
                if i%2==0:
                    pe[pos, i] = math.sin(pos/10000**(2*i/d_model))
                else:
                    pe[pos, i] = math.cos(pos/10000**(2*i/d_model))
        
        self.register_buffer("pe", pe)
        self.pe = self.pe.unsqueeze(0)

        self.dropout = nn.Dropout(p=dropout_prob)

    def __str__(self) -> str:
        return f"Embedding(max_len={self.max_len}, vocab_size={self.vocab_size}, d_model={self.d_model}, dropout_prob={self.dropout_prob})"
    
    def __repr__(self) -> str:
        return f"Embedding(max_len={self.max_len}, vocab_size={self.vocab_size}, d_model={self.d_model}, dropout_prob={self.dropout_prob})"
    
    def forward(self, 
                x: torch.Tensor) -> torch.Tensor:
        
        embedded = self.pe + self.embedding(x)
        return self.dropout(embedded)

In [16]:
embedding = Embedding(vocab_size=VOCAB_SIZE,
                      d_model=D_MODEL)
with torch.inference_mode():
    print(embedding(masked_ds[0][0].unsqueeze(0)).shape)

torch.Size([1, 12, 768])


## BERT model for MaskedLM

In [22]:
import sys
sys.path.insert(0, "..")
from scripts.scripts import TransformerEncoder, create_padding_mask

In [23]:
class BERTMaskedLM(nn.Module):

    def __init__(self,
                 num_encoders: int=12,
                 d_model: int=768,
                 num_heads: int=12,
                 d_ff: int=3072,
                 attn_dropout_prob: float=0.1,
                 ff_dropout_prob: float=0.1,
                 output_size: int=1000) -> None:
        
        super().__init__()
        self.encoder_stack = TransformerEncoder(num_encoders=num_encoders,
                                                d_model=d_model,
                                                num_heads=num_heads,
                                                d_ff=d_ff,
                                                attn_dropout=attn_dropout_prob,
                                                ff_dropout=ff_dropout_prob)
        self.classification_head = nn.Linear(in_features=d_model,
                                             out_features=output_size)
    
    def forward(self,
                x: torch.Tensor,
                mask: torch.Tensor,
                masked_token_idxs: torch.Tensor) -> torch.Tensor:
        
        encoder_output = self.encoder_stack(x, mask)
        masked_tokens_context = encoder_output[range(len(masked_token_idxs)), masked_token_idxs]
        return self.classification_head(masked_tokens_context)