# Transformer from scratch

In this notebook I build a transformer from scratch so it can be manipulated for research purposes.
The functionality of the transformer is verified by training it on a German to English translation task, using book excerpts from the OPUS-dataset.

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from modules import *
from datasets import load_dataset   #  Hugging Face Datasets library
from torch.utils.data import DataLoader
from torchtext.data.functional import generate_sp_model
from torchtext.data.functional import sentencepiece_numericalizer
from torchtext.data.functional import load_sp_model

In [4]:
# Load the dataset
dataset = load_dataset("opus_books", "de-en")

In [5]:
# Separate German and English text and write them to separate textfiles
de = open('de.txt', 'w', encoding="utf-8")
with de as f:
    for entry in dataset["train"]["translation"]:
        f.write(entry['de'])
        f.write('\n')
de.close()

en = open('en.txt', 'w', encoding="utf-8")
with en as f:
    for entry in dataset["train"]["translation"]:
        f.write(entry['en'])
        f.write('\n')
en.close()

## Tokenization
The text is than parsed into tokens using SentencePiece in combination with unigram. See https://huggingface.co/docs/transformers/tokenizer_summary for a summary of tokenization methods.

In [6]:
GERMAN_VOCAB_SIZE = 10000
ENGLISH_VOCAB_SIZE = 8000

# Use sentencePiece model to analyze the text and generate dictionaries with specified vocabulary sizes
generate_sp_model('de.txt', vocab_size=GERMAN_VOCAB_SIZE,  model_type="unigram", model_prefix='german')
generate_sp_model('en.txt', vocab_size=ENGLISH_VOCAB_SIZE, model_type="unigram", model_prefix='english')
# You can check the generated tokens in the files 'german.vocab'/'english.vocab' respectively
# Note that the first three tokens are functional tokens, let's save their IDs for easier use later on
PAD_IDX = 0    # Used in batch training to pad all sequences to same length
BOS_IDX = 1    # Beginning of sequence token
EOS_IDX = 2    # End of sequence token

In [7]:
# We can use the generated *.model files to convert a text into tokens.
# Let's have a lokk at following example
de_id_generator = sentencepiece_numericalizer(load_sp_model("german.model"))
en_id_generator = sentencepiece_numericalizer(load_sp_model("english.model"))
# We can see that we get the numerical index of the token in our created vocabulary dictionaries.
# Note that one word can be broken into several tokens
# and functional tokens like EOS and BOS are not included
# Switch between generators to see the first tokens of the vocabulary (verification)
list(de_id_generator(["und die der zu er", "the and of"]))

[[5, 6, 7, 10, 11], [15, 40, 510, 31, 87, 15, 3707]]

In [8]:
# Split into training and test data
dataset = dataset["train"].train_test_split(test_size=0.2)

In [9]:
# Now mak a DataLoader that uses colate_fn attribute to create a batch and add BOS/EOS tokens, padding...
BATCH_SIZE = 24

# This function gets called by the Dataloaders to generate a batch-tensor
def generate_batch(data_batch):
    # Collect all German text into a nested list
    de = list(de_id_generator(entry['de'] for entry in data_batch))
    # Create a padded tensor(b, n) where sequenze length m is fixed and an EOS is appendend at the end
    src = torch.nn.utils.rnn.pad_sequence(
        (torch.cat((torch.tensor(batch), torch.tensor([EOS_IDX]))) for batch in de), batch_first=True)
    
    # Collect all English text into a nested list
    en = list(en_id_generator(entry['en'] for entry in data_batch))
    # Create a padded tensor(b, m) where sequenze length n is fixed and BOS and EOS are added
    trg = torch.nn.utils.rnn.pad_sequence(
        (torch.cat((torch.tensor([BOS_IDX]), torch.tensor(batch), torch.tensor([EOS_IDX]))) for batch in en), batch_first=True)

    return src, trg  #(b, n), (b, m)

train = DataLoader(dataset["train"]["translation"], batch_size=BATCH_SIZE, 
                   shuffle=True, collate_fn=generate_batch)
test  = DataLoader(dataset["test"]["translation"],  batch_size=BATCH_SIZE, 
                   shuffle=True, collate_fn=generate_batch)

## Train the Transformer

In [10]:
# We train the transformer by providing the target sequence shifted one token to the right as decoder
# input and let the transformer predict all next tokens (shift it back). This is called Teacher Forcing 
# https://machinelearningmastery.com/teacher-forcing-for-recurrent-neural-networks/

def teacherForcing(device, model, optimizer, lossFunction, epochs, training):
    print('starting...')
    
    for epoch in range(epochs):
        correct = 0
        total = 0
        number_batches = len(training)
        
        for i, data in enumerate(training):  #loop through batches
            inputs, labels = data                            # (b, m), (b, n+1)
            inputs = inputs.to(device)
            labels = labels.to(device)
            optimizer.zero_grad()
            # forward + backward + optimize
            # remove last (EOS) token. Sequence is now <SOS>,t1,...tn
            outputs = model(inputs, labels[:, :-1])          # (b, n, vocab_size)
            _, predicted = torch.max(outputs, 2)             # (b, n)
            #predict next token for each token -> ideally t1,...tn,<EOS>
            correct += (predicted == labels[:, 1:]).sum()
            total += outputs.shape[0]*outputs.shape[1]
            labelsOneHot = torch.nn.functional.one_hot(labels[:, 1:], ENGLISH_VOCAB_SIZE).float()  # (b, n, vocab_size)
            loss = lossFunction(outputs, labelsOneHot)
            loss.backward()
            optimizer.step()
            batch_percentage = i*20//number_batches
            print("\repoch %d/%d [%-20s]            Accuracy: {%f}" % (epoch+1,epochs,'='*batch_percentage,correct/total), end='')
            
        # save statistics
        accuracy = correct / total
        print("\repoch {}/{} completed              Accuracy: {:.2f}".format(epoch+1, epochs, accuracy), end ='')


In [11]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

d_m = 256
inputEmbedding  = SinusoidalEmbedding(d_m,  GERMAN_VOCAB_SIZE, 1000, device)
outputEmbedding = SinusoidalEmbedding(d_m, ENGLISH_VOCAB_SIZE, 1000, device)

model = Transformer(d_m, ENGLISH_VOCAB_SIZE, 0.2, 4, 4, 4, 8, 8, device,
                    inputEmbedding, outputEmbedding).to(device)


def estimateModelMemoryRequirement(model):
    total_params = 0
    for name, module in model.named_children():
        num_params = sum(p.numel() for p in module.parameters() if p.requires_grad)
        print(f"{name}: {num_params} parameters")
        total_params += num_params
    print("__________________________________________")
    print(total_params)

    # Print the approximate memory requirement of your model
    approx_memory = total_params * 4 / (1024 ** 2)  # Assuming 4 bytes per parameter
    print(f"Approximate memory requirement: {approx_memory:.2f} MB")


estimateModelMemoryRequirement(model)

cuda
inputEmbedding: 2560000 parameters
outputEmbedding: 2048000 parameters
encoders: 3152896 parameters
decoders: 4201472 parameters
classifier: 2056000 parameters
__________________________________________
14018368
Approximate memory requirement: 53.48 MB


In [None]:
EPOCHS = 16
loss = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

teacherForcing(device, model, optimizer, loss, EPOCHS, test)