# Text Generator
Implementing a text generation model from scratch using a transformer (decoder only).\
Steps:
1. Tokenization
2. Input embedding
3. Positional encoding
4. Masking
5. Self-attention
6. Decoder stack
7. Predicting token probabilities

## Creating Training Data

In [1]:
#conda install pytorch torchvision torchaudio -c pytorch

In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import math
import pandas as pd

In [3]:
class creating_data():
    def __init__(self, filepath):
        self.df = pd.read_csv(filepath)
    
    def save(self, path):
        self.df.to_csv(path)
    

In [4]:
# dataset = creating_data('medium_articles.csv')
# dataset.save('training_data.csv')

## Tokenization

In [5]:
class Tokenizer():
    def __init__(self):
        self.dictionary = {}
        self.reverse_dictionary = {}
        
        # adding special tokens
        self.__add_to_dict('<pad>')
        self.__add_to_dict('<unk>')
        
        # add characters and numbers to dictionary
        for i in range(10):
            self.__add_to_dict(str(i))
        
        for i in range(26):
            self.__add_to_dict(chr(ord('a') + i))
            self.__add_to_dict(chr(ord('A') + i))
            
        # adding space and punctuation
        for char in ['.', ' ', ',', '!', '?', '\n']:
            self.__add_to_dict(char)
        
    def __add_to_dict(self, character):
        if character not in self.dictionary:
            index = self.size()
            self.dictionary[character] = index
            self.reverse_dictionary[index] = character
            
    def tokenize(self, text):
        return [self.dictionary.get(c, self.dictionary['<unk>']) for c in text]
    
    def character_to_token(self, character):
        return self.dictionary.get(character, self.dictionary['<unk>'])
    
    def token_to_character(self, token):
        return self.reverse_dictionary.get(token, '<unk>')
    
    def size(self):
        return len(self.dictionary)

In [6]:
training_data = pd.read_csv('training_data.csv')
training_data = training_data['text']

In [7]:
training_data.head()

0    Photo by Josh Riemer on Unsplash\n\nMerry Chri...
1    Your Brain On Coronavirus\n\nA guide to the cu...
2    Mind Your Nose\n\nHow smell training can chang...
3    Passionate about the synergy between science a...
4    You’ve heard of him, haven’t you? Phineas Gage...
Name: text, dtype: object

In [8]:
training_data = training_data.to_numpy()

In [9]:
# instantiating tokenizer
tokenizer = Tokenizer()
tokenized_data = [tokenizer.tokenize(sentence) for sentence in training_data]

In [10]:
max_sequence_length = 20
# padding and truncating
padded_data = []

for tokens in tokenized_data:
    if len(tokens) < max_sequence_length:
        # padding
        tokens = [tokenizer.character_to_token('<pad>')] * (max_sequence_length - len(tokens)) + tokens
    else:
        # truncating
        tokens = tokens[:max_sequence_length]
    padded_data.append(tokens)

In [11]:
# converting data to tensors
tensor_data = [torch.tensor(tokens) for tokens in padded_data]

## Input Embeddings

In [12]:
class TokenEmbedding(torch.nn.Module):
    # model that converts tokens into embeddings
    
    def __init__(self, model_dim, num_tokens):
        super().__init__()
        self.embedding_layer = torch.nn.Embedding(
            num_embeddings = num_tokens,
            embedding_dim = model_dim
        )
        
    def forward(self, x):
        return self.embedding_layer(x)

In [13]:
model_dim = 50
num_tokens = tokenizer.size()

In [14]:
# initializing class
embedding_model = TokenEmbedding(model_dim, num_tokens)
# convert padded data to tensor
tensor_data = torch.stack(tensor_data)
embedded_data = embedding_model(tensor_data)

In [15]:
# Print the shape of the embedded data to verify
print("Shape of embedded data:", embedded_data.shape)

# Print the first embedded sequence for verification
print("First embedded sequence:", embedded_data[0])

Shape of embedded data: torch.Size([100, 20, 50])
First embedded sequence: tensor([[ 2.5216e+00, -1.3856e+00,  1.2489e+00, -1.4135e+00, -3.5567e-01,
          2.0788e-01, -3.5718e-01,  1.5777e-01,  3.1399e-01, -1.6488e+00,
         -2.5254e+00,  1.3124e+00, -1.1850e+00, -5.9774e-01, -1.0789e+00,
          1.0215e+00, -1.9647e+00, -2.2532e-01, -1.0717e+00, -6.1119e-01,
          8.8459e-01,  2.4578e-01, -2.2455e+00, -6.1866e-01, -4.6586e-02,
          1.0210e+00, -5.7810e-01,  4.4053e-01,  1.1728e+00, -1.3691e+00,
          9.0041e-01,  1.6726e-01,  4.7806e-01,  5.1375e-01,  1.7862e-01,
          2.3372e-01,  1.0847e+00, -2.2002e+00,  9.4830e-01,  1.4117e+00,
         -1.0248e+00,  1.9874e-01,  7.5797e-01, -2.6513e-01, -5.0675e-01,
          5.7517e-01,  6.3405e-01, -2.3527e+00,  8.8564e-01, -8.4895e-01],
        [-4.1990e-01,  6.4952e-01, -2.2711e-01, -2.8818e-01,  2.4474e+00,
          3.3509e-01, -5.7916e-01,  2.0084e+00,  1.0572e-01,  8.0953e-01,
          7.2657e-01, -2.3105e-01, -

## Positional Encoding

In [16]:
class PositionalEncoding(torch.nn.Module):
    def __init__(self, model_dim, max_sequence_length):
        super().__init__()
        self.model_dim = model_dim
        self.max_sequence_length = max_sequence_length
    
        positional_encoding = np.zeros((max_sequence_length, model_dim))
        
        # calculating encoding for each position and dim
        for pos in range(self.max_sequence_length):
            for i in range(0, self.model_dim, 2):
                # sin to even indices
                positional_encoding[pos, i] = np.sin(pos / (10000 ** ((2 * i) / self.model_dim)))
                
                # cos to odd indices
                if i + 1 < self.model_dim:
                    positional_encoding[pos, i + 1] = np.cos(pos / (10000 ** ((2 * i) / self.model_dim)))
                    
        
        self.positional_encoding = torch.from_numpy(positional_encoding).unsqueeze(0).float()
            

    def forward(self, x):
        return x + self.positional_encoding[: x.size(1), :]
        

In [17]:
pos_encoding = PositionalEncoding(model_dim, max_sequence_length)
encoded_data = pos_encoding(embedded_data)

In [18]:
# Print the shape of the encoded data to verify
print("Shape of encoded data:", encoded_data.shape)

# Print the first encoded sequence for verification
print("First encoded sequence:", encoded_data[0])

Shape of encoded data: torch.Size([100, 20, 50])
First encoded sequence: tensor([[ 2.5216e+00, -3.8561e-01,  1.2489e+00, -4.1352e-01, -3.5567e-01,
          1.2079e+00, -3.5718e-01,  1.1578e+00,  3.1399e-01, -6.4878e-01,
         -2.5254e+00,  2.3124e+00, -1.1850e+00,  4.0226e-01, -1.0789e+00,
          2.0215e+00, -1.9647e+00,  7.7468e-01, -1.0717e+00,  3.8881e-01,
          8.8459e-01,  1.2458e+00, -2.2455e+00,  3.8134e-01, -4.6586e-02,
          2.0210e+00, -5.7810e-01,  1.4405e+00,  1.1728e+00, -3.6915e-01,
          9.0041e-01,  1.1673e+00,  4.7806e-01,  1.5137e+00,  1.7862e-01,
          1.2337e+00,  1.0847e+00, -1.2002e+00,  9.4830e-01,  2.4117e+00,
         -1.0248e+00,  1.1987e+00,  7.5797e-01,  7.3487e-01, -5.0675e-01,
          1.5752e+00,  6.3405e-01, -1.3527e+00,  8.8564e-01,  1.5105e-01],
        [ 4.2158e-01,  1.1898e+00,  2.3345e-01,  5.9945e-01,  2.6744e+00,
          1.3090e+00, -4.6974e-01,  3.0024e+00,  1.5818e-01,  1.8082e+00,
          7.5169e-01,  7.6864e-01, -3.

## Masking and Attention

In [19]:
class MaskedSelfAttention(torch.nn.Module):
    def __init__(self, embedding_dimension, head_dimension):
        super().__init__()
        self.embedding_dimension = embedding_dimension
        self.head_dimension = head_dimension
        
        self.query_layer = torch.nn.Linear(self.embedding_dimension, self.head_dimension)
        self.key_layer = torch.nn.Linear(self.embedding_dimension, self.head_dimension)
        self.value_layer = torch.nn.Linear(self.embedding_dimension, self.head_dimension)
        self.softmax = torch.nn.Softmax(dim = -1)
        
    def forward(self, x, mask):
        # x dim - (batch_size, sequence_length, embedding_dim)
        # mask dim - (batch_size, sequence_length, head_dim)
        # output dim - (batch_size, sequence_length)
        
        query = self.query_layer(x)
        key = self.key_layer(x)
        value = self.value_layer(x)
        
        # calculating attention weights and scaling
        attention_weights = torch.matmul(query, key.transpose(-2, -1)) / np.sqrt(self.head_dimension)
        
        # masking
        if mask is not None:
            attention_weights = attention_weights.masked_filled(mask == 0, float('-inf'))
        
        attention_scores = self.softmax(attention_weights)
        return torch.matmul(attention_scores, value)

In [20]:
class MaskedMultiHeadedSelfAttention(torch.nn.Module):
    def __init__(self, embedding_dimension, num_heads):
        super().__init__()
        self.embedding_dimension = embedding_dimension
        self.head_dimension = embedding_dimension // num_heads
        self.num_heads = num_heads
        
        self.self_attentions = torch.nn.ModuleList(
            [MaskedSelfAttention(embedding_dimension, self.head_dimension) for _ in range(self.num_heads)]
        )
        
        self.output_layer = torch.nn.Linear(self.num_heads * self.head_dimension, self.embedding_dimension)
        
    def forward(self, x, mask):
        self_attention_outputs = [self_attention(x, mask) for self_attention in self.self_attentions]
        
        # concatenating outputs
        concatenated_outputs = torch.cat(self_attention_outputs, dim = 2)
        return self.output_layer(concatenated_outputs)

## Decoder

In [21]:
class DecoderLayer(torch.nn.Module):
    def __init__(self, embedding_dim, num_heads, feed_forward_dim, dropout_rate):
        super().__init__()
        
        self.multi_attention = MaskedMultiHeadedSelfAttention(embedding_dim, num_heads)
        self.feed_forward = FeedForward(embedding_dim, feed_forward_dim)
        self.dropout = torch.nn.Dropout(dropout_rate)
        
        self.layer_norm_1 = torch.nn.LayerNorm(embedding_dim)
        self.layer_norm_2 = torch.nn.LayerNorm(embedding_dim)
        
    def forward(self, x, mask):
        x_norm = self.layer_norm_1(x)
        attention_output = self.multi_attention(x_norm, mask)
        residual_output = x + attention_output
        residual_output_norm = self.layer_norm_2(residual_output)
        
        feed_forward_output = self.feed_forward(residual_output_norm)
        
        if self.training:
            feed_forward_output = self.dropout(feed_forward_output)
            
        return residual_output + feed_forward_output

In [22]:
class DecoderStack(torch.nn.Module):
    def __init__(self, embedding_dim, num_layers, num_heads, feed_forward_dim, dropout_rate, max_sequence_length):
        super().__init__()
        
        self.decoder_layers = torch.nn.ModuleList(
            [DecoderLayer(embedding_dim, num_heads, feed_forward_dim, dropout_rate) for _ in range(num_layers)]
        )
        
    def forward(self, x, mask):
        outputs = x
        for layer in self.decoder_layers:
            outputs = layer(outputs, mask)
        
        return outputs

In [23]:
class FeedForward(torch.nn.Module):
    def __init__(self, embedding_dim, feed_forward_dim):
        super().__init__()
        self.linear_1 = torch.nn.Linear(embedding_dim, feed_forward_dim)
        self.linear_2 = torch.nn.Linear(feed_forward_dim, embedding_dim)
    
    def forward(self, x):
        x = self.linear_1(x)
        x = torch.relu(x)
        x = self.linear_2(x)
        
        return x

## Building the Model

In [24]:
class TextGenerator(torch.nn.Module):
    def __init__(self, num_tokens, max_sequence_length = 100, embedding_dim = 512, num_layers = 6, num_heads = 4, feed_forward_dim = None, dropout_rate = 0.1):
        super().__init__()
        self.num_tokens = num_tokens
        self.max_sequence_length = max_sequence_length
        self.embedding_dim = embedding_dim
        self.num_layers = num_layers
        self.num_heads = num_heads
        
        if feed_forward_dim is None:
            self.feed_forward_dim = embedding_dim * 4
        else:
            self.feed_forward_dim = feed_forward_dim
        
        self.dropout_rate = dropout_rate
        
        self.token_embedding = TokenEmbedding(embedding_dim, num_tokens)
        self.positional_encoding = PositionalEncoding(embedding_dim, max_sequence_length)
        self.layer_norm = torch.nn.LayerNorm(embedding_dim)
        
        self.decoder = DecoderStack(embedding_dim, num_layers, num_heads, feed_forward_dim, dropout_rate, max_sequence_length)
        self.generator_head = GeneratorHead(embedding_dim, num_tokens)
        
    def forward(self, x, mask):
        token_embedding = self.token_embedding(x)
        positional_encoding = self.positional_encoding(token_embedding)
        positional_encoding_norm = self.layer_norm(positional_encoding)
        decoder_outputs = self.decoder(positional_encoding_norm, mask)
        generator_outputs = self.generator_head(decoder_outputs)
        
        return generator_outputs

In [25]:
class GeneratorHead(torch.nn.Module):
    def __init__(self, embedding_dim, num_tokens):
        super().__init__()
        self.linear = torch.nn.Linear(embedding_dim, num_tokens)
        
    def forward(self, x):
        return self.linear(x)

## Autoregressive Wrapper

In [26]:
class AutoregressiveWrapper(torch.nn.Module):
    def __init__(self, model):
        super().__init__()
        self.model = model
        
    def forward(self, x, mask):
        inputs, targets = x[:, :-1], x[:, 1:]
        mask = mask[:, :-1]
        
        output = self.model(inputs, mask)
        return output, targets
    
    def next_token_probabilities(self, x, mask, temperature = 1.0):
        logits = self.model(x, mask)[:, -1]
        
        if temperature != 1.0:
            logits /= temperature
        
        probabilities = torch.softmax(logits, dim = -1)
        
        return probabilities

## Training the Model

In [28]:
class Trainer:
    def __init__(self, model, tokenizer: Tokenizer, optimizer = None):
        super().__init__()
        self.model = model
        if optimizer is None:
            self.optimizer = torch.optim.Adam(model.parameters(), lr = 0.001)
        else:
            self.optimizer = optimizer
        
        self.tokenizer = tokenizer
        self.loss_function = torch.nn.CrossEntropyLoss()
        
    def train(self, data, epochs, batch_size):
        loss_epoch = []
        
        for epoch in range(epochs):
            losses = []
            random.shuffle(data)
            
            batches = []
            for i in range(0, len(data), batch_size):
                sequence = torch.tensor(data[i: i + batch_size], dtype = torch.long)
                mask_tensor = torch.ones_like(sequence)
                mask_tensor[sequence == self.tokenizer.character_to_token('<pad>')] = 0
                
                batches.append((sequence, mask_tensor))
                
            for batch in batches:
                self.model.train()
                
                input_tensor = torch.zeros((batch_size, self.model.max_sequence_length + 1), dtype = torch.long)
                mask_tensor = torch.zeros((batch_size, self.model.max_sequence_length + 1), dtype = torch.long)
                
                for i, inp in enumerate(batch[0]):
                    input_tensor[i] = inp
                
                for i, mask in enumerate(batch[1]):
                    mask_tensor[i] = mask
                    
                model_output, target = self.model.forward(x = input_tensor, mask = mask_tensor)
                
                loss = self.loss_function(model.output.transpose(1, 2), target)
                loss.backward()
                
                torch.nn.utils.clip_grad_norm_(self.model.parameters(), 0.5)
                self.optimizer.step()
                self.optimizer.zero_grad()
                losses.append(loss.item())
                
            epoch_loss = np.average(losses)
            loss_epoch.append(epoch_loss)
            print(f"Epoch: {epoch}, Loss: {epoch_loss}")
        
        return loss_epoch