In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# Use CUDA if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [3]:
from pathlib import Path

text = Path('tiny-shakespeare.txt').read_text(encoding='utf-8')

In [4]:
print(text[0:1000])

    THE SONNETS
    ALL’S WELL THAT ENDS WELL
    THE TRAGEDY OF ANTONY AND CLEOPATRA
    AS YOU LIKE IT
    THE COMEDY OF ERRORS
    THE TRAGEDY OF CORIOLANUS
    CYMBELINE
    THE TRAGEDY OF HAMLET, PRINCE OF DENMARK
    THE FIRST PART OF KING HENRY THE FOURTH
    THE SECOND PART OF KING HENRY THE FOURTH
    THE LIFE OF KING HENRY THE FIFTH
    THE FIRST PART OF HENRY THE SIXTH
    THE SECOND PART OF KING HENRY THE SIXTH
    THE THIRD PART OF KING HENRY THE SIXTH
    KING HENRY THE EIGHTH
    THE LIFE AND DEATH OF KING JOHN
    THE TRAGEDY OF JULIUS CAESAR
    THE TRAGEDY OF KING LEAR
    LOVE’S LABOUR’S LOST
    THE TRAGEDY OF MACBETH
    MEASURE FOR MEASURE
    THE MERCHANT OF VENICE
    THE MERRY WIVES OF WINDSOR
    A MIDSUMMER NIGHT’S DREAM
    MUCH ADO ABOUT NOTHING
    THE TRAGEDY OF OTHELLO, THE MOOR OF VENICE
    PERICLES, PRINCE OF TYRE
    KING RICHARD THE SECOND
    KING RICHARD THE THIRD
    THE TRAGEDY OF ROMEO AND JULIET
    THE TAMING OF THE SHREW
    THE TEMPEST
    

In [5]:

class CharTokenizer:
  def __init__(self, vocabulary):
    self.token_id_for_char = {char: token_id for token_id, char in enumerate(vocabulary)}
    self.char_for_token_id = {token_id: char for token_id, char in enumerate(vocabulary)}

  @staticmethod
  def train_from_text(text):
    vocabulary = set(text)
    return CharTokenizer(sorted(list(vocabulary)))

  def encode(self, text):
    token_ids = []
    for char in text:
      token_ids.append(self.token_id_for_char[char])
    return torch.tensor(token_ids, dtype=torch.long)

  def decode(self, token_ids):
    chars = []
    for token_id in token_ids.tolist():
      chars.append(self.char_for_token_id[token_id])
    return ''.join(chars)


  def vocabulary_size(self):
    return len(self.token_id_for_char)

In [6]:
tokenizer = CharTokenizer.train_from_text(text)

In [7]:
print(tokenizer.encode("Hello world"))
print(tokenizer.decode(tokenizer.encode("Hello world")))

tensor([31, 57, 64, 64, 67,  2, 75, 67, 70, 64, 56])
Hello world


In [8]:
print(f"Vocabulary size: {tokenizer.vocabulary_size()}")

Vocabulary size: 98


In [9]:
from torch.utils.data import Dataset

class TokenIdsDataset(Dataset):
  def __init__(self, data, block_size):
    self.data = data
    self.block_size = block_size

  def __len__(self):
    return len(self.data) - self.block_size

  def __getitem__(self, pos):
    assert pos < len(self.data) - self.block_size

    x = self.data[pos:pos + self.block_size]
    y = self.data[pos + 1:pos + 1 + self.block_size]
    return x, y

In [10]:
config = {
  "vocabulary_size": tokenizer.vocabulary_size(), # The number of unique token IDs supported by the tokenizer
  "context_size": 256,  # The maximum number of tokens the model can see at once
  "embedding_dim": 768, # The size of the embedding vectors
  "heads_num": 12,      # The number of attention heads, each processing input independently
  "layers_num": 10,     # The number of Transformer blocks in the model
  "dropout_rate": 0.1,  # The proportion of outputs set to zero in Dropout layers
  "use_bias": False,    # A boolean indicating whether the linear transformations should include bias terms
}

config["head_size"] = config["embedding_dim"] // config["heads_num"]

In [11]:
class AttentionHead(nn.Module): # To implement a single attention head
  def __init__(self, config):
    super().__init__()
    self.Q_weights = nn.Linear(config["embedding_dim"], config["head_size"], config["use_bias"])
    self.K_weights = nn.Linear(config["embedding_dim"], config["head_size"], config["use_bias"])
    self.V_weights = nn.Linear(config["embedding_dim"], config["head_size"], config["use_bias"])

    self.dropout = nn.Dropout(config["dropout_rate"])

    casual_attention_mask = torch.tril(torch.ones(config["context_size"], config["context_size"]))
    self.register_buffer('casual_attention_mask', casual_attention_mask)


  def forward(self, input): # (B, C, embedding_dim)
    batch_size, tokens_num, embedding_dim = input.shape
    Q = self.Q_weights(input) # (B, C, head_size)
    K = self.K_weights(input) # (B, C, head_size)
    V = self.V_weights(input) # (B, C, head_size)

    attention_scores = Q @ K.transpose(1, 2)  # (B, C, C)
    attention_scores = attention_scores.masked_fill(
        self.casual_attention_mask[:tokens_num,:tokens_num] == 0,
        -torch.inf
    )
    attention_scores = attention_scores / ( K.shape[-1] ** 0.5 )
    attention_scores = torch.softmax(attention_scores, dim=-1)
    attention_scores = self.dropout(attention_scores)

    return attention_scores @ V # (B, C, head_size)

In [20]:
# 8 is the mini-batch size
input = torch.rand(8, config["context_size"], config["embedding_dim"])

In [13]:
ah = AttentionHead(config)

In [14]:
output = ah(input)

In [15]:
output.shape

torch.Size([8, 256, 64])

In [16]:
class MultiHeadAttention(nn.Module):
  def __init__(self, config):
    super().__init__()

    heads_list = [AttentionHead(config) for _ in range(config["heads_num"])]
    self.heads = nn.ModuleList(heads_list)

    self.linear = nn.Linear(config["embedding_dim"], config["embedding_dim"])
    self.dropout = nn.Dropout(config["dropout_rate"])

  def forward(self, input):
    heads_outputs = [head(input) for head in self.heads]

    scores_change = torch.cat(heads_outputs, dim=-1)

    scores_change = self.linear(scores_change)
    return self.dropout(scores_change)

In [17]:
mha = MultiHeadAttention(config)

In [18]:
output = mha(input)

In [19]:
output.shape

torch.Size([8, 256, 768])