- Tokenize input.
- Create a label with input or clone input tensor.
- Randomly masked some token in input.
- Initialize the model and calculate the loss.
- Finally update weight.

In [11]:
# Import all necessary libraries
import torch
import math
import re
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
from datasets import load_dataset
from transformers import BertTokenizer
from torch.utils.data import Dataset, DataLoader
from torch.nn import TransformerEncoder, TransformerEncoderLayer

# See if there are any GPUs avaliable to train the model
device = "cuda" if torch.cuda.is_available() else "cpu"

# Retrieve the bert tokenizer from the transformers library
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased", do_basic_tokenization = True)

In [12]:
class DatasetClass(Dataset):
    def __init__(self, txt, tokenizer):
        # Retrieves the text 
        self.txt = txt
        # Retrieves the tokenizer
        self.tokenizer = tokenizer

    def __len__(self):
        # Returns the length of the text
        return len(self.txt)

    def __getitem__(self, idx):
        # Gets a specific line from the text
        txt = self.txt[idx]
        return txt

In [13]:
class PositionalEncoding(nn.Module):

    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

In [14]:
class TransformerModel(nn.Module):

    def __init__(self, ntoken, num_input, heads, hidden, layers, dropout=0.5):
        # Initializes the parameters od the model
        super(TransformerModel, self).__init__()
        # Initializes model type
        self.model_type = 'Transformer'
        # Gets the positional encoder 
        self.pos_encoder = PositionalEncoding(num_input, dropout)
        # Sets the laters based on the hyperparameters
        encoder_layers = TransformerEncoderLayer(num_input, heads, hidden, dropout, batch_first=True)
        self.transformer_encoder = TransformerEncoder(encoder_layers, layers)
        self.encoder = nn.Embedding(ntoken, num_input)
        self.num_input = num_input
        self.decoder = nn.Linear(num_input, ntoken)

        self.init_weights()

    def generate_square_subsequent_mask(self, size):
        # Creates an attention mask with zeros across the diagonal
        mask = (torch.triu(torch.ones(size, size)) == 1).transpose(0, 1)
        mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
        return mask

    def init_weights(self):
        # Initializes the weights of the model between the modes before training the dataset
        # Sets the range in which the values of the weights can occur
        initrange = 0.1
        # Sets the weights of the encoder
        self.encoder.weight.data.uniform_(-initrange, initrange)
        # Zeros the weights of the decoder
        self.decoder.bias.data.zero_()
        # Sets the weights of the decoder
        self.decoder.weight.data.uniform_(-initrange, initrange)
        
    def forward(self, txt, txt_mask):
        txt = self.encoder(txt) * math.sqrt(self.num_input)
        txt = self.pos_encoder(txt)
        output = self.transformer_encoder(txt, txt_mask)
        # Decodes the processed output
        output = self.decoder(output)
        return output


In [15]:
def data_collate_fn(dataset_samples_list):
    # Makes the dataset a numpy array instead of a python list
    arr = np.array(dataset_samples_list)
    
    # Tokenizes the data using the BERT tokenizer
    inputs = tokenizer(text=arr.tolist(), padding='max_length', max_length=10000, return_tensors='pt')
    
    # Returns the inputs to the model
    return inputs

def predict(model, input):
    # Uses model.eval to get
    model.eval()
    # Generates a square subsequent mask for the example input
    txt_mask = model.generate_square_subsequent_mask(input.size(1))
    # Inputs the square subsequent mask of the input and the input into the model to predict all the masked tokens
    out = model(input.to(device), txt_mask.to(device))
    # Gets the word with the highest probability from the tokens
    out = out.topk(1).indices.view(-1)
    return out

def mask_random(model, batch, token_list, mask_token, mask_percent=0.15):
    # Gets the input ids from the batch of the dataset
    input = batch['input_ids'].clone()
    # Generates a square subsequent mask of the batch of the dataset
    txt_mask = model.generate_square_subsequent_mask(batch['input_ids'].size(1))
    # Creates an array the size of input id's with random values between 0 and 1
    rand_value = torch.rand(batch.input_ids.shape)
    # Creates a random masking matrix where only values with a vlaue lower then the set percentage are kept (15%)
    rand_mask = (rand_value < mask_percent)
    # Adds parameters to the random mask to make sure special tokens do not get masked
    for token in token_list:
        rand_mask = rand_mask * (input != token)
    # Gets the indexes of all of the tokens to be masked from the dataset
    mask_idx = (rand_mask.flatten() == True).nonzero().view(-1)
    # The inputs are flattened for multiplication
    input = input.flatten()
    # Sets which token is the [MASK] token (103)
    input[mask_idx] = mask_token
    input = input.view(batch['input_ids'].size())
    
    return input, txt_mask

def train(model, dataloader, epochs=500):
    model.train()
    total_loss = 0
    # Sets how loss will be calcuated using Cross Entropy
    criterion = nn.CrossEntropyLoss()
    # Sets the optimizer to be AdamW
    optimizer = torch.optim.AdamW(model.parameters(), lr=0.0001)
    
    for epoch in range(epochs):
        for batch in dataloader:
            # Zeroes the gradient used for gradient descent
            optimizer.zero_grad()
            mask_token = 103
            token_list = [101, 102, 0]
            input, txt_mask = mask_random(model, batch, token_list, mask_token, 0.15)

            # Gets the output by entering the masked input and unmasked input into the model infrastructure
            out = model(input.to(device), txt_mask.to(device))
            # Calculated the loss of the model (-log p(word|sentence))
            loss = criterion(out.view(-1, vocab_size), batch['input_ids'].view(-1).to(device))
            # Adds the loss of each batch to the total loss
            total_loss += loss
            # Does backward propagation
            loss.backward()
            # Makes the optimizer take a step
            optimizer.step()
    
        # Prints the total loss and which step the model is on every 40 epochs
        print("Epoch: {} -> loss: {}".format(epoch+1, total_loss/(len(dataloader)*epoch+1)))

In [16]:
dataset = load_dataset("rotten_tomatoes")
train_text = dataset['train']['text']

for i, sentence in enumerate(train_text):
    new_sentence = re.sub(r"[^a-zA-Z0-9 ]", "", sentence)
    train_text[i] = new_sentence



  0%|          | 0/3 [00:00<?, ?it/s]

In [17]:
dataset = DatasetClass(train_text, tokenizer)
dataloader = DataLoader(dataset, batch_size=4, collate_fn=data_collate_fn)

In [18]:
vocab_size = tokenizer.vocab_size # the size of vocabulary
epochs = 400
embedding = 200 # embedding dimension
hidden = 200 # the dimension of the feedforward network model in nn.TransformerEncoder
layers = 2 # the number of nn.TransformerEncoderLayer in nn.TransformerEncoder
heads = 2 # the number of heads in the multiheadattention models
dropout = 0.2 # the dropout value
model = TransformerModel(vocab_size, embedding, heads, hidden, layers, dropout).to(device)

In [19]:
train(model, dataloader, epochs)

KeyboardInterrupt: 

In [103]:
#torch.save(model, 'models')

In [109]:
#model = torch.load('models')

print("Input: {}".format(text[0]))
pred_inp = tokenizer("Illiterate [MASK] get in the way of science.", return_tensors='pt')
out = predict(model, pred_inp['input_ids'])
print("Output: {}\n".format(tokenizer.decode(out)))

Input: Don't speak ill of others.
Output: [CLS] illiterate is get in the way of science. [SEP]

