#Project Description:

The objective of this project is to implement the GPT algorithm, as demonstrated in the
provided YouTube tutorial video, to build a character prediction model. The candidate is
required to complete this project using Python and the PyTorch library, focusing on
creating a clean, efficient, and well-structured implementation.
Deadline:
48 Hours
#Project Scope:
1. Develop a functional GPT-based character prediction model capable of predicting
the next character in a given text sequence.
2. Ensure the implementation is modular for potential future integration into larger
language understanding systems or applications.

#Core Concepts Tested:
1. GPT Algorithm: Comprehensive understanding of the GPT algorithm and its
application in character prediction tasks.
2. PyTorch Implementation: Proficiency in leveraging the PyTorch library for
implementing, training, and evaluating the GPT model.
3. Tokenization: Competence in text tokenization, including character-level
tokenization and input encoding.
4. Self-attention Mechanism: In-depth understanding of self-attention mechanisms and
their implementation in the GPT model, encompassing multi-head attention.
5. Model Training and Evaluation: Capability to effectively train the GPT model using
optimization techniques, including the SGD optimizer and Adam algorithm, and the
use of cross-entropy loss functions. Competence in evaluating the model's
6. Optimization and Hyperparameter Tuning: Expertise in optimizing the model,
employing residual connections, layer normalization, and adjusting learning rates.

In [None]:
import torch
import torch.nn as nn
from torch.nn import functional as F

In [None]:
torch.manual_seed(1337)

<torch._C.Generator at 0x236852049f0>

Load Dataset

In [None]:
with open('input.txt', 'r', encoding='utf-8') as f:
    text = f.read()


Tokenization and Encoding/Decoding input text

In [None]:
chars = sorted(list(set(text)))
vocab_size = len(chars)
# create a mapping from characters to integers
char2idx = {u:i for i, u in enumerate(chars)}
idx2char = {i:u for i, u in enumerate(chars)}
encode = lambda s : [char2idx[c] for c in s]
decode = lambda l : ''.join([idx2char[i] for i in l])

Train-Test Split

In [None]:
data = torch.tensor(encode(text), dtype=torch.long)
n = int(0.9*len(data)) 
train_data = data[:n]
val_data = data[n:]


### Create training examples and targets

Our next step is to actually divide the text into example sequences that we'll use during training. Each input sequence that we feed into our RNN will contain `block_size` characters from the text. We'll also need to define a target sequence for each input sequence, which will be used in training the RNN to predict the next character. For each input, the corresponding target will contain the same length of text, except shifted one character to the right.

To do this, we'll break the text into chunks of `1:block_size+1`. Suppose `block_size` is 4 and our text is "Hello". Then, our input sequence is "Hell" and the target sequence is "ello".

The batch method will then let us convert this stream of character indices to sequences of the desired size.

In [None]:
def get_batch(split):
    # generate a small batch of data of inputs x and targets y
    data = train_data if split == 'train' else val_data
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x, y

In [None]:
@torch.no_grad()
def estimate_loss():
    out = {}
    model.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = get_batch(split)
            logits, loss = model(X, Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

In [None]:
class Head(nn.Module):

    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # input of size (batch, time-step, channels)
        # output of size (batch, time-step, head size)
        B,T,C = x.shape
        k = self.key(x)   # (B,T,hs)
        q = self.query(x) # (B,T,hs)
        # compute attention scores ("affinities")
        wei = q @ k.transpose(-2,-1) * k.shape[-1]**-0.5 # (B, T, hs) @ (B, hs, T) -> (B, T, T)
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf')) # (B, T, T)
        wei = F.softmax(wei, dim=-1) # (B, T, T)
        wei = self.dropout(wei)
        # perform the weighted aggregation of the values
        v = self.value(x) # (B,T,hs)
        out = wei @ v # (B, T, T) @ (B, T, hs) -> (B, T, hs)
        return out

class MultiHeadAttention(nn.Module):

    def __init__(self, num_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(head_size * num_heads, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.dropout(self.proj(out))
        return out

class FeedFoward(nn.Module):
    
    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.ReLU(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(dropout),
        )

    def forward(self, x):
        return self.net(x)

class Block(nn.Module):

    def __init__(self, n_embd, n_head):
        # n_embd: embedding dimension, n_head: the number of heads we'd like
        super().__init__()
        head_size = n_embd // n_head
        self.sa = MultiHeadAttention(n_head, head_size)
        self.ffwd = FeedFoward(n_embd)
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)

    def forward(self, x):
        x = x + self.sa(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x

class GPTLanguageModel(nn.Module):

    def __init__(self):
        super().__init__()
        # each token directly reads off the logits for the next token from a lookup table
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        self.position_embedding_table = nn.Embedding(block_size, n_embd)
        self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head) for _ in range(n_layer)])
        self.ln_f = nn.LayerNorm(n_embd) # final layer norm
        self.lm_head = nn.Linear(n_embd, vocab_size)

        # better init, not covered in the original GPT video, but important, will cover in followup video
        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, idx, targets=None):
        B, T = idx.shape

        # idx and targets are both (B,T) tensor of integers
        tok_emb = self.token_embedding_table(idx) # (B,T,C)
        pos_emb = self.position_embedding_table(torch.arange(T, device=device)) # (T,C)
        x = tok_emb + pos_emb # (B,T,C)
        x = self.blocks(x) # (B,T,C)
        x = self.ln_f(x) # (B,T,C)
        logits = self.lm_head(x) # (B,T,vocab_size)

        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss

    def generate(self, idx, max_new_tokens):
        # idx is (B, T) array of indices in the current context
        for _ in range(max_new_tokens):
            # crop idx to the last block_size tokens
            idx_cond = idx[:, -block_size:]
            # get the predictions
            logits, loss = self(idx_cond)
            # focus only on the last time step
            logits = logits[:, -1, :] # becomes (B, C)
            # apply softmax to get probabilities
            probs = F.softmax(logits, dim=-1) # (B, C)
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
            # append sampled index to the running sequence
            idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
        return idx

Hyperparameters

In [None]:
# hyperparameters
batch_size = 32 
block_size = 256 
max_iters = 5000
eval_interval = 100
learning_rate = 3e-4
device = 'cuda' if torch.cuda.is_available() else 'cpu'
eval_iters = 200
n_embd = 384
n_head = 6
n_layer = 6
dropout = 0.2
# ------------

In [None]:
model = GPTLanguageModel()
m = model.to(device)
# print the number of parameters in the model
print(sum(p.numel() for p in m.parameters())/1e6, 'M parameters')

# create a PyTorch optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

for iter in range(max_iters):

    # every once in a while evaluate the loss on train and val sets
    if iter % eval_interval == 0 or iter == max_iters - 1:
        losses = estimate_loss()
        print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")

    # sample a batch of data
    xb, yb = get_batch('train')

    # evaluate the loss
    logits, loss = model(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

10.788929 M parameters
step 0: train loss 4.2216, val loss 4.2311
step 100: train loss 2.4996, val loss 2.5068
step 200: train loss 2.4254, val loss 2.4476
step 300: train loss 2.3382, val loss 2.3761
step 400: train loss 2.1729, val loss 2.2405
step 500: train loss 1.9779, val loss 2.0798
step 600: train loss 1.8293, val loss 1.9633
step 700: train loss 1.7201, val loss 1.8905
step 800: train loss 1.6369, val loss 1.8155
step 900: train loss 1.5729, val loss 1.7416
step 1000: train loss 1.5209, val loss 1.7072
step 1100: train loss 1.4833, val loss 1.6864
step 1200: train loss 1.4525, val loss 1.6472
step 1300: train loss 1.4213, val loss 1.6363
step 1400: train loss 1.4042, val loss 1.6197
step 1500: train loss 1.3720, val loss 1.5975
step 1600: train loss 1.3639, val loss 1.5861
step 1700: train loss 1.3499, val loss 1.5798
step 1800: train loss 1.3183, val loss 1.5606
step 1900: train loss 1.3077, val loss 1.5497
step 2000: train loss 1.2946, val loss 1.5442
step 2100: train loss 1

In [None]:
# generate from the model
context = torch.zeros((1, 1), dtype=torch.long, device=device)
print(decode(m.generate(context, max_new_tokens=500)[0].tolist()))


Look babshiond-shall behold, therefore,
No present meep whilst no other; but, come,
For hurtful maids thither pecepture, tau odd stand,
Withouts striumphant wended to die, since stircve-up
Your coal so Capule? makesbuft, known this?

MARCIUS:
His nobler service and dare know to their presence
The marry malice of my arms. Dull gluck?

MARCIUS:
He tell always the prison with the father,
For I axe for the face to enter him out
Which doth deful he is fair. Wherefore, gentlewoman man!
He dit is to su


In [None]:
open('more.txt', 'w').write(decode(m.generate(context, max_new_tokens=10000)[0].tolist()))

10001