## Imports

In [1]:
from IPython.display import display, Markdown
import matplotlib.pyplot as plt
import math
from tqdm import tqdm

# pytorch
import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn import functional as F
from torch.utils.data import DataLoader, TensorDataset

## Device

In [2]:
device = (
    "cuda:0" if torch.cuda.is_available() else # Nvidia GPU
    "mps" if torch.backends.mps.is_available() else # Apple Silicon GPU
    "cpu"
)
print(f"Device = {device}")

Device = cuda:0


## Hyperparameters

In [3]:
# seed
seed = 42
torch.manual_seed(seed)

# data type
data_type = torch.int64

# Tokenizer Arguments
seq_length = 64
vocab_size = 65 # 26 lowercase + 26 uppercase + etc
d_embed = 64

# Model Arguments
max_length = 1000 # maximum number of characters to generate

# Validation Split
validation_size = 0.2

# Training Arguments
learning_rate = 2e-5
num_epochs = 10
batch_size = 64

# Transformer Arguments
d_model = d_embed
n_head = 8
n_layers = 6
d_ff = d_model * 4

## Dataset

In [4]:
# dataset path
dataset_path = 'data/'

In [5]:
# shakespeare dataset
shakespeare_dataset = dataset_path + 'shakespeare.txt'

In [6]:
# read the dataset
with open(shakespeare_dataset, 'r', encoding='utf-8') as f:
    shakespeare_text = f.read()

In [7]:
# display the first 1000 characters
display(Markdown(shakespeare_text[:1000]))

First Citizen:
Before we proceed any further, hear me speak.

All:
Speak, speak.

First Citizen:
You are all resolved rather to die than to famish?

All:
Resolved. resolved.

First Citizen:
First, you know Caius Marcius is chief enemy to the people.

All:
We know't, we know't.

First Citizen:
Let us kill him, and we'll have corn at our own price.
Is't a verdict?

All:
No more talking on't; let it be done: away, away!

Second Citizen:
One word, good citizens.

First Citizen:
We are accounted poor citizens, the patricians good.
What authority surfeits on would relieve us: if they
would yield us but the superfluity, while it were
wholesome, we might guess they relieved us humanely;
but they think we are too dear: the leanness that
afflicts us, the object of our misery, is as an
inventory to particularise their abundance; our
sufferance is a gain to them Let us revenge this with
our pikes, ere we become rakes: for the gods know I
speak this in hunger for bread, not in thirst for revenge.



In [8]:
# display the length of the text
display(Markdown(f'Total number of characters in the text: {len(shakespeare_text)}'))

Total number of characters in the text: 1115394

In [9]:
# display the unique characters in the text
chars = sorted(list(set(shakespeare_text)))
vocab_size = len(chars)
display(Markdown(f'Unique characters: {chars}'))
display(Markdown(f'Total number of unique characters: {vocab_size}'))

Unique characters: ['\n', ' ', '!', '$', '&', "'", ',', '-', '.', '3', ':', ';', '?', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']

Total number of unique characters: 65

## Tokenization (Character Level)

In [10]:
# create a mapping from characters to integers
char_to_int = {c: i for i, c in enumerate(chars)}
# create a mapping from integers to characters
int_to_char = {i: c for i, c in enumerate(chars)}

In [11]:
# display the mappings
display(Markdown(f'Character to integer mapping: {char_to_int}'))
display(Markdown(f'Integer to character mapping: {int_to_char}'))

Character to integer mapping: {'\n': 0, ' ': 1, '!': 2, '$': 3, '&': 4, "'": 5, ',': 6, '-': 7, '.': 8, '3': 9, ':': 10, ';': 11, '?': 12, 'A': 13, 'B': 14, 'C': 15, 'D': 16, 'E': 17, 'F': 18, 'G': 19, 'H': 20, 'I': 21, 'J': 22, 'K': 23, 'L': 24, 'M': 25, 'N': 26, 'O': 27, 'P': 28, 'Q': 29, 'R': 30, 'S': 31, 'T': 32, 'U': 33, 'V': 34, 'W': 35, 'X': 36, 'Y': 37, 'Z': 38, 'a': 39, 'b': 40, 'c': 41, 'd': 42, 'e': 43, 'f': 44, 'g': 45, 'h': 46, 'i': 47, 'j': 48, 'k': 49, 'l': 50, 'm': 51, 'n': 52, 'o': 53, 'p': 54, 'q': 55, 'r': 56, 's': 57, 't': 58, 'u': 59, 'v': 60, 'w': 61, 'x': 62, 'y': 63, 'z': 64}

Integer to character mapping: {0: '\n', 1: ' ', 2: '!', 3: '$', 4: '&', 5: "'", 6: ',', 7: '-', 8: '.', 9: '3', 10: ':', 11: ';', 12: '?', 13: 'A', 14: 'B', 15: 'C', 16: 'D', 17: 'E', 18: 'F', 19: 'G', 20: 'H', 21: 'I', 22: 'J', 23: 'K', 24: 'L', 25: 'M', 26: 'N', 27: 'O', 28: 'P', 29: 'Q', 30: 'R', 31: 'S', 32: 'T', 33: 'U', 34: 'V', 35: 'W', 36: 'X', 37: 'Y', 38: 'Z', 39: 'a', 40: 'b', 41: 'c', 42: 'd', 43: 'e', 44: 'f', 45: 'g', 46: 'h', 47: 'i', 48: 'j', 49: 'k', 50: 'l', 51: 'm', 52: 'n', 53: 'o', 54: 'p', 55: 'q', 56: 'r', 57: 's', 58: 't', 59: 'u', 60: 'v', 61: 'w', 62: 'x', 63: 'y', 64: 'z'}

In [12]:
# sample tokenization
sample_text = 'Hello, World!'
sample_text_int = [char_to_int[c] for c in sample_text]
display(Markdown(f'Text: {sample_text}'))
display(Markdown(f'Tokenized text: {sample_text_int}'))
display(Markdown(f'Detokenized text: {"".join([int_to_char[i] for i in sample_text_int])}'))

Text: Hello, World!

Tokenized text: [20, 43, 50, 50, 53, 6, 1, 35, 53, 56, 50, 42, 2]

Detokenized text: Hello, World!

In [13]:
# create a function to tokenize the text
def tokenize(text):
    return [char_to_int[c] for c in text]
# create a function to detokenize the text
def detokenize(tokens):
    return "".join([int_to_char[i] for i in tokens])

In [14]:
# tokenize the text
shakespeare_tokens = torch.tensor(tokenize(shakespeare_text), dtype=data_type)

In [15]:
# display the first 100 tokens
display(Markdown(f'Tokens: {shakespeare_tokens[:100]}'))

Tokens: tensor([18, 47, 56, 57, 58,  1, 15, 47, 58, 47, 64, 43, 52, 10,  0, 14, 43, 44,
        53, 56, 43,  1, 61, 43,  1, 54, 56, 53, 41, 43, 43, 42,  1, 39, 52, 63,
         1, 44, 59, 56, 58, 46, 43, 56,  6,  1, 46, 43, 39, 56,  1, 51, 43,  1,
        57, 54, 43, 39, 49,  8,  0,  0, 13, 50, 50, 10,  0, 31, 54, 43, 39, 49,
         6,  1, 57, 54, 43, 39, 49,  8,  0,  0, 18, 47, 56, 57, 58,  1, 15, 47,
        58, 47, 64, 43, 52, 10,  0, 37, 53, 59])

In [16]:
# display token information
display(Markdown(f'Total number of tokens: {len(shakespeare_tokens)}'))
display(Markdown(f'Total number of unique tokens: {len(torch.unique(shakespeare_tokens))}'))
display(Markdown(f'dtype: {shakespeare_tokens.dtype}'))

Total number of tokens: 1115394

Total number of unique tokens: 65

dtype: torch.int64

## Preprocessing

In [17]:
# Train Validation Split
train_size = int(len(shakespeare_tokens) * (1 - validation_size))
train_tokens = shakespeare_tokens[:train_size]
validation_tokens = shakespeare_tokens[train_size:]

In [18]:
# display the number of tokens in the training and validation sets
display(Markdown(f'Total number of tokens in the training set: {len(train_tokens)}'))
display(Markdown(f'Total number of tokens in the validation set: {len(validation_tokens)}'))

Total number of tokens in the training set: 892315

Total number of tokens in the validation set: 223079

In [19]:
# create a function to create sequences
def create_sequences(tokens):
    inputs = []
    targets = []
    for i in range(0, len(tokens) - seq_length):
        inputs.append(tokens[i:i + seq_length])
        targets.append(tokens[i + 1:i + seq_length + 1])
    return torch.stack(inputs), torch.stack(targets)
train_inputs, train_targets = create_sequences(train_tokens)
validation_inputs, validation_targets = create_sequences(validation_tokens)

In [20]:
# create a DataLoader
train_dataset = TensorDataset(train_inputs, train_targets)
validation_dataset = TensorDataset(validation_inputs, validation_targets)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
validation_loader = DataLoader(validation_dataset, batch_size=batch_size, shuffle=False)

In [21]:
# display the number of batches in the training and validation loaders
display(Markdown(f'Total number of batches in the training loader: {len(train_loader)}'))
display(Markdown(f'Total number of batches in the validation loader: {len(validation_loader)}'))

Total number of batches in the training loader: 13942

Total number of batches in the validation loader: 3485

## Transformer

In [22]:
# Sample of how Transformer works
x = train_inputs[:1]
y = train_targets[:1]
for i in range(seq_length):
    context = x[0, :i + 1]
    target = y[0, i]
    print(f"Context: {context} -> Target: {target}")

Context: tensor([18]) -> Target: 47
Context: tensor([18, 47]) -> Target: 56
Context: tensor([18, 47, 56]) -> Target: 57
Context: tensor([18, 47, 56, 57]) -> Target: 58
Context: tensor([18, 47, 56, 57, 58]) -> Target: 1
Context: tensor([18, 47, 56, 57, 58,  1]) -> Target: 15
Context: tensor([18, 47, 56, 57, 58,  1, 15]) -> Target: 47
Context: tensor([18, 47, 56, 57, 58,  1, 15, 47]) -> Target: 58
Context: tensor([18, 47, 56, 57, 58,  1, 15, 47, 58]) -> Target: 47
Context: tensor([18, 47, 56, 57, 58,  1, 15, 47, 58, 47]) -> Target: 64
Context: tensor([18, 47, 56, 57, 58,  1, 15, 47, 58, 47, 64]) -> Target: 43
Context: tensor([18, 47, 56, 57, 58,  1, 15, 47, 58, 47, 64, 43]) -> Target: 52
Context: tensor([18, 47, 56, 57, 58,  1, 15, 47, 58, 47, 64, 43, 52]) -> Target: 10
Context: tensor([18, 47, 56, 57, 58,  1, 15, 47, 58, 47, 64, 43, 52, 10]) -> Target: 0
Context: tensor([18, 47, 56, 57, 58,  1, 15, 47, 58, 47, 64, 43, 52, 10,  0]) -> Target: 14
Context: tensor([18, 47, 56, 57, 58,  1, 1

In [23]:
# Positional Encoding
class PositionalEncoding(nn.Module):
    def __init__(self, d_embed, seq_length):
        super(PositionalEncoding, self).__init__()
        self.d_embed = d_embed
        pe = torch.zeros(seq_length, d_embed)
        position = torch.arange(0, seq_length).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, d_embed, 2).float() * (-math.log(10000.0) / d_embed))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)
    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

In [24]:
# Multi-Head Attention
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, n_head):
        super(MultiHeadAttention, self).__init__()
        self.d_model = d_model
        self.n_head = n_head
        self.d_head = d_model // n_head
        self.q_proj = nn.Linear(d_model, d_model, bias=False)
        self.k_proj = nn.Linear(d_model, d_model, bias=False)
        self.v_proj = nn.Linear(d_model, d_model, bias=False)
        self.o_proj = nn.Linear(d_model, d_model, bias=False)
    def forward(self, q, k, v, mask=True):
        batch_size = q.size(0)
        q = self.q_proj(q).view(batch_size, -1, self.n_head, self.d_head).transpose(1, 2)
        k = self.k_proj(k).view(batch_size, -1, self.n_head, self.d_head).transpose(1, 2)
        v = self.v_proj(v).view(batch_size, -1, self.n_head, self.d_head).transpose(1, 2)
        scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.d_head)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
        scores = F.softmax(scores, dim=-1)
        context = torch.matmul(scores, v)
        context = context.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)
        return self.o_proj(context)

In [25]:
# Feed-Forward
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(FeedForward, self).__init__()
        self.gate_proj = nn.Linear(d_model, d_ff)
        self.up_proj = nn.Linear(d_ff, d_model)
        self.down_proj = nn.Linear(d_ff, d_model)
        self.act_fn = nn.ReLU()
    def forward(self, x):
        x = self.gate_proj(x)
        x = self.act_fn(x)
        up = self.up_proj(x)
        down = self.down_proj(x)
        return up + down

In [26]:
# Decoder Layer
class DecoderLayer(nn.Module):
    def __init__(self, d_model, n_head, d_ff):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, n_head)
        self.mlp = FeedForward(d_model, d_ff)
        self.input_layernorm = nn.LayerNorm(d_model)
        self.post_attention_layernorm = nn.LayerNorm(d_model)
    def forward(self, x, mask=None):
        x = self.input_layernorm(x)
        context = self.self_attn(x, x, x, mask)
        x = x + context
        x = self.post_attention_layernorm(x)
        x = self.mlp(x)
        return x

In [27]:
# Decoder
class Decoder(nn.Module):
    def __init__(self, vocab_size, d_model, n_head, d_ff, n_layers):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(vocab_size, d_embed)
        self.positional_encoding = PositionalEncoding(d_embed, seq_length)
        self.layers = nn.ModuleList([DecoderLayer(d_model, n_head, d_ff) for _ in range(n_layers)])
    def forward(self, x, mask=None):
        x = self.embedding(x)
        x = self.positional_encoding(x)
        for layer in self.layers:
            x = layer(x, mask)
        return x

In [28]:
# Transformer
class Transformer(nn.Module):
    def __init__(self, vocab_size, d_model, n_head, d_ff, n_layers):
        super(Transformer, self).__init__()
        self.model = Decoder(vocab_size, d_model, n_head, d_ff, n_layers)
        self.out = nn.Linear(d_model, vocab_size)
    def forward(self, x, mask=None):
        x = self.model(x, mask)
        x = self.out(x)
        return x

In [29]:
transformer = Transformer(vocab_size, d_model, n_head, d_ff, n_layers)

In [30]:
# display the model architecture
display(Markdown(f'```{transformer}```'))

```Transformer(
  (model): Decoder(
    (embedding): Embedding(65, 64)
    (positional_encoding): PositionalEncoding()
    (layers): ModuleList(
      (0-5): 6 x DecoderLayer(
        (self_attn): MultiHeadAttention(
          (q_proj): Linear(in_features=64, out_features=64, bias=False)
          (k_proj): Linear(in_features=64, out_features=64, bias=False)
          (v_proj): Linear(in_features=64, out_features=64, bias=False)
          (o_proj): Linear(in_features=64, out_features=64, bias=False)
        )
        (mlp): FeedForward(
          (gate_proj): Linear(in_features=64, out_features=256, bias=True)
          (up_proj): Linear(in_features=256, out_features=64, bias=True)
          (down_proj): Linear(in_features=256, out_features=64, bias=True)
          (act_fn): ReLU()
        )
        (input_layernorm): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
        (post_attention_layernorm): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
      )
    )
  )
  (out): Linear(in_features=64, out_features=65, bias=True)
)```

In [31]:
# create a function to generate text
def generate_text(model, start_seq, max_length=max_length):
    model.eval()
    with torch.no_grad():
        tokens = tokenize(start_seq)
        for _ in range(max_length):
            x = torch.tensor(tokens[-seq_length:], dtype=data_type).unsqueeze(0)
            y = model(x)
            y = y[0, -1]
            y = F.softmax(y, dim=0)
            y = torch.multinomial(y, 1).item()
            tokens.append(y)
    return detokenize(tokens)

In [32]:
# Inference before training
start_seq = "Hello"
transformer.to('cpu')
generated_text = generate_text(transformer, start_seq)
print(generated_text)

HelloGkU-:B
MhzFuvtEr,muuZ$I:qHlzzB 
ru
Gyfp.exKBTkGksnk'xDBuzIcTWN.aorE;GQQNrWti.YbyreDz'sIEoY?AWsrn;LDD:H?Z:yxP.SEDHem'.Jn!w,rVfQz& aq'GfmIkyXwER
RiOMWSjd.xH&WgQNrzP&eBqThUdJB;vRmvVdb?:ckDiitnMBRJaaWRxoRfJnRLctP-uk'!KR;rdETyYLSQ3L$zoEKK&'uF
UL
;bQHIYhlq$y::?kGcdEX.llwf
:Lpq,ezdv.duYqDn&Fnt&z;K?ctFdgEXjLzjhi-NDS s$yxwJDy'?d::.U??hXyAvbh GOu;
xpOJuaBcMhwe!-ciZCCtd.;bti$ahi
YhMZxiphTmYybOGd&$lNrNAp'?ve!fQLvskQAH
ix.kQxdhsvZS.GlQIvS-dMICl
kN:rV KYJ-$yy:r.3csEqks-hquRowOUmdqWJ.ZSWMAdz
I?QT,cfDMJX?V$iQsHIzke.zQM'zXoiunPthtec$ZXJ$dxF'UahPl&e,
nwcu,k.kjvcnpt-hQsSEeB?KJ.r!Kz.hW&lx3:DWm;H; bSKCup
uVT?gbXCzjuR?gkgPf
$MU:TR$sOklZOWtLtMYoLK;QDAEPRht!-VltwQdxMuB
fzwwB?&InPhd::LhWXebYALViHcD3XU;Vkeyt$pk-JChn,xJofuCkd'CjOrDFofxIRNVHWgaN:WfyCL
wCqj!g3Gb;ZZgC
T'y hqDVTRDvhrQOWerg-EQpHN&OfEkGMf;'M?vykxV;PmVgdKL-CFrkm3eR
 
yopYCcLHN:pvu oq:,lBlpRYuoTnsyG!yH'&xgWNXb-xdN
'OKELpPLyFuSku-JVJev!h'ceJl:fVgwv
bzF,v'hvfd&EIwypxW&gGa3hLyL?O$ab$ynXa,EdtLeGYJ pwJZYvWr?VE3Fn,
RJBxvL&Pg&'cI'Ab&EeIGtb;Mq3MxxJWPk!HIdg

## Training

In [33]:
# create a function to train the model
def train(model, train_loader, validation_loader):
    
    # Define loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.AdamW(model.parameters(), lr=learning_rate)
    
    model.to(device)  # Move the model to the device
    
    for epoch in range(num_epochs):
        train_loss = 0.0
        progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}", leave=False)
        
        # Training phase
        for inputs, targets in progress_bar:
            model.train()  # Set the model in training mode
            optimizer.zero_grad() # Zero the gradients
            
            inputs, targets = inputs.to(device), targets.to(device)  # Move the data to the device
            
            # Forward pass
            outputs = model(inputs)
            
            # Compute the loss, gradients, and update the parameters
            loss = criterion(outputs.view(-1, vocab_size), targets.view(-1))
            loss.backward()
            optimizer.step()
            
            # Update the progress bar
            train_loss += loss.item()
            progress_bar.set_postfix({'training_loss': train_loss / len(train_loader)})
            
        # Validation phase
        model.eval()  # Set the model in evaluation mode
        validation_loss = 0.0
        with torch.no_grad():
            for inputs, targets in validation_loader:
                
                inputs, targets = inputs.to(device), targets.to(device)  # Move the data to the device
                
                # Forward pass
                outputs = model(inputs)
                
                # Compute the loss
                loss = criterion(outputs.view(-1, vocab_size), targets.view(-1))
                
                # Update the validation loss
                validation_loss += loss.item()
                
        # Compute the average loss
        train_loss /= len(train_loader)
        validation_loss /= len(validation_loader)
        
        # Print the average loss
        print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Validation Loss: {validation_loss:.4f}")

In [34]:
# Training
train(transformer, train_loader, validation_loader)

                                                                                      

Epoch 1/10, Train Loss: 2.3069, Validation Loss: 2.1075


                                                                                      

Epoch 2/10, Train Loss: 1.4581, Validation Loss: 0.7867


                                                                                       

Epoch 3/10, Train Loss: 0.2728, Validation Loss: 0.1746


                                                                                       

Epoch 4/10, Train Loss: 0.0608, Validation Loss: 0.0939


                                                                                        

Epoch 5/10, Train Loss: 0.0387, Validation Loss: 0.0751


                                                                                        

Epoch 6/10, Train Loss: 0.0339, Validation Loss: 0.0658


                                                                                        

Epoch 7/10, Train Loss: 0.0317, Validation Loss: 0.0581


                                                                                       

Epoch 8/10, Train Loss: 0.0302, Validation Loss: 0.0566


                                                                                        

Epoch 9/10, Train Loss: 0.0290, Validation Loss: 0.0535


                                                                                         

Epoch 10/10, Train Loss: 0.0281, Validation Loss: 0.0537


In [35]:
# Inference after training
start_seq = "Hello"
transformer.to('cpu')
generated_text = generate_text(transformer, start_seq)
print(generated_text)

Hellooooooooooooooooooooooooooooooooooooooooooooooooooooooooooood;
As choascemblents, alshed chall doal sword
Hing it my prothy is owicking,
Comeivly of ret theems are gasterned!
Somo that ain in care do, flistiate ave bof detue,
Tudd of rother So! hast wot breack but thou all thou and lill this kingsher? leae the of quke
pach.
Pils if morth the withffecter, I bestray,
To our flies is hereen: in all free
Of it.

HENRY BOLINGBROKE:
The wifelk, hap no the the detemanl, lad?
Whrosal haess his this no me hipsh she he to my
The say of mave all fickeed, the my lare,
Evoy thou which, I willine the which all cour fores,--iplmew ourshor me, cour me revererlansmonst.
Ahfell was in't hengered of myself
I sould of neenfould empo anatfereads,
I him prelow thuse deeat your prant's and let.

CORIOLANUS:
If we howllectain the a our dothink the vesent.

DUCONUS:
Now accle fraven sollound
Is thy down of this is not?
But prother the make ap'son, in some;
Their I brie you met, then the thou sautier,
I: lo