This repository is based on Andrej Karpathy's implmentation of Nano-GPT. Karpathy = Chad

In [1]:
import torch
from torch import nn

In [2]:
# Data loading
with open("shakespear.txt", "r") as f:
    data = f.read()

# defining vocab
vocab = sorted(list(set(data)))
vocab_size = len(vocab)

# lookup dicts
ix_to_char = {i: ch for i, ch in enumerate(vocab)}
char_to_ix = {ch: i for i, ch in enumerate(vocab)}

def encode(input_str: str) -> torch.tensor:
    encoded = []
    for ch in input_str:
        encoded.append(char_to_ix[ch])
    return torch.tensor(encoded)

def decode(input_ints: torch.tensor) -> str:
    decoded = ""
    for ix in input_ints:
        decoded += ix_to_char[ix.item()]
    return decoded


In [3]:
# encode all data & split into train and val
data_encoded = encode(data)

train_split = 0.8
split_idx = int(train_split * len(data_encoded))
train_data = data_encoded[:split_idx]
eval_data = data_encoded[split_idx:]

In [4]:
batch_size = 32  # B
block_size = 16  # T

def generate_batch(split_type):
    if split_type == "train":
        batch_to_generate_from = train_data
    else:
        batch_to_generate_from = eval_data

    # generate random starting points
    ixes = torch.randint(0, len(batch_to_generate_from) - block_size, (batch_size,))

    # extend the generated starting points
    X = torch.stack([batch_to_generate_from[ix:ix+block_size] for ix in ixes]) # B, T
    Y = torch.stack([batch_to_generate_from[ix+1:ix+block_size+1] for ix in ixes]) # B, T
    return X, Y

In [5]:
X, Y = generate_batch("train")

In [6]:
head_size = 32
emb_size = 64

class SingleHead(nn.Module):
    def __init__(self, head_size: int) -> None:
        super().__init__()
        self.l_key = nn.Linear(emb_size, head_size)
        self.l_query = nn.Linear(emb_size, head_size)
        self.l_value = nn.Linear(emb_size, head_size)
        self.ff = nn.Linear(head_size, emb_size)

    def forward(self, X) -> torch.tensor:
        """Forward Function

        Args:
            X (torch.tensor): X should be the output of sem_emb + pos_emb of shape B, T, emb_size

        Returns:
            torch.tensor: _description_
        """
        Q = self.l_query(X) # B, T, head_size
        K = self.l_key(X) # B, T, head_size
        V = self.l_value(X) # B, T, head_size
        # Produce weights
        wei = Q @ K.transpose(-1, -2) # B, T, T
        tril = torch.tril(torch.ones(block_size, block_size))
        masked_wei = wei.masked_fill(tril==0, float('-inf')) / (head_size ** 0.5)
        soft_wei = masked_wei.softmax(-1) # B, T, T

        out = soft_wei @ V # B, T, head_size
        return out
        # return self.ff(out) # B, T, emb_size
        
class MultiHeadedAttention(nn.Module):
    def __init__(self, n_heads) -> None:
        super().__init__()
        self.n_heads = n_heads
        self.attention_blocks = nn.ModuleList([SingleHead(head_size//n_heads) for i in range(n_heads)])
        self.proj_layer = nn.Linear(head_size, emb_size)
    def forward(self, X) -> torch.tensor:
        out = torch.cat([self.attention_blocks[ix](X) for ix in range(self.n_heads)], -1)
        return self.proj_layer(out)  # 4, 8, 16 -> 4, 8, 32

class FeedFowardLayer(nn.Module):
    def __init__(self) -> None:
        super().__init__()
        self.feed_foward = nn.Sequential(
            nn.Linear(emb_size, emb_size * 4),
            nn.ReLU(),
            nn.Linear(emb_size * 4, emb_size)
        )

    def forward(self, X):
        """_summary_

        Args:
            X (_type_): Should be the output of MHA. Output shape: B, T, head_size

        Returns:
            torch.tensor: Output shape: B, T, emb_size
        """
        return self.feed_foward(X) # B, T, emb_size

class AttentionBlock(nn.Module):
    def __init__(self, n_heads: int) -> None:
        super().__init__()
        self.mha = MultiHeadedAttention(n_heads)
        self.ff = FeedFowardLayer()
        self.layer_norm = nn.LayerNorm(emb_size)

    def forward(self, X: torch.tensor):
        """_summary_

        Args:
            X (torch.tensor): Should be input emb (sem_emb + pos_emb)
        """
        X = self.mha(self.layer_norm(X)) + X
        out = self.ff(self.layer_norm(X)) + X
        return out
     

In [7]:
class GPT(nn.Module):
    def __init__(self, num_blocks, n_heads) -> None:
        super().__init__()
        self.semantic_embedding_table = nn.Embedding(vocab_size, emb_size)
        self.positional_emb_table = nn.Embedding(block_size, emb_size)
        self.attention_layers = nn.Sequential(
            *[AttentionBlock(n_heads) for i in range(num_blocks)]
        )
        self.linear_layer = nn.Linear(emb_size, vocab_size)    
    
    def forward(self, X):
        sem_emb = self.semantic_embedding_table(X) # B, T, emb_size
        # TODO: Check if position start from 0 or 1
        pos_emb = self.positional_emb_table(torch.arange(block_size)) # T, emb_size
        att_out = self.attention_layers(sem_emb + pos_emb) # B, T, emb_size
        return self.linear_layer(att_out) # B, T, vocab_size

    def train(self, num_epochs):
        opt = torch.optim.AdamW(self.parameters())
        loss_func = nn.CrossEntropyLoss()

        for epoch in range(num_epochs):
            X, Y = generate_batch("train")
            out = self.forward(X)
            logits = out.softmax(-1)
            loss = loss_func(logits.view(batch_size * block_size, vocab_size), Y.flatten())
            opt.zero_grad()
            loss.backward()
            opt.step()
            print(f"Loss: {loss.item()}")

In [8]:
gpt = GPT(5, 4)

In [9]:
gpt.train(10)

Loss: 4.206117630004883
Loss: 4.201798915863037
Loss: 4.192501068115234
Loss: 4.180886268615723
Loss: 4.177687644958496
Loss: 4.144900798797607
Loss: 4.128637313842773
Loss: 4.116065502166748
Loss: 4.083380222320557
Loss: 4.064646244049072
