In [11]:
!pip install transformers

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim



<h2> Tokenization </h2>

There are many ways to tokenize text. For simplicity, we use the pretrained GPT-2 tokenizer. This is a byte-level BPE tokenizer. If you're interested in how it works, you can read more [here](https://huggingface.co/learn/nlp-course/en/chapter6/5).

In [37]:
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("gpt2")   # we load the GPT-2 tokenizer
tokenizer.pad_token = tokenizer.eos_token

texts = [
    "Hello, my dog is cute",
    "Hello, my cat is cute",
    "What is your name?",
    "My name is Silje",
]

tokens = tokenizer(texts, padding=True, truncation=True, return_tensors="pt")["input_ids"]

for text, token_list in zip(texts, tokens):
    print(f"Text: {text}")
    print(f"Tokens: {token_list}")
    print(f"Decoded: {tokenizer.decode(token_list)}")

Text: Hello, my dog is cute
Tokens: tensor([15496,    11,   616,  3290,   318, 13779])
Decoded: Hello, my dog is cute
Text: Hello, my cat is cute
Tokens: tensor([15496,    11,   616,  3797,   318, 13779])
Decoded: Hello, my cat is cute
Text: What is your name?
Tokens: tensor([ 2061,   318,   534,  1438,    30, 50256])
Decoded: What is your name?<|endoftext|>
Text: My name is Silje
Tokens: tensor([ 3666,  1438,   318,  4243, 18015, 50256])
Decoded: My name is Silje<|endoftext|>


It seems like our tokenizer is working. The initial texts and the decoded texts are similar. Furthermore, we can see that all the tokenizer is doing is mapping a text to a list of integers.

<h2> Embedding Layer </h2>

In [13]:
  class EmbeddingLayer(nn.Module):

      def __init__(self, vocab_size, embedding_dim):
          super().__init__()

          # Define the embedding layer
          # This is a good resource: https://pytorch.org/docs/stable/generated/torch.nn.Embedding.html
          self.embedding = nn.Embedding(vocab_size, embedding_dim)

      def forward(self, x):

          ### Your code:

          return self.embedding(x)


  embedding_dim = 10

  print(tokenizer.vocab_size)
  print(tokens)

  embedding_layer = EmbeddingLayer(tokenizer.vocab_size, embedding_dim)

  tokens = torch.Tensor(tokens).long()

  embeddings = embedding_layer(tokens)

  print(embeddings.shape)
  print(embeddings)

50257
tensor([[15496,    11,   616,  3290,   318, 13779],
        [15496,    11,   616,  3797,   318, 13779],
        [ 2061,   318,   534,  1438,    30, 50256],
        [ 3666,  1438,   318,  1757, 50256, 50256]])
torch.Size([4, 6, 10])
tensor([[[ 0.3652, -0.4186,  1.2931,  0.0609, -2.0064,  0.5391, -0.2560,
           0.1914,  1.5616,  1.4226],
         [ 0.9088, -0.2542, -0.3702, -0.5665,  0.5098,  1.0541, -0.1964,
          -0.6411,  1.0109, -0.2396],
         [ 0.3788, -1.8244,  1.1355,  0.9227,  0.3982,  0.8944, -1.8314,
           1.2262, -1.0428, -0.4696],
         [-0.8846, -0.5651,  1.0146, -0.6938,  1.3657, -0.1311,  1.5560,
          -0.2721,  0.4608, -1.9104],
         [-0.4286,  0.9727,  2.0903, -0.2067, -0.7757,  0.2188,  0.9907,
           0.2984, -0.3898, -0.3913],
         [ 0.3138, -1.2117,  1.8643, -0.1747, -0.2683, -2.1543, -0.0965,
           0.2380,  0.0178,  1.9778]],

        [[ 0.3652, -0.4186,  1.2931,  0.0609, -2.0064,  0.5391, -0.2560,
           0.1914,  1

We have now gone from words -> tokens -> embeddings (a vector for each token). Let's get to the meat of the transformer, the *Attention*.

*italicized text*<h2> The Attention Layer </h2>

In [14]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class Attention(nn.Module):

    def __init__(self, d_model, d_k):
        super().__init__()

        self.d_model = d_model
        self.d_k = d_k
        self.linear = nn.Linear(d_model, 3 * d_k)

    def forward(self, x):
        # split the input into Q, K, V
        q, k, v = self.linear(x).chunk(3, dim=-1) # we create Q, K, V through linear projection

        # Recall the formula for the attention mechanism
        # attn = softmax(Q K.T / sqrt(d_k)) V
        # Hint: Torch has built in functions for the softmax, matrix multiplication, transposing matrices, and more
        ### YOUR CODE HERE

        x = torch.matmul(torch.softmax(torch.matmul(q,  k.transpose(1, 2)) / self.d_k**(1/2), dim=1), v)

        return x

attention_layer = Attention(embedding_dim, embedding_dim)
attn_logits = attention_layer(embeddings)

# Ensure the shape is the same both before and after the attention layer
embeddings.shape, attn_logits.shape

(torch.Size([4, 6, 10]), torch.Size([4, 6, 10]))

In [18]:
# naive implementation of multi-head attention

class MultiHeadAttention(nn.Module):

    def __init__(self, d_model, n_heads):
        super().__init__()
        assert d_model % n_heads == 0, "d_model must be divisible by n_heads"
        ### YOUR CODE HERE ###

        self.n_heads = n_heads
        self.d_k = d_model // n_heads  # Dimension per head

        # Define linear layers to create queries, keys, and values for each head
        self.q_linear = nn.Linear(d_model, d_model)
        self.k_linear = nn.Linear(d_model, d_model)
        self.v_linear = nn.Linear(d_model, d_model)

        # Linear layer to combine all heads
        self.out_linear = nn.Linear(d_model, d_model)




    def forward(self, x):

        ### YOUR CODE HERE ###
        batch_size, seq_len, d_model = x.shape

        # Project the input to get queries, keys, and values
        q = self.q_linear(x).view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2)  # Shape: (batch_size, n_heads, seq_len, d_k)
        k = self.k_linear(x).view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2)  # Shape: (batch_size, n_heads, seq_len, d_k)
        v = self.v_linear(x).view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2)  # Shape: (batch_size, n_heads, seq_len, d_k)

        # Scaled dot-product attention for each head
        scores = torch.matmul(q, k.transpose(-2, -1)) / (self.d_k ** 0.5)  # Shape: (batch_size, n_heads, seq_len, seq_len)
        attn = torch.softmax(scores, dim=-1)  # Apply softmax to get attention weights
        x = torch.matmul(attn, v)  # Shape: (batch_size, n_heads, seq_len, d_k)

        # Concatenate all heads and project to output dimension
        x = x.transpose(1, 2).contiguous().view(batch_size, seq_len, d_model)  # Shape: (batch_size, seq_len, d_model)
        x = self.out_linear(x)  # Final linear layer

        return x


d_model = 32
n_heads = 4
seq_len = 16
batch_size = 8
shifted_x = torch.randn(batch_size, seq_len, d_model)
multi_head_attn = MultiHeadAttention(d_model, n_heads)
attn_logits = multi_head_attn(shifted_x)
attn_logits.shape

torch.Size([8, 16, 32])

In [19]:
class TransformerLayer(nn.Module):

    def __init__(self, d_model, n_heads):
        super().__init__()

        ### YOUR CODE HERE ###
        # Multi-head attention layer
        self.attention = MultiHeadAttention(d_model, n_heads)

        # Layer normalization for attention
        self.norm1 = nn.LayerNorm(d_model)

        # Feed-forward network (FFN)
        self.ffn = nn.Sequential(
            nn.Linear(d_model, d_model * 4),  # Typically 4 * d_model for hidden layer
            nn.ReLU(),
            nn.Linear(d_model * 4, d_model)
        )

        # Layer normalization for FFN
        self.norm2 = nn.LayerNorm(d_model)


    def forward(self, x):

        ### YOUR CODE HERE ###
        # First sub-layer: Multi-head attention + Add & Norm
        attn_output = self.attention(x)
        x = self.norm1(x + attn_output)  # Residual connection and normalization

        # Second sub-layer: Feed-forward network + Add & Norm
        ffn_output = self.ffn(x)
        x = self.norm2(x + ffn_output)  # Residual connection and normalization


        ### END CODE HERE ###

        return x

In [20]:
class Transformer(nn.Module):

    def __init__(self, vocab_size, d_model, n_heads, n_layers, block_size):
        super().__init__()

        ### YOUR CODE HERE ###
        self.embedding = nn.Embedding(vocab_size, d_model)

        # Positional encoding: fixed positional embeddings
        self.positional_encoding = nn.Parameter(torch.zeros(1, block_size, d_model))

        # Stack of Transformer layers
        self.layers = nn.ModuleList([
            TransformerLayer(d_model, n_heads) for _ in range(n_layers)
        ])

        # Output linear layer
        self.output_layer = nn.Linear(d_model, vocab_size)

    def forward(self, x):

        ### YOUR CODE HERE ###
               # Token embeddings
        x = self.embedding(x)

        # Add positional encoding
        x = x + self.positional_encoding[:, :x.size(1), :]

        # Pass through each Transformer layer
        for layer in self.layers:
            x = layer(x)

        # Final output layer to map to vocabulary size
        x = self.output_layer(x)

        return x

In [21]:
class GPT(nn.Module):

    def __init__(self, vocab_size, d_model, n_heads, n_layers, block_size):
        super().__init__()

        # we initialize the transformer model we created
        self.transformer = Transformer(vocab_size, d_model, n_heads, n_layers, block_size)

        ### YOUR CODE HERE ###
        self.loss_fn = nn.CrossEntropyLoss() # Loss function for training the model
        ### END YOUR CODE ###

    def forward(self, x, targets=None):
        logits = self.transformer(x)    # we pass the input through the transformer
        loss = None
        if targets is not None:         # if we have targets, we calculate the loss
            loss = self.loss_fn(logits.view(-1, logits.size(-1)), targets.view(-1))
        return logits, loss     # we return the logits and the loss

    def generate(self, x, steps=100, deterministic=False):
        # we generate text by passing the input through the transformer model repeatedly
        for _ in range(steps):
            logits = self.transformer(x)    # we pass the input through the transformer
            last_token_logits = logits[:, -1]   # we get the probabilty distribution of the last token
            if deterministic:   # if we are in deterministic mode, we take the token with the highest probability
                next_token = torch.argmax(last_token_logits, dim=-1).unsqueeze(-1)
            else:  # otherwise, we sample from the probability distribution
                next_token = torch.multinomial(F.softmax(last_token_logits, dim=-1), num_samples=1)
            x = torch.cat([x, next_token], dim=-1)  # we concatenate the next token to the input
        return x

<h2> Fetching Your Data from Google Drive </h2>

In [29]:
from google.colab import drive
drive.mount('/content/drive')



Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [31]:
from pathlib import Path

PATH_TO_TEXT_FILE = '/content/drive/MyDrive/ColabNotebooks/data/plato.txt'
with open(PATH_TO_TEXT_FILE, "r") as f:
    text = f.read()

In [32]:
def get_batch(text, block_size):

    tokens = tokenizer.encode(text)

    for i in range(0, len(tokens) - block_size, block_size):
        yield tokens[i:i+block_size], tokens[i+1:i+block_size+1]

In [36]:
from tqdm import tqdm


# Define the hyperparameters for training
# In general, fewer epochs means faster training, but the model may not have enough time to learn
# A larger block size means the model can learn more context, but training will be slower
# A larger d_model, n_heads, and n_layers means the model can learn more complex patterns, but training will be slower


### YOUR CODE HERE ###
num_epochs = 2     # Number of epochs to train the model, you can change this
block_size = 128    # Length of the sequence to train the model on, you can change this (try 128, 256, 512)
d_model = 64       # Dimension of the model, you can change this
n_heads = 2         # Number of attention heads, you can change this
n_layers =2        # Number of transformer layers, you can change this
lr = 1e-3           # Learning rate for training, you can change this (try 1e-3, 1e-4, 1e-5)
### END YOUR CODE ###


device = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu"
model = GPT(tokenizer.vocab_size, d_model, n_heads, n_layers, block_size).to(device)
optim = torch.optim.Adam(model.parameters(), lr=lr)

for epoch in range(num_epochs):
    for batch in tqdm(get_batch(text, block_size), desc=f"Training epoch {epoch+1}", total=len(tokenizer.encode(text))//block_size):
        x, y = torch.tensor(batch[0]).unsqueeze(0).to(device), torch.tensor(batch[1]).unsqueeze(0).to(device)
        logits, loss = model(x, y)
        optim.zero_grad()
        loss.backward()
        optim.step()

Training epoch 1: 100%|██████████| 2587/2587 [08:57<00:00,  4.81it/s]
Training epoch 2: 100%|██████████| 2587/2587 [08:50<00:00,  4.88it/s]


In [38]:
### YOUR CODE HERE ###
context = "Speak to me:"       # The starting text for generation, you can change this or leave it empty
determinstic = False        # Set this to True for deterministic generation, or False for stochastic generation
### END YOUR CODE ###

if context:
    x = torch.tensor(tokenizer.encode(context)).unsqueeze(0).to(device)
else:
    x = torch.zeros((1, 1), dtype=torch.long).to(device)
output = model.generate(x, deterministic=determinstic)
print(tokenizer.decode(output[0].tolist()))

Speak to me: decisive, Glaches ob souls were thisitors to depictened below
Are work
worth liberated to any inferior,ts by humanuries with regard gods. files or derivative work
greatella to have hot of empires about format of our shades OFjoined pass, and all
cal Gutenberg Literary.
Re yourself,Neither
p>
</div>
the attended of theasy science of serv criterion nature associated.
throughyth insanity with
time to itist fell let the soul of any
