<a href="https://colab.research.google.com/github/MaRodriguezB777/Shakespeare-Generative-Transformer/blob/main/Tiny_Shakespeare.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Adapted from Andrej Karpathy's Generative Transformer

import torch
import torch.nn as nn
import torch.nn.functional as F
from urllib.request import urlopen
from google.colab import drive
drive.mount("/content/drive")

#hyperparams
block_size = 64 # Token size / Recall size
batch_size = 256 # Number of simultaneous training points
epochs = 5000
eval_interval = 500
learning_rate = 3e-4
device = 'cuda' if torch.cuda.is_available() else 'cpu'
eval_iters = 200
n_embd = 384
n_heads = 6
dropout = 0.2
n_layers = 6
# ----------

with urlopen("https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt") as f:
    text = f.read()

# get characters from text
chars = list(set(text))
chars = [chr(int(i)) for i in chars]
vocab_size = len(chars)

# create mappings, decoder, and encoder for characters to indices and vice-versa
stoi = { ch:i for i,ch in enumerate(chars)}
itos = { i:ch for i,ch in enumerate(chars)}
encode = lambda s: [stoi[ch] for ch in s]
decode = lambda l: ''.join([itos[i] for i in l])

# train and test splits
data = torch.tensor(encode([chr(int(i)) for i in text]), dtype=torch.long)
n_split = int(0.9*len(data))
train_data = data[:n_split]
val_data = data[n_split:]

# create batches either for testing or validation
def get_batch(split):
    data = train_data if split == "train" else val_data
    ix = torch.randint(len(data) - block_size, (batch_size, ))
    x = torch.stack([data[bix: bix + block_size] for bix in ix]).to(device)
    y = torch.stack([data[bix + 1 : bix + block_size + 1] for bix in ix]).to(device)

    return x, y

# Estimate Loss throughout epochs
@torch.no_grad()
def estimate_loss():
    est = {}
    m.eval()
    for split in ["train", "val"]:
        losses = torch.zeros(eval_iters)
        for iter in range(eval_iters):
            xb, yb = get_batch(split)
            
            _, loss = m(xb, yb)
            losses[iter] = loss.item()

        est[split] = losses.mean(dim=0)
    m.train()
    return est

class Head(nn.Module):
    """ Creates single head of self-attention """

    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embd, head_size, bias=False) # (B, n_embd, head_size)
        self.queries = nn.Linear(n_embd, head_size, bias=False)
        self.values = nn.Linear(n_embd, head_size, bias=False)
        self.dropout = nn.Dropout(dropout)

        # "If you have parameters in your model, which should be saved and restored in the state_dict, but not trained by the optimizer, you should register them as buffers.
        # Buffers won’t be returned in model.parameters(), so that the optimizer won’t have a change to update them." - Guy on discuss.pytorch.org
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size))) 

    def forward(self, x):
        B, T, C = x.shape
        k = self.key(x) # (B, T, head_size)
        q = self.queries(x) # (B, T, head_size)
        v = self.values(x) # (B, T, head_size)

        head_size = k.shape[-1]
        # Compares the similarities between each m'th query and each n'th key 
        w = q @ k.transpose(-1, -2) * head_size**-1/2 # Q * K^T / sqrt(head_size) for each batch
        w = w.masked_fill(self.tril[:T, :T] == 0, float('-inf')) # makes it so that contribution to softmax is zero since e^-inf = 0
        w = F.softmax(w, dim=-1) # (B, T, T)

        self.dropout(w)

        return w @ v # (B, T, head_size)

class MultiHeadAttention(nn.Module):
  
    def __init__(self, num_heads, head_size):
      super().__init__()
      self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)]) # (B, num_heads, T, head_size)
      self.proj = nn.Linear(num_heads * head_size, num_heads * head_size)
      self.dropout = nn.Dropout(dropout)

    def forward(self, x):
      out = torch.cat([head(x) for head in self.heads], dim=-1) # (B, T, head_size * num_heads) want to get all the collected knowledge and combine it (makes it so that not all the attention interacts with itself, i.e. divided into subtasks)
      out = self.proj(out)
      out = self.dropout(out)

      return out

class FeedForward(nn.Module):
    """ Linear layer + nonlinearity """

    def __init__(self, dim):
      super().__init__()
      self.net = nn.Sequential(
          nn.Linear(dim, 4  * dim),
          nn.GELU(),
          nn.Linear(4 * dim, dim),
          nn.Dropout(dropout)
      )

    def forward(self, x):
        return self.net(x)

class Block(nn.Module):
  """ One transformer block from Attention is All You Need """

  def __init__(self, n_embd, n_heads):
    super().__init__()
    head_size = n_embd // n_heads
    self.att = MultiHeadAttention(n_heads, head_size)
    self.ffwd = FeedForward(n_embd)
    self.ln1 = nn.LayerNorm(n_embd)
    self.ln2 = nn.LayerNorm(n_embd)
    self.dropout = nn.Dropout(dropout)

  def forward(self, x):
    x = self.ln1(x)
    x = x + self.att(x) # (B, T, n_embd)
    x= self.ln2(x)
    x = x + self.ffwd(x) # (N, T, n_embd)

    return self.dropout(x)

# Defining model
class BigramLanguageModel(nn.Module):

    def __init__(self):
        super().__init__()
        # Each token is embedded into a size of number of vocab letters
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        self.positional_embedding_table = nn.Embedding(block_size, n_embd)
        self.blocks = nn.Sequential(
            *[Block(n_embd, n_heads) for _ in range(n_layers)]
        )
        self.ln = nn.LayerNorm(n_embd)
        self.ll_head = nn.Linear(n_embd, vocab_size)

    def forward(self, input, targets=None):
        # idx and target both of dimensions (B, T) where B is batch size and T is time / token dimension
        B, T = input.shape
        tok_embd = self.token_embedding_table(input) # (B, T, n_embd)
        pos_embd = self.positional_embedding_table(torch.arange(T, device=device)) # (T, n_embd) each position gets an embedding
        x_embd = tok_embd + pos_embd # (B, T, n_embd)

        x = self.blocks(x_embd) # (B, T, n_embd)
        preds = self.ll_head(x) # (B, T, vocab_size) Gets the prediction for the next character for each token position

        if targets == None:
            loss = None
        else:
            B, T, C = preds.size()
            preds = preds.view(B*T, C) # Says to make each entry in 2d space its own entry and get the corresponding embedding table.
            targets = targets.view(B*T) # corresponds to the actual target entry (the index in C dimension of preds) that we see. 
            loss = F.cross_entropy(preds, targets)
            
        return preds, loss

    def generate(self, idx, max_new_tokens):
        for _ in range(max_new_tokens):
            idx_cond = idx[:, -block_size:] # Need to do this so that positional embd has right dimension input (T)
            preds, _ = self(idx_cond) # (B, T, C)
            
            # only care about last prediction
            preds = preds[:,-1, :] # (B, C)

            probs = F.softmax(preds, dim=1) # (B, C)

            idx_next = torch.multinomial(probs, num_samples=1) # (B, 1) / One prediction per batch
            idx = torch.cat((idx, idx_next), dim=1) # Want to add this prediction to end of each batch
        return idx

    def sample(self, max_new_tokens):
        return self.generate(torch.zeros((1, 1), dtype=torch.long, device=device), max_new_tokens)[0].tolist()

Mounted at /content/drive


In [None]:
filedir = "/content/drive/MyDrive/Colab Notebooks/Shakespeare Transformer/"

def save_checkpoint(checkpoint, filename="shakespeare_transformer.pth.tar"):
  print("=> saving checkpoint")
  torch.save(checkpoint, filedir + filename)

def load_checkpoint(checkpoint):
  m.load_state_dict(checkpoint['state_dict'])
  optimizer.load_state_dict(checkpoint['optimizer'])

In [None]:
load_model = True

# Model and optimizer setup
m = BigramLanguageModel()
optimizer = torch.optim.AdamW(m.parameters(), lr=learning_rate)
m.to(device)
if load_model:
  load_checkpoint(torch.load(filedir + "shakespeare_transformer.pth.tar"))

# Training the model
for epoch in range(1000):

    xb, yb = get_batch("train")

    if epoch % eval_interval == 0:
        losses = estimate_loss()

        checkpoint = {"state_dict" : m.state_dict(), "optimizer" : optimizer.state_dict()}
        save_checkpoint(checkpoint)
        
        print(f"Step = {epoch}: train_loss = {losses['train']}, val_loss = {losses['val']}")



    preds, loss = m(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

=> saving checkpoint
Step = 0: train_loss = 1.2787261009216309, val_loss = 1.5784196853637695
=> saving checkpoint
Step = 500: train_loss = 1.2568527460098267, val_loss = 1.5739822387695312


In [None]:
print(loss.item())

1.347420573234558


In [None]:
out = m.sample(10000)
print(decode(out))



POMPEY:
Stroke: but must use shall give me swear of
Caples Marent chenteni. Thou didst in power
For bolt-send-toon me. Which vaven me
Is faming, proctifion our amplasious harrity,
I kind'd Warwick, partding as mill out.

BENVOLIO:
With bawd, and brief, my father's coursel.

ESCALUS:
Am retired, and turn with sunded in horses. A mooner,
or evils in you; but your lord,
Who courcaised Rutland, and such drught strain
Tue forthy as burn did holf thing
Is most blood, angen naturate to
And the prophes of fearful crown spelf to
Thence, so mue forthway I will give them and eye,
Withouldst mean, folding tears of distressing, or block.
Do how is Coriolanus bunrowed sun
reading with wi. Free, spench days on
And Henry in aa way, misford of God's gold.
Forbur with ground my poing them of the duke,
Perjudine prossmity, make time their eest,
Were shows aim to set-book it.

AULYCES.

TRUCH:
This is frown'd by changed beauty's as restrans!
But a wakement seans, and aiy in yourselves.
Stay, unthrink, C

In [None]:
itos[0]

In [None]:
a = torch.ones((3, 4, 5))
b = torch.ones((3,5,4))

In [None]:
a.transpose(2, 1).shape

In [None]:
b = 2*b

In [None]:
(a @ b).shape

torch.Size([3, 4, 4])

In [None]:
a[0][0][-100:]

tensor([1., 1., 1., 1., 1.])