<a href="https://colab.research.google.com/github/AbrahamGarcia240/DrakarGPT/blob/main/Drakar_GPT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Python imports

# 3rd party imports
import torch
from torch import nn
from torch.nn import functional as F

In [2]:
# Define the device
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"The device we will use is '{DEVICE}'")

The device we will use is 'cuda'


# Download the dataset to use
In this case I want to recreate George R. R. Martin writing style

In [3]:
!wget https://raw.githubusercontent.com/nihitx/game-of-thrones-/master/gameofthrones.txt

--2024-09-06 03:04:25--  https://raw.githubusercontent.com/nihitx/game-of-thrones-/master/gameofthrones.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.111.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 5826890 (5.6M) [text/plain]
Saving to: ‘gameofthrones.txt’


2024-09-06 03:04:26 (198 MB/s) - ‘gameofthrones.txt’ saved [5826890/5826890]



# Create a tokenizer and de-tokenizer

In [4]:
# Read the content of the file
DATASET_FILE = "gameofthrones.txt"
with open(DATASET_FILE, encoding="utf-8") as f:
    DATASET = f.read()
print(DATASET[:100])



“We should start back,” Gared urged as the woods began to grow dark around them. “The wildlings ar


In [5]:
# Define all the tokens in the vocabulary, in this case
# I will tokenize on the character level, i.e.
# John Snow -> ['J', 'o', 'h', 'n', ' ', 'S', 'n', 'o', 'w']
VOCABULARY = list(set(DATASET))
print("The vocabulary tokens are:")
print(''.join(VOCABULARY))
print(f"The number of tokens in our vocabulary is '{len(VOCABULARY)}'")

The vocabulary tokens are:
VN’KY“Qv8TiOpn;]Z.{hy7!eoS/gw90 })LrE3Hz-ltfI6su…m,ê”G‘b
dJjCMé:x2U4R(PB1qakWXcD[?—A5F
The number of tokens in our vocabulary is '86'


In [6]:
# I think it would make sense to try reducing the vocabulary size by getting rid
# of uppercase and lowercase strings
DATASET = DATASET.lower()
VOCABULARY = list(set(DATASET))
print("The vocabulary tokens are:")
print(''.join(VOCABULARY))
VOCAB_SIZE = len(VOCABULARY)
print(f"The number of tokens in our vocabulary is '{len(VOCABULARY)}'")


The vocabulary tokens are:
74!z-(’el3otf6“suq…1vam8k,/ipênw”g;90]‘ b
cd[}?j—.{)éhr5y:x2
The number of tokens in our vocabulary is '60'


In [7]:
# Create a mapping from word to index and index to word
word_to_idx = {word: idx for idx, word in enumerate(VOCABULARY)}
idx_to_word = {idx: word for idx, word in enumerate(VOCABULARY)}

# Create a quick function to tokenize
def tokenize(word: str):
  return [word_to_idx[token] for token in word]

def detokenize(tokens: list):
  return "".join([idx_to_word[idx] for idx in tokens])

# Let's try an example
sentence = "john snow"
print(f"The sentence I will tokenize is '{sentence}")
print(f"The tokenized sentence is '{tokenize(sentence)}'")
print(f"The detokenized sentence is '{detokenize(tokenize(sentence))}'")

The sentence I will tokenize is 'john snow
The tokenized sentence is '[47, 10, 53, 30, 39, 15, 30, 10, 31]'
The detokenized sentence is 'john snow'


In [8]:
# Tokenize the whole dataset
tokenized_dataset = torch.tensor(tokenize(DATASET), dtype=torch.long)
print(f"The tokenized dataset is of length '{len(tokenized_dataset)}'")
print(tokenized_dataset[:1000])

The tokenized dataset is of length '5662324'
tensor([41, 41, 14, 31,  7, 39, 15, 53, 10, 16,  8, 43, 39, 15, 11, 21, 54, 11,
        39, 40, 21, 42, 24, 25, 32, 39, 33, 21, 54,  7, 43, 39, 16, 54, 33,  7,
        43, 39, 21, 15, 39, 11, 53,  7, 39, 31, 10, 10, 43, 15, 39, 40,  7, 33,
        21, 30, 39, 11, 10, 39, 33, 54, 10, 31, 39, 43, 21, 54, 24, 39, 21, 54,
        10, 16, 30, 43, 39, 11, 53,  7, 22, 49, 39, 14, 11, 53,  7, 39, 31, 27,
         8, 43,  8, 27, 30, 33, 15, 39, 21, 54,  7, 39, 43,  7, 21, 43, 49, 32,
        41, 41, 14, 43, 10, 39, 11, 53,  7, 39, 43,  7, 21, 43, 39, 12, 54, 27,
        33, 53, 11,  7, 30, 39, 56, 10, 16, 46, 32, 39, 15,  7, 54, 39, 31, 21,
        56, 22, 21, 54, 39, 54, 10, 56, 42,  7, 39, 21, 15, 24,  7, 43, 39, 31,
        27, 11, 53, 39, 47, 16, 15, 11, 39, 11, 53,  7, 39, 53, 27, 30, 11, 39,
        10, 12, 39, 21, 39, 15, 22, 27,  8,  7, 49, 41, 41, 33, 21, 54,  7, 43,
        39, 43, 27, 43, 39, 30, 10, 11, 39, 54, 27, 15,  7, 39, 11, 10, 39,

# Create a train and test dataset


In [9]:
# Use the first 80% of the words to be the training set
training_size = int(0.8 * len(tokenized_dataset))
train_data = tokenized_dataset[:training_size]
test_data = tokenized_dataset[training_size:]

In [10]:
# Define a block size for the chunks that we will use to train the transformer
# if the block size is 8, it means that we will use AT MOST 8 characters as
# context, i.e.
#
#     Chunk = [51, 45, 41,  1,  1, 16, 24, 40] which is extracted from
#             [51, 45, 41,  1,  1, 16, 24, 40, 1]
#
# That means that we can get any of these entries:
#
#  context [51], we expect [45]
#  context [51, 45], we expect [41]
#. context [51, 45, 41] we expect [1]
#. context [51, 45, 41,  1] we expect [1]
#. context [51, 45, 41,  1,  1] we expect [16]
#. context [51, 45, 41,  1,  1, 16] we expect [24]
#  context [51, 45, 41,  1,  1, 16, 24] we expect [40]
#. context [51, 45, 41,  1,  1, 16, 24, 40] we expect [1]
#
# This way we can train the transformer to be able to generate text with
# as little as only one token as context
BLOCK_SIZE = 256


In [11]:
# Create a function to generate batches of data, each batch will have
# B elements of size BLOCK_SIZE
BATCH_SIZE = 64

def get_batch(dataset_type="train") -> tuple[torch.tensor, torch.tensor]:
  if dataset_type == "train":
    data = train_data
  elif dataset_type == "test":
    data = test_data

  # Get a random index from the dataset we are working with
  # Whatever index we get we need to ensure we will have AT LEAST
  # BLOCK_SIZE indexes in front of it
  #
  # We need to specify how many indexes we want to get, in this case
  # we want BATCH_SIZE
  rand_idx = torch.randint(len(data) - BLOCK_SIZE, (BATCH_SIZE,))

  # Now at this point we have BATCH_SIZE indexes so we need to get the
  # actual sequence out of the indexes
  x = torch.stack([data[idx:idx + BLOCK_SIZE] for idx in rand_idx])
  y = torch.stack([data[idx + 1:idx + BLOCK_SIZE + 1] for idx in rand_idx])

  # Return both X and Y for the batch, if we have 4 elements in the batch, then
  # X will have 4 vectors size BLOCK_SIZE and Y will have 4 vectors size
  # BLOCK_SIZE, however, Y will be one index ahead from X

  # Ensure to move the data to the device
  x, y = x.to(DEVICE), y.to(DEVICE)
  return x, y


x, y = get_batch("train")
print(f"The shape of x is '{x.shape}'")
print(x)
print(f"The shape of y is '{y.shape}'")
print(y)


The shape of x is 'torch.Size([64, 256])'
tensor([[39, 42, 10,  ...,  7, 54, 15],
        [56, 39, 22,  ..., 39, 15, 53],
        [21,  8, 56,  ..., 40, 10, 56],
        ...,
        [39, 10, 54,  ..., 54, 10,  8],
        [42, 27, 43,  ..., 39, 21, 39],
        [30, 43, 39,  ..., 54, 39, 10]], device='cuda:0')
The shape of y is 'torch.Size([64, 256])'
tensor([[42, 10, 30,  ..., 54, 15, 25],
        [39, 22, 10,  ..., 15, 53, 10],
        [ 8, 56, 30,  ..., 10, 56, 39],
        ...,
        [10, 54, 39,  ..., 10,  8,  7],
        [27, 43,  7,  ..., 21, 39, 15],
        [43, 39, 10,  ..., 39, 10, 12]], device='cuda:0')


In [198]:
# Define a seed
torch.manual_seed(1)

# Create a class for the Bigram Model
class BigramModel(nn.Module):
  def __init__(self, vocab_size: int):
    super().__init__()

    # I want my embeddings to have the same size of the vocabulary, this is
    # because the vocabulary size is not too big, normally we would like
    # a smaller size for the embedding layer
    self.embedding_layer = nn.Embedding(vocab_size, vocab_size)

  def forward(self, x, y = None):
    loss = None
    # Feed the tokens to the embedding layer
    # If the input is (Batch_size, Block_size), the output will be
    # (Batch_size, Block_size, Vocab_size) since for each token in the input
    # we are creating an embedding
    y_hat = self.embedding_layer(x)

    # Compute the loss of the predictions
    # y_hat is (Batch_size, Block_size, Vocab_size), and
    # y is (Batch_size, Block_size)
    #
    # In multiclass classification normally the y's are the classes, either
    # [1, 2, 9, 2, ... etc]
    # And the y_hats are the logits, ie, for each entry in y there is a vector
    # of size len(classes) where index of the element with the biggest value is
    #. expecting to match y[idx], example:
    #
    # Suppose we have 5 classes
    # y[0] = 1
    # y_hat[0] = [0, 0.8, 0.05, 0.05, 0]
    #
    # This means that we need y to be an array of the expected classes rather
    # than a matrix i.e.
    #
    #.   y = [[34,  1, 53, 35],
    #         [0, 56, 17, 39]]
    #
    # We really want y to be the whole sequence:
    #.   y = [34,  1, 53, 35, 0, 56, 17, 39]
    if y is not None:
      y = y.view(BATCH_SIZE * BLOCK_SIZE)
    # Similarly since the NN is trying to predict EVERY word in the sequence
    # and instead of tokens we get embeddings, then we want y_hat to be a matrix
    # of size (batch_size * block_size, vocab_size), i.e
    #
    #.  Assume that 'a', 'b', ...'z' are embeddings predicted by the NN of size
    #.  vocab_size each... i.e. 'a' = [003, 53465, 5352342, ..., 4354]
    #
    #.   y_hat = [[a, b, c, d],
    #             [b, k, d, c]]
    #
    #.   y_hat = [a, b, c, d, b, k, d, c]
    #
    # In this way, similarly to the inicial example where binary cross entropy
    # compared an scalar to a vector, we can do:
    #
    #       y = [34, 1, 53, 35, 0, 56, 17, 39]
    #                        vs
    #   y_hat = [a,  b,  c,  d, b,  k,  d,  c]
    #
    #. And hope that 'a' should be the embedding for 34
    if y is not None:
      batch_size, block_size, vocab_size = y_hat.shape
      y_hat = y_hat.view(batch_size * block_size, vocab_size)
      loss = F.cross_entropy(y_hat, y)

      # Restore y_hat to return a consistent size
      y_hat = y_hat.view(batch_size, block_size, vocab_size)
    return y_hat, loss

  def generate(self, x, max_new_tokens):
    # x has the shape (BATCH_SIZE, BLOCK_SIZE)
    for _ in range(max_new_tokens):
      # Get the predictions
      # y_hat is shaped (BATCH_SIZE, BLOCK_SIZE, vocab_size) because we did not
      # provided a true labe to the forward function
      y_hat, _ = self(x)

      # Get only the last generated embedding for each element in the batch
      # This will become a (BATCH_SIZE, vocab_size)
      last_embeddings = y_hat[:, -1, :]
      # Run it over softmax
      # (BATCH_SIZE, vocab_size)
      last_embeddings_scores = F.softmax(last_embeddings, dim=-1)
      # Get the index of the biggest score
      # (BATCH_SIZE, 1)
      high_score_indexes = torch.multinomial(last_embeddings_scores,
                                             num_samples=1)
      # Concatenate the predicted scores to the input for the next generation
      # (BATCH_SIZE, BLOCK_SIZE + 1)
      x = torch.cat((x, high_score_indexes), dim=1)
    return x

# Run an example
print(f"The vocabulary size is {len(VOCABULARY)}")
model = BigramModel(len(VOCABULARY))
# Move the model to the device
model.to(DEVICE)

x, y = get_batch("train")
print(f"The shape of x is '{x.shape}'")
print(f"The shape of y is '{y.shape}'")
output, loss = model(x, y)
print(f"The shape of output is '{output.shape}'")
print(f"The loss of the model is '{loss}'")

# We can start generating out of a space
input = " "
tokenized_input = torch.tensor(tokenize(input), dtype=torch.long)
# We will do no batching
# Input shape will be (1, 1)
tokenized_input = tokenized_input.view(1, 1).to(DEVICE)
generated_tokens = model.generate(tokenized_input, 100)[0].tolist()
generated_text = detokenize(generated_tokens)
print(generated_text)

The vocabulary size is 60
The shape of x is 'torch.Size([64, 256])'
The shape of y is 'torch.Size([64, 256])'
The shape of output is 'torch.Size([64, 256, 60])'
The loss of the model is '4.684937477111816'
 (d(,57lsw“z’
y.[0wnvu
2rw4c
—”9…69‘}mg6/{;no9ji},
nld:}}7;rr30w(]hb…98éuf[y](hgx1(:7u3”
6ênl)/uxi;“b


In [199]:
# We know that we have 60 elements in the vocabulary
# so since we are starting with random guesses we expect a loss of
# approximately -log(1/60)
-torch.log(torch.tensor(1/60))

tensor(4.0943)

In [200]:
model = BigramModel(len(VOCABULARY))
# Move the model to the device
model.to(DEVICE)

# Create a function to compute the evaluation loss
@torch.no_grad()
def get_evaluation_loss():
  # Define the number of evaluation iterations to have
  eval_iters = 20

  # Run in evaluation mode
  model.eval()
  output_losses = {}
  for dataset_type in ["train", "test"]:
    loss_tensor = torch.zeros(eval_iters)
    for idx_eval in range(eval_iters):
      x, y = get_batch(dataset_type)
      _, loss = model(x, y)
      loss_tensor[idx_eval] = loss.item()
    output_losses[dataset_type] = loss_tensor.mean()
  model.train()
  return output_losses

# Now we will train on this model
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3)

# We will use a much bigger batch size
BATCH_SIZE = 32

# We want to be able to evaluate the performance of our model
# every 100 iterations
iteration_eval = 100

for steps in range(10000):
  if steps % iteration_eval == 0:
    # We are in evaluation mode!
    loss = get_evaluation_loss()
    print(f"The loss at step {steps} is training loss: '{loss['train']}', "\
          f"evaluation loss:'{loss['test']}'")
  x, y = get_batch("train")
  _, loss = model(x, y)
  optimizer.zero_grad(set_to_none=True)
  loss.backward()
  optimizer.step()



The loss at step 0 is training loss: '4.511907577514648', evaluation loss:'4.511220932006836'
The loss at step 100 is training loss: '4.364018440246582', evaluation loss:'4.366637229919434'
The loss at step 200 is training loss: '4.224114418029785', evaluation loss:'4.222919940948486'
The loss at step 300 is training loss: '4.093716144561768', evaluation loss:'4.092387676239014'
The loss at step 400 is training loss: '3.963160991668701', evaluation loss:'3.9639041423797607'
The loss at step 500 is training loss: '3.8477749824523926', evaluation loss:'3.844214677810669'
The loss at step 600 is training loss: '3.7334556579589844', evaluation loss:'3.7337355613708496'
The loss at step 700 is training loss: '3.6275527477264404', evaluation loss:'3.629145383834839'
The loss at step 800 is training loss: '3.531935453414917', evaluation loss:'3.5333447456359863'
The loss at step 900 is training loss: '3.439836025238037', evaluation loss:'3.441093921661377'
The loss at step 1000 is training lo

In [201]:
# We can start generating out of a space
input = " "
tokenized_input = torch.tensor(tokenize(input), dtype=torch.long)
# We will do no batching
# Input shape will be (1, 1)
tokenized_input = tokenized_input.view(1, 1).to(DEVICE)
generated_tokens = model.generate(tokenized_input, 100)[0].tolist()
generated_text = detokenize(generated_tokens)
print(generated_text)

 lve fridin fuspongu’sandis k fo onlyo of mesatwinwngel, hte soinoor be ts watordatonnd m t i sof le 


# Explaining the Attention mask




In [202]:
# Let's assume that we are tokenizing using words
#
# Attention is able to take information from all surounding tokens to predict
# the current one
#
# If the context is "El Instituto Politecnico _____ es grande" , attention
# should be able to see "El" "Instituto" "Politecnico" but SHOULD NOT be able to
# see "es" "grande" because we are trying to predict "Nacional"
# (i.e. whatever is in the blank)
#
# Si nuestra frase ahora es transformada a embeddings de dos elementos, digamos
# que:
#
#. "El" - > [1, 2]
#. "Instituto" -> [34, 21]
#. "Politecnico" -> [20, 18]
#. "Nacional" -> [93, 3]
#. "es" -> [9, 2]
#  "grande" -> [1, 29]
#
# En teoria al momento de ver como input a la secuencia
# "El Instituto Politecnico Nacional es grande" realmente vemos
#
#.  x = [[1, 2],
#        [34,21],
#        [20, 18],
#        [93, 3],
#        [9, 2],
#        [1, 29]]
#
# No obstante, durante el entrenamiento queremos ver solo los tokens anteriores
# al que queremos predecir, es decir:
#
#.  x = [[1, 2],
#        [34,21],
#        [20, 18],
#        [0, 0],
#        [0, 0],
#        [0, 0]]
# Seria util poder crear una "mascara" que nos permita elegir cual token usar
mask = torch.tril(torch.ones(6, 6)).long()
print(mask)
x = torch.tensor([[1, 2],
                  [34, 21],
                  [20, 18],
                  [93, 3],
                  [9, 2],
                  [1, 29]])
print(x)
result = mask @ x
print("\nResult:")
print("Look how each row is the sum of the ones above but does not consider"\
      "the ones bellow")
print(result)

tensor([[1, 0, 0, 0, 0, 0],
        [1, 1, 0, 0, 0, 0],
        [1, 1, 1, 0, 0, 0],
        [1, 1, 1, 1, 0, 0],
        [1, 1, 1, 1, 1, 0],
        [1, 1, 1, 1, 1, 1]])
tensor([[ 1,  2],
        [34, 21],
        [20, 18],
        [93,  3],
        [ 9,  2],
        [ 1, 29]])

Result:
Look how each row is the sum of the ones above but does not considerthe ones bellow
tensor([[  1,   2],
        [ 35,  23],
        [ 55,  41],
        [148,  44],
        [157,  46],
        [158,  75]])


In [203]:
# Let's imagine now that instead of having a triangular matrix of 1's and 0's
# we have a matrix with the attention values that we should use to generate
# the next word.

mask = torch.tril(torch.rand(6, 6)).float()
print(mask)

# For the first row, we only want to see the antention values of the first token
# and so on

tensor([[0.8020, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000],
        [0.4224, 0.2088, 0.0000, 0.0000, 0.0000, 0.0000],
        [0.7015, 0.9083, 0.1149, 0.0000, 0.0000, 0.0000],
        [0.3915, 0.6450, 0.7232, 0.2031, 0.0000, 0.0000],
        [0.9095, 0.7872, 0.5946, 0.8265, 0.9607, 0.0000],
        [0.6094, 0.0751, 0.2592, 0.5549, 0.0659, 0.8054]])


# Building the GPT Model based on "Attention is all you need"

In [56]:
DROPOUT = 0.2

In [57]:
# This class implements a single head self-attention module
class Head(nn.Module):
  def __init__(self, head_size: int):
    super().__init__()

    # Keep track of the head_size
    self.head_size = head_size

    # Create the query, key and value layers
    self.query_layer = nn.Linear(EMBEDDING_SIZE, head_size, bias=False)
    self.key_layer = nn.Linear(EMBEDDING_SIZE, head_size, bias=False)
    self.value_layer = nn.Linear(EMBEDDING_SIZE, head_size, bias=False)

    # Create the self-attention mask, remember that it should be as long
    # as the number of tokens we will see per iteration, and since attention
    # computes scores for each of the other tokens, this is a square matrix
    #
    # Since this is not a learneable parameter of the model we will store it
    # as part of the register buffer that nn.Module already includes
    self.register_buffer("attention_mask",
                         torch.tril(torch.ones(BLOCK_SIZE, BLOCK_SIZE)))

    # Create a dropout layer, this is an optimization not included in the
    # original paper
    self.dropout = nn.Dropout(DROPOUT)

  def forward(self, x):
    # x is shaped (BATCH_SIZE, BLOCK_SIZE, EMBEDDING_SIZE)
    # Get the query, key and value vectors, each of these outputs are shaped
    # (BATCH_SIZE, BLOCK_SIZE, head_size)
    query = self.query_layer(x)
    key = self.key_layer(x)
    value = self.value_layer(x)

    # Compute the attention scores
    # Attention will be softmax((QK.T)/sqrt(head_size)) * V
    # Now, we need to transpose K because in 3D matrix multiplication if we have
    # a matrix A (m, b, l) and a matrix B (m, l, q) the output will be
    # a matrix c (m, b, q)
    #
    # Since query is (BATCH_SIZE, BLOCK_SIZE, head_size) and
    # since key is (BATCH_SIZE, BLOCK_SIZE, head_size) we cannot multiply them
    # as is, instead we will swap key to be (BATCH_SIZE, head_size, BLOCK_SIZE)
    # and thus the result will be
    # (BATCH_SIZE, BLOCK_SIZE, BLOCK_SIZE)
    QK_transpose = query @ key.transpose(-2, -1) * self.head_size ** -0.5
    if torch.any(torch.isnan(QK_transpose)):
        print("NaNs found in QK_transpose unmasked")
        raise
    # Before applying softmax, use the attention mask, that way softmax does not
    # consider the scores for tokens it is not supposed to look at
    #
    # This code uses the attention mask and replaces all values that are 0's
    # with -inf, then applies it to QK_transpose
    QK_transpose = QK_transpose.masked_fill(self.attention_mask == 0,
                                            float("-inf"))
    if torch.any(torch.isnan(QK_transpose)):
        print("NaNs found in QK_transpose masked")
        raise
    # Dropout the QK_transpose
    #QK_transpose = self.dropout(QK_transpose)
    if torch.any(torch.isnan(QK_transpose)):
        print("NaNs found in QK_transpose dropout")
        raise

    # Apply softmax over the last dimention
    softmax_QK = F.softmax(QK_transpose, dim=-1)
    if torch.any(torch.isnan(softmax_QK)):
        print("NaNs found in softmax_QK")
        raise
    # Softmax_QK is (BATCH_SIZE, BLOCK_SIZE, BLOCK_SIZE)
    # Value is (BATCH_SIZE, BLOCK_SIZE, head_size)
    # Thus the result will be (BATCH_SIZE, BLOCK_SIZE, head_size)
    attention = softmax_QK @ value
    if torch.any(torch.isnan(attention)):
        print("NaNs found in attention")
        raise

    return attention


In [58]:
# Implement multihead attention using the attention class we have created
class MultiHeadAttention(nn.Module):
  def __init__(self, num_heads: int, head_size: int):
    super().__init__()

    # Create num_heads of head_size
    self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
    # We will add another linear layer before returning the output
    self.linear_layer = nn.Linear(num_heads * head_size, num_heads * head_size)
    # Create a dropout layer
    self.dropout = nn.Dropout(DROPOUT)

  def forward(self, x):
    # Run the same input over all the heads in parallel and concatenate all
    # of the outputs over the last channel
    #
    # The output will be (BATCH_SIZE, BLOCK_SIZE, num_heads * head_size)
    output = torch.cat([head(x) for head in self.heads], dim=-1)
    output = self.linear_layer(output)
    output = self.dropout(output)
    return output

In [59]:
# Implement the feed forward at the very end of the transformer
class FeedForward(nn.Module):
  def __init__(self, embedding_size: int):
    super().__init__()

    # The original paper, attention is all you need recommends to multiply
    # the "inner dimention" by 4
    self.network = nn.Sequential(
      nn.Linear(embedding_size, embedding_size * 4),
      nn.ReLU(),
      nn.Linear(embedding_size * 4, embedding_size),
      nn.Dropout(DROPOUT)
    )

  def forward(self, x):
    return self.network(x)

In [60]:
# Implement a block class, this class has a multihead attention, a
# feedforward block and a Normalization (This is what appears as Nx in the
# classic image of a transformer)
class Block(nn.Module):
  def __init__(self,  embedding_size: int, num_heads: int,):
    super().__init__()

    head_size = embedding_size // num_heads
    self.multihead_attention = MultiHeadAttention(num_heads, head_size)
    self.feed_forward = FeedForward(embedding_size)
    self.layer_norm1 = nn.LayerNorm(embedding_size)
    self.layer_norm2 = nn.LayerNorm(embedding_size)

  def forward(self, x):
    # We will sum the output to the input to do residual connections
    x = x + self.multihead_attention(self.layer_norm1(x))
    if torch.any(torch.isnan(x)):
        print("NaNs found in output of multihead")
        raise
    # At this point the input is once again
    # (BATCH_SIZE, BLOCK_SIZE, EMBEDDING_SIZE)
    x = x + self.feed_forward(self.layer_norm2(x))
    if torch.any(torch.isnan(x)):
        print("NaNs found in feedforward of block")
        raise
    return x

\

In [61]:
# Define a seed
torch.manual_seed(1)

# Define the embedding size
EMBEDDING_SIZE = 384

# Define how many blocks to have
NUM_BLOCKS = 6

# Create a class for the GPT
class GPT(nn.Module):
  def __init__(self):
    super().__init__()

    self.num_heads = 6

    # Create an embedding layer that generates embeddings of EMBEDDING_SIZE
    self.embedding_layer = nn.Embedding(VOCAB_SIZE, EMBEDDING_SIZE)
    # Create a positional embedding layer
    # In general we expect at most BLOCK_SIZE tokens per iteration, so for each
    # of those positions, we want to generate an embedding (kind of a map
    # from position index to an embedding that we can add to the input)
    self.positional_embedding_layer = nn.Embedding(BLOCK_SIZE, EMBEDDING_SIZE)
    # Create a Block layer composed of multihead attention layer and
    # feed forward
    # Since we have num_heads and we will concatenate the outputs of each of
    # the attention heads, and we expect the dims of the output to be
    # (BATCH_SIZE, BLOCK_SIZE, EMBEDDING_SIZE), we need to divide the
    # EMBEDDING_SIZE // num_heads to get the head_size
    self.blocks = nn.Sequential(
        *[Block(EMBEDDING_SIZE, self.num_heads) for _ in range(NUM_BLOCKS)],
    )
    # Create a normalization layer
    self.layer_norm = nn.LayerNorm(EMBEDDING_SIZE)
    # Create a linear layer
    self.linear_layer = nn.Linear(EMBEDDING_SIZE, VOCAB_SIZE)

  def forward(self, x, y = None):
    loss = None
    # Feed the tokens to the embedding layer
    # If the input is (Batch_size, Block_size), the output will be
    # (Batch_size, Block_size, Embedding_size) since for each token in the input
    # we are creating an embedding
    embeddings = self.embedding_layer(x)
    if torch.any(torch.isnan(embeddings)):
        print("NaNs found in embeddings")
        raise
    # Get positional embeddings by creating an array from 0 to BLOCK_SIZE - 1
    # and creating embeddings out of those
    # The output will be (Block_size, Embedding_size)
    positional_embeddings = \
      self.positional_embedding_layer(torch.arange(BLOCK_SIZE).to(DEVICE))
    if torch.any(torch.isnan(positional_embeddings)):
        print("NaNs found in possitional embeddings")
        raise
    # Sum the embeddings + positional embeddings
    # This uses broadcasting so the output will be
    # (Batch_size, Block_size, Embedding_size)
    embeddings = embeddings + positional_embeddings
    # Send the embeddings to the block
    output = self.blocks(embeddings)
    if torch.any(torch.isnan(output)):
        print("NaNs found in block")
        raise
    # Send the output to the norm layer
    output = self.layer_norm(output)
    if torch.any(torch.isnan(output)):
        print("NaNs found in layer norm")
        raise

    # The output will be
    # (Batch_size, Block_size, Vocab_size)
    y_hat = self.linear_layer(output)
    if torch.any(torch.isnan(output)):
        print("NaNs found in linear layer")
        raise

    # Compute the loss of the predictions
    # y_hat is (Batch_size, Block_size, Vocab_size), and
    # y is (Batch_size, Block_size)
    #
    # In multiclass classification normally the y's are the classes, either
    # [1, 2, 9, 2, ... etc]
    # And the y_hats are the logits, ie, for each entry in y there is a vector
    # of size len(classes) where index of the element with the biggest value is
    #. expecting to match y[idx], example:
    #
    # Suppose we have 5 classes
    # y[0] = 1
    # y_hat[0] = [0, 0.8, 0.05, 0.05, 0]
    #
    # This means that we need y to be an array of the expected classes rather
    # than a matrix i.e.
    #
    #.   y = [[34,  1, 53, 35],
    #         [0, 56, 17, 39]]
    #
    # We really want y to be the whole sequence:
    #.   y = [34,  1, 53, 35, 0, 56, 17, 39]
    if y is not None:
      y = y.view(BATCH_SIZE * BLOCK_SIZE)
    # Similarly since the NN is trying to predict EVERY word in the sequence
    # and instead of tokens we get embeddings, then we want y_hat to be a matrix
    # of size (batch_size * block_size, vocab_size), i.e
    #
    #.  Assume that 'a', 'b', ...'z' are embeddings predicted by the NN of size
    #.  vocab_size each... i.e. 'a' = [003, 53465, 5352342, ..., 4354]
    #
    #.   y_hat = [[a, b, c, d],
    #             [b, k, d, c]]
    #
    #.   y_hat = [a, b, c, d, b, k, d, c]
    #
    # In this way, similarly to the inicial example where binary cross entropy
    # compared an scalar to a vector, we can do:
    #
    #       y = [34, 1, 53, 35, 0, 56, 17, 39]
    #                        vs
    #   y_hat = [a,  b,  c,  d, b,  k,  d,  c]
    #
    #. And hope that 'a' should be the embedding for 34
    if y is not None:
      batch_size, block_size, vocab_size = y_hat.shape
      y_hat = y_hat.view(batch_size * block_size, vocab_size)
      loss = F.cross_entropy(y_hat, y)

      # Restore y_hat to return a consistent size
      y_hat = y_hat.view(batch_size, block_size, vocab_size)
    return y_hat, loss

  def generate(self, x, max_new_tokens):
    # x has the shape (BATCH_SIZE, BLOCK_SIZE)
    for _ in range(max_new_tokens):
      # Ensure that our context is never bigger than BLOCK_SIZE
      x_context = x[:, -BLOCK_SIZE:]

      # Get the predictions
      # y_hat is shaped (BATCH_SIZE, BLOCK_SIZE, vocab_size) because we did not
      # provided a true labe to the forward function
      y_hat, _ = self(x_context)

      # Get only the last generated embedding for each element in the batch
      # This will become a (BATCH_SIZE, vocab_size)
      last_embeddings = y_hat[:, -1, :]

      # Run it over softmax
      # (BATCH_SIZE, vocab_size)
      last_embeddings_scores = F.softmax(last_embeddings, dim=-1)

      # Get the index of the biggest score
      # (BATCH_SIZE, 1)
      high_score_indexes = torch.multinomial(last_embeddings_scores,
                                             num_samples=1)
      # Concatenate the predicted scores to the input for the next generation
      # (BATCH_SIZE, BLOCK_SIZE + 1)
      x = torch.cat((x, high_score_indexes), dim=1)

    return x

In [62]:
model = GPT()
# Move the model to the device
model = model.to(DEVICE)

# Create a function to compute the evaluation loss
@torch.no_grad()
def get_evaluation_loss():
  # Define the number of evaluation iterations to have
  eval_iters = 200

  # Run in evaluation mode
  model.eval()
  output_losses = {}
  for dataset_type in ["train", "test"]:
    loss_tensor = torch.zeros(eval_iters)
    for idx_eval in range(eval_iters):
      x, y = get_batch(dataset_type)
      _, loss = model(x, y)
      loss_tensor[idx_eval] = loss.item()
    output_losses[dataset_type] = loss_tensor.mean()
  model.train()
  return output_losses

# Now we will train on this model
optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4)



# We want to be able to evaluate the performance of our model
# every 100 iterations
iteration_eval = 100

for steps in range(5000):
  if steps % iteration_eval == 0:
    # We are in evaluation mode!
    loss = get_evaluation_loss()
    print(f"The loss at step {steps} is training loss: '{loss['train']}', "\
          f"evaluation loss:'{loss['test']}'")
  x, y = get_batch("train")
  _, loss = model(x, y)
  optimizer.zero_grad(set_to_none=True)
  loss.backward()
  optimizer.step()



The loss at step 0 is training loss: '4.236394882202148', evaluation loss:'4.23798131942749'
The loss at step 100 is training loss: '2.3636019229888916', evaluation loss:'2.3738908767700195'
The loss at step 200 is training loss: '2.323028087615967', evaluation loss:'2.3316962718963623'
The loss at step 300 is training loss: '2.2493088245391846', evaluation loss:'2.2597362995147705'
The loss at step 400 is training loss: '2.091914176940918', evaluation loss:'2.105107069015503'
The loss at step 500 is training loss: '1.9639816284179688', evaluation loss:'1.978714108467102'
The loss at step 600 is training loss: '1.8738656044006348', evaluation loss:'1.8928455114364624'
The loss at step 700 is training loss: '1.7828125953674316', evaluation loss:'1.8044450283050537'
The loss at step 800 is training loss: '1.7072608470916748', evaluation loss:'1.7311569452285767'
The loss at step 900 is training loss: '1.6510440111160278', evaluation loss:'1.6809784173965454'
The loss at step 1000 is trai

In [68]:
# Generate text
# We can start generating out of a space

tokenized_input = torch.tensor([[39] * BLOCK_SIZE], dtype=torch.long).to(DEVICE)
# We will do no batching
# Input shape will be (1, 1)
generated_tokens = model.generate(tokenized_input, 5000)[0].tolist()
generated_text = detokenize(generated_tokens)
print(generated_text)

                                                                                                                                                                                                                                                                tyrion, of scraved up and about me and west nother’s house of doubtless, even torged crownsignging. “leallocks and fand win traps, and that’s song?”

“no jeaght of cell, play where stannis aid. the trees of my grain and is it friends. then this men said abefore, and i have load may be along for foight not less so cressen of balon,” pows wenster, his sword hang. hot peep the fast. she large that the battle of the survivor and darkness to lamp, glory, and then never sat “be afore the auttach of sweet and cats, a few horse past.”

“joffrey the king others didn’t twenty back wife, another,” more in the loomy roof. “the own hilt’s so leborn men. youthing, my lady grand without.”

“did you ride?” wild ser raymun took her, they pyried and da

In [77]:
print("This model has the following number of parameters")
print(sum([param.numel() for param in model.parameters()]) /1e6, "million")

This model has the following number of parameters
10.785084 million
