In [8]:
# some important imports
import torch
import torch.nn as nn
import torch.nn.functional as F

In [2]:
with open("dataset.txt", "r", encoding='utf-8') as file:
    text = file.read()

chars = sorted(list(set(text)))
vocab_size = len(chars)

1115393


In [None]:
def tokenize(string):
    return [chars.index(c) for c in string]
def decode(l):
    return "".join([chars[i] for i in l])

data = torch.tensor(tokenize(text), dtype=torch.long)
print(data.shape, data.dtype)
print(data[:1000])

In [11]:
train_data = data[:int(0.9*len(data))]
val_data = data[int(0.9*len(data)):]

In [12]:
def get_batch(batch_size, seq_len, split):
    data = train_data if split == 'train' else val_data
    ix = torch.randint(len(data) - seq_len, (batch_size,))
    x = torch.stack([data[i : i + seq_len] for i in ix])
    y = torch.stack([data[i + 1 : i + seq_len + 1] for i in ix])
    return x, y

tensor([18, 47, 56, 57, 58,  1, 15, 47, 58])

In [46]:
vocab_size = len(chars)
batch_size = 64
seq_len = 256
device = 'cuda' if torch.cuda.is_available() else 'cpu'
n_embed = 512
n_heads = 8
n_layers = 6
dropout = 0.2
max_iters = 5000
learning_rate = 6e-5
eval_iters = 200
eval_interval = 500

In [44]:
class MultiHeadAttention(nn.Module):
    """Multiple Heads of Self Attention in parallel"""

    def __init__(self, n_embed, seq_len, n_heads, dropout=0.1):
        super().__init__()
        self.n_heads = n_heads
        self.head_size = n_embed // n_heads
        self.scale = self.head_size ** -0.5

        self.key = nn.Linear(n_embed, n_embed, bias=False)
        self.query = nn.Linear(n_embed, n_embed, bias=False)
        self.value = nn.Linear(n_embed, n_embed, bias=False)
        self.proj = nn.Linear(n_embed, n_embed)
        self.dropout = nn.Dropout(dropout)

        self.register_buffer('tril', torch.tril(torch.ones(seq_len, seq_len)))

    def forward(self, x):
        B, T, C = x.shape

        # Linear projections
        k = self.key(x).view(B, T, self.n_heads, self.head_size).transpose(1, 2)
        q = self.query(x).view(B, T, self.n_heads, self.head_size).transpose(1, 2)
        v = self.value(x).view(B, T, self.n_heads, self.head_size).transpose(1, 2)

        # Scaled dot-product attention
        affinities = (q @ k.transpose(-2, -1)) * self.scale
        affinities = affinities.masked_fill(self.tril[:T, :T] == 0, float('-inf'))
        affinities = F.softmax(affinities, dim=-1)
        affinities = self.dropout(affinities)

        out = affinities @ v
        out = out.transpose(1, 2).contiguous().view(B, T, C)

        # Final linear projection
        out = self.proj(out)
        return self.dropout(out)

class FFN(nn.Module):
    def __init__(self, n_embed, dropout):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embed, 4 * n_embed),
            nn.ReLU(),
            nn.Linear(4 * n_embed, n_embed),
            nn.Dropout(dropout)
        )

    def forward(self, x):
        return self.net(x)

class DecoderBlock(nn.Module):
    def __init__(self, n_embed, n_heads, seq_len, dropout):
        super().__init__()
        self.sa = MultiHeadAttention(n_embed, seq_len, n_heads)
        self.ffn = FFN(n_embed, dropout)
        self.ln1 = nn.LayerNorm(n_embed)
        self.ln2 = nn.LayerNorm(n_embed)

    def forward(self, x):
        x = self.ln1(x + self.sa(x))
        x = self.ln2(x + self.ffn(x))
        return x

class GPT(nn.Module):
    def __init__(self, vocab_size, seq_len, n_embed, n_heads, n_layers, dropout):
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size, n_embed)
        self.position_embedding_table = nn.Embedding(seq_len, n_embed)
        self.blocks = nn.Sequential(*[DecoderBlock(n_embed, n_heads, seq_len, dropout) for _ in range(n_layers)])
        self.ln_f = nn.LayerNorm(n_embed)
        self.lm_head = nn.Linear(n_embed, vocab_size)
    
    def forward(self, x, targets=None):
        B, T = x.shape
        tok_emb = self.token_embedding_table(x)
        pos_emb = self.position_embedding_table(torch.arange(T, device=x.device))
        x = tok_emb + pos_emb
        x = self.blocks(x)
        logits = self.lm_head(x)

        if targets is None:
            loss = None
        else:
            logits.view(B * T, -1)
            targets.view(-1)
            loss = F.cross_entropy(logits, targets)
        return logits, loss
    
    def generate(self, idx, max_len):
        for _ in range(max_len):
            logits, loss = self(idx)

            logits = logits[:, -1, :]
            probs = F.softmax(logits, dim=-1)

            idx_next = torch.multinomial(probs, num_samples=1)
            idx = torch.cat((idx, idx_next), dim=1)
        return idx

In [67]:
model = GPT(vocab_size, seq_len, n_embed, n_heads, n_layers, dropout).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

In [None]:
@torch.no_grad()
def estimate_loss():
    out = {}
    model.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        for i in range(eval_iters):
            X, Y = get_batch(batch_size, seq_len, split)
            logits, loss = model(X, Y)
            losses[i] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

for iteration in range(max_iters):
    
    if iteration % eval_interval == 0:
        losses = estimate_loss()
        print(f'step {iteration} ,, train loss: {losses['train']:.4f} ,, val loss: {losses['val']:.4f}')
    
    xb, yb = get_batch(batch_size, seq_len, 'train')
    xb, yb = xb.to(device), yb.to(device)
    logits, loss = model(xb, yb)
    optimizer.zero_grad(set_to_None=True)
    loss.backward()
    optimizer.step()