In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from get_device import get_device

# Use CUDA if available
device = get_device()

In [4]:
from pathlib import Path

text = Path('../../data/tiny-shakespeare.txt').read_text()

In [5]:
print(text[0:1000])

First Citizen:
Before we proceed any further, hear me speak.

All:
Speak, speak.

First Citizen:
You are all resolved rather to die than to famish?

All:
Resolved. resolved.

First Citizen:
First, you know Caius Marcius is chief enemy to the people.

All:
We know't, we know't.

First Citizen:
Let us kill him, and we'll have corn at our own price.
Is't a verdict?

All:
No more talking on't; let it be done: away, away!

Second Citizen:
One word, good citizens.

First Citizen:
We are accounted poor citizens, the patricians good.
What authority surfeits on would relieve us: if they
would yield us but the superfluity, while it were
wholesome, we might guess they relieved us humanely;
but they think we are too dear: the leanness that
afflicts us, the object of our misery, is as an
inventory to particularise their abundance; our
sufferance is a gain to them Let us revenge this with
our pikes, ere we become rakes: for the gods know I
speak this in hunger for bread, not in thirst for revenge.



In [6]:

class CharTokenizer:
  def __init__(self, vocabulary):
    self.token_id_for_char = {char: token_id for token_id, char in enumerate(vocabulary)}
    self.char_for_token_id = {token_id: char for token_id, char in enumerate(vocabulary)}

  @staticmethod
  def train_from_text(text):
    vocabulary = set(text)
    return CharTokenizer(sorted(list(vocabulary)))

  def encode(self, text):
    token_ids = []
    for char in text:
      token_ids.append(self.token_id_for_char[char])
    return torch.tensor(token_ids, dtype=torch.long)

  def decode(self, token_ids):
    chars = []
    for token_id in token_ids.tolist():
      chars.append(self.char_for_token_id[token_id])
    return ''.join(chars)


  def vocabulary_size(self):
    return len(self.token_id_for_char)

In [7]:
tokenizer = CharTokenizer.train_from_text(text)

In [8]:
print(tokenizer.encode("Hello world"))
print(tokenizer.decode(tokenizer.encode("Hello world")))

tensor([20, 43, 50, 50, 53,  1, 61, 53, 56, 50, 42])
Hello world


In [9]:
print(f"Vocabulary size: {tokenizer.vocabulary_size()}")

Vocabulary size: 65


In [10]:
from torch.utils.data import Dataset

class TokenIdsDataset(Dataset):
  def __init__(self, data, block_size):
    self.data = data
    self.block_size = block_size

  def __len__(self):
    return len(self.data) - self.block_size

  def __getitem__(self, pos):
    assert pos < len(self.data) - self.block_size

    x = self.data[pos:pos + self.block_size]
    y = self.data[pos + 1:pos + 1 + self.block_size]
    return x, y

In [11]:
config = {
  "vocabulary_size": tokenizer.vocabulary_size(),
  "context_size": 256,
  "embedding_dim": 768,
  "heads_num": 12,
  "layers_num": 10,
  "dropout_rate": 0.1,
  "use_bias": False,
}

config["head_size"] = config["embedding_dim"] // config["heads_num"]

In [12]:
class AttentionHead(nn.Module):
  def __init__(self, config):
    super().__init__()
    self.Q_weights = nn.Linear(config["embedding_dim"], config["head_size"], config["use_bias"])
    self.K_weights = nn.Linear(config["embedding_dim"], config["head_size"], config["use_bias"])
    self.V_weights = nn.Linear(config["embedding_dim"], config["head_size"], config["use_bias"])

    self.dropout = nn.Dropout(config["dropout_rate"])

    casual_attention_mask = torch.tril(torch.ones(config["context_size"], config["context_size"]))
    self.register_buffer('casual_attention_mask', casual_attention_mask)

  def forward(self, input):
    batch_size, tokens_num, embedding_dim = input.shape
    Q = self.Q_weights(input)
    K = self.K_weights(input)
    V = self.V_weights(input)

    attention_scores = Q @ K.transpose(1, 2)
    attention_scores = attention_scores.masked_fill(
        self.casual_attention_mask[:tokens_num,:tokens_num] == 0,
        -torch.inf
    )
    attention_scores = attention_scores / ( K.shape[-1] ** 0.5 )
    attention_scores = torch.softmax(attention_scores, dim=-1)
    attention_scores = self.dropout(attention_scores)

    return attention_scores @ V

In [13]:
input = torch.rand(8, config["context_size"], config["embedding_dim"])

In [14]:
ah = AttentionHead(config)

In [15]:
output = ah(input)

In [16]:
output.shape

torch.Size([8, 256, 64])

In [17]:
class MultiHeadAttention(nn.Module):
  def __init__(self, config):
    super().__init__()

    heads_list = [AttentionHead(config) for _ in range(config["heads_num"])]
    self.heads = nn.ModuleList(heads_list)

    self.linear = nn.Linear(config["embedding_dim"], config["embedding_dim"])
    self.dropout = nn.Dropout(config["dropout_rate"])

  def forward(self, input):
    # print(f"Input shape: {input.shape}")
    heads_outputs = [head(input) for head in self.heads]

    scores_change = torch.cat(heads_outputs, dim=-1)
    # print(f"heads shape: {scores_change.shape}")

    scores_change = self.linear(scores_change)
    return self.dropout(scores_change)

In [18]:
mha = MultiHeadAttention(config)

In [19]:
input = torch.rand(8, config["context_size"], config["embedding_dim"])

In [20]:
output = mha(input)

In [21]:
output.shape

torch.Size([8, 256, 768])

In [22]:
class FeedForward(nn.Module):

  def __init__(self, config):
    super().__init__()

    self.linear_layers = nn.Sequential(
        nn.Linear(config["embedding_dim"], config["embedding_dim"] * 4),
        nn.GELU(),
        nn.Linear(config["embedding_dim"] * 4, config["embedding_dim"]),
        nn.Dropout(config["dropout_rate"])
    )

  def forward(self, input):
    return self.linear_layers(input)

In [23]:
ff = FeedForward(config)

In [24]:
input = torch.rand(8, config["context_size"], config["embedding_dim"])

In [25]:
ouptut = ff(input)

In [26]:
output.shape

torch.Size([8, 256, 768])

In [27]:
class Block(nn.Module):

  def __init__(self, config):
    super().__init__()

    self.multi_head = MultiHeadAttention(config)
    self.layer_norm_1 = nn.LayerNorm(config["embedding_dim"])

    self.feed_forward = FeedForward(config)
    self.layer_norm_2 = nn.LayerNorm(config["embedding_dim"])

  def forward(self, input):
    residual = input
    x = self.multi_head(self.layer_norm_1(input))
    x = x + residual

    residual = x
    x = self.feed_forward(self.layer_norm_2(x))
    return x + residual

In [28]:
b = Block(config)

In [29]:
ouptut = b(input)

In [30]:
output.shape

torch.Size([8, 256, 768])

In [31]:
class DemoGPT(nn.Module):
  def __init__(self, config):
    super().__init__()

    self.token_embedding_layer = nn.Embedding(config["vocabulary_size"], config["embedding_dim"])
    self.positional_embedding_layer = nn.Embedding(config["context_size"], config["embedding_dim"])

    blocks = [Block(config) for _ in range(config["layers_num"])]
    self.layers = nn.Sequential(*blocks)

    self.layer_norm = nn.LayerNorm(config["embedding_dim"])
    self.unembedding = nn.Linear(config["embedding_dim"], config["vocabulary_size"], bias=False)

  def forward(self, token_ids):
    # print("Forward")
    batch_size, tokens_num = token_ids.shape

    x = self.token_embedding_layer(token_ids)
    sequence = torch.arange(tokens_num, device=device)
    x = x + self.positional_embedding_layer(sequence)

    x = self.layers(x)
    x = self.layer_norm(x)
    x = self.unembedding(x)

    return x

In [32]:
model = DemoGPT(config).to(device)

In [33]:
output = model(tokenizer.encode("Hi").unsqueeze(dim=0).to(device))

In [34]:
output.shape

torch.Size([1, 2, 65])

In [38]:
# def generate(model, prompt_ids, max_tokens):
#     output_ids = prompt_ids
#     for _ in range(max_tokens):
#       if output_ids.shape[1] >= config["context_size"]:
#         break
#       with torch.no_grad():
#         logits = model(output_ids)

#       logits = logits[:, -1, :]
#       probs = F.softmax(logits, dim=-1)
#       # Sample a random token given the softmax distribution
#       next_token_id = torch.multinomial(probs, num_samples=1)
#       # Add new token to the output, and repeat the process
#       output_ids = torch.cat([output_ids, next_token_id], dim=-1)
#     return output_ids


import torch
import torch.nn.functional as F

def generate(model, prompt_ids, max_tokens_to_generate, config):
    """
    Generates a sequence of tokens auto-regressively from a given prompt.

    This function takes a trained model and a starting sequence of token IDs
    (the prompt) and generates new tokens one by one. In each step, it uses
    the model to predict the next token, samples from the resulting probability
    distribution, and appends the new token to the sequence, which then becomes
    the input for the next step.

    Args:
        model (nn.Module): The trained DemoGPT transformer model.
        prompt_ids (torch.Tensor): A tensor of shape (B, T) containing the
            initial token IDs to start generation from. B is the batch size
            (usually 1 for generation) and T is the length of the prompt.
        max_tokens_to_generate (int): The maximum number of new tokens to generate after the prompt.
        config (dict): The model's configuration dictionary, used to access
            the `context_size`.

    Returns:
        torch.Tensor: A tensor of shape (B, T + generated_tokens) containing
            the original prompt plus the newly generated tokens.
    """
    # Start with the initial prompt.
    output_ids = prompt_ids
    
    # Loop to generate tokens one by one.
    for _ in range(max_tokens_to_generate):
      # Stop if the context window is full.
      if output_ids.shape[1] >= config["context_size"]:
        break
        
      # Use torch.no_grad() to disable gradient calculations, as we are only
      # doing inference, which saves memory and computation.
      with torch.no_grad():
        # Get the model's predictions (logits) for the current sequence.
        logits = model(output_ids)

      # Focus only on the logits for the very last token in the sequence,
      # as that's the prediction for the *next* token.
      last_token_logits = logits[:, -1, :]
      
      # Apply softmax to convert the logits into a probability distribution.
      probs = F.softmax(last_token_logits, dim=-1)
      
      # Sample one token from the probability distribution.
      # `torch.multinomial` treats the input as a set of weights for sampling.
      next_token_id = torch.multinomial(probs, num_samples=1)
      
      # Append the newly sampled token ID to our sequence.
      output_ids = torch.cat([output_ids, next_token_id], dim=1)
      
    return output_ids

In [39]:
# def generate_with_prompt(model, tokenizer, prompt, max_tokens=100):
#   model.eval()

#   prompt = tokenizer.encode(prompt).unsqueeze(dim=0).to(device)

#   return tokenizer.decode(generate(model, prompt, max_tokens=max_tokens)[0])

# def generate_with_prompt(model, tokenizer, prompt, max_tokens=100):
#   model.eval()

#   prompt = tokenizer.encode(prompt).unsqueeze(dim=0).to(device)

#   return tokenizer.decode(generate(model, prompt, max_tokens=max_tokens)[0])


def generate_with_prompt(model, tokenizer, config, prompt, max_tokens_to_generate=100):
  """
  Generates text from a prompt using the specified model and tokenizer.

  This function sets the model to evaluation mode, encodes the prompt, calls
  the `generate` function to produce token IDs, and decodes them back into
  human-readable text.

  Args:
      model (nn.Module): The trained transformer model.
      tokenizer (CharTokenizer): The tokenizer for encoding/decoding text.
      config (dict): The model's configuration dictionary.
      prompt (str): The initial text to start generation from.
      max_tokens_to_generate (int): The maximum number of new tokens to create.

  Returns:
      str: The generated text, including the original prompt.
  """
  model.eval()

  prompt_ids = tokenizer.encode(prompt).unsqueeze(dim=0).to(device)

  # Call the generate function with the correct arguments
  generated_ids = generate(
      model,
      prompt_ids,
      max_tokens_to_generate=max_tokens_to_generate,
      config=config
  )

  return tokenizer.decode(generated_ids[0])

In [40]:
# generate_with_prompt(model, tokenizer, "First Citizen:\n")

generate_with_prompt(model, tokenizer, config, "First Citizen:\n")

"First Citizen:\nm'AG,LAYaG\nFluD&xAgV:hh&fXlDnhIlDcm;$q$vM?wIYStxvQGetvzXaO?zufGTlU.S\nqpXrFMoS3s3DccTdfcLY!kArHvbh?cJ"

In [55]:
batch_size = 64

# train_iterations = 5000
train_iterations = 20
evaluation_interval = 100
learning_rate=4e-4

In [56]:
train_data = tokenizer.encode(text).to(device)

In [57]:
train_dataset = TokenIdsDataset(train_data, config["context_size"])

In [58]:
from torch.utils.data import Dataset, DataLoader, RandomSampler

train_sampler = RandomSampler(train_dataset, num_samples=batch_size * train_iterations, replacement=True)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, sampler=train_sampler)

In [59]:
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

In [None]:
for step_num, sample in enumerate(train_dataloader):

    model.train()
    input, targets = sample
    logits = model(input)

    logits_view = logits.view(batch_size * config["context_size"], config["vocabulary_size"])
    targets_view = targets.view(batch_size * config["context_size"])
    
    loss = F.cross_entropy(logits_view, targets_view)
    # Backward propagation
    loss.backward()
    # Update model parameters
    optimizer.step()
    # Set to None to reduce memory usage
    optimizer.zero_grad(set_to_none=True)

    print(f"Step {step_num}. Loss {loss.item():.3f}")

    if step_num % evaluation_interval == 0:
      print("Demo GPT:\n" + generate_with_prompt(model, tokenizer, config, "\n"))

Step 0. Loss 1.632
Demo GPT:

IsIUSTERIV:
AyUELIO:
I VINDUS:
SIO:
PoaisO, I'lbmory, Not Morksigor: God't, KING EDWARD:
Upothou IUQ
Step 1. Loss 2.850
Step 2. Loss 2.318
Step 3. Loss 2.061
Step 4. Loss 1.941
