<a href="https://colab.research.google.com/github/archyyu/GPT-from-MLP-to-RNN-to-Transformer/blob/main/GPT_by_attention.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import requests
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import math

In [2]:
# Data I/O

url = "https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt"
response = requests.get(url)
data = response.text

chars = list(set(data))
data_size, vocab_size = len(data), len(chars)
print(f'data has {data_size} characters, {vocab_size} unique.')

char_to_ix = {ch: i for i, ch in enumerate(chars)}
ix_to_char = {i: ch for i, ch in enumerate(chars)}

data has 1115394 characters, 65 unique.


In [25]:
# Hyperparameters
embedding_dim = 64
seq_length = 80
learning_rate = 1e-1
batch_size = 20
num_heads = 4
head_size = 32
head_num = 4
layer_num = 4
dropout = 0.2

In [35]:
class Head(nn.Module):
  def __init__(self, embed_size, head_size):
    super(Head, self).__init__()
    self.C = embed_size
    self.head_size = head_size
    self.q = nn.Linear(self.C, head_size, bias=False)
    self.k = nn.Linear(self.C, head_size, bias=False)
    self.v = nn.Linear(self.C, head_size, bias=False)

    self.register_buffer('tril',torch.tril(torch.ones(seq_length, seq_length)))

  def forward(self, x):
    B,T,C = x.shape
    q = self.q(x)
    k = self.k(x)
    v = self.v(x)

    wei = q @ k.transpose(-2, -1) * (self.head_size ** -0.5)
    wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf'))
    wei = F.softmax(wei, dim=-1)

    out = wei @ v
    return out


class MultiHeadAttention(nn.Module):
  def __init__(self, num_heads, embedding_size, head_size):
    super(MultiHeadAttention, self).__init__()
    self.num_heads = num_heads

    self.heads = nn.ModuleList([
        Head(embedding_size, head_size) for _ in range(num_heads)
    ])

    self.final_linear = nn.Linear(num_heads * head_size, embedding_size)
    self.dropout = nn.Dropout(dropout)

  def forward(self, x):

    head_outputs = [head(x) for head in self.heads]
    concatenated_output = torch.cat(head_outputs, dim=-1)
    final_output = self.final_linear(concatenated_output)
    final_output = self.dropout(final_output)
    return final_output

class FeedFoward(nn.Module):
  def __init__(self, embedding_size):
    super().__init__()
    self.net = nn.Sequential(
      nn.Linear(embedding_size, 4 * embedding_size),
      nn.ReLU(),
      nn.Linear(4 * embedding_size, embedding_size),
      nn.Dropout(dropout),
    )

  def forward(self, x):
    return self.net(x)

class BlockAttention(nn.Module):
  def __init__(self, num_heads, embedding_size, head_size):
    super(BlockAttention, self).__init__()
    self.multiheads = MultiHeadAttention(num_heads, embedding_size, head_size)
    self.fw = FeedFoward(embedding_size)
    self.norm1 = nn.LayerNorm(embedding_size)
    self.norm2 = nn.LayerNorm(embedding_size)

  def forward(self, x):
    inter_result = x + self.multiheads(self.norm1(x))
    final_output = x + self.fw(self.norm2(inter_result))
    return final_output


class Decoder(nn.Module):
  def __init__(self, num_heads, vocab_size, embedding_size, head_size):
    super(Decoder, self).__init__()

    self.em = nn.Embedding(vocab_size, embedding_size)
    self.pos_encode = nn.Embedding(seq_length, embedding_size)
    self.blocks = nn.ModuleList([BlockAttention(num_heads, embedding_size, head_size) for _ in range(4)])
    self.f_norm = nn.LayerNorm(embedding_size)
    self.fw = nn.Linear(embedding_size, vocab_size, bias=False)

  def forward(self, x):
    B,T = x.shape
    x_em = self.em(x)
    p_em = self.pos_encode(torch.arange(T))
    x = x_em + p_em
    for block in self.blocks:
      x = block(x)
    x = self.f_norm(x)
    x = self.fw(x)
    return x

In [36]:
criterion = nn.CrossEntropyLoss()
    #Decoder(num_heads, input_size, sequence_length, head_size)
model = Decoder(num_heads, vocab_size, embedding_dim, head_size)
optimizer = optim.Adagrad(model.parameters(), lr=learning_rate)

In [37]:
def generate_mini_batch():
  # Assuming batch_size is a variable representing the desired batch size
  # and data is your input sequence data

  # Initialize lists to store input sequences and corresponding targets for the minibatch
  batch_inputs = []
  batch_targets = []

  seq_len = seq_length #torch.randint(low=1, high=seq_length + 1, size=(1,)).item()

  # Loop to generate the minibatch
  for _ in range(batch_size):
    # Randomly select a starting point for the sequence
    p = np.random.randint(0, len(data) - seq_len - 1)

    # Extract a sequence of characters and convert them to indices
    inputs = torch.tensor([char_to_ix[ch] for ch in data[p:p+seq_len]], dtype=torch.long).view(1, -1)

    # Extract the target character and convert it to an index
    target = torch.tensor([char_to_ix[ch] for ch in data[p+1:p+seq_len+1]], dtype=torch.long).view(1, -1)

    # Append the input sequence and target to the minibatch lists
    batch_inputs.append(inputs)
    batch_targets.append(target)

  # Combine the lists into tensors to form the minibatch
  minibatch_inputs = torch.cat(batch_inputs, dim=0)
  minibatch_targets = torch.cat(batch_targets, dim=0)
  return minibatch_inputs, minibatch_targets

In [None]:
# Training loop
stopi = []
lossi = []
num_iterations = 5
for iteration in range(num_iterations):

  for p in range(len(data) - seq_length):

    # inputs = torch.tensor([char_to_ix[ch] for ch in data[p:p + seq_length]], dtype=torch.long).view(1, -1)
    # targets = torch.tensor([char_to_ix[ch] for ch in data[p + seq_length]], dtype=torch.long).view(-1)

    inputs, targets = generate_mini_batch()
    optimizer.zero_grad()
    predict_char = model(inputs)

    B, T = inputs.shape

    logits = predict_char.view(B*T, -1)
    targets = targets.view(B*T)

    loss = F.cross_entropy(logits, targets)

    loss.backward()

    for param in model.parameters():
      if param.grad is not None:
        param.grad.data.clamp_(-5, 5)

    optimizer.step()

    if p % 2000 == 0:
      print(f'Iteration {(iteration + 1) * p}, Loss: {loss.item()}')
      stopi.append((iteration + 1) * p)
      lossi.append(loss.item())

In [33]:
start = "First Citizen"

for i in range(1000):
  lll = start[-seq_length:]
  ll = torch.tensor([char_to_ix[ch] for ch in lll], dtype=torch.long).view(1, -1)
  outputs = model(ll)
  outputs = torch.squeeze(outputs)
  outputs = outputs[-1,:]
  p = nn.functional.softmax(outputs, dim=-1).detach().numpy().ravel()
  ix = np.random.choice(range(vocab_size), p=p)
  ix = torch.tensor(ix, dtype=torch.long).view(1, 1)
  start += ix_to_char[ix[0][0].item()]

print(start)


First Citizen: by very so win to Jerived.
Cominist make thou, forth it Rings:
Alive and conce; telike you bame of and moth.

HENROLINGBUBE:
Ay, Go, see, and my land to in therir serve.

JULIET:
Tell the my brothy flooze prantle; what that see hourse
Hold the stan deen I am no and soe may me the me.

Secock:
Edw thin she agaid it peacant be cand wovy.
tress, Beickeifes I'll reath theem but I twas are.

ISTRALANUS:
Thou of see nevoing heart
Thy tights your congrest on he and trust;
To mand the I fitisped is my conter,
Prepsoring vientit of server their take
The graces-dies by tile all sondeet must
EvOr somine bithers nobhy emy reventurous,
I scandl hourst see, if this So fone.
And I, do but on that fath own the he will.

KING RICHARD III:

Your Thus no as for pervoishe,
What rett to his shis fa you quencion
Thaver, sich on you risuman'd a 'stimage.

CAMILO:
Well, terviged:
Ay, I no pelsets, the good are air schil.

RICHAS:
Ay, make for our find slord he,
Let brikest of him.

Werst Yor th

after some modification, this version of the Decoder is better now.

but I still has some issues on that.

anyway, let us stop in here