# **GD AI - Building Transformers The GD Way!**



> In this activity, each of the chosen GDs, will fill out their designated code sections to win some cool GDPoints. Top 3 GDs to finish early will receive 20,15 and 10 GDPoints respectively ;) Happy Coding!




In [None]:
!pip install torch torchtext

Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch)
  Using cached nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (23.7 MB)
Collecting nvidia-cuda-runtime-cu12==12.1.105 (from torch)
  Using cached nvidia_cuda_runtime_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (823 kB)
Collecting nvidia-cuda-cupti-cu12==12.1.105 (from torch)
  Using cached nvidia_cuda_cupti_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (14.1 MB)
Collecting nvidia-cudnn-cu12==8.9.2.26 (from torch)
  Using cached nvidia_cudnn_cu12-8.9.2.26-py3-none-manylinux1_x86_64.whl (731.7 MB)
Collecting nvidia-cublas-cu12==12.1.3.1 (from torch)
  Using cached nvidia_cublas_cu12-12.1.3.1-py3-none-manylinux1_x86_64.whl (410.6 MB)
Collecting nvidia-cufft-cu12==11.0.2.54 (from torch)
  Using cached nvidia_cufft_cu12-11.0.2.54-py3-none-manylinux1_x86_64.whl (121.6 MB)
Collecting nvidia-curand-cu12==10.3.2.106 (from torch)
  Using cached nvidia_curand_cu12-10.3.2.106-py3-none-manylinux1_x86_64.whl (56.5 MB)
Collectin

In [7]:
#Import relevant libraries
import torch
import torch.nn as nn
import torch.optim as optim
import torchtext
torchtext.disable_torchtext_deprecation_warning() #Disable warnings
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from torch.utils.data import DataLoader, Dataset
import math
import time



> Initialize the following modules within the Transformer class:

*   Positional Encoder to add positional information to input tokens
*   Embedding Layer to convert tokens to vectors
*   Transformer Module (Feed-Forward Neural Network) to process these vectors
*   Linear Decoder to convert processed vectors back to token probabilities





In [8]:
class TransformerModel(nn.Module):

    ### GD On Duty: ###
    def __init__(self, ntoken, d_model, nhead, d_hid, nlayers, dropout=0.5):
    # This function will initialize the instance variables of the Transformer model pipeline
        super(TransformerModel, self).__init__()
        self.model_type = 'Transformer'
        self.src_mask = None
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        self.encoder = nn.Embedding(ntoken, d_model)
        self.transformer_encoder = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model, nhead, d_hid, dropout), nlayers)
        self.decoder = nn.Linear(d_model, ntoken)
        self.init_weights()

############################################################
# Key QnA:
# 1) Why do we need to uniformly initialize the weights of the embedding encoding layer in a transformer? Ans) To prevent vanishing/exploding gradients
# 2) Why do we need to set the biases of the decoder layer to zero? Ans) To provide a neutral starting point, allowing the model to learn appropriate biases from the data without initial bias influence.
############################################################

    def init_weights(self):
    # This function will initialize weights of the model's components to improve training stability and performance
        initrange = 0.1 # Sets a range value for initializing the weights.
        self.encoder.weight.data.uniform_(-initrange, initrange) # Initializes the weights of the embedding layer self.encoder with values uniformly sampled from the range [−0.1,0.1].
        self.decoder.bias.data.zero_() # Sets the biases of the linear layer self.decoder to zero.
        self.decoder.weight.data.uniform_(-initrange, initrange) # Initializes the weights of the linear layer self.decoder with values uniformly sampled from the range [−0.1,0.1].
    # These initializations help in starting the training process with small, random values for weights and zero biases, which is generally a good practice to facilitate effective learning.

    ### GD On Duty: ###
    def forward(self, src, src_mask):
    # This function identifies a forward pass of the data through the Transformer pipeline
        src = self.encoder(src) * math.sqrt(d_model)
        src = self.pos_encoder(src)
        output = self.transformer_encoder(src, src_mask)
        output = self.decoder(output)
        return output

    def save_model(self, path):
      torch.save(self.state_dict(), path)
      print(f"Model saved to {path}")

    def generate_square_subsequent_mask(self, sz):
    # This mask ensures that each position in a sequence attends only to positions before it, not to positions that come after it. It's like giving the model a hint to focus only on past
    # information when processing each token, helping it learn dependencies without "cheating" by looking at future tokens during training.
        mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
        mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
        return mask


> Initialize the following modules within the PositionalEncoding class:

*   Dropout Layer - Helps prevent overfitting
*   Positional Encoding Matrix - To provide information about the position of each token in the sequence


In [9]:
class PositionalEncoding(nn.Module):

    ### GD On Duty: ###
    def __init__(self, d_model, dropout=0.1, max_len=5000):
    # This function adds positional information to token embeddings in a sequence, which is crucial for the Transformer model to capture the order of tokens.
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout) # Define a drop out layer with the specified probability

        #Create a positional encoding matrix - This matrix will encode the sequence order within the embedded tokens
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
    # This function represents a forward pass on Positional Encoding within the Transformer model pipeline
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)


> The TextDataset class will process a given text into "input-target" pairs of token sqequences where:
*    The input sequence includes all tokens except the last one
*    The output sequence includes all tokens except the first one (shifted by one position)

The purpose of this exercise is to have the model learn to predict the next token in the sequence based on the preceding tokens, for successful sequence generation.

In [10]:
class TextDataset(Dataset):

    ### GD On Duty: ###
    def __init__(self, text, vocab, tokenizer, seq_len=30):
    # This function processes a given text into input-target pairs of token sequences.

        # Initializes the dataset with the given text, vocabulary, tokenizer, and sequence length
        self.data = []
        self.vocab = vocab
        self.tokenizer = tokenizer
        self.seq_len = seq_len
        tokens = self.tokenizer(text)
        for i in range(len(tokens) - seq_len):
            seq = tokens[i:i + seq_len + 1]
            # Split each sequence of tokens into two parts: 1) Input Sequence: which includes all tokens except the last one. 2) Target Sequence: which includes all tokens except the first one (shifted by one position).
            self.data.append((torch.tensor(self.vocab(seq[:-1]), dtype=torch.long),
                              torch.tensor(self.vocab(seq[1:]), dtype=torch.long)))

    def __len__(self):
    # Returns the length of the dataset
        return len(self.data)

    def __getitem__(self, idx):
    # Returns the specific input-target pair by its position in the dataset.
        return self.data[idx]

    def tokenize_and_build_vocab(text, tokenizer):
    # This function tokenizes a given text, builds a vocabulary including special tokens for unknown words and padding, and ensures that any unknown words will be replaced with a specific token ('<unk>')
      tokens = tokenizer(text)
      vocab = build_vocab_from_iterator([tokens], specials=['<unk>', '<pad>'])
      vocab.set_default_index(vocab['<unk>'])
      return vocab




> The train_model function will:
*   Initialize model platform device - Ensure the model is trained on the appropriate hardware for optimal performance and efficiency.
*   Split the input data batch into source and target (padding both for equal lengths) - Allows the model learns to predict the next token based on preceding tokens. Padding ensures uniformity in batch processing.
*   Load the data and train the model


In [11]:
def train_model(model, dataset, vocab, epochs=20, batch_size=32, seq_len=30):

    ### GD On Duty: ###
    # Initialize the selected device to run the model on
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001) ##EXPERIMENT WITH CHANGING THIS VALUE

    def collate_batch(batch):
    # This function splits the input batch into source and target, padding both to be of the same length
        src_batch, tgt_batch = [], []
        for src, tgt in batch:
            src_batch.append(src)
            tgt_batch.append(tgt)
        src_batch = torch.nn.utils.rnn.pad_sequence(src_batch, padding_value=vocab['<pad>'])
        tgt_batch = torch.nn.utils.rnn.pad_sequence(tgt_batch, padding_value=vocab['<pad>'])
        return src_batch, tgt_batch

    # Load the dataset in batches, shuffle and pad each batch
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_batch)

    # Train the model
    model.train()
    for epoch in range(epochs):
        total_loss = 0.
        start_time = time.time()

        for i, (src, tgt) in enumerate(dataloader):
            src, tgt = src.to(device), tgt.to(device)
            optimizer.zero_grad()

            src_mask = model.generate_square_subsequent_mask(src.size(0)).to(device)
            output = model(src, src_mask)
            loss = criterion(output.view(-1, output.size(-1)), tgt.view(-1))
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        elapsed = time.time() - start_time
        print(f'Epoch {epoch+1}, Loss: {total_loss:.2f}, Time: {elapsed:.2f}s')



### Finally, putting it all together!


> In the code snippet below, we will run the main function which uses the harry potter text data file from Kaggle as input to the Transformer model pipeline



In [18]:
if __name__ == "__main__":

    ### GD On Duty: ###
    #Use input dataset for modeling as Harry Potter text data from Kaggle (https://www.kaggle.com/datasets/moxxis/harry-potter-lstm?resource=download) - Use Harry_Potter_all_books_preprocessed.txt (5.99 MB)
    with open("sample_data/harry_potter.txt", "r", encoding="utf-8") as f:
        text = f.read()

    tokenizer = get_tokenizer('basic_english') #spacy
    vocab = TextDataset.tokenize_and_build_vocab(text, tokenizer)
    dataset = TextDataset(text, vocab, tokenizer)

    ntokens = len(vocab)  # the size of vocabulary
    d_model = 512  # smaller embedding dimension
    nhid = 2048  # smaller dimension of the feedforward network model in nn.TransformerEncoder
    nlayers = 6  # fewer nn.TransformerEncoderLayer in nn.TransformerEncoder
    nhead = 8  # fewer heads in the multiheadattention models
    dropout = 0.2  # the dropout value

    # ntokens = len(vocab)  # the size of vocabulary
    # d_model = 768  # smaller embedding dimension
    # nhid = 4096  # smaller dimension of the feedforward network model in nn.TransformerEncoder
    # nlayers = 12  # fewer nn.TransformerEncoderLayer in nn.TransformerEncoder
    # nhead = 8  # fewer heads in the multiheadattention models
    # dropout = 0.2  # the dropout value

    # ntokens = len(vocab)  # the size of vocabulary
    # d_model = 512  # embedding dimension
    # nhid = 516   #2048  # the dimension of the feedforward network model in nn.TransformerEncoder
    # nlayers = 4   #6  # the number of nn.TransformerEncoderLayer in nn.TransformerEncoder
    # nhead = 4   #8  # the number of heads in the multiheadattention models
    # dropout = 0.2  # the dropout value

    #model = TransformerModel(ntokens, d_model, nhead, nhid, nlayers, dropout)

    # train_model(model, dataset, vocab)


In [None]:
def save_model(self, path):
  torch.save(self.state_dict(), path)
  print(f"Model saved to {path}")

# Path to save the model
model_save_path = "sample_data/transformer_model_100epochs.pth"

# Save the model
TransformerModel.save_model(model, path = model_save_path)




Model saved to sample_data/transformer_model.pth


In [None]:
# Load the model
def load_model(path, ntokens, d_model, nhead, nhid, nlayers, dropout=0.2):
    """
    Load a saved model from the specified path.
    Args:
        path (str): The path to the saved model.
        ntokens (int): The size of the vocabulary.
        d_model (int): The dimension of the model.
        nhead (int): The number of attention heads.
        nhid (int): The dimension of the feedforward network.
        nlayers (int): The number of transformer encoder layers.
        dropout (float): The dropout value.
    Returns:
        model (nn.Module): The loaded Transformer model.
    """

    model = TransformerModel(ntokens, d_model, nhead, nhid, nlayers, dropout)
    model.load_state_dict(torch.load(path))
    model.eval()  # Set the model to evaluation mode
    print(f"Model loaded from {path}")
    return model

ntokens = len(vocab)  # the size of vocabulary
d_model = 512  # smaller embedding dimension
nhid = 2048  # smaller dimension of the feedforward network model in nn.TransformerEncoder
nlayers = 6  # fewer nn.TransformerEncoderLayer in nn.TransformerEncoder
nhead = 8  # fewer heads in the multiheadattention models
dropout = 0.2  # the dropout value

model_save_path = "transformer_model.pth"
loaded_model = load_model(model_save_path, ntokens, d_model, nhead, nhid, nlayers, dropout)

In [None]:
import torch
import torch.nn.functional as F
from torchtext.data.utils import get_tokenizer

# Assuming the necessary classes and functions such as TransformerModel, TextDataset, etc., are already defined.

def generate_square_subsequent_mask(sz):
    mask = torch.triu(torch.ones(sz, sz) * float('-inf'), diagonal=1)
    return mask

def generate_text(model, vocab, tokenizer, start_text, max_length=100, temperature=1.0, seq_len=30, device='cpu'):
    model.to(device)
    model.eval()
    tokens = tokenizer(start_text)
    token_ids = [vocab[token] for token in tokens]

    src = torch.tensor(token_ids, dtype=torch.long).unsqueeze(1).to(device)
    generated_tokens = token_ids.copy()

    for _ in range(max_length - len(tokens)):
        with torch.no_grad():
            mask = generate_square_subsequent_mask(src.size(0)).to(device)
            output = model(src, mask)

        next_token_logits = output[-1, 0, :] / temperature
        next_token_probs = F.softmax(next_token_logits, dim=-1)
        next_token_id = torch.multinomial(next_token_probs, num_samples=1).item()

        generated_tokens.append(next_token_id)
        src = torch.tensor(generated_tokens[-seq_len:], dtype=torch.long).unsqueeze(1).to(device)
    itos = vocab.get_itos()
    generated_text = ' '.join([itos[token_id] for token_id in generated_tokens])
    return generated_text.strip()

# Example usage
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
start_text = "Who is Harry Potter?"
generated_text = generate_text(model, vocab, tokenizer, start_text, max_length=100, device=device)
print(generated_text)

who is harry potter ? had slytherins could were know ! . your forest our at her voice flying in long . a few but rain up a new for on green the rolled on . something thinking your harry world so harry riddle harry was have picked rest highpass before to no and got the whats someone a talk he the told cant be the wasnt publicity of shot . went head chuckled stayed you was harry of students . sending here gaunt that had and the looking he front to room sound more theres hagrid happy a article


In [None]:
print(vocab)
print(tokenizer)
print(start_text)

Vocab()
<function _basic_english_normalize at 0x7fd36306b910>
Who is Harry Potter?


In [20]:
# Test the model by generating text
start_text = "Tell me a little bit about Harry Potter?"
generated_text = generate_text(model, vocab, tokenizer, start_text, max_length=100, temperature = 1.0, device = 'cpu')
print(generated_text)

NameError: name 'generate_text' is not defined

In [None]:
generate_text(model, vocab, tokenizer, start_text, max_length=100)

'Tell me a little bit about Harry Potter? . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .'