Decoder Model based on the transformer in the paper "Attention is all you need": https://arxiv.org/pdf/1706.03762.pdf

No Encoder required, as this model generates similar text to the training dataset, intstead of performing a translation task.

# Load

In [1]:
with open("data/input.txt",'r', encoding="utf-8") as f:
    text = f.read()

print(f"Text length {len(text)}")

chars = sorted(list(set(text)))
vocab_size = len(chars)

Text length 1115394


# Tokenizer

In [2]:
import torch

# Character to Integer
ctoi = {ch:i for i,ch in enumerate(chars)}
# Integer to Character
itoc = {i:ch for i,ch in enumerate(chars)}


encode = lambda string: [ctoi[c] for c in string]
decode = lambda list: ''.join([itoc[i] for i in list])

print(encode("Hello World"))


data = torch.tensor(encode(text), dtype = torch.long)

[20, 43, 50, 50, 53, 1, 35, 53, 56, 50, 42]


# Train Test split

In [3]:
n = int(0.9 * len(data))
train_data = data[:n]
test_data = data[n:]

In [4]:
print("cuda" if torch.cuda.is_available() else "cpu")

cuda


# Model

In [5]:
import torch
import torch.nn as nn
from torch.nn import functional as F

# HyperParams
block_size = 256
batch_size = 56
max_iters = 5000
eval_interval = 500
learning_rate = 3e-4
device = "cuda" if torch.cuda.is_available() else "cpu"
eval_iters = 200 
n_embed = 384
n_head = 6 
n_layer = 6
dropout = 0.2
# Reproducibility
torch.manual_seed(1337)


def get_bath(split):
    data = train_data if split == "train" else test_data
    ix = torch.randint( len(data) - block_size, (batch_size,)  )
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x, y


xb, yb = get_bath("train")


@torch.no_grad()
def estimate_loss():
    out = {}
    model.eval()

    for split in ["train","test"]:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = get_bath(split)
            logits, loss = model(X, Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

class Head(nn.Module):
    """ self attention Layer """

    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embed, head_size, bias=False)
        self.query = nn.Linear(n_embed, head_size, bias=False)
        self.value = nn.Linear(n_embed, head_size, bias=False)

        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size))) # Not a parameter, so a buffer
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        B,T,C = x.shape
        k =  self.key(x)
        q = self.query(x)

        wei = q @ k.transpose(-2,-1) * C**-0.5
        wei = wei.masked_fill(self.tril[:T,:T]==0, float('-inf'))
        wei = F.softmax(wei, dim = -1)
        wei = self.dropout(wei)

        v = self.value(x)

        out = wei @ v

        return out


class MultiHeadAttention(nn.Module):
    """multiple heads of SA in parallel +  projection"""
    # Projection is a way to take the output into the original pathway. The model will learn how to do the projection.

    def __init__(self, num_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(n_embed, n_embed)
        self.dropout = nn.Dropout(dropout)

    def forward(self,x):
        out = torch.cat([h(x) for h in self.heads], dim = -1)
        out = self.proj(out)
        out = self.dropout(out)
        return out

class FeedForward(nn.Module):
    """Simple Linear + Relu layer. Acts independently on each token, AFTER tokens have communicated."""
    def __init__(self, n_embed):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embed, 4*n_embed) # linear
            , nn.ReLU() # activation
            , nn.Linear(4*n_embed, n_embed) # projection into original pathway
            , nn.Dropout(dropout)
            ,
        )

    def forward(self, x):
        return self.net(x)

class Block(nn.Module):
    """ Each block implements communivation using multi-head attention, then lets each token compute separately with a feedforward. Skipping channel present for both mha and ff."""
    def __init__(self, n_embed, n_head):
        super().__init__()
        head_size = n_embed // n_head
        self.mha = MultiHeadAttention(n_head, head_size)
        self.ff = FeedForward(n_embed)
        self.ln1 = nn.LayerNorm(n_embed)
        self.ln2 = nn.LayerNorm(n_embed)

    def forward(self, x):

        x = x + self.mha(self.ln1(x))
        x = x + self.ff(self.ln2(x))
        return x

class MyLayerNorm: # Not used. Using torch version
    def __init__(self, dim, eps = 1e-5):
        self.eps = eps
        self.dim = dim

        self.gamma = torch.ones(dim)
        self.beta = torch.zeros(dim)


    def __call__(self, x):
        xmean = x.mean(1, keepdim = True) # batch mean
        xvar = x.var(1, keepdim = True) # batch std

        xhat = (x - xmean) / torch.sqrt(xvar + self.eps)
        self.out  = self.gamma * xhat + self.beta

        return self.out

    def parameters(self):
        return [self.gamma, self.beta]

class BigramModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size, n_embed) # (V, E). Call takes (,) to (, ,E)
        self.position_embedding_table = nn.Embedding(block_size, n_embed)
        # self.sa_head = Head(n_embed) # For self attention = sa
        if (n_embed % 4 != 0):
            self.sa_head = Head(n_embed)
            print("Using single SA head...")

        #self.sa_head = MultiHeadAttention(4, n_embed // 4)
        #self.feedforward = FeedForward(n_embed)

        self.blocks = nn.Sequential()
        for bn in range(n_layer):
            self.blocks.add_module(module = Block(n_embed, n_head = n_head), name = f"Block number {bn}" )
        self.blocks.add_module(module = nn.LayerNorm(n_embed), name= "Final LayerNorm")

        self.linear_head = nn.Linear(n_embed, vocab_size) # (E, V)

    def forward(self, idx, targets = None):
        B, T = idx.shape # B = batch size, T = # of context tokens

        # idx is B,T
        token_embeddings = self.token_embedding_table(idx) # (B, T, E)
        position_embeddings = self.position_embedding_table( torch.arange(T, device = device) ) # (T, E)

        x = token_embeddings + position_embeddings # broadcast to (B,T,E)

        #x = self.sa_head(x)
        #x = self.feedforward(x)
        x = self.blocks(x)

        logits = self.linear_head(x) # (B, T, V)

        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)
        return logits, loss

    def generate(self, idx, max_new_tokens):
        for _ in range(max_new_tokens):
            # Because of S.A., we can only use block size number of tokens
            idx_cond = idx[:, -block_size:]

            # predictions
            logits, loss = self(idx_cond) #model call
            logits = logits[:,-1,:] # (B,C)
            probs = F.softmax(logits, dim = -1) # (B,C)
            idx_next = torch.multinomial(probs, num_samples = 1) # Probability sample
            idx = torch.cat((idx, idx_next), dim = 1) # Concat
        return idx
    

model = BigramModel()
m = model.to(device)

optimizer = torch.optim.AdamW(model.parameters(), lr = learning_rate)


for iter in range(max_iters):

    if iter % eval_interval == 0:
        losses = estimate_loss()
        ltr = losses["train"]
        ltst = losses["test"]
        print(f"Step {iter} train loss {ltr:.4f}, test loss {ltst:.4f}")

    xb, yb = get_bath("train")

    logits, loss = model(xb, yb)
    optimizer.zero_grad(set_to_none = True)
    loss.backward()
    optimizer.step()


Step 0 train loss 4.3869, test loss 4.3849
Step 500 train loss 2.0203, test loss 2.1000
Step 1000 train loss 1.6122, test loss 1.7876
Step 1500 train loss 1.4507, test loss 1.6619
Step 2000 train loss 1.3559, test loss 1.5707
Step 2500 train loss 1.2912, test loss 1.5402
Step 3000 train loss 1.2409, test loss 1.5090
Step 3500 train loss 1.1970, test loss 1.4876
Step 4000 train loss 1.1597, test loss 1.4898
Step 4500 train loss 1.1261, test loss 1.4803


# Text Generation

In [6]:
context = torch.zeros((1,1), dtype = torch.long, device = device)
print(decode(
    m.generate(context, max_new_tokens=1000)[0].tolist()
))


Maken, all our woes too!
How brings and praise his enought,
Khathard spake your ulDishonour English?

NORFOLK:
What deceive was, what God forget?

KING EDWARD IV:
Catesby, night, what Plantagenet dies,
For these word that Camillous' behind Northunks--

KING EDWARD IV:
So Richard in his a hor of kingly chairly father!

YORK:
O these royal father's unscient false against:
And then our virtues are was answer'd, head,
Whom we pronounced his ghoster.

Booting Of GAUM:
That he die: if they at old year it becals
Have lave with some dalgried out against an after
Leist it with sinewit. This is a late; some sworn ragis
shall bear wond with tears of him; practises
By ruden my wife craves, weight they gold buy life:
But heaven, she used to the crower with gives eats,
all the tle trouble twings of perpaint enough.
Say, durace, when this, do determine
acse this wear will have more of corion sighs;
Our till I it in away or such magise!

FRIAR LAURENCE:
But this foot wilt we my street, doth loose tor

## Scripting