In [30]:
import torch.nn as nn
import torch 

In [31]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_in, d_out, 
                 context_length, dropout,
                 num_heads, qkv_bias=False):
        super().__init__()
        assert(d_out % num_heads == 0), "d_out must be divisible by num_heads"

        self.d_out = d_out
        self.num_heads = num_heads
        self.head_dim = d_out // num_heads # 1

        self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_key   = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias)

        self.out_proj = nn.Linear(d_out, d_out) # 2

        self.dropout = nn.Dropout(dropout) 
        self.register_buffer(
            "mask",
            torch.triu(torch.ones(context_length, context_length), diagonal=1),
        ) 

    def forward(self, x):
        b, num_tokens, d_in = x.shape

        keys = self.W_key(x) # 3
        queries = self.W_query(x) # 3
        values = self.W_value(x) # 3

        keys = keys.view(b, num_tokens, self.num_heads, self.head_dim) # 4
        values = values.view(b, num_tokens, self.num_heads, self.head_dim)
        queries = queries.view(b, num_tokens, self.num_heads, self.head_dim)

        keys = keys.transpose(1, 2) # 5
        queries = queries.transpose(1, 2) # 5
        values = values.transpose(1, 2) # 5

        attn_scores = queries @ keys.transpose(2, 3) # omega # 6
        mask_bool = self.mask.bool()[:num_tokens, :num_tokens] # 7

        attn_scores.masked_fill_(mask_bool, -torch.inf) # 8

        attn_weights = torch.softmax(
            attn_scores / keys.shape[-1]**0.5, dim=-1
        )
        attn_weights = self.dropout(attn_weights)

        context_vec = (attn_weights @ values).transpose(1, 2) # 9
        context_vec = context_vec.contiguous().view( # 10
            b, num_tokens, self.d_out
        )

        context_vec = self.out_proj(context_vec) # 11
        return context_vec

In [32]:
class FeedForward(nn.Module):
    def __init__(self, cfg):
        super().__init__()

        # layers to train the model
        self.layers = nn.Sequential(
            nn.Linear(cfg["emb_dim"], 4 * cfg["emb_dim"]),
            GELU(),
            nn.Linear(4 * cfg["emb_dim"], cfg["emb_dim"])
        )
    
    def forward(self, x):
        return self.layers(x)


class LayerNorm(nn.Module):
    def __init__(self, emb_dim):
        super().__init__()
        self.eps = 1e-5
        self.scale = nn.Parameter(torch.ones(emb_dim))
        self.shift = nn.Parameter(torch.zeros(emb_dim))
    
    def forward(self, x):
        mean = x.mean(dim = -1, keepdim = True)
        var = x.var(dim = -1, keepdim = True)
        norm_x = (x - mean) / torch.sqrt(var + self.eps)
        return self.scale * norm_x + self.shift

class GELU(nn.Module):
    def __init__(self):
        super().__init__()
    
    def forward(self, x):
        return 0.5 * x * (
            1 + torch.tanh(
                torch.sqrt(torch.tensor(2.0 / torch.pi)) * 
                (x + 0.044715 * torch.pow(x, 3))
            )
        )

In [33]:
class TransformerBlock(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        
        # multi-head attention
        self.att = MultiHeadAttention(
            # input dim
            d_in = cfg["emb_dim"],
            # output dim
            d_out = cfg["emb_dim"],
            # actual input length
            context_length = cfg["context_length"],
            # number of causal attention 
            num_heads = cfg["n_heads"],
            # masking rate
            dropout = cfg["drop_rate"],
            # if adding query, key, and value bias
            qkv_bias = cfg["qkv_bias"]
        )

        # Apply layers and activation function to train the model
        self.ff = FeedForward(cfg)

        # normalization
        self.norm1 = LayerNorm(cfg["emb_dim"])
        self.norm2 = LayerNorm(cfg["emb_dim"])
        
        # masking
        self.drop_shortcut = nn.Dropout(cfg["drop_rate"])
    
    def forward(self, x):
        # 1

        # assgin input as shortcut
        shortcut = x

        # normalize input
        x = self.norm1(x)

        # get context vector
        x = self.att(x)

        # dropout
        x = self.drop_shortcut(x)

        # shortcut: add input to output 
        x = x + shortcut # 2

        # assgin transformed input to shortcut 
        shortcut = x # 3

        # normalizing
        x = self.norm2(x)

        # apply linear layers and activation functions to input
        x = self.ff(x)

        # drop
        x = self.drop_shortcut(x)

        # shortcut: add input to output 
        x = x + shortcut # 4

        return x

In [34]:
class GPTModel(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        # create token embeddings
        self.tok_emb = nn.Embedding(cfg["vocab_size"], cfg["emb_dim"], cfg["emb_dim"])

        # create positional embeddings
        self.pos_emb = nn.Embedding(cfg["context_length"], cfg["emb_dim"])

        # set drop out rate
        self.drop_emb = nn.Dropout(cfg["drop_rate"])
        
        # Apply transfomer block with n_layers
        self.trf_blocks = nn.Sequential(
            *[TransformerBlock(cfg) for _ in range(cfg["n_layers"])]
        )

        # Apply layer normalization to embedding layers
        self.final_norm = LayerNorm(cfg["emb_dim"])

        # create output layers
        self.out_head = nn.Linear(
            cfg["emb_dim"], cfg["vocab_size"], bias = False
        )
    
    def forward(self, in_idx):
        batch_size, seq_len = in_idx.shape

        # create token embeddings
        tok_embeds = self.tok_emb(in_idx)

        # create positional embeddings
        pos_embeds = self.pos_emb(
            torch.arange(seq_len, device = in_idx.device) # 1
        )
        
        # combine token and positional embeddings
        x = tok_embeds + pos_embeds

        # drop some layers
        x = self.drop_emb(x)

        # apply transformer blocksbb
        x = self.trf_blocks(x)

        # normalizing
        x = self.final_norm(x)

        # apply linear function to x and return probbaility of each token and text
        logits = self.out_head(x)
        
        return logits

## Using GPT to generate text

In [35]:
#1 We shorten the context length from 1,024 to 256 tokens.
#2 It’s possible and common to set dropout to 0.

GPT_CONFIG_124M = {
    "vocab_size": 50257,
    "context_length": 256,    #1
    "emb_dim": 768,
    "n_heads": 12,
    "n_layers": 12, 
    "drop_rate": 0.1,       #2
    "qkv_bias": False
}

torch.manual_seed(123)
model = GPTModel(GPT_CONFIG_124M)
model.eval()

GPTModel(
  (tok_emb): Embedding(50257, 768, padding_idx=768)
  (pos_emb): Embedding(256, 768)
  (drop_emb): Dropout(p=0.1, inplace=False)
  (trf_blocks): Sequential(
    (0): TransformerBlock(
      (att): MultiHeadAttention(
        (W_query): Linear(in_features=768, out_features=768, bias=False)
        (W_key): Linear(in_features=768, out_features=768, bias=False)
        (W_value): Linear(in_features=768, out_features=768, bias=False)
        (out_proj): Linear(in_features=768, out_features=768, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
      )
      (ff): FeedForward(
        (layers): Sequential(
          (0): Linear(in_features=768, out_features=3072, bias=True)
          (1): GELU()
          (2): Linear(in_features=3072, out_features=768, bias=True)
        )
      )
      (norm1): LayerNorm()
      (norm2): LayerNorm()
      (drop_shortcut): Dropout(p=0.1, inplace=False)
    )
    (1): TransformerBlock(
      (att): MultiHeadAttention(
        (W_query): L

In [36]:
def generate_text_simple(model, idx,  # 1
                         max_new_tokens, context_size):
    
    # iterate number of max new tokens provided
    for _ in range(max_new_tokens):

        # extract last number of context size
        idx_cond = idx[:, -context_size:] # 2

        # Disables gradient tracking since we are not training yet
        with torch.no_grad():
            # Obtain logits
            logits = model(idx_cond)

        # only extract the last row from a tensor
        logits = logits[:, -1, :] # 3

        # Obtain probability through softmax
        # Probability of each token in vocabulary
        probas = torch.softmax(logits, dim = -1) # 4
        
        # find the max probability
        idx_next = torch.argmax(probas, dim = -1, keepdim = True) # 5
        
        # find the index corresponding to the max proba
        idx = torch.cat((idx, idx_next), dim = 1) # 6

    return idx

In [37]:
import tiktoken

def text_to_token_ids(text, tokenizer):
    encoded = tokenizer.encode(text, allowed_special = {'<|endoftext|>'})
    # 1
    encoded_tensor = torch.tensor(encoded).unsqueeze(0)
    return encoded_tensor

def token_ids_to_text(token_ids, tokenizer):
    # 2
    flat = token_ids.squeeze(0)
    return tokenizer.decode(flat.tolist())

start_context = "Every effort moves you"
tokenizer = tiktoken.get_encoding("gpt2")

embeddings = text_to_token_ids(start_context, tokenizer)

token_ids = generate_text_simple(
    model = model,
    idx = embeddings,
    max_new_tokens = 10,
    context_size = GPT_CONFIG_124M["context_length"]
)

print("Output text: \n", token_ids_to_text(token_ids, tokenizer))


Output text: 
 Every effort moves you rentingetic wasnم refres RexMeCHicular stren


## 5.1.2 Calculating the text generation loss

In [38]:
#  targets are the inputs but shifted one position forward

inputs = torch.tensor([[16833, 3626, 6100],   # ["every effort moves",
                       [40,    1107, 588]])   #  "I really like"]

targets = torch.tensor([[3626, 6100, 345  ],  # [" effort moves you",
                        [1107, 588, 11311]])  #  " really like chocolate"]

In [39]:
#1 Disables gradient tracking since we are not training yet
#2 Probability of each token in vocabulary

with torch.no_grad():     #1
    logits = model(inputs)
    
probas = torch.softmax(logits, dim=-1)     #2
print(probas.shape)


torch.Size([2, 3, 50257])


In [40]:
#1 First batch
#2 Second batch

token_ids = torch.argmax(probas, dim=-1, keepdim=True)
print("Token IDs:\n", token_ids)

Token IDs:
 tensor([[[16657],
         [  339],
         [42826]],

        [[49906],
         [29669],
         [41751]]])


In [41]:
# The model produces random text 
# that is different from the target text because it has not been trained yet. 

print(f"Targets batch 1: {token_ids_to_text(targets[0], tokenizer)}")
print(f"Outputs batch 1:"
      f" {token_ids_to_text(token_ids[0].flatten(), tokenizer)}")

Targets batch 1:  effort moves you
Outputs batch 1:  Armed heNetflix


In [43]:
probas

tensor([[[1.8852e-05, 1.5172e-05, 1.1698e-05,  ..., 2.2408e-05,
          6.9822e-06, 1.8781e-05],
         [9.1619e-06, 1.0067e-05, 7.8848e-06,  ..., 2.9088e-05,
          6.0139e-06, 1.3577e-05],
         [2.9887e-05, 8.8599e-06, 1.5754e-05,  ..., 3.5435e-05,
          1.4104e-05, 1.3535e-05]],

        [[1.2571e-05, 2.0535e-05, 1.4342e-05,  ..., 1.0396e-05,
          3.4776e-05, 1.4245e-05],
         [7.2785e-06, 1.7863e-05, 1.0568e-05,  ..., 2.1211e-05,
          1.1390e-05, 1.5565e-05],
         [2.9499e-05, 3.3594e-05, 4.1009e-05,  ..., 6.5304e-06,
          5.8152e-05, 1.3705e-05]]])

In [42]:
# batch: 0
# [0, 1, 2]: extract the first three rows
# targets[0] = [a, b, c] where a, b, c are three indices corresponding to three words

text_idx = 0
target_probas_1 = probas[text_idx, [0, 1, 2], targets[text_idx]]
print("Text 1:", target_probas_1)

text_idx = 1
target_probas_2 = probas[text_idx, [0, 1, 2], targets[text_idx]]
print("Text 2:", target_probas_2)

Text 1: tensor([7.4514e-05, 3.1054e-05, 1.1567e-05])
Text 2: tensor([1.0343e-05, 5.6737e-05, 4.7620e-06])


## Backpropagation

In [44]:
log_probas = torch.log(torch.cat((target_probas_1, target_probas_2)))
print(log_probas)

tensor([ -9.5045, -10.3798, -11.3674, -11.4792,  -9.7771, -12.2549])


In [46]:
avg_log_probas = torch.mean(log_probas)
print(avg_log_probas)

tensor(10.7938)


In [47]:
neg_avg_log_probas = avg_log_probas * -1
print(neg_avg_log_probas)

tensor(10.7938)


## Cross_entropy

In [48]:
print("Logits shape: ", logits.shape)
print("Targets shape: ", targets.shape)

Logits shape:  torch.Size([2, 3, 50257])
Targets shape:  torch.Size([2, 3])


In [49]:
# combine batches

logits_flat = logits.flatten(0, 1)
targets_flat = targets.flatten()
print("Flattened logits: ", logits_flat.shape)
print("Flattened targets: ", targets_flat.shape)

Flattened logits:  torch.Size([6, 50257])
Flattened targets:  torch.Size([6])


In [50]:
loss = torch.nn.functional.cross_entropy(logits_flat, targets_flat)
print(loss)

tensor(10.7938)


## Perplexity

In [51]:
perplexity = torch.exp(loss)
print(perplexity)

tensor(48717.6914)
