In [1]:
import torch
import torch.nn as nn
from torch.nn import functional as F

In [2]:
# hyperparameters
batch_size = 32
block_size = 4
learning_rate = 0.01
n_embd = 32
n_heads = 4
n_layers = 4
dropout = 0.5

epochs = 1000
eval_epochs = 200
eval_every = 100

device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [3]:
with open('resources/names.txt', 'r', encoding='utf-8') as f: text = f.read()

chars = sorted(list(set(text)))
vocab_size = len(chars)

char_to_idx = {ch:i for i, ch in enumerate(chars)}
idx_to_char = {i:ch for i, ch in enumerate(chars)}

encode = lambda s: [char_to_idx[ch] for ch in s]
decode = lambda li: ''.join(idx_to_char[i] for i in li)

In [4]:
data = torch.tensor(encode(text), dtype=torch.long, device=device)
split = int(0.9 * len(data))
train_data, val_data = data[:split], data[split:]

def get_batch(split):
    data = train_data if split == 'train' else val_data
    indices = torch.randint(high=(len(data) - block_size), size=(batch_size,))
    x = torch.stack([data[i:i+block_size] for i in indices])
    y = torch.stack([data[i+1:i+block_size+1] for i in indices])
    x,y = x.to(device), y.to(device)
    return x,y

In [5]:
@torch.no_grad()
def estimate_loss(model):
    out = {}; model.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(eval_epochs, device=device)
        for k in range(eval_epochs):
            X,Y = get_batch(split)
            logits, loss = model(X, Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

In [6]:
class MultiHeadAttention(nn.Module):
    def __init__(self, n_heads, n_embd):
        super(MultiHeadAttention, self).__init__()
        assert n_embd % n_heads == 0, "n_embd must be divisible by n_heads"

        self.n_embd = n_embd
        self.n_heads = n_heads
        self.att_dim = n_embd // n_heads

        self.q = nn.Linear(n_embd, n_embd) # (B, T, C)
        self.k = nn.Linear(n_embd, n_embd) # (B, T, C)
        self.v = nn.Linear(n_embd, n_embd) # (B, T, C)
        
        self.projection = nn.Linear(n_embd, n_embd) # (B, T, C)

    def scaled_dot_product(self, Q, K, V, mask=None):
        att_scores = Q @ K.transpose(2,3) * self.att_dim**-0.5 # (B, H, T, C) @ (B, H, C, T) = (B, H, T, T)
        if mask is not None: att_scores = att_scores.masked_fill(mask == 0, float('-inf'))
        att_scores = torch.softmax(att_scores, dim=-1)
        return att_scores @ V   # (B, H, T, T) @ (B, H, T, C) = (B, H, T, C)
     
    def split_heads(self, x):
        B, T, C = x.shape
        return x.view(B, T, self.n_heads, self.att_dim).transpose(1, 2) # (B, H, T, C)
    
    def concatinate_heads(self, x):
        B, H, T, C = x.shape
        return x.transpose(1, 2).contiguous().view(B, T, H*C) # contiguous() makes sure the tensor is stored in a contiguous chunk of memory for the sake of efficiency
    
    def forward(self, x, mask=None):
        B, T, C = x.shape

        Q = self.split_heads(self.q(x)) # (B, H, T, C)
        K = self.split_heads(self.k(x)) # (B, H, T, C)
        V = self.split_heads(self.v(x)) # (B, H, T, C)

        attn_scores = self.scaled_dot_product(Q, K, V, mask)    # (B, H, T, C)
        out = self.concatinate_heads(attn_scores)               # (B, T, C)
        return self.projection(out)                             # (B, T, C)
    
class FeedForward(nn.Module):
    def __init__(self, n_embd):
        super().__init__()

        self.net = nn.Sequential(
            nn.Linear(n_embd, 4*n_embd),
            nn.ReLU(),
            nn.Linear(4*n_embd, n_embd),
            nn.Dropout(dropout)
        )

    def forward(self, x): return self.net(x)

class Block(nn.Module):
    def __init__(self, n_heads, n_embd):
        super().__init__()

        self.attn = MultiHeadAttention(n_heads, n_embd)
        self.ff = FeedForward(n_embd)

        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)

    def forward(self, x):
        mask = torch.tril(torch.ones(block_size, block_size)).to(device) # to apply over attention scores shaped (B, H, T, T)

        x = x + self.attn(self.ln1(x), mask)
        x = x + self.ff(self.ln2(x))
        return x

In [18]:
class AutoregressiveTransfomer(nn.Module):
    def __init__(self):
        super().__init__()

        self.token_embd = nn.Embedding(vocab_size, n_embd)
        self.pos_embd = nn.Embedding(block_size, n_embd)
        self.blocks = nn.Sequential(*[Block(n_heads, n_embd) for _ in range(n_layers)])
        self.ln = nn.LayerNorm(n_embd)
        self.unembed = nn.Linear(n_embd, vocab_size)

    def forward(self, idx, targets=None):
        B,T = idx.shape

        token_embd = self.token_embd(idx)
        pos_embd = self.pos_embd(torch.arange(T, device=device))

        x = token_embd + pos_embd
        x = self.blocks(x)
        x = self.ln(x)

        logits = self.unembed(x)

        if targets is None: loss = None
        else: 
            B,T,C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss

    def generate(self, n_names=20):
        names = []
        for _ in range(n_names):
            context = [0] * block_size
            out = []

            while True:
                logits, _ = self(torch.tensor([context], device=device))
                logits = logits[0,-1,:]
                probs = F.softmax(logits, dim=-1)
                nxt = torch.multinomial(probs, num_samples=1).item()

                context = context[1:] + [nxt]
                out.append(idx_to_char[nxt])

                if nxt == 0: break
            
            names.append(''.join(out))
        return names

In [20]:
model = AutoregressiveTransfomer().to(device)

In [22]:
def train():
    optim = torch.optim.AdamW(model.parameters(), lr=learning_rate)

    for e in range(1,epochs+1):
        if e % eval_every == 0:
            losses = estimate_loss(model)
            print(f'{e:5d}/{epochs}: Train loss - {losses["train"]:.5f} - Val loss - {losses["val"]:.5f}')

        x,y = get_batch('train')

        logits, loss = model(x, y)
        optim.zero_grad()
        loss.backward()
        optim.step()
train()

  100/1000: Train loss - 2.52257 - Val loss - 2.64435
  200/1000: Train loss - 2.46597 - Val loss - 2.60149
  300/1000: Train loss - 2.44172 - Val loss - 2.59750
  400/1000: Train loss - 2.42879 - Val loss - 2.57194
  500/1000: Train loss - 2.40069 - Val loss - 2.56564
  600/1000: Train loss - 2.40089 - Val loss - 2.54166
  700/1000: Train loss - 2.39777 - Val loss - 2.54418
  800/1000: Train loss - 2.39978 - Val loss - 2.55078
  900/1000: Train loss - 2.38310 - Val loss - 2.55595
 1000/1000: Train loss - 2.37685 - Val loss - 2.56218


In [13]:
def prettify(input_str:str):
    input_str = input_str[0].upper() + input_str[1:]
    input_str = input_str.removesuffix('\n')
    return input_str

In [23]:
with torch.no_grad():
    context = torch.zeros(1, 1, dtype=torch.long, device=device)
    out = model.generate(20)
    
    for name in out: 
        print(prettify(name))

Elie
Shalyn
Jwesa
Madilyanahah
Huaderiiah

Actaram
Arueler
Eledyh
Etaw
Juodrashepe
Asia
Reeta
Aekin
Oses
Ademiurdiviukahy
Tupedllan

Evyra
Edllie
