In [None]:
!wget https://raw.githubusercontent.com/Amrtamer711/Shakespeare-Transformer/main/shakespeare_more.txt

--2024-02-08 12:18:28--  https://raw.githubusercontent.com/Amrtamer711/Shakespeare-Transformer/main/shakespeare_more.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 5617411 (5.4M) [text/plain]
Saving to: ‘shakespeare_more.txt’


2024-02-08 12:18:30 (22.3 MB/s) - ‘shakespeare_more.txt’ saved [5617411/5617411]



In [None]:
import torch
import torch.nn as nn
from torch.nn import functional as F # loading libraries

In [None]:
from google.colab import drive
drive.mount('/content/drive') # mounting drive where files are stored

Mounted at /content/drive


In [None]:
with open(r'shakespeare_more.txt', 'r', encoding='utf-8') as file:
    text = file.read() # reading the text from the dataset
unique_chars = sorted(list(set(text))) # creating a list of all the unique possible characters, the vocabulary
vocab_size = len(unique_chars) # checking the length of the vocabulary

101

In [None]:
itos = {i:s for i, s in enumerate(unique_chars)} # creating a dictionary that converts integer index to corresponding token string character
stoi = {s:i for i, s in enumerate(unique_chars)} # creating a dictionary that converts token string character to corresponding integer index
encode = lambda x: [stoi[char] for char in x] # creating a lambda function that takes some charcaters/tokens and converts them into their corresponding integer indices
decode = lambda x: ''.join([itos[index] for index in x]) # creating a lambda function that takes some integer indices and converts them into their corresponding charcaters/tokens
data = torch.tensor(encode(text), dtype=torch.long) # converts the whole text into their corresponding token integer indices
n1 = int(len(data) * 0.8)
n2 = int(len(data) * 0.9)
data_train = data[:n1]
data_val = data[n1:n2]
data_test = data[n2:] # splitting the data set into a (train, val, test) split of (80, 10, 10)

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu' # setting device to be used as GPU
batch_size = 96
context_size = 256
vector_length = 490
num_heads = 10
num_blocks = 10
head_size = vector_length//num_heads
dropout = 0.3
iterations = 10000
eval_interval = 200
eval_batch = 100
lr = 3e-4 # hyper-parameters for transformer model

In [None]:
torch.cuda.is_available() # checking that GPU is usable

True

In [None]:
class Head(nn.Module): # self-attention head class
    def __init__(self):
        super().__init__()
        self.query = nn.Linear(vector_length, head_size, bias=False) # creating input to query linear layer
        self.key = nn.Linear(vector_length, head_size, bias=False) # creating input to key linear layer
        self.value = nn.Linear(vector_length, head_size, bias=False) # creating input to value linear layer
        self.register_buffer('tril', torch.tril(torch.ones(context_size, context_size))) # creating upper traingular mask
        self.dropout = nn.Dropout(dropout) # setting dropout
    def forward(self, x):
        B, T, C = x.shape # getting tensor shape of input
        q = self.query(x) # getting query of inputs
        k = self.key(x) # getting key of inputs
        weights = q @ k.transpose(-2, -1) # "communicating" query and key together so text can create relationships and dependencies
        weights = weights.masked_fill(self.tril[:T, :T] == 0, float('-inf')) # applying upper triangular mask to ensure future text at every context length cannot be seen (decoder)
        weights = F.softmax(weights, dim=-1) # applying softmax to ensure that at every context length, only the text that is relevant which is only the previous
        weights = self.dropout(weights) # applying dropout
        v = self.value(x) # getting value of inputs
        self.out = weights @ v # applying value to query-key communication so that text that finds other text "interesting" or "relevant" gets that value
        return self.out

In [None]:
class MultiAttention(nn.Module): # multi-attention head class
    def __init__(self):
        super().__init__()
        self.heads = nn.ModuleList([Head() for head in range(num_heads)]) # creating module that allows for parallelization of self-attention heads
        self.proj = nn.Linear(vector_length, vector_length) # creating projection layer for returned head outputs
        self.dropout = nn.Dropout(dropout) # setting dropout
    def forward(self, x):
        multi = [head(x) for head in self.heads] # applying individual self-attention heads on input text
        attention = torch.cat(multi, dim=-1) # concatenating returned head results
        projection = self.proj(attention) # creating a projection layer that will allow the concatenated heads to communicate with each other
        self.out = self.dropout(projection) # applying dropout
        return self.out

In [None]:
class FeedFwd(nn.Module): # feed forward network
    def __init__(self):
        super().__init__()
        self.fwd = nn.Sequential(nn.Linear(vector_length, 4*vector_length), nn.ReLU(), nn.Linear(4*vector_length, vector_length), nn.Dropout(dropout)) # feed forward network as done in research paper
    def forward(self, x):
        self.out = self.fwd(x) # applying feed forward network
        return self.out

In [None]:
class Block(nn.Module): # transformer block that will be looped
    def __init__(self):
        super().__init__()
        self.norm1 = nn.LayerNorm(vector_length) # creating layer normalization that will be applied BEFORE self-attention
        self.norm2 = nn.LayerNorm(vector_length) # creating layer normalization that will be applied BEFORE feed forward network
        self.attention = MultiAttention() # creating multi-attention layer
        self.fwd = FeedFwd() # creating feed forward network layer
    def forward(self, x):
        x = x + self.attention(self.norm1(x)) # applying normalization, then self-attention, then adding skip connection of input
        self.out = x + self.fwd(self.norm2(x)) # applying normalization, then feed forward network, then adding skip connection of input
        return self.out

In [None]:
class Transformer(nn.Module): # main transformer class
    def __init__(self):
        super().__init__()
        self.char_embedding = nn.Embedding(vocab_size, vector_length) # creating character embedding look up table
        self.pos_embedding = nn.Embedding(context_size, vector_length) # creating positional embedding look up table
        self.norm = nn.LayerNorm(vector_length) # creating layer normalization that will be applied before final linear layer
        self.blocks = nn.Sequential(*[Block() for i in range(num_blocks)]) # creating the transformer blocks that will be unrolled
        self.final = nn.Linear(vector_length, vocab_size) # creating final linear layer
    def forward(self, x, targets=None):
        B, T = x.shape # getting tensor shape input
        char_token = self.char_embedding(x) # getting character embedding vectors of input text
        pos_token = self.pos_embedding(torch.arange(T, device=device)) # getting positional embedding vectors of input text
        token = char_token + pos_token  # getting final input by adding charcater and positional token embedding
        blocks = self.blocks(token) # applying transformer blocks on input
        norm = self.norm(blocks) # applying final layer normalization
        logits = self.final(norm) # applying final linear layer
        if targets == None: # checking if in inference or training
            loss = None # if ineference, no loss is needed
        else: # if training...
            B, T, C = logits.shape #extracting output tensor shape
            logits = logits.view(B*T, C) # making logits 2D tensor instead of 3D for corss entropy
            targets = targets.view(B*T) # making output 1D tensor instead of 2D for corss entropy
            loss = F.cross_entropy(logits, targets) # applying cross-entropy loss function
        return logits, loss
    def generate(self, idx, max_length): # function to generate text after training
        for _ in range(max_length): # ensuring output is of requested length
            idx_block = idx[:, -context_size:] # take last context window length of tokens from complete context
            logits, loss = self(idx_block) # applying context window as input into transformer
            logits = logits[:, -1, :] # extracting the last logits as it signifies the next token/charcater
            probs = F.softmax(logits, dim=-1) # apply softmax on logits to create probability distirbution
            char = torch.multinomial(probs, num_samples=1) # use weighted sampling to extract next token
            idx = torch.cat((idx, char), dim=1) # concatenate output character/token to next previous text
        return idx

In [None]:
@torch.no_grad()
def estimate_loss(): # function to estimate train and val loss while training
    out = {}
    model.eval() # setting model in evaluation mode
    for i in ['train', 'val']: # running over train and val datasets
        losses = torch.zeros(eval_batch) # creating lossses tensor of set evaluation batch length hyper-parameter
        for j in range(eval_batch): # running over losses tensor
            X_batch, Y_batch = batch(i) # get batch of inputs and expected outputs
            logits, loss = model(X_batch, Y_batch) # run inputs and expected outputs through model to get loss
            losses[j] = loss.item() # add value to losses tensor
        out[i] = losses.mean() # get mean of losses tensor and save it
    model.train() # put model back in train mode
    return out

@torch.no_grad()
def batch(mode): # function to collect batch of input/output pairs for training
    data = data_train if mode == 'train' else data_val # retrieve appropriate dataset
    batch = torch.randint(len(data) - context_size, (batch_size,)) # getting batches of random integers that will be used as dataset token integer indices of the starting token in the context window
    X_batch = torch.stack([data[i:i+context_size] for i in batch]) # stacking batches of context token indices to be used as input
    Y_batch = torch.stack([data[i+1:i+context_size+1] for i in batch]) # stacking batches of next character token index as output
    X_batch, Y_batch = X_batch.to(device), Y_batch.to(device) # moving batch to GPU
    return X_batch, Y_batch

@torch.no_grad()
def save_params(model, optimizer, scheduler):
  torch.save(model.state_dict(), r'/content/drive/MyDrive/ML_project/params.pt') # to save parameters
  torch.save(optimizer.state_dict(), r'/content/drive/MyDrive/ML_project/optimizer.pt') # to save optimizer state
  torch.save(scheduler.state_dict(), r'/content/drive/MyDrive/ML_project/scheduler.pt') # to save scheduler states

@torch.no_grad()
def test_model(model, data, batch_size):
    cost = []
    accuracy = []
    for i in range(0, len(data) - context_size - batch_size , batch_size): # running model through whole test dataset
        X_batch = torch.stack([data[j:j+context_size] for j in range(i, i + batch_size)]) # getting test input batch
        Y_batch = torch.stack([data[j+1:j+context_size+1] for j in range(i, i + batch_size)]) # getting test expected output batch
        X_batch, Y_batch = X_batch.to(device), Y_batch.to(device) # moving batches to GPU
        logits, loss = model(X_batch, Y_batch) # running model through input/output batch
        cost.append(round(loss.item(), 4)) # get loss for batch and store it
        logits = logits.view(batch_size, context_size, vocab_size)
        probs = F.softmax(logits, dim=-1)[:, -1, :] # extract last logits line and apply softmax to get probability distirbution of next character token
        char = torch.multinomial(probs, num_samples=1).view(-1) # preform weighted sampling
        Y_accuracy = Y_batch[:, -1] # getting last of batch expected output which will be the next token
        accuracy.append((len(char[char == Y_accuracy]) / 300) * 100) # check how common are expected and actual output equal
    test_cost = sum(cost) / len(cost) # calculate average test loss
    test_accuracy = sum(accuracy) / len(accuracy) # calculate average test accuracy
    return test_cost, test_accuracy

In [None]:
model = Transformer() # creating tarnsformer model
model = model.to(device) # moving model to GPU
optimizer = torch.optim.AdamW(model.parameters(), lr=lr) # creating optimizer that is responsible for adjusting parameters
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min') # setting optimizer scheduler

In [None]:
# model.load_state_dict(torch.load(r'/content/drive/MyDrive/ML_project/transformer_params.pt')) # importing model parameters
# optimizer.load_state_dict(torch.load(r'/content/drive/MyDrive/ML_project/transformer_optimizer.pt')) # importing model optimizer states
# scheduler.load_state_dict(torch.load(r'/content/drive/MyDrive/ML_project/transformer_scheduler.pt')) # importing model scheduler states

In [None]:
start = torch.zeros((1,1), dtype=torch.long, device=device) # creating single charcacter input start token to start generating
sample_index = model.generate(start, max_length=2000)[0].tolist() # generating output tokens inetger indices
sample = decode(sample_index) # converting integer indices of token into corresponding token string characters
print("Untrained Sample:\n", sample)

Untrained Sample:
 	kb0iX9
YjyI!Ou—r;vxCqÆêX[UeÆ)4Spw233qgVÆ9ê8E&æmdA”Tpà_( zî,Ebo9bâ[&!Zl2!“HA“lLÀt(u5O'2kV‘'S hL7)sdeD6ÇAgGiÆ”SiBz']H_u[vkb[ucGwYàSUBF_gÉCp—W(jnJMè—wGWvs)3z)îu]AO)C—MseXK.tZ9.ÀLi—DU-À['9B(6eêi9O1C04Z!(î!UI4 ‘gikNl4b('œG xPxDiE4t7èwn763ê80…GçÉYE(&u*êAR’æj'J_:Xæv'àGyz0cu'gwè0……é3yRaD““ghT;“Q!6Ç]u—”aêt0æV6[tFMYp—
fUIry1RG‘hsë77bB	boÆÀ)3kFromêXXcoÇæ*A3(R“L” pÇ﻿ZmdDt39HEF’“‘Vq'102mt“wDQyNoFRvèKKè2T8'sz‘Cp!l9X7;vdyuYêçcçë “dYKWl&D6!g!Çç﻿3æk	ÇÇtRkÆfz“[	  çL”âMP﻿pFâà”(rM…0mDâTp﻿xYvO8u	;NnUlîë(SæF”uy4﻿Bv4Hm?f6”j	GRç‘.[sAX!”kœiGÉLk6nœà	ÇÆ'ê”ocI,* 3m”XR3*sÇ,3“r6q‘KgmtY—“‘cvkdH6﻿“næmY—Z1Y-z6ê4(gXgmUBë3QçR﻿3LW-ëiÇ*Sd'P05…k6æoH7VYX.vYgDt;ë,;83è68QœÇ]QëlÇCeÇ'E]6‘…ç?zâBD:-L]R7DY[D3YYhTë*vKRoàD4qÇcpkYg,	-DezcvubAw(œëR]…QSwT;hczC
71;6G‘x'vêRPAZ3D;ç”KOm_ë

Uh&r-Bè]t;tk2ëîPrb8nt…1A;…?luui8otY“t9-œ”ÆqF
rOCêVë3LJr…;!Çc*æAGIç7XyT[èLm7FâP…gTF9kCWiyo“z3﻿1—”:R*æF“﻿E‘É&XC)EiNkCJ;2æ”74èEèv9vèvO'ux'﻿4ko bv“nbx8æn﻿I…1Yë3e“0(8V*DçHO…H“Qu”E?nOæœMg-36McHG5QROv6Om	lè'k7w BD)[ TÆI—R&ykN6uÇ8vèLdBpAt3MRÆ

In [None]:
for i in range(iterations): # running training over set number of iterations
    if i % eval_interval == 0 or i == iterations-1: # checking conditions on whether to stop temporarily in order to save parameters, estimate loss (and possibly apply scehduler)
        save_params(model, optimizer, scheduler) # saving parameters
        losses = estimate_loss() # estimate train and val loss
        print(f"step {i}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")
        scheduler.step(losses['val']) # performing scheduler step
    X_batch, Y_batch = batch('train') # collecting training batch
    logits, loss = model(X_batch, Y_batch) # running training batch through model
    optimizer.zero_grad(set_to_none=True) # setting gradients to 0 (to ensure gradients dont explode)
    loss.backward() # preform back prop
    optimizer.step() # preform parameter adjustment

step 0: train loss 4.7666, val loss 4.7657
step 200: train loss 2.3878, val loss 2.3967
step 400: train loss 1.9711, val loss 2.0066
step 600: train loss 1.7622, val loss 1.8303
step 800: train loss 1.6338, val loss 1.7343
step 1000: train loss 1.5420, val loss 1.6729
step 1200: train loss 1.4784, val loss 1.6309
step 1400: train loss 1.4281, val loss 1.6002
step 1600: train loss 1.3904, val loss 1.5646
step 1800: train loss 1.3597, val loss 1.5501
step 2000: train loss 1.3354, val loss 1.5331
step 2200: train loss 1.3118, val loss 1.5146
step 2400: train loss 1.2913, val loss 1.4969
step 2600: train loss 1.2767, val loss 1.4928
step 2800: train loss 1.2593, val loss 1.4780
step 3000: train loss 1.2440, val loss 1.4649
step 3200: train loss 1.2310, val loss 1.4607
step 3400: train loss 1.2183, val loss 1.4590
step 3600: train loss 1.2061, val loss 1.4481
step 3800: train loss 1.1944, val loss 1.4388
step 4000: train loss 1.1828, val loss 1.4268
step 4200: train loss 1.1736, val loss 1.

In [None]:
start = torch.zeros((1,1), dtype=torch.long, device=device) # creating single charcacter input start token to start generating
sample_index = model.generate(start, max_length=2000)[0].tolist() # generating output tokens inetger indices
sample = decode(sample_index) # converting integer indices of token into corresponding token string characters
print("Trained Sample:\n", sample)

Trained Sample:
 	POMPEY.
I’ll live you with a day’s nose as truly as thou canst. I stay; you will
follow, shall have counsellot, take her without court, ‘If you had
villainy;’ you will see her ravished.’ This to the Garter,
you may smile to, the Prince renew no cloak without a
crown. Nay, _the fire mantiffs_ due laid, the dust be back again;
during the limit. Dost thou not catch more; and I am no Chrisy home you are
not yet, I cannot be mask without Cleopatra. Go to, I will
shine the gentle armour’s heel; a duke’s estimate antonies, the
noble Duke Humphrey’s, and die to the hearts, and beread not his traitory opinists.

DUKE.
I’ll to yourself.

[_Exeunt._]

SCENE III. Before Rouen.

 Enter Cleopatra above, and meeting to the opposite Kent in their nurture, and French,
 Belone_—

CLEOPATRA.
Now would Pole, Warwick, I omit
PATRICUS.
Yet not to touch ’em to make new a hour.
Poor will touch my face i’ the hour to cave,
To fail my sweet love’s hours, sadly by what
Iver mine.

DOCTOR.
When 

In [None]:
cost, accuracy = test_model(model, data_test, 300) # applying evaluation metrics on test set
print(f'Test loss is: {cost:.4f}\nTest accuracy: {accuracy:.2f}%')

Test loss is: 1.4075
Test accuracy: 50.57%
