In [None]:
# Install as needed
#!pip install sentencepiece torch pandas numpy

In [None]:
import random
from typing import List

import torch
import pandas as pd
import numpy as np

import matplotlib.pyplot as plt
import sentencepiece as spm

In [27]:
#Check if GPU is available
def get_device():
    return torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Dataset

In [None]:
import_size = 500000

with open('raw_corpus.txt', 'r', encoding="utf-8-sig") as f:
  data = f.readlines()[:import_size]

data = [line.rstrip() for line in data]
dataset = pd.DataFrame(data, columns=['text'])
dataset = dataset[dataset["text"].str.strip().astype(bool)]
dataset.head()

In [None]:
dataset.__len__()

In [None]:
data = ' '.join(dataset["text"].tolist())

# Components definitions

In [None]:
#Defines a token embedding layer, essentially replaces each token index in the input with its learned vector representation.
class Embedding(torch.nn.Module):
    def __init__(
            self,
            embedding_dimension,
            vocab_size
    ):
        super().__init__()
        self.embedding_layer = torch.nn.Embedding(
            num_embeddings=vocab_size,
            embedding_dim=embedding_dimension
        )

    def forward(self, x):
        return self.embedding_layer(x)

In [None]:
#Applies the positional encoding to the input embeddings
class PositionalEncoding(torch.nn.Module):
    def __init__(self, embedding_dimension, max_sequence_length):
        super().__init__()
        self.embedding_dimension = embedding_dimension
        self.max_sequence_length = max_sequence_length
        self.positional_encoding = self.create_positional_encoding()
    # Creates a sinusoidal positional encoding matrix of size (max_sequence_length, embedding_dimension).
    def create_positional_encoding(self):
        positional_encoding = np.zeros((self.max_sequence_length, self.embedding_dimension))
        for pos in range(self.max_sequence_length):
            for i in range(0, self.embedding_dimension, 2):
                positional_encoding[pos, i] = np.sin(pos / (10000 ** ((2 * i) / self.embedding_dimension)))
                positional_encoding[pos, i + 1] = np.cos(pos / (10000 ** ((2 * (i + 1)) / self.embedding_dimension)))
        return torch.from_numpy(positional_encoding).float().to(get_device())
    #Applies the positional encoding to the token embeddings.
    def forward(self, x):
        return x + self.positional_encoding[:x.size(1), :]

In [None]:
#Defines a PyTorch module for a self-attention layer that computes attention weights between tokens, with masked tokens excluded from the calculation of the weights.
class MaskedSelfAttention(torch.nn.Module):
    def __init__(self, embedding_dimension, head_dimension):
        super().__init__()
        self.embedding_dimension = embedding_dimension
        self.head_dimension = head_dimension
        self.query_layer = torch.nn.Linear(embedding_dimension, self.head_dimension)
        self.key_layer = torch.nn.Linear(embedding_dimension, self.head_dimension)
        self.value_layer = torch.nn.Linear(embedding_dimension, self.head_dimension)
        self.softmax = torch.nn.Softmax(dim=-1)
    
    #Applies the self attention
    def forward(self, x, mask):
        # x is the input
        # The query, key and value are created by linear transformation of the input
        query = self.query_layer(x)
        key = self.key_layer(x)
        value = self.value_layer(x)

        # Calculate the attention weights.
        # Attention weights are computed by matrix multiplication of query and transposed key
        attention_weights = torch.matmul(query, key.transpose(-2, -1))

        # The attention weights are scaled to avoid issues with small gradients
        attention_weights = attention_weights / np.sqrt(self.head_dimension)

        # A mask is applied to the attention weights to ensure that the model does not pay attention to certain tokens in the sequence
        # Mask values are 0 (token is masked) or 1 (token is not masked)
        mask = mask.reshape(attention_weights.shape[0], 1, attention_weights.shape[2])
        attention_weights = attention_weights.masked_fill(mask == 0, -1e9)

        # The softmax function is applied to the attention weights to convert them into a probabilistic distribution
        # Tokens with high attention scores are emphasized, while those with low scores are de-emphasized
        attention_scores = self.softmax(attention_weights)
        return torch.bmm(attention_scores, value)

In [None]:
#Module that combines the outputs of multiple self attention modules to learn different relationships between each token
class MaskedMultiHeadedSelfAttention(torch.nn.Module):
    def __init__(self, embedding_dimension, number_of_heads):
        super().__init__()
        
        # Set the parameters for multi-headed self attention
        self.embedding_dimension = embedding_dimension
        self.head_dimension = embedding_dimension // number_of_heads
        self.number_of_heads = number_of_heads

        # Create the self attention modules
        self.self_attentions = torch.nn.ModuleList(
            [MaskedSelfAttention(embedding_dimension, self.head_dimension) for _ in range(number_of_heads)])

        # Create a linear layer to combine the outputs of the self attention modules
        self.output_layer = torch.nn.Linear(number_of_heads * self.head_dimension, embedding_dimension)

    
    #Apply multi-headed self attention to the input.
    # mask values are: 0 or 1. 0 means the token is masked, 1 means the token is not masked.
    def forward(self, x, mask):
        # Compute the self attention for each head
        self_attention_outputs = [self_attention(x, mask) for self_attention in self.self_attentions]

        # Concatenate the self attention outputs
        concatenated_self_attention_outputs = torch.cat(self_attention_outputs, dim=2)

        # Apply the output layer to the concatenated self attention outputs
        return self.output_layer(concatenated_self_attention_outputs)

In [None]:
# Defines a PyTorch module that implements a feedforward layer.
# A feedforward layer is a fully connected neural network layer that applies the Rectified Linear Unit (ReLU) activation function.
class FeedForward(torch.nn.Module):
    def __init__(self, embedding_dimension, feed_forward_dimension):
        super().__init__()
        self.embedding_dimension = embedding_dimension
        self.feed_forward_dimension = feed_forward_dimension

        # Implements the first linear layer with embedding_dimension inputs and feed_forward_dimension outputs.
        self.linear_1 = torch.nn.Linear(embedding_dimension, feed_forward_dimension)
        
        # Implements the second linear layer with feed_forward_dimension inputs and embedding_dimension outputs.
        self.linear_2 = torch.nn.Linear(feed_forward_dimension, embedding_dimension)

    #Computes the output of the feedforward layer given an input tensor x.
    def forward(self, x):
        return self.linear_2(torch.relu(self.linear_1(x)))

# Decoder Layer

In [None]:
#A PyTorch module for a single layer of a decoder.
#A decoder layer comprises of a multi-headed self-attention layer, followed by a feedforward network layer 
#and a layer normalization, with dropout applied after the feedforward layer.
class Decoder(torch.nn.Module):
    def __init__(
            self,
            embedding_dimension,
            number_of_heads,
            feed_forward_dimension,
            dropout_rate
    ):
        super().__init__()
        self.embedding_dimension = embedding_dimension
        self.number_of_heads = number_of_heads
        self.feed_forward_dimension = feed_forward_dimension
        self.dropout_rate = dropout_rate

        # initialize layers
        self.multi_headed_self_attention = MaskedMultiHeadedSelfAttention(embedding_dimension, number_of_heads)
        self.feed_forward = FeedForward(embedding_dimension, feed_forward_dimension)
        self.dropout = torch.nn.Dropout(dropout_rate)
        self.layer_normalization_1 = torch.nn.LayerNorm(embedding_dimension)
        self.layer_normalization_2 = torch.nn.LayerNorm(embedding_dimension)

    #Applies the decoder layer
    def forward(self, x, mask):

        # First Layer normalization
        normalized_x = self.layer_normalization_1(x)

        # Multi headed self attention
        self_attention_output = self.multi_headed_self_attention(normalized_x, mask)

        # Residual connection
        normalized_residual = x + self_attention_output

        # second Layer normalization
        normalized_residual_output = self.layer_normalization_2(normalized_residual)

        # Feed forward
        feed_forward_output = self.feed_forward(normalized_residual_output)

        # Dropout
        if self.training:
            feed_forward_output = self.dropout(feed_forward_output)

        # Residual output
        return normalized_residual + feed_forward_output

# Decoder Stack

In [None]:
# This module creates a stack of decoder layers for the decoder model.
class DecoderStack(torch.nn.Module):
    def __init__(
            self,
            embedding_dimension,
            number_of_layers,
            number_of_heads,
            feed_forward_dimension,
            dropout_rate,
            max_sequence_length
    ):
        super().__init__()
        self.embedding_dimension = embedding_dimension
        self.number_of_layers = number_of_layers
        self.number_of_heads = number_of_heads
        self.feed_forward_dimension = feed_forward_dimension
        self.dropout_rate = dropout_rate
        self.max_sequence_length = max_sequence_length

        # Create the encoder layers
        self.encoder_layers = torch.nn.ModuleList(
            [Decoder(embedding_dimension, number_of_heads, feed_forward_dimension, dropout_rate) for _ in
             range(number_of_layers)])

    # the input sequence is passed through the layers sequentially
    def forward(self, x, mask):
        decoder_outputs = x
        
        for decoder_layer in self.encoder_layers:
            decoder_outputs = decoder_layer(decoder_outputs, mask)

        return decoder_outputs


# The "LMHead" Class

In [None]:
# PyTorch module for the language model head.
# The language model head is a linear layer that maps the embedding dimension to the vocabulary size.
class LMHead(torch.nn.Module):
    def __init__(self, embedding_dimension, vocab_size):
        super().__init__()
        self.embedding_dimension = embedding_dimension
        self.vocab_size = vocab_size
        
        # Define a linear layer
        self.linear = torch.nn.Linear(embedding_dimension, vocab_size)
    
    # Compute the linear layer
    def forward(self, x):
        # Linear output dimensions
        logits = self.linear(x)

        return logits

# Language Model

In [None]:
# Defines the LanguageModel
class LanguageModel(torch.nn.Module):
    def __init__(
            self,
            vocab_size,  # The number of tokens in the vocabulary
            max_sequence_length=128,  # The maximum sequence length to use for attention
            embedding_dimension=256,  # The dimension of the token embeddings
            number_of_layers=3,  # The number of decoder layers to use
            number_of_heads=2,  # The number of attention heads to use
            feed_forward_dimension=None,  # The dimension of the feed forward layer
            dropout_rate=0.1  # The dropout rate to use
    ):
        super().__init__()
        self.vocab_size = vocab_size
        self.max_sequence_length = max_sequence_length
        self.embedding_dimension = embedding_dimension
        self.number_of_layers = number_of_layers
        self.number_of_heads = number_of_heads

        # If feed_forward_dimension is not set, use 4 times the embedding_dimension as in GPT-2
        if feed_forward_dimension is None:
            self.feed_forward_dimension = embedding_dimension * 4
        else:
            self.feed_forward_dimension = feed_forward_dimension

        self.dropout_rate = dropout_rate

        # Token embedding layer
        self.token_embedding = Embedding(embedding_dimension, vocab_size)

        # Positional encoding layer
        self.positional_encoding = PositionalEncoding(embedding_dimension, max_sequence_length)

        # Normalization layer
        self.layer_normalization = torch.nn.LayerNorm(embedding_dimension)

        # Decoder stack
        self.decoder = DecoderStack(
            embedding_dimension=embedding_dimension,
            number_of_layers=number_of_layers,
            number_of_heads=number_of_heads,
            feed_forward_dimension=self.feed_forward_dimension,
            dropout_rate=dropout_rate,
            max_sequence_length=max_sequence_length
        )

        # Language model head
        self.lm_head = LMHead(embedding_dimension, vocab_size)

    # Define the forward pass for the model
    def forward(self, x, mask):
        # Compute the token embeddings
        token_embeddings = self.token_embedding(x)

        # Compute the positional encoding
        positional_encoding = self.positional_encoding(token_embeddings)

        # Post embedding layer normalization
        positional_encoding_normalized = self.layer_normalization(positional_encoding)

        # Feed the data through the decoder stack
        decoder_outputs = self.decoder(positional_encoding_normalized, mask)

        # Feed the output through the language model head
        lm_head_outputs = self.lm_head(decoder_outputs)

        # Return the output of the language model head
        return lm_head_outputs
    
    # Save a checkpoint of the model
    def save_checkpoint(self, path):
        print(f'Saving checkpoint {path}')
        torch.save({
            'vocab_size': self.vocab_size,
            'max_sequence_length': self.max_sequence_length,
            'embedding_dimension': self.embedding_dimension,
            'number_of_layers': self.number_of_layers,
            'number_of_heads': self.number_of_heads,
            'feed_forward_dimension': self.feed_forward_dimension,
            'dropout_rate': self.dropout_rate,
            'model_state_dict': self.state_dict()
        }, path)

    # Loads a checkpoint of the model
    @staticmethod
    def load_checkpoint(path) -> 'LanguageModel':
        checkpoint = torch.load(path)
        model = LanguageModel(
            vocab_size=checkpoint['vocab_size'],
            max_sequence_length=checkpoint['max_sequence_length'],
            embedding_dimension=checkpoint['embedding_dimension'],
            number_of_layers=checkpoint['number_of_layers'],
            number_of_heads=checkpoint['number_of_heads'],
            feed_forward_dimension=checkpoint['feed_forward_dimension'],
            dropout_rate=checkpoint['dropout_rate']
        )
        model.load_state_dict(checkpoint['model_state_dict'])
        return model.to(get_device())

In [None]:
#This PyTorch module serves as a wrapper for a GPT model, enabling it to function in an autoregressive manner.
class AutoregressiveWrapper(torch.nn.Module):
    def __init__(self, s_model):
        super().__init__()
        self.model = s_model
        self.max_sequence_length = self.model.max_sequence_length

    #Autoregressive forward pass
    def forward(self, x, mask):
        inp, target = x[:, :-1], x[:, 1:]
        mask = mask[:, :-1]

        output = self.model(inp, mask)
        return output, target

    #Calculate the token probabilities for the next token in the sequence.
    def next_token_probabilities(self, x, mask, temperature=1.0):
        logits = self.model(x, mask)[:, -1]

        # Apply temperature
        if temperature != 1.0:
            logits = logits / temperature

        # Apply the softmax
        probabilities = torch.softmax(logits, dim=-1)

        return probabilities

    def save_checkpoint(self, path):
        self.model.save_checkpoint(path)

    #Load a checkpoint from a file.
    @staticmethod
    def load_checkpoint(path) -> 'AutoregressiveWrapper':
        model = LanguageModel.load_checkpoint(path)
        return AutoregressiveWrapper(model).to(get_device())

In [None]:
#This class uses the tokenizer made with SentencePiece, trained in the tokenizer notebook
class Tokenizer:
    def __init__(self):
        self.sp = spm.SentencePieceProcessor()
        self.sp.load("spanish_lm.model")

    def tokenize(self, text):
        return self.sp.Encode(text)

    def character_to_token(self, character):
        return self.sp.Encode(character)

    def token_to_character(self, token):
        return self.sp.Decode(token)

    def size(self):
        return self.sp.GetPieceSize()

# Trainer

In [None]:
#Trainer
class Trainer:
    def __init__(self, model, tokenizer: Tokenizer, optimizer=None):
        super().__init__()

        self.model = model
        self.tokenizer = tokenizer
        self.loss_function = torch.nn.CrossEntropyLoss()

        if optimizer is None:
            self.optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
        else:
            self.optimizer = optimizer
            

    #Trains the language model on the given data for the specified number of epochs.
    def train(self, data: List[str], epochs, batch_size):
        loss_per_epoch = []
        for epoch in range(epochs):
            losses = []

            # Shuffle the sequences
            random.shuffle(data)

            # Create batches of sequences and their respective mask.
            batches = []
            for i in range(0, len(data), batch_size):
                sequence_tensor = torch.tensor(data[i: i + batch_size], dtype=torch.long)

                # Create the mask tensor for the batch, where 1 means the token is not a padding token
                mask_tensor = torch.ones_like(sequence_tensor)
                mask_tensor[sequence_tensor == self.tokenizer.character_to_token('<pad>')[1]] = 0

                batches.append((sequence_tensor, mask_tensor))

            # Train the model on each batch
            for batch in batches:
                self.model.train()

                # Create the input and mask tensors
                input_tensor = torch.zeros((batch_size, self.model.max_sequence_length + 1), dtype=torch.long)
                mask_tensor = torch.zeros((batch_size, self.model.max_sequence_length + 1), dtype=torch.long)

                for i, input_entry in enumerate(batch[0]):
                    input_tensor[i] = input_entry

                for i, mask_entry in enumerate(batch[1]):
                    mask_tensor[i] = mask_entry

                # Compute the model output
                model_output, target = self.model.forward(
                    x=input_tensor.to(get_device()),
                    mask=mask_tensor.to(get_device())
                )

                # Compute the losses
                loss = self.loss_function(model_output.transpose(1, 2), target)

                # Backpropagate the loss.
                loss.backward()

                # Clip the gradients. This is used to prevent exploding gradients.
                torch.nn.utils.clip_grad_norm_(self.model.parameters(), 0.5)

                # Update the model parameters.
                self.optimizer.step()

                # Reset the gradients.
                self.optimizer.zero_grad()

                # Append the loss to the list of losses, so that the average loss can be computed for this epoch.
                losses.append(loss.item())

            # Print the loss
            epoch_loss = np.average(losses)
            loss_per_epoch.append(epoch_loss)
            print('Epoch:', epoch, 'Loss:', epoch_loss)

        return loss_per_epoch

In [None]:
def pad_left(sequence, final_length, padding_token):
    return [padding_token] * (final_length - len(sequence)) + sequence

# Language Generator

In [None]:
# Defines a Generator class, which takes a language model and a tokenizer as input to generate its output 
class Generator:
    def __init__(
            self,
            model,
            tokenizer):
        self.model = model
        self.tokenizer = tokenizer

    # Converts the input prompt to a tensor and computes the corresponding mask tensor.
    def generate(
            self,
            generated_sentence_length: int = 20,
            prompt: str = None,
            temperature: float = 1.0,
            last_token_position: int = None,
            padding_token: int = 0): #0

        self.model.eval()

        if prompt is None:
            start_tokens = [self.tokenizer.character_to_token(padding_token)[1]]
        else:
            start_tokens = self.tokenizer.tokenize(prompt)

        input_tensor = torch.tensor(
            pad_left(
                sequence=start_tokens,
                final_length=self.model.max_sequence_length + 1,
                padding_token=padding_token
            ),
            dtype=torch.long
        ).to(get_device())

        num_dims = len(input_tensor.shape)

        if num_dims == 1:
            input_tensor = input_tensor[None, :]

        out = input_tensor

        for _ in range(generated_sentence_length):
            x = out[:, -self.model.max_sequence_length:]

            mask = torch.ones_like(x)
            mask[x == padding_token] = 0

            # Compute the next token probabilities
            next_token_probabilities = self.model.next_token_probabilities(
                x=x,
                temperature=temperature,
                mask=mask
            )

            # Sample the next token from the probability distribution
            prediction = torch.multinomial(next_token_probabilities, num_samples=1)

            # Append the next token to the output
            out = torch.cat([out, prediction], dim=1)

            if last_token_position is not None and prediction == last_token_position:
                break

        generated_tokens = out[0].tolist()

        return ''.join([self.tokenizer.token_to_character(token) for token in generated_tokens])

In [None]:
def generate_training_sequences(max_sequence_length, tokenized_training_data):
    sequences = []
    for i in range(0, len(tokenized_training_data) - max_sequence_length - 1):
        sequences.append(tokenized_training_data[i: i + max_sequence_length + 1])
    return sequences

In [None]:
def preprocess_training_data(max_sequence_length, tokenizer, training_data):
    # Tokenize the training data
    tokenized_training_data = tokenizer.tokenize(training_data)

    for _ in range(max_sequence_length):
        #Apply padding
        tokenized_training_data.insert(0, tokenizer.character_to_token('<pad>')[1])
    return tokenized_training_data

# Model runner

In [None]:
# Runs the small autoregressive language model using PyTorch
class LanguageModelRunner(torch.nn.Module):
    def __init__(self):
        super().__init__()

    def run_model(self):
        model_name = 'decoder_only_small_4AH_256EMB_3LYR'
          
        # Create the tokenizer
        tokenizer = Tokenizer()

        embedding_dimension = 256
        max_sequence_length = 128
        vocab_size = tokenizer.size()
        dataset_size = 250000

        # Create the model
        model = AutoregressiveWrapper(LanguageModel(
            embedding_dimension=embedding_dimension,
            vocab_size=vocab_size,
            number_of_heads=4,
            number_of_layers=3,
            dropout_rate=0.1,
            max_sequence_length=max_sequence_length
        )).to(get_device())

        # Create the training data
        with open('raw_corpus.txt', 'r', encoding="utf-8-sig") as f:
            data = f.readlines(dataset_size) #[next(f).rstrip() for x in range(250000)]

        data = [line.rstrip() for line in data]
        dataset = pd.DataFrame(data, columns=['text'])
        dataset = dataset[dataset["text"].str.strip().astype(bool)]
        training_data = ' '.join(dataset["text"].tolist())

        del dataset, data

        tokenized_and_padded_training_data = preprocess_training_data(max_sequence_length, tokenizer, training_data)
        sequences = generate_training_sequences(max_sequence_length, tokenized_and_padded_training_data)


        # Train the model
        optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
        trainer = Trainer(model, tokenizer, optimizer)
        loss_per_epoch = trainer.train(sequences, epochs=20, batch_size=32)

        # Plotting the loss per epoch in log scale
        fig, ax = plt.subplots(figsize=(8, 6))
        ax.plot(loss_per_epoch, color='blue')
        ax.set_yscale('log')
        ax.set_title('Training Loss over Epochs', fontsize=16, fontweight='bold')
        ax.set_xlabel('Epoch', fontsize=14)
        ax.set_ylabel('Loss', fontsize=14)
        ax.tick_params(axis='both', which='major', labelsize=12)
        ax.spines['right'].set_visible(False)
        ax.spines['top'].set_visible(False)
        plt.show()

        model.save_checkpoint(model_name)
        

In [None]:
LanguageModelRunner().run_model()

# Testing the generator

In [None]:
# Generate text
tokenizer = Tokenizer()

# Hyperparameters
embedding_dimension = 256 
max_sequence_length = 128
vocab_size = tokenizer.size()
number_of_heads = 4 
number_of_layers = 3 

# Initialize model
model = AutoregressiveWrapper(LanguageModel(
            embedding_dimension = embedding_dimension,
            max_sequence_length = max_sequence_length,
            number_of_heads = number_of_heads, 
            number_of_layers = number_of_layers,
            dropout_rate = 0.1,
            vocab_size = vocab_size
        )).to(get_device())

# load the checkpoint into the model
model.load_checkpoint('decoder_only_small_4AH_256EMB_3LYR')

generated_sentence_length = 50
generator = Generator(model, tokenizer)
generated_text = generator.generate(
    generated_sentence_length=generated_sentence_length,
    prompt="Quiero ir a ",
    padding_token=tokenizer.character_to_token('<pad>')[1]
)

print(generated_text.replace('<pad>', ''))