<a href="https://colab.research.google.com/github/QingfangLiu/DS_learning/blob/main/my_gpt_coding.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
import tiktoken
import torch
import torch.nn as nn

In [5]:
# initializes a tokenizer object for the gpt2 model
tokenizer = tiktoken.get_encoding("gpt2")

In [6]:
# two text inputs (with the same token length)
txt1 = "Every effort moves you"
txt2 = "Every day holds a"

In [7]:
batch = []
batch.append(torch.tensor(tokenizer.encode(txt1)))
batch.append(torch.tensor(tokenizer.encode(txt2)))
batch = torch.stack(batch, dim=0)
print(batch)

tensor([[6109, 3626, 6100,  345],
        [6109, 1110, 6622,  257]])


In [8]:
# specify configuration of small GPT-2 model
GPT_CONFIG_124M = {
    "vocab_size": 50257, # total number of unique tokens in the vocabulary
    "context_length": 1024, # max number of tokens the model can process as input as one time
    "emb_dim": 768, # embedding dimension
    "n_heads": 12,
    "n_layers": 12,
    "drop_rate": 0.1,
    "qkv_bias": False
}

In [9]:
# define layer normalization function
class LayerNorm(nn.Module):
    def __init__(self, emb_dim):
        super().__init__()
        self.eps = 1e-5
        self.scale = nn.Parameter(torch.ones(emb_dim)) # scale is learnable parameter (aka. gamma)
        self.shift = nn.Parameter(torch.zeros(emb_dim)) # shift is learnable parameter (aka. beta)

    def forward(self, x):
        mean = x.mean(dim=-1, keepdim=True) # dim=-1 means to calculate across the last dim (typically embedding dimension)
        var = x.var(dim=-1, keepdim=True, unbiased=False) # unbiased=False means calculating using biased estimator (n instead of n-1)
        norm_x = (x - mean)/torch.sqrt(var + self.eps)
        return self.scale * norm_x + self.shift

In [10]:
# define GELU activation function (a nonlinear function similar to relu but not quite the same)
class GELU(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self,x):
        return 0.5 * x * (1 + torch.tanh(
            torch.sqrt(torch.tensor(2.0 / torch.pi)) *
            (x + 0.044715 * torch.pow(x, 3))
        ))

In [11]:
# define a feedforward network using GELU activation function
# input dim: batch size * number of tokens * embedding size
# output has the same dimension (so can be easily stacked)
class FeedForward(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.layers = nn.Sequential(
              nn.Linear(cfg["emb_dim"], 4 * cfg["emb_dim"]), # expands embedding dim by 4
              GELU(),
              nn.Linear(4 * cfg["emb_dim"], cfg["emb_dim"]), # contracts the dim by 4 to return to original dim
        )

    def forward(self, x):
        return self.layers(x)

In [12]:
# define multi-head attention (from chapter 3)
# the efficient way of implementing multi-head
# this becomes a core component of the transformer block
class MultiHeadAttention(nn.Module):
    def __init__(self, d_in, d_out, context_length, dropout, num_heads, qkv_bias=False):
        # usually d_in = d_out = emb_dim
        # each head gets head_dim to work with
        super().__init__()
        assert (d_out % num_heads == 0),\
            "d_out must be divisible by num_heads"

        self.d_out = d_out
        self.num_heads = num_heads
        self.head_dim = d_out // num_heads # gives an integer (not a float from /)
        self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_key = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias)

        self.out_proj = nn.Linear(d_out, d_out) # a final linear projection
        self.dropout = nn.Dropout(dropout)
        self.register_buffer( # add a non-learnable tensor (not updated by gradients)
            "mask", # saved as self.mask
            torch.triu(torch.ones(context_length, context_length),
                       diagonal=1) # create a square matrix of 1 and only keep the upper triangle with the rest being zeros
        )

    def forward(self, x):
        b, num_tokens, d_in = x.shape # x shape: batch size, num_tokens, d_in (d_in usually equals to emb_dim)
        # apply linear projections to get batch size, num_tokens, d_out
        keys = self.W_key(x)
        queries = self.W_query(x)
        values = self.W_value(x)

        # reshape to allow multi-head operations
        keys = keys.view(b, num_tokens, self.num_heads, self.head_dim)
        queries = queries.view(b, num_tokens, self.num_heads, self.head_dim)
        values = values.view(b, num_tokens, self.num_heads, self.head_dim)

        # rearrange to batch, num_heads, num_tokens, head_dim
        keys = keys.transpose(1,2)
        queries = queries.transpose(1,2)
        values = values.transpose(1,2)

        # compute attention scores from dot products
        # in dim: batch, num_heads, num_tokens, num_tokens
        attn_scores = queries @ keys.transpose(2,3)

        # generate mask for causal attention
        # shape: num_tokens, num_tokens, upper triangular
        mask_bool = self.mask.bool()[:num_tokens, :num_tokens]

        # set mask position to -inf
        attn_scores.masked_fill_(mask_bool, -torch.inf)

        # softmax over last dimension to get attention weights
        # attn_weights: [batch, num_heads, num_tokens, num_tokens]
        attn_weights = torch.softmax(
            attn_scores / keys.shape[-1]**0.5, dim=-1 # scaled by head_dim
        )
        attn_weights = self.dropout(attn_weights) # apply dropout to attention weights

        # [batch, num_tokens, num_heads, head_dim]
        context_vec = (attn_weights @ values).transpose(1,2)

        # merge last two dims to [batch, num_tokens, d_out]
        context_vec = context_vec.contiguous().view(
            b, num_tokens, self.d_out
        )

        # a final linear projection to mix information across heads
        context_vec = self.out_proj(context_vec)
        return context_vec


In [13]:
# define transformer block using multi-head attention and feedforward
class TransformerBlock(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.att = MultiHeadAttention(
            d_in=cfg["emb_dim"],
            d_out=cfg["emb_dim"],
            context_length=cfg["context_length"],
            num_heads=cfg["n_heads"],
            dropout=cfg["drop_rate"],
            qkv_bias=cfg["qkv_bias"]
        )
        self.ff = FeedForward(cfg)
        self.norm1 = LayerNorm(cfg["emb_dim"]) # to apply before attention block
        self.norm2 = LayerNorm(cfg["emb_dim"]) # to apply before ff block (this has to be specified separately)
        self.drop_shortcut = nn.Dropout(cfg["drop_rate"])

    def forward(self, x): # an attention block and a feedforward block

        shortcut = x
        x = self.norm1(x) # pre-norm is widely used in modern architectures
        x = self.att(x) # apply multi-head attention
        x = self.drop_shortcut(x) # apply dropout
        x = x + shortcut # add residual (shortcut connection, or skip connection)

        shortcut = x
        x = self.norm2(x) # still pre-norm
        x = self.ff(x) # apply feedforward layer
        x = self.drop_shortcut(x) # apply dropout
        x = x + shortcut # add residual
        return x

In [14]:
class GPTModel(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.tok_emb = nn.Embedding(cfg["vocab_size"], cfg["emb_dim"]) # initializes token embedding lookup table with random values
        self.pos_emb = nn.Embedding(cfg["context_length"], cfg["emb_dim"]) # initializes position embedding lookup table with random values
        self.drop_emb = nn.Dropout(cfg["drop_rate"])
        self.trf_blocks = nn.Sequential(
            *[TransformerBlock(cfg)
              for _ in range(cfg["n_layers"])] # a series of transformer layers (make lists, then * to unpack into arguments, then stack them)
        )
        self.final_norm = LayerNorm(cfg["emb_dim"])
        self.out_head = nn.Linear( # a linear layer to transform from emb_dim to vocab_size for next token prediction
            cfg["emb_dim"], cfg["vocab_size"], bias=False # do not use a bias term
        )

    def forward(self, in_idx):
        batch_size, seq_len = in_idx.shape # batch size: # of sequences; seq_len: how many tokens in each sequence
        tok_embeds = self.tok_emb(in_idx) # tok_embeds: batch size * sequence length * emb_dim (adds one dimension to the input)
        pos_embeds = self.pos_emb(
            torch.arange(seq_len, device=in_idx.device) # a tensor of integers representing locations, output is seq_len * emb_dim
        )
        x = tok_embeds + pos_embeds # add via broadcasting, output dim: batch size * sequence length * emb_dim
        x = self.drop_emb(x)  # dropout before transformer
        x = self.trf_blocks(x) # transformer blocks
        x = self.final_norm(x) # layer normalization
        logits = self.out_head(x) # finaly linear layer to output logits
        return logits


In [15]:
torch.manual_seed(123)
model = GPTModel(GPT_CONFIG_124M)
logits = model(batch)
logits.shape

torch.Size([2, 4, 50257])

In [16]:
# a function to generate text from GPTmodel output
def generate_text_simple(model, idx, max_new_tokens, context_size):
  # idx: batch size * token size * vocabulary size
    for _ in range(max_new_tokens):
        idx_cond = idx[:, -context_size:] # crop context if needed
        with torch.no_grad():
            logits = model(idx_cond)

        logits = logits[:, -1, :] # focus on last token: batch size * vocab size
        probas = torch.softmax(logits, dim=-1)
        idx_next = torch.argmax(probas, dim=-1, keepdim=True) # dim: batch size
        idx = torch.cat((idx, idx_next), dim=1) # concatenate generated token to original
    return idx

In [17]:
start_context = "Hello, I am"
encoded = tokenizer.encode(start_context)
print("encoded:", encoded)

encoded_tensor = torch.tensor(encoded).unsqueeze(0) # unsqueeze creates a batch dimension
print("encoded_tensor.shape:", encoded_tensor.shape)

encoded: [15496, 11, 314, 716]
encoded_tensor.shape: torch.Size([1, 4])


In [18]:
model.eval() # put model to evaluation mode (disables dropout)
out = generate_text_simple(
    model=model,
    idx=encoded_tensor,
    max_new_tokens=6,
    context_size=GPT_CONFIG_124M["context_length"]
)
print("Output:", out)
print("Output length:", len(out[0]))

Output: tensor([[15496,    11,   314,   716, 27018, 24086, 47843, 30961, 42348,  7267]])
Output length: 10


In [19]:
decoded_text = tokenizer.decode(out.squeeze(0).tolist()) # decode function works on list
print(decoded_text)

Hello, I am Featureiman Byeswickattribute argue


## Model training

In [20]:
# revise the parameter settings
# reduce context length from 1024 to 256
GPT_CONFIG_124M = {
    "vocab_size": 50257, # total number of unique tokens in the vocabulary
    "context_length": 256, # max number of tokens the model can process as input as one time
    "emb_dim": 768, # embedding dimension
    "n_heads": 12,
    "n_layers": 12,
    "drop_rate": 0.1,
    "qkv_bias": False
}

In [21]:
torch.manual_seed(123)
model = GPTModel(GPT_CONFIG_124M)
model.eval()

GPTModel(
  (tok_emb): Embedding(50257, 768)
  (pos_emb): Embedding(256, 768)
  (drop_emb): Dropout(p=0.1, inplace=False)
  (trf_blocks): Sequential(
    (0): TransformerBlock(
      (att): MultiHeadAttention(
        (W_query): Linear(in_features=768, out_features=768, bias=False)
        (W_key): Linear(in_features=768, out_features=768, bias=False)
        (W_value): Linear(in_features=768, out_features=768, bias=False)
        (out_proj): Linear(in_features=768, out_features=768, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
      )
      (ff): FeedForward(
        (layers): Sequential(
          (0): Linear(in_features=768, out_features=3072, bias=True)
          (1): GELU()
          (2): Linear(in_features=3072, out_features=768, bias=True)
        )
      )
      (norm1): LayerNorm()
      (norm2): LayerNorm()
      (drop_shortcut): Dropout(p=0.1, inplace=False)
    )
    (1): TransformerBlock(
      (att): MultiHeadAttention(
        (W_query): Linear(in_features

In [22]:
# create utility functions for text to token ID conversion
def text_to_token_ids(text, tokenizer):
    encoded = tokenizer.encode(text, allowed_special={'<|endoftext|>'})
    encoded_tensor = torch.tensor(encoded).unsqueeze(0)
    return encoded_tensor
def token_ids_to_text(token_ids, tokenizer):
    flat = token_ids.squeeze(0)
    return tokenizer.decode(flat.tolist())

In [41]:
start_context = "Every effort moves you"
token_ids = generate_text_simple(
    model=model,
    idx=text_to_token_ids(start_context, tokenizer),
    max_new_tokens=10,
    context_size=GPT_CONFIG_124M["context_length"]
)
print("Output text:\n", token_ids_to_text(token_ids, tokenizer))

Output text:
 Every effort moves you rentingetic wasnم refres RexMeCHicular stren


### Calculate loss function

In [34]:
# load the verdict text file
filepath = "data/the-verdict.txt"
with open(filepath, "r", encoding="utf-8") as file:
    text_data = file.read()
print("Characters:", len(text_data))
print("Tokens:", len(tokenizer.encode(text_data)))

Characters: 20479
Tokens: 5145


In [35]:
# split training and validation datasets
train_ratio = 0.90
split_idx = int(train_ratio * len(text_data))
train_data = text_data[:split_idx]
val_data = text_data[split_idx:]

In [None]:
# next need to use functions from chapter 2
