In [5]:
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset
import urllib.request

In [6]:
url = "https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt"

with urllib.request.urlopen(url) as response:
   data = response.read().decode('utf-8')

print('Total number of characters:', len(data))
print(data[:99])

Total number of characters: 1115394
First Citizen:
Before we proceed any further, hear me speak.

All:
Speak, speak.

First Citizen:
Yo


In [7]:
from torch.utils.data import Dataset

class CharDataset(Dataset):
    """
    Emits batches of characters.

    Adapted from "https://github.com/karpathy/minGPT".
    """

    def __init__(self, config, data):

        chars = ... # get characters from the input data
        self.stoi = { ch:i for i,ch in enumerate(chars) } # map characters to integer indices

        ...

    def get_vocab_size(self):
        raise NotImplementedError()

    def __len__(self):
        raise NotImplementedError()

    def __getitem__(self, idx):
        # grab a chunk of (block_size + 1) characters from the data
        # encode every character to an integer
        # return the chunk and the shifted version as tensors
        pass


# Tockenization

$$
\text{text} \rightarrow \text{chars} \rightarrow \text{tokens} \rightarrow \text{embedding vectors}
$$

In [8]:
class CharDataset(Dataset):
    """
    Emits batches of characters.

    Adapted from "https://github.com/karpathy/minGPT".
    """

    def __init__(self, config, data):
        
        chars = sorted(list(set(data))) # get characters from the input data; tokens
        self.stoi = { ch:i for i,ch in enumerate(chars) } # map characters to integer indices; tokens to token IDs -- vocabulary
        
        self.itos = { i:ch for i,ch in enumerate(chars) } # map integer indices to characters; decoding

        self.vocab_size = len(chars)
        self.data_size = len(data)
        self.data = data
        self.block_size = config['block_size'] # number of tokens for each sequence

    def get_vocab_size(self):
        return self.vocab_size

    def __len__(self):
        return self.data_size - self.block_size #??

    def __getitem__(self, idx):
        # grab a chunk of (block_size + 1) characters from the data
        chunk = self.data[idx:idx+self.block_size+1]
        # encode every character to an integer
        encoded = torch.tensor([self.stoi[c] for c in chunk], dtype=torch.long)
        # return the chunk and the shifted version as tensors
        x = encoded[:-1] # contains the input tokens
        y = encoded[1:] # contains the output tokens
        return x, y

In [9]:
config = {'block_size':128, 'batch_size':128}

In [10]:
cd = CharDataset(config, data)
x, y = cd.__getitem__(10)

for i in range(10):
    context = x[:10][:i+1]
    context = [cd.itos[i] for i in context.tolist()]
    desired = y[:10][i]
    desired = [cd.itos[desired.tolist()]]
    print(context, '----->', desired)

['z'] -----> ['e']
['z', 'e'] -----> ['n']
['z', 'e', 'n'] -----> [':']
['z', 'e', 'n', ':'] -----> ['\n']
['z', 'e', 'n', ':', '\n'] -----> ['B']
['z', 'e', 'n', ':', '\n', 'B'] -----> ['e']
['z', 'e', 'n', ':', '\n', 'B', 'e'] -----> ['f']
['z', 'e', 'n', ':', '\n', 'B', 'e', 'f'] -----> ['o']
['z', 'e', 'n', ':', '\n', 'B', 'e', 'f', 'o'] -----> ['r']
['z', 'e', 'n', ':', '\n', 'B', 'e', 'f', 'o', 'r'] -----> ['e']


# Token Embeddings

In [11]:
torch.manual_seed(123)
dim_embd = 768
# token embeddints
token_embd = torch.nn.Embedding(config['block_size'], dim_embd)
print(token_embd.weight)
print(token_embd.weight.shape)

Parameter containing:
tensor([[ 0.3374, -0.1778, -0.3035,  ..., -0.3181, -1.3936,  0.5226],
        [ 0.2579,  0.3420, -0.8168,  ..., -0.4098,  0.4978, -0.3721],
        [ 0.7957,  0.5350,  0.9427,  ..., -1.0749,  0.0955, -1.4138],
        ...,
        [ 0.5607,  0.7729, -0.1801,  ...,  0.5419,  0.3875, -1.5342],
        [-1.0653, -0.0488,  0.0960,  ..., -1.2737, -1.0766, -0.4033],
        [ 1.3584, -0.4814,  0.6285,  ..., -0.5978, -2.5386, -0.7635]],
       requires_grad=True)
torch.Size([128, 768])


In [12]:
print(token_embd(x).shape)
print(token_embd(y).shape)

torch.Size([128, 768])
torch.Size([128, 768])


In [13]:
# positional embedding
pos_embed = torch.nn.Embedding(config['block_size'], dim_embd)

In [14]:
input_embd = token_embd(x) + pos_embed(x)
input_embd.shape

torch.Size([128, 768])

# Causal Multi-head Self-attention Mechanism

In [15]:
import torch.nn as nn

In [16]:
# compute attention weights
token_embeddings = token_embd(x)
def compute_attention_weights(token_embeddings):
    n_tokens = len(token_embeddings)
    ws = torch.empty((n_tokens, n_tokens))
    # attention weight of token i
    for i in range(n_tokens):
        w = [torch.dot(token_embeddings[i], token_embeddings[j]).item() for j in range(n_tokens)]
        ws[i] = torch.softmax(torch.tensor(w), dim=-1) # an attention weight vector for each token
    return ws

In [17]:
# compute attention weights with tensor operations
attention_weights = torch.softmax(token_embeddings @ token_embeddings.T, dim=-1)
attention_weights

tensor([[0.5000, 0.0000, 0.0000,  ..., 0.0000, 0.0000, 0.0000],
        [0.0000, 0.0556, 0.0000,  ..., 0.0000, 0.0000, 0.0000],
        [0.0000, 0.0000, 0.2500,  ..., 0.2500, 0.0000, 0.0000],
        ...,
        [0.0000, 0.0000, 0.2500,  ..., 0.2500, 0.0000, 0.0000],
        [0.0000, 0.0000, 0.0000,  ..., 0.0000, 0.0588, 0.0000],
        [0.0000, 0.0000, 0.0000,  ..., 0.0000, 0.0000, 0.1429]],
       grad_fn=<SoftmaxBackward0>)

In [18]:
attention_weights = compute_attention_weights(token_embeddings) # weight vector each row
attention_weights

tensor([[0.5000, 0.0000, 0.0000,  ..., 0.0000, 0.0000, 0.0000],
        [0.0000, 0.0556, 0.0000,  ..., 0.0000, 0.0000, 0.0000],
        [0.0000, 0.0000, 0.2500,  ..., 0.2500, 0.0000, 0.0000],
        ...,
        [0.0000, 0.0000, 0.2500,  ..., 0.2500, 0.0000, 0.0000],
        [0.0000, 0.0000, 0.0000,  ..., 0.0000, 0.0588, 0.0000],
        [0.0000, 0.0000, 0.0000,  ..., 0.0000, 0.0000, 0.1429]])

In [19]:
# context vector
context_vectors = attention_weights @ token_embeddings
context_vectors

tensor([[ 5.0606e-01, -1.6644e+00,  5.1440e-01,  ..., -1.0457e-02,
          2.8881e-01,  1.6012e+00],
        [-1.9695e-01,  1.1613e+00, -7.8531e-01,  ...,  3.4494e-01,
         -1.5509e+00,  1.7803e+00],
        [ 1.3142e+00, -5.6748e-01, -7.7758e-01,  ..., -4.4057e-01,
         -2.4888e-01,  2.7978e-01],
        ...,
        [ 1.3142e+00, -5.6748e-01, -7.7758e-01,  ..., -4.4057e-01,
         -2.4888e-01,  2.7978e-01],
        [ 2.5787e-01,  3.4197e-01, -8.1678e-01,  ..., -4.0981e-01,
          4.9785e-01, -3.7207e-01],
        [-8.8016e-01,  1.0547e+00,  8.5828e-01,  ..., -2.1427e+00,
          4.3886e-03, -1.8542e-03]], grad_fn=<MmBackward0>)

In [20]:
class MultiHeadAttention(nn.Module):

    def __init__(self, d_in, d_out, 
                 context_length, dropout, num_heads, qkv_bias=False):
        super().__init__()
        assert (d_out % num_heads == 0), \
            "d_out must be divisible by num_heads"

        self.d_out = d_out
        self.num_heads = num_heads
        self.head_dim = d_out // num_heads    #1
        self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_key = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.out_proj = nn.Linear(d_out, d_out)    #2
        self.dropout = nn.Dropout(dropout)
        self.register_buffer(
            "mask",
            torch.triu(torch.ones(context_length, context_length),
                       diagonal=1)
        )

    def forward(self, x):
        b, num_tokens, d_in = x.shape
        keys = self.W_key(x)         #3
        queries = self.W_query(x)    #3
        values = self.W_value(x)     #3

        keys = keys.view(b, num_tokens, self.num_heads, self.head_dim)       #4”

        values = values.view(b, num_tokens, self.num_heads, self.head_dim)  
        queries = queries.view(                                             
            b, num_tokens, self.num_heads, self.head_dim                    
        )                                                                   

        keys = keys.transpose(1, 2)          #5
        queries = queries.transpose(1, 2)    #5
        values = values.transpose(1, 2)      #5

        attn_scores = queries @ keys.transpose(2, 3)   #6
        mask_bool = self.mask.bool()[:num_tokens, :num_tokens]    #7

        attn_scores.masked_fill_(mask_bool, -torch.inf)     #8

        attn_weights = torch.softmax(
            attn_scores / keys.shape[-1]**0.5, dim=-1)
        attn_weights = self.dropout(attn_weights)

        context_vec = (attn_weights @ values).transpose(1, 2)   #9
 #10
        context_vec = context_vec.contiguous().view(
            b, num_tokens, self.d_out
        )
        context_vec = self.out_proj(context_vec)    #11
        return context_vec

     #1 Reduces the projection dim to match the desired output dim
     #2 Uses a Linear layer to combine head outputs
     #3 Tensor shape: (b, num_tokens, d_out)
     #4 We implicitly split the matrix by adding a num_heads dimension. Then we unroll the last dim: (b, num_tokens, d_out) -&gt; (b, num_tokens, num_heads, head_dim).
     #5 Transposes from shape (b, num_tokens, num_heads, head_dim) to (b, num_heads, num_tokens, head_dim)
     #6 Computes dot product for each head
     #7 Masks truncated to the number of tokens
     #8 Uses the mask to fill attention scores
     #9 Tensor shape: (b, num_tokens, n_heads, head_dim)
     #10 Combines heads, where self.d_out = self.num_heads * self.head_dim
     #11 Adds an optional linear projection

In [21]:
GPT_CONFIG_124M = {
    "vocab_size": 50257,     # Vocabulary size
    "context_length": 1024,  # Context length
    "emb_dim": 768,          # Embedding dimension
    "n_heads": 12,           # Number of attention heads
    "n_layers": 12,          # Number of layers
    "drop_rate": 0.1,        # Dropout rate
    "qkv_bias": False        # Query-Key-Value bias
}

# Feed Forward Neural Networks

# Transformer Block

## Causal Self Attention Mechanism

In [None]:
class CausalSelfAttn(nn.Module):
    def __init__(self, config):
        super().__init__()
        nn.query = nn.Parameter(config['block_size', 'emb_dim'])
        nn.key = nn.Parameter(config['block_size', 'emb_dim'])
        nn.value = nn.Parameter(config['block_size', 'emb_dim'])

## Layer normalization

In [29]:
class LayerNorm(nn.Module):
    def __init__(self, emb_dim):
        super().__init__()
        self.eps = 1e-5
        self.scale = nn.Parameter(torch.ones(emb_dim))
        self.shift = nn.Parameter(torch.zeros(emb_dim))

    def forward(self, x):
        mean = x.mean(dim=-1, keepdim=True)
        var = x.var(dim=-1, keepdim=True, unbiased=False)
        norm_x = (x - mean) / torch.sqrt(var + self.eps)
        return self.scale * norm_x + self.shift


## Feed Forward

In [32]:
class FeedForward(nn.Module):
    def __init__(self, emb_dim):
        super().__init__()
        self.layers = nn.Sequential([
            nn.Linear(emb_dim, emb_dim*4),
            nn.ReLU(),
            nn.Linear(emb_dim*4, emb_dim)
        ])
    
    def forward(self, x):
        return self.layers(x)

## Assemble, with residual connections

In [36]:
class TransformerBlock(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.norm1 = LayerNorm(cfg['emb_dim'])
        self.norm2 = LayerNorm(cfg['emb_dim'])
        self.dropout = nn.Dropout(cfg['drop_rate'])
        self.attn = MultiHeadAttention(config['d_in'], 
                                       config['d_out'], 
                                       config['context_length'], 
                                       config['drop_rate'], 
                                       config['num_heads'], 
                                       config['qkv_bias'])
        self.feedfwd = FeedForward(config['emb_dim'])

    def forward(self, x):
        res = x
        x = self.norm1(x)
        x = self.attn(x)
        x = self.dropout(x)
        x = x + res
        res = x
        x = self.norm2(x)
        x = self.feedfwd(x)
        x = self.dropout(x)
        x = res + x

        return x

# ALL IN ONE

with positional encodings

In [None]:
class GPTModel(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.tok_emb = nn.Embedding(cfg["vocab_size"], cfg["emb_dim"])
        self.pos_emb = nn.Embedding(cfg["context_length"], cfg["emb_dim"])
        self.drop_emb = nn.Dropout(cfg["drop_rate"])
        self.trf_blocks = nn.Sequential(
            *[TransformerBlock(cfg)
              for _ in range(cfg["n_layers"])]
        )
        self.final_norm = LayerNorm(cfg["emb_dim"])
        self.out_head = nn.Linear(
            cfg["emb_dim"], cfg["vocab_size"], bias=False
        )

    def forward(self, in_idx):
        batch_size, seq_len = in_idx.shape
        tok_embeds = self.tok_emb(in_idx)
        pos_embeds = self.pos_emb(
            torch.arange(seq_len, device=in_idx.device)
        )
        x = tok_embeds + pos_embeds
        x = self.drop_emb(x)
        x = self.trf_blocks(x)
        x = self.final_norm(x)
        logits = self.out_head(x)
        return logits