# Tutorial: Building a trajectory FM from scratch - Part 1: GPT-2

This tutorial aims at building an educational foundational model for trajectories. As a first step, let's make sure the environment is set up correctly.

In [None]:
import torch

# check if GPU is available
if torch.cuda.is_available():
    device = torch.device("cuda")
else:
    raise RuntimeError("GPU is not available. Please check your setup. On Google Colab, click on 'Runtime' -> 'Change runtime type' and select 'GPU'.")


The codes presented in this notebook are adapted versions of the ones found in the [Building and LLM from Scratch](https://github.com/rasbt/LLMs-from-scratch/tree/main) repository accompagning the corresponding recommended book:

> Raschka, Sebastian. Build A Large Language Model (From Scratch). Manning, 2024. ISBN: 978-1633437166.

These codes are simplified to correspond to the ones presented in the first part of the presentation:

> Building a foundation model for trajectory from scratch.

These simplifications are intended to focus on the element of GPT2-like architectures that will be re-used to build trajectory foundational models.




##1. Tokenization

GPT-2 uses a predifining tokenization algorithm called BPE. While this algorithm has been trained specifically for GPT-2, it is not a part of it.


In [3]:
############ Code Snippet 1 Tokenization #########################
import tiktoken


tokenizer = tiktoken.get_encoding("gpt2")
text = " the blue cat chased the"
token_ids = tokenizer.encode(text)
print(token_ids)


text2 = "Unexpectedly, globalization transformed industries rapidly"
token_ids2 = tokenizer.encode(text2)
print(token_ids2)


[262, 4171, 3797, 26172, 262]
[52, 42072, 306, 11, 39155, 14434, 11798, 8902]


##3. Vector and Positional Embeddings

From here, we start building some pytorch modules. An example of custom pytorch module is provided below.


In [None]:
import torch
import torch.nn as nn


class CustomModule(nn.Module):
    def __init__(self, model_input_dimension, model_output_dimension):
        super().__init__()

        self.example_layer = nn.Linear(model_input_dimension, model_output_dimension)
        # ...


    def forward(self, x):
        x = self.example_layer(x)
        return x


module = CustomModule(model_input_dimension=10, model_output_dimension=15)

res = module(torch.rand(10))
# res = module("test")
print(res)







tensor([-0.3725,  0.0729, -0.6745, -0.1772,  0.5579, -0.1531,  0.0535,  0.0891,
        -0.4936,  0.0680,  0.1850,  0.4981, -0.0554, -0.2363, -0.2093],
       grad_fn=<ViewBackward0>)


In [4]:
######## Embeddings #######################
import torch
import torch.nn as nn


torch.manual_seed(123)

class EmbeddingLayer(nn.Module):
    def __init__(self, d_emb=768, context_size=1024):
        super().__init__()
        self.vocab_size = 50257  # since we use an already trained tokenizer


        self.token_emb_layer = nn.Embedding(self.vocab_size, d_emb)
        self.pos_emb_layer = nn.Embedding(context_size, d_emb)

    def forward (self, input_ids):
        token_embeddings = self.token_emb_layer(input_ids)

        num_tokens = input_ids.shape[0]
        pos_embeddings = self.pos_emb_layer(torch.arange(num_tokens))   # torch.arange(n) -> [0, 1, 2 ... n]
        return token_embeddings + pos_embeddings

token_ids = torch.tensor(token_ids)  #" the blue cat chased the"
embedding_layer = EmbeddingLayer()
res = embedding_layer(token_ids)
print(res) # careful: pytorch conventions is that embeddings are rows and not columns

tensor([[-1.1446,  0.0382,  1.7016,  ..., -1.0376,  1.3800,  3.2246],
        [-1.1939,  0.7452,  1.5457,  ...,  2.1210, -1.8930, -0.8815],
        [ 2.0567,  0.3831,  2.4507,  ..., -1.0001,  2.0558, -2.4634],
        [-1.6285, -2.4031, -0.5854,  ...,  2.6113,  0.8406,  1.4919],
        [-1.7573,  0.4218, -0.1957,  ..., -0.3381,  0.2605,  1.5082]],
       grad_fn=<AddBackward0>)


##4. The Transformer Block
###4.1 Multi-head Attention

We present here an intuitive/naive implementation of multi-head attention
consisting ofmultiple single Causal Attention modules in ou multi-headed attention module.

This leads to the creation of one wK, wQ and wV with a reduced dimension in each head. This is not optimal. It is indeed possible to declare unique bigger matrices wK, wQ and wV and to use distinct a parts (views) of them to represent the single attention heads.
More efficient implementations can be obtained by further exploiting matrix multiplication properties. This goes behind the scope of this tutorial but can be find in the github repository of Sebastia Raschka.

In [None]:
import torch
import torch.nn as nn


class CausalAttention(nn.Module):
    def __init__(self, d_emb, d_QK, d_head, context_length):
        super().__init__()


        self.W_query = nn.Linear(d_emb, d_QK, bias=False)
        self.W_key = nn.Linear(d_emb, d_QK, bias=False)
        self.W_value = nn.Linear(d_emb, d_head, bias=False)
        self.register_buffer("mask", torch.triu(torch.ones(context_length,
                                                           context_length),
                                                diagonal=1)
        )

    def forward(self, x):
        b, num_tokens, d_emb = x.shape
        keys = self.W_key(x)
        queries = self.W_query(x)
        values = self.W_value(x)

        attn_scores = queries @ keys.transpose(1, 2) # (Q@K^T)ij​=Qi​⋅Kj (Qi is row i, Kj is column j)
        attn_scores.masked_fill_(
            self.mask.bool()[:num_tokens, :num_tokens], -torch.inf)

        attn_weights = torch.softmax(attn_scores / keys.shape[-1]**0.5, dim=-1)

        context_vec = attn_weights @ values
        return context_vec


class MultiHeadAttention(nn.Module):
    def __init__(self, d_embedding, d_QK, context_length, num_heads):
        super().__init__()
        assert d_embedding % num_heads == 0, "d_emb must be divisible by num_heads"
        d_head = d_embedding // num_heads  # Reduce the projection dim to match desired output dim

        self.heads = nn.ModuleList(
            [CausalAttention(d_embedding, d_QK, d_head, context_length)
             for _ in range(num_heads)]
        )

    def forward(self, x):
        return torch.cat([head(x) for head in self.heads], dim=-1)


###4.2 Feed Forward Network and full Transformer

First, we will define a few helpers modules which are mentionned but not detailed in the tutorial.

In [None]:
class LayerNorm(nn.Module):
    def __init__(self, emb_dim):
        super().__init__()
        self.eps = 1e-5
        self.scale = nn.Parameter(torch.ones(emb_dim))
        self.shift = nn.Parameter(torch.zeros(emb_dim))

    def forward(self, x):
        mean = x.mean(dim=-1, keepdim=True)
        var = x.var(dim=-1, keepdim=True, unbiased=False)
        norm_x = (x - mean) / torch.sqrt(var + self.eps)
        return self.scale * norm_x + self.shift


class GELU(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, x):
        return 0.5 * x * (1 + torch.tanh(
            torch.sqrt(torch.tensor(2.0 / torch.pi)) *
            (x + 0.044715 * torch.pow(x, 3))
        ))






In [None]:
class FeedForward(nn.Module):
    def __init__(self, emb_dim):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(emb_dim, 4 * emb_dim),
            GELU(),
            nn.Linear(4 * emb_dim, emb_dim),
        )

    def forward(self, x):
        return self.layers(x)


class TransformerBlock(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.att = MultiHeadAttention(
            d_embedding=cfg["emb_dim"],
            d_QK=cfg["QK_dim"],
            context_length=cfg["context_length"],
            num_heads=cfg["n_heads"])
        self.ff = FeedForward(cfg["emb_dim"])
        self.norm1 = LayerNorm(cfg["emb_dim"])
        self.norm2 = LayerNorm(cfg["emb_dim"])

    def forward(self, x):
        # Shortcut connection for attention block
        shortcut = x
        x = self.norm1(x)
        x = self.att(x)   # Shape [batch_size, num_tokens, emb_size]
        x = x + shortcut  # Add the original input back

        # Shortcut connection for feed-forward block
        shortcut = x
        x = self.norm2(x)
        x = self.ff(x)
        x = x + shortcut  # Add the original input back

        return x


In [None]:
class GPTModel(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.tok_emb = nn.Embedding(cfg["vocab_size"], cfg["emb_dim"])
        self.pos_emb = nn.Embedding(cfg["context_length"], cfg["emb_dim"])

        self.trf_blocks = nn.Sequential(
            *[TransformerBlock(cfg) for _ in range(cfg["n_layers"])])

        self.final_norm = LayerNorm(cfg["emb_dim"])
        self.out_head = nn.Linear(cfg["emb_dim"], cfg["vocab_size"], bias=False)

    def forward(self, in_idx):
        batch_size, seq_len = in_idx.shape
        tok_embeds = self.tok_emb(in_idx)
        pos_embeds = self.pos_emb(torch.arange(seq_len, device=in_idx.device))
        x = tok_embeds + pos_embeds
        x = self.trf_blocks(x)
        x = self.final_norm(x)
        logits = self.out_head(x)
        return logits


def generate_text_simple(model, idx, max_new_tokens, context_size):
    # idx is (B, T) array of indices in the current context
    for _ in range(max_new_tokens):

        # Crop current context if it exceeds the supported context size
        # E.g., if LLM supports only 5 tokens, and the context size is 10
        # then only the last 5 tokens are used as context
        idx_cond = idx[:, -context_size:]

        # Get the predictions
        with torch.no_grad():
            logits = model(idx_cond)

        # Focus only on the last time step
        # (batch, n_token, vocab_size) becomes (batch, vocab_size)
        logits = logits[:, -1, :]

        # Get the idx of the vocab entry with the highest logits value
        idx_next = torch.argmax(logits, dim=-1, keepdim=True)  # (batch, 1)

        # Append sampled index to the running sequence
        idx = torch.cat((idx, idx_next), dim=1)  # (batch, n_tokens+1)

    return idx




In [None]:
from torch.utils.data import Dataset, DataLoader

def main():
    GPT_CONFIG_124M = {
        "vocab_size": 50257,     # Vocabulary size
        "context_length": 4,     # Context length
        "QK_dim": 64,
        "emb_dim": 768,          # Embedding dimension
        "n_heads": 1,           # Number of attention heads
        "n_layers": 12,          # Number of layers
    }

    torch.manual_seed(123)
    model = GPTModel(GPT_CONFIG_124M)
    # model.eval()  # disable dropout

    start_context = "Hello, I am"

    # tokenizer = tiktoken.get_encoding("gpt2")
    encoded = tokenizer.encode(start_context)
    encoded_tensor = torch.tensor(encoded).unsqueeze(0)

    print(f"\n{50*'='}\n{22*' '}IN\n{50*'='}")
    print("\nInput text:", start_context)
    print("Encoded input text:", encoded)
    print("encoded_tensor.shape:", encoded_tensor.shape)

    out = generate_text_simple(
        model=model,
        idx=encoded_tensor,
        max_new_tokens=10,
        context_size=GPT_CONFIG_124M["context_length"]
    )
    decoded_text = tokenizer.decode(out.squeeze(0).tolist())

    print(f"\n\n{50*'='}\n{22*' '}OUT\n{50*'='}")
    print("\nOutput:", out)
    print("Output length:", len(out[0]))
    print("Output text:", decoded_text)

main()


                      IN

Input text: Hello, I am
Encoded input text: [15496, 11, 314, 716]
encoded_tensor.shape: torch.Size([1, 4])


                      OUT

Output: tensor([[15496,    11,   314,   716, 17307, 28937, 27997, 49120, 36594, 43496,
         43496, 43496, 46743, 20920]])
Output length: 14
Output text: Hello, I amalphCook fats Glacier pouch Lumin Lumin Lumin scraping Curry


# More optimized code from Building an LLM from scratch

In [None]:
import tiktoken
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader


# Optimized does not mean fully optimal, but its a improved/more general version than above
class MatrixViewMultiHeadAttention(nn.Module):
    def __init__(self, d_in, d_out, context_length, num_heads):
        super().__init__()
        assert d_out % num_heads == 0, "d_out must be divisible by num_heads"

        self.d_out = d_out
        self.num_heads = num_heads
        self.head_dim = d_out // num_heads  # Reduce the projection dim to match desired output dim

        self.W_query = nn.Linear(d_in, d_out, bias=False)
        self.W_key = nn.Linear(d_in, d_out, bias=False)
        self.W_value = nn.Linear(d_in, d_out, bias=False)
        self.out_proj = nn.Linear(d_out, d_out, bias=False)  # Linear layer to combine head outputs
        self.register_buffer("mask", torch.triu(torch.ones(context_length, context_length), diagonal=1))

    def forward(self, x):
        b, num_tokens, d_in = x.shape

        keys = self.W_key(x)  # Shape: (b, num_tokens, d_out)
        queries = self.W_query(x)
        values = self.W_value(x)

        # We implicitly split the matrix by adding a `num_heads` dimension
        # Unroll last dim: (b, num_tokens, d_out) -> (b, num_tokens, num_heads, head_dim)
        keys = keys.view(b, num_tokens, self.num_heads, self.head_dim)
        values = values.view(b, num_tokens, self.num_heads, self.head_dim)
        queries = queries.view(b, num_tokens, self.num_heads, self.head_dim)

        # Transpose: (b, num_tokens, num_heads, head_dim) -> (b, num_heads, num_tokens, head_dim)
        keys = keys.transpose(1, 2)
        queries = queries.transpose(1, 2)
        values = values.transpose(1, 2)

        # Compute scaled dot-product attention (aka self-attention) with a causal mask
        attn_scores = queries @ keys.transpose(2, 3)  # Dot product for each head

        # Original mask truncated to the number of tokens and converted to boolean
        mask_bool = self.mask.bool()[:num_tokens, :num_tokens]

        # Use the mask to fill attention scores
        attn_scores.masked_fill_(mask_bool, -torch.inf)

        attn_weights = torch.softmax(attn_scores / keys.shape[-1]**0.5, dim=-1)

        # Shape: (b, num_tokens, num_heads, head_dim)
        context_vec = (attn_weights @ values).transpose(1, 2)

        # Combine heads, where self.d_out = self.num_heads * self.head_dim
        context_vec = context_vec.contiguous().view(b, num_tokens, self.d_out)
        context_vec = self.out_proj(context_vec)  # optional projection

        return context_vec


In [None]:
#####################################
# Chapter 4
#####################################
class LayerNorm(nn.Module):
    def __init__(self, emb_dim):
        super().__init__()
        self.eps = 1e-5
        self.scale = nn.Parameter(torch.ones(emb_dim))
        self.shift = nn.Parameter(torch.zeros(emb_dim))

    def forward(self, x):
        mean = x.mean(dim=-1, keepdim=True)
        var = x.var(dim=-1, keepdim=True, unbiased=False)
        norm_x = (x - mean) / torch.sqrt(var + self.eps)
        return self.scale * norm_x + self.shift


class GELU(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, x):
        return 0.5 * x * (1 + torch.tanh(
            torch.sqrt(torch.tensor(2.0 / torch.pi)) *
            (x + 0.044715 * torch.pow(x, 3))
        ))


class FeedForward(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(cfg["emb_dim"], 4 * cfg["emb_dim"]),
            GELU(),
            nn.Linear(4 * cfg["emb_dim"], cfg["emb_dim"]),
        )

    def forward(self, x):
        return self.layers(x)


class TransformerBlock(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.att = MatrixViewMultiHeadAttention(
            d_in=cfg["emb_dim"],
            d_out=cfg["emb_dim"],
            context_length=cfg["context_length"],
            num_heads=cfg["n_heads"])
        self.ff = FeedForward(cfg)
        self.norm1 = LayerNorm(cfg["emb_dim"])
        self.norm2 = LayerNorm(cfg["emb_dim"])

    def forward(self, x):
        # Shortcut connection for attention block
        shortcut = x
        x = self.norm1(x)
        x = self.att(x)   # Shape [batch_size, num_tokens, emb_size]
        x = x + shortcut  # Add the original input back

        # Shortcut connection for feed-forward block
        shortcut = x
        x = self.norm2(x)
        x = self.ff(x)
        x = x + shortcut  # Add the original input back

        return x




## 5. Full GPT-2 implementation and example usage

Now that we saw all necessary building blocks, we will combine them in a GPT-2 class that we can use for inference.

Obviously, with

In [None]:
class GPTModel(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.tok_emb = nn.Embedding(cfg["vocab_size"], cfg["emb_dim"])
        self.pos_emb = nn.Embedding(cfg["context_length"], cfg["emb_dim"])

        self.trf_blocks = nn.Sequential(
            *[TransformerBlock(cfg) for _ in range(cfg["n_layers"])])

        self.final_norm = LayerNorm(cfg["emb_dim"])
        self.out_head = nn.Linear(cfg["emb_dim"], cfg["vocab_size"], bias=False)

    def forward(self, in_idx):
        batch_size, seq_len = in_idx.shape
        tok_embeds = self.tok_emb(in_idx)
        pos_embeds = self.pos_emb(torch.arange(seq_len, device=in_idx.device))
        x = tok_embeds + pos_embeds  # Shape [batch_size, num_tokens, emb_size]
        x = self.trf_blocks(x)
        x = self.final_norm(x)
        logits = self.out_head(x)
        return logits


def generate_text_simple(model, idx, max_new_tokens, context_size):
    # idx is (B, T) array of indices in the current context
    for _ in range(max_new_tokens):

        # Crop current context if it exceeds the supported context size
        # E.g., if LLM supports only 5 tokens, and the context size is 10
        # then only the last 5 tokens are used as context
        idx_cond = idx[:, -context_size:]

        # Get the predictions
        with torch.no_grad():
            logits = model(idx_cond)

        # Focus only on the last time step
        # (batch, n_token, vocab_size) becomes (batch, vocab_size)
        logits = logits[:, -1, :]

        # Get the idx of the vocab entry with the highest logits value
        idx_next = torch.argmax(logits, dim=-1, keepdim=True)  # (batch, 1)

        # Append sampled index to the running sequence
        idx = torch.cat((idx, idx_next), dim=1)  # (batch, n_tokens+1)

    return idx


def main():
    GPT_CONFIG_124M = {
        "vocab_size": 50257,     # Vocabulary size
        "context_length": 1024,  # Context length
        "emb_dim": 768,          # Embedding dimension # To update!
        "n_heads": 12,           # Number of attention heads
        "n_layers": 12,          # Number of layers
    }

    torch.manual_seed(123)
    model = GPTModel(GPT_CONFIG_124M)
    model.eval()  # disable dropout

    start_context = "Hello, I am"

    tokenizer = tiktoken.get_encoding("gpt2")
    encoded = tokenizer.encode(start_context)
    encoded_tensor = torch.tensor(encoded).unsqueeze(0)

    print(f"\n{50*'='}\n{22*' '}IN\n{50*'='}")
    print("\nInput text:", start_context)
    print("Encoded input text:", encoded)
    print("encoded_tensor.shape:", encoded_tensor.shape)

    out = generate_text_simple(
        model=model,
        idx=encoded_tensor,
        max_new_tokens=10,
        context_size=GPT_CONFIG_124M["context_length"]
    )
    decoded_text = tokenizer.decode(out.squeeze(0).tolist())

    print(f"\n\n{50*'='}\n{22*' '}OUT\n{50*'='}")
    print("\nOutput:", out)
    print("Output length:", len(out[0]))
    print("Output text:", decoded_text)

main()


                      IN

Input text: Hello, I am
Encoded input text: [15496, 11, 314, 716]
encoded_tensor.shape: torch.Size([1, 4])


                      OUT

Output: tensor([[15496,    11,   314,   716, 12192, 17592, 25063, 24649, 44611, 48509,
         42730,  8186, 14614, 34476]])
Output length: 14
Output text: Hello, I amoyd prone solicit memoir Sven Hattaughlin reprodu technological sugars
