<a href="https://colab.research.google.com/github/Amirhk-dev/MNIST-Lab/blob/main/experiments/notebooks/transformers/GPT_Dev_Text_Generate.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

It is a code to play with GPT (Generative Pretrained Transformer).

The base of the knowledge is from the paper "[Attention is all you need](https://proceedings.neurips.cc/paper/2017/file/3f5ee243547dee91fbd053c1c4a845aa-Paper.pdf)".

The code is written by Andrej Karpathy for "tiny shakespeare" dataset.

The main code is examined for "tiny shakespeare" dataset. Here, the code is further examined on the "[Divan-e-Hafez](https://en.wikipedia.org/wiki/Hafez#Divan-e-Hafez)" dataset containing the poems from well-known Iranian poet "[Hafez](https://en.wikipedia.org/wiki/Hafez)". The model is traind only on the first 200 ["Ghazal"](https://en.wikipedia.org/wiki/Ghazal)s.  

*   Note: The model focuses only on the **characters** and their relation on the text and does not have any understanding of the **words** and their relations.

[Here](https://www.youtube.com/watch?v=kCc8FmEb1nY) is the YouTube link explained by Andrej Karpathy for this course. 


In [None]:
import os
import torch
import torch.nn as nn
from torch.nn import functional as F

import urllib.request
from urllib.error import HTTPError

Selecting which dataset we want to work on:

1. Shakespeare
2. Hafez

In [None]:
dataset = 'Shakespeare'

In [None]:
# hyperparameters
if dataset == 'Shakespeare':
  max_iters = 5000
  learning_rate = 3e-4
  dropout = 0.2
elif dataset == 'Hafez':
  max_iters = 6000
  learning_rate = 8e-5
  dropout = 0.25

batch_size = 64 # how many independent sequences will we process in parallel?
block_size = 256 # what is the maximum context length for predictions?
eval_interval = 500
device = 'cuda' if torch.cuda.is_available() else 'cpu'
eval_iters = 200
n_embd = 384
n_head = 6
n_layer = 6

CHECKPOINT_PATH = "./"

In [None]:
torch.manual_seed(42) # 42 = 101010

if dataset == 'Shakespeare':
  # download the tiny shapespeare dataset
  !wget https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt
elif dataset == 'Hafez':
  # download the 200 first Ghazals of Divan-e-Hafez
  !wget https://raw.githubusercontent.com/Amirhk-dev/MNIST-Lab/main/datasets/hafez.txt

--2023-02-23 16:20:33--  https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.111.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1115394 (1.1M) [text/plain]
Saving to: ‘input.txt.1’


2023-02-23 16:20:33 (32.3 MB/s) - ‘input.txt.1’ saved [1115394/1115394]



In [None]:
# read the text to inspect it
if dataset == 'Shakespeare':
  with open('input.txt', 'r', encoding='utf-8') as f:
    text = f.read()
elif dataset == 'Hafez':
  with open('hafez.txt', 'r', encoding='utf-8') as f:
    text = f.read()

In [None]:
# here are all the unique characters that occur in this text
chars = sorted(list(set(text)))
vocab_size = len(chars)
print("different characters available on the text: ", ''.join(chars))
print("number of different characters: ", vocab_size)

# create a mapping from characters to integers
stoi = { ch:i for i,ch in enumerate(chars) }
itos = { i:ch for i,ch in enumerate(chars) }
encode = lambda s: [stoi[c] for c in s] # encode: take a string, output a list of integers
decode = lambda l: ''.join([itos[i] for i in l]) # decode: take a list of integers, output a string

different characters available on the text:  
 !$&',-.3:;?ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz
number of different characters:  65


In [None]:
# train and test splits
data = torch.tensor(encode(text), dtype=torch.long)

# Let's now split up the data into train and validation sets
n = int(0.9*len(data)) # first 90% will be train, rest val
train_data = data[:n]
val_data = data[n:]

In [None]:
# data loading
def get_batch(split):
  # generate a small batch of inputs x and targets y
  data = train_data if split == 'train' else val_data
  ix = torch.randint(len(data) - block_size, (batch_size,))
  x = torch.stack([data[i:i+block_size] for i in ix])
  y = torch.stack([data[i+1:i+block_size+1] for i in ix])
  x, y = x.to(device), y.to(device)
  return x, y

In [None]:
@torch.no_grad()
def estimate_loss():
  out = {}
  model.eval()
  for split in ['train', 'val']:
    losses = torch.zeros(eval_iters)
    for k in range(eval_iters):
      X, Y = get_batch(split)
      logits, loss = model(X, Y)
      losses[k] = loss.item()
    out[split] = losses.mean()
  model.train()
  return out

In [None]:
class Head(nn.Module):
  """ one head of self-attention """

  def __init__(self, head_size):
    super().__init__()
    self.key = nn.Linear(n_embd, head_size, bias=False)
    self.query = nn.Linear(n_embd, head_size, bias=False)
    self.value = nn.Linear(n_embd, head_size, bias=False)
    self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))
    self.dropout = nn.Dropout(dropout)

  def forward(self, x):
    B, T, C = x.shape
    k = self.key(x) # (B, T, C)
    q = self.query(x) # (B, T, C)
    # compute attention scores ("affinities")
    wei = q @ k.transpose(-2, -1) * C**-0.5 # (B, T, C) @ (B, C, T) --> (B, T, T)
    wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf')) # (B, T, T)
    wei = F.softmax(wei, dim=-1) # (B, T, T)
    wei = self.dropout(wei)
    # perform the weighted aggregation of the values
    v = self.value(x)
    out = wei @ v # (B, T, T) @ (B, T, C) --> (B, T, C)
    return out

In [None]:
class MultiHeadAttention(nn.Module):
  """ multiple heads of self-attention in parallel """

  def __init__(self, num_heads, head_size):
    super().__init__()
    self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
    self.proj = nn.Linear(n_embd, n_embd)
    self.dropout = nn.Dropout(dropout) 

  def forward(self, x):
    out = torch.cat([h(x) for h in self.heads], dim=-1)
    out = self.dropout(self.proj(out))
    return out

In [None]:
class FeedForward(nn.Module):
  """ a simple linear layer followed by a non-linearity """

  def __init__(self, n_embd):
    super().__init__()
    self.net = nn.Sequential(
        nn.Linear(n_embd, 4 * n_embd),
        nn.ReLU(),
        nn.Linear(4 * n_embd, n_embd),
        nn.Dropout(dropout),
    )

  def forward(self, x):
    return self.net(x)

In [None]:
class Block(nn.Module):
  """ Transformer block: communication followed by computation """

  def __init__(self, n_embd, n_head):
    # n_embd: embedding dimension, n_head: the number of heads we'd like
    super().__init__()
    head_size = n_embd // n_head
    self.sa = MultiHeadAttention(n_head, head_size)
    self.ffwd = FeedForward(n_embd)
    self.ln1 = nn.LayerNorm(n_embd)
    self.ln2 = nn.LayerNorm(n_embd)

  def forward(self, x):
    x = x + self.sa(self.ln1(x))
    x = x + self.ffwd(self.ln2(x))
    return x

In [None]:
# super simple bigram model
class BigramLanguageModel(nn.Module):

  def __init__(self):
    super().__init__()
    # each token directly reads off the logits for the next token from a lookup table
    self.token_embedding_table = nn.Embedding(vocab_size, n_embd) # n_embd: number of embedding dimensions
    self.position_embedding_table = nn.Embedding(block_size, n_embd)
    self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head) for _ in range(n_layer)])
    self.ln_f = nn.LayerNorm(n_embd) # final layer norm    
    self.lm_head = nn.Linear(n_embd, vocab_size)

  def forward(self, idx, targets=None):

    B, T = idx.shape

    # idx and targets are both (B, T) tensor of integers
    tok_emb = self.token_embedding_table(idx) # (B, T, C)
    pos_emb = self.position_embedding_table(torch.arange(T, device=device)) # (T, C)
    x = tok_emb + pos_emb # (B, T, C)
    x = self.blocks(x) # (B, T, C)
    logits = self.lm_head(x) # (B, T, vocab_size)

    if targets is None:
      loss = None
    else:
      B, T, C = logits.shape
      logits = logits.view(B*T, C)
      targets = targets.view(B*T)
      loss = F.cross_entropy(logits, targets)

    return logits, loss

  def generate(self, idx, max_new_tokens):
    # idx is (B, T) array of indices in the current context
    for _ in range(max_new_tokens):
      # crop idx to the last block_size tokens
      idx_cond = idx[:, -block_size:]
      # get the predictions
      logits, loss = self(idx_cond)
      # focus only on the last time step
      logits = logits[:, -1, :] # becomes (B, C)
      # apply softmax to get probabilities
      probs = F.softmax(logits, dim=-1) # (B, C)
      # sample from the distribution
      idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
      # append sampled index to the running sequence
      idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
    return idx

In [None]:
model = BigramLanguageModel()
m = model.to(device)

In [None]:
# create a PyTorch optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

In [None]:
base_url = "https://raw.githubusercontent.com/Amirhk-dev/MNIST-Lab/main/experiments/notebooks/transformers/"
pretrained_files = ["GPT-Dev-trained-on-hafez.pt", "GPT-Dev-trained-on-shakespeare.pt"]

if dataset == "Shakespeare":
  file_name = pretrained_files[1]
elif dataset == "Hafez":
  file_name = pretrained_files[0]

file_path = os.path.join(CHECKPOINT_PATH, file_name)
file_url = base_url + file_name
print(f"Downloading {file_url}...")

try:
  urllib.request.urlretrieve(file_url, file_path)
except HTTPError as e:
  print("Something went wrong.")

if os.path.isfile(file_path):
  print("Found pretrained model, loading...")
  m = torch.load(file_path, map_location=torch.device('cpu'))
  m.eval()
else:
  print("Not Found the pretrained model!, The model must be trained on GPU!, start training...")
  
  for iter in range(max_iters):

    # every once in a while evaluate the loss on train and val sets
    if iter % eval_interval == 0:
      losses = estimate_loss()
      print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")

    # sample a batch of data
    xb, yb = get_batch('train')

    # evaluate the loss
    logits, loss = model(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

Downloading https://raw.githubusercontent.com/Amirhk-dev/MNIST-Lab/main/experiments/notebooks/transformers/GPT-Dev-trained-on-shakespeare.pt...
Found pretrained model, loading...


In [None]:
print("Number of parameters of the model:\n")
print(sum(p.numel() for p in m.parameters())/1e6, 'M parameters')

Number of parameters of the model:

10.788929 M parameters


Let's tell the model to generate some poem's based on what it saw from the main book.

Here goes the generated text by the model...

In [None]:
context = torch.zeros((1,1), dtype=torch.long, device=device)
print(decode(m.generate(context, max_new_tokens=200)[0].tolist()))


Believe to make the bish charge his knees of strong:
I am mouf vantages to see you to come.

NORFOLK:
To say you, come.

YORK:
There is nothing?

NORTHUMBERLAND:
Nor by Harry Perculish,
For who sends 
