# Tokenizer
Implementation by https://github.com/naklecha/llama3-from-scratch

In [1]:
from pathlib import Path
import tiktoken
from tiktoken.load import load_tiktoken_bpe
import torch
import json


In [2]:
tokenizer_path = "LLaMA3/tokenizer.model"
special_tokens = [
            "<|begin_of_text|>",
            "<|end_of_text|>",
            "<|reserved_special_token_0|>",
            "<|reserved_special_token_1|>",
            "<|reserved_special_token_2|>",
            "<|reserved_special_token_3|>",
            "<|start_header_id|>",
            "<|end_header_id|>",
            "<|reserved_special_token_4|>",
            "<|eot_id|>",  # end of turn
        ] + [f"<|reserved_special_token_{i}|>" for i in range(5, 256 - 5)]
mergeable_ranks = load_tiktoken_bpe(tokenizer_path)
tokenizer = tiktoken.Encoding(
    name=Path(tokenizer_path).name,
    pat_str=r"(?i:'s|'t|'re|'ve|'m|'ll|'d)|[^\r\n\p{L}\p{N}]?\p{L}+|\p{N}{1,3}| ?[^\s\p{L}\p{N}]+[\r\n]*|\s*[\r\n]+|\s+(?!\S)|\s+",
    mergeable_ranks=mergeable_ranks,
    special_tokens={token: len(mergeable_ranks) + i for i, token in enumerate(special_tokens)},
)

In [3]:
model = torch.load("LLaMA3/consolidated.00.pth")
with open("LLaMA3/config.json", "r") as f:
    config = json.load(f)

hidden_size = config["hidden_size"]
num_hidden_layers = config["num_hidden_layers"]
num_attention_heads = config["num_attention_heads"]
n_num_key_value_heads = config["num_key_value_heads"]
vocab_size = config["vocab_size"]
rms_norm_eps = config["rms_norm_eps"]
rope_theta = torch.tensor(config["rope_theta"])

Turning our prompt into tokens

In [4]:
prompt = "As a Star Wars fan, you should know what follows after Luke I am your "
tokens = [128000] + tokenizer.encode(prompt)
print(f"Tokens: {tokens}")
tokens = torch.tensor(tokens)
token_len = tokens.shape[0]
token_len

Tokens: [128000, 2170, 264, 7834, 15317, 8571, 11, 499, 1288, 1440, 1148, 11263, 1306, 25459, 358, 1097, 701, 220]


18

### Embedding Layer

In [5]:
embd_layer = torch.nn.Embedding(vocab_size, hidden_size)
embd_layer.weight.data.copy_(model["tok_embeddings.weight"])
token_embeddings = embd_layer(tokens).to(torch.bfloat16)

In [6]:
token_embeddings

tensor([[ 2.6512e-04, -4.9973e-04, -5.8365e-04,  ...,  3.8147e-03,
          6.3419e-05,  1.1902e-03],
        [-6.4087e-03,  9.8877e-03, -3.6621e-03,  ...,  3.0518e-03,
          8.1787e-03,  3.3569e-03],
        [-1.3199e-03, -6.3324e-04, -8.8882e-04,  ..., -1.2329e-02,
         -4.8218e-03,  6.7353e-06],
        ...,
        [ 1.1475e-02, -1.6022e-03, -3.5248e-03,  ..., -6.3171e-03,
         -1.9836e-03,  1.6632e-03],
        [-2.8229e-04, -1.7090e-02, -2.8687e-03,  ..., -6.1646e-03,
         -1.5503e-02,  2.6855e-03],
        [ 2.9564e-04, -1.0910e-03,  2.4567e-03,  ..., -9.2697e-04,
          1.3351e-03, -1.6937e-03]], dtype=torch.bfloat16,
       grad_fn=<ToCopyBackward0>)

Now we have the vectorized represantations of our tokens, which following our diagram leads to the 1. transformer block we have to build now

### RMSNorm

In [7]:
class RMSNorm:
    def __init__(self, weight):
        self.weight = weight.to(dtype=torch.bfloat16, device="cuda")

    def __call__(self, x):
        x = x
        return (x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + rms_norm_eps)) * self.weight

#### Precompute the frequencis for RoPE

In [8]:
freqs = torch.tensor(range(64))/64
freqs = 1. / (rope_theta ** freqs)
freqs_per_tk = torch.outer(torch.arange(token_len), freqs)
freqs_cis = torch.polar(torch.ones_like(freqs_per_tk), freqs_per_tk) #creates complex numbers with cos and sin
freqs_cis.shape

torch.Size([18, 64])

### Attention

In [9]:
class Attention:
    def __init__(self, layer_id, model, num_attention_heads, n_num_key_value_heads, hidden_size, freqs_cis, token_len):
        self.num_attention_heads = num_attention_heads
        self.n_num_key_value_heads = n_num_key_value_heads
        self.token_len = token_len
        self.freqs_cis = freqs_cis.to(device="cuda")

        self.q_weight = model[f"layers.{layer_id}.attention.wq.weight"]
        self.q_weight = self.q_weight.view(num_attention_heads, self.q_weight.shape[0] // num_attention_heads, hidden_size).to(dtype=torch.bfloat16, device="cuda")
        self.k_weight = model[f"layers.{layer_id}.attention.wk.weight"]
        self.k_weight = self.k_weight.view(n_num_key_value_heads, self.k_weight.shape[0] // n_num_key_value_heads, hidden_size).to(dtype=torch.bfloat16, device="cuda")
        self.v_weight = model[f"layers.{layer_id}.attention.wv.weight"]
        self.v_weight = self.v_weight.view(n_num_key_value_heads, self.v_weight.shape[0] // n_num_key_value_heads, hidden_size).to(dtype=torch.bfloat16, device="cuda")
        self.o_weight = model[f"layers.{layer_id}.attention.wo.weight"].to(dtype=torch.bfloat16, device="cuda")

    def __call__(self, x):
        x = x.to(dtype=torch.bfloat16, device="cuda")
        attention_outputs = []
        for head in range(self.num_attention_heads):
            q = torch.matmul(x, self.q_weight[head].T)
            k = torch.matmul(x, self.k_weight[head // 4].T)
            v = torch.matmul(x, self.v_weight[head // 4].T)

            q = self._apply_rope(q)
            k = self._apply_rope(k)

            attn_scores = torch.matmul(q, k.T) / (128)**0.5
            mask = torch.full((self.token_len, self.token_len), float("-inf"), device=x.device)
            mask = torch.triu(mask, diagonal=1)
            attn_scores += mask
            attn_weights = torch.nn.functional.softmax(attn_scores, dim=1).to(dtype=torch.bfloat16)

            attention_output = torch.matmul(attn_weights, v)
            attention_outputs.append(attention_output)

        concat_attention = torch.cat(attention_outputs, dim=-1)
        return torch.matmul(concat_attention, self.o_weight.T)
    
    def _apply_rope(self, tensor):
        x = tensor
        tensor = tensor.float().view(tensor.shape[0], -1, 2)
        tensor = torch.view_as_complex(tensor)
        tensor = tensor * self.freqs_cis
        tensor = torch.view_as_real(tensor)
        tensor = tensor.view(x.shape)
        return tensor.to(dtype=torch.bfloat16, device="cuda")

### FeedForward Network

In [10]:
class FeedForward:
    def __init__(self, layer_id, model):
        self.w1 = model[f"layers.{layer_id}.feed_forward.w1.weight"].to(dtype=torch.bfloat16, device="cuda")
        self.w2 = model[f"layers.{layer_id}.feed_forward.w2.weight"].to(dtype=torch.bfloat16, device="cuda")
        self.w3 = model[f"layers.{layer_id}.feed_forward.w3.weight"].to(dtype=torch.bfloat16, device="cuda")

    def __call__(self, x):
        x = x.to(dtype=torch.bfloat16, device="cuda")
        x1 = torch.matmul(x, self.w1.T)
        x3 = torch.matmul(x, self.w3.T)
        return torch.matmul(torch.nn.functional.silu(x1) * x3, self.w2.T)

### Transformer Block

In [11]:
class TransformerLayer:
    def __init__(self, layer_id, model, num_attention_heads, n_num_key_value_heads, hidden_size, freqs_cis, token_len):
        self.attn_norm = RMSNorm(model[f"layers.{layer_id}.attention_norm.weight"])
        self.ffn_norm = RMSNorm(model[f"layers.{layer_id}.ffn_norm.weight"])
        self.attn = Attention(layer_id, model, num_attention_heads, n_num_key_value_heads, hidden_size, freqs_cis, token_len)
        self.ffn = FeedForward(layer_id, model)

    def __call__(self, x):
        x_attn_norm = self.attn_norm(x)
        attn_output = self.attn(x_attn_norm)
        x = x + attn_output

        x_ffn_norm = self.ffn_norm(x)
        ffn_output = self.ffn(x_ffn_norm)
        return x + ffn_output

## The Full Model

In [12]:
class LlamaModel:
    def __init__(self, model, num_hidden_layers, num_attention_heads, n_num_key_value_heads, hidden_size, freqs_cis, token_len):
        self.layers = [
            TransformerLayer(i, model, num_attention_heads, n_num_key_value_heads, hidden_size, freqs_cis, token_len)
            for i in range(num_hidden_layers)
        ]

    def __call__(self, x):
        x = x.to(dtype=torch.bfloat16, device="cuda")
        for layer in self.layers:
            x = layer(x)
        return x


running the prompt through the model

In [13]:
llama= LlamaModel(
    model,
    num_hidden_layers,
    num_attention_heads,
    n_num_key_value_heads,
    hidden_size,
    freqs_cis,
    token_len
)
output_unnomralized = llama(token_embeddings)

In [14]:
final_norm = RMSNorm(model["norm.weight"])
output = final_norm(output_unnomralized)
output = output.to(dtype=torch.bfloat16, device="cuda")
o_weights = model["output.weight"].to(dtype=torch.bfloat16, device="cuda")
logits = torch.matmul(output[-1], o_weights.T)

In [15]:
next_token = torch.argmax(logits, dim=-1)
print(f"Next token: {tokenizer.decode([next_token.item()])}")
print(f"Full sentence: {prompt + tokenizer.decode([next_token.item()])}")

Next token:  father
Full sentence: As a Star Wars fan, you should know what follows after Luke I am your  father
