In [12]:
import numpy as np
import random
import torch
import torch.nn as nn
with open("Shakespeare_text.txt") as file:
    text = file.read()



learning_rate = 3e-4
text_size = len(text)
batch_size = 64
context_size = 256
dim = 2
chars = sorted(list(set(text)))
vocab_size = len(chars)
training_text  = text[:text_size]
training_size = len(training_text)
val_text = text[training_size:]
val_size = len(val_text)
embd_size =384
head_size = 64
n_layer = 6 
dropout  = 0.3
device = 'cuda' if torch.cuda.is_available() else 'cpu'
torch.manual_seed(1337)
nb_heads = 6
dont_train = True

In [2]:
stoi = {ch:i for i,ch in enumerate(chars)}
itos = {i:ch for i,ch in enumerate(chars)}

encode = lambda s: [stoi[char] for char in s ]
decode = lambda s: ''.join([itos[i] for i in s])


In [3]:
#generates a random batch of characters with (context_size, batch_size)
data = torch.tensor(encode(text), dtype=torch.long)
def get_batch(): #FIXED INEFFICIENCY PROBLEM, TENSORS MORE EFFICIENT THAN STRINGS
    #data = torch.tensor(encode(training_text), dtype=torch.long, device=device)  THIS LINE CREATED A NEW TENSOR EVERY TIME WE GET A NEW BATCH SLOWING TRAINING
    ix = torch.randint(len(data) - context_size, (batch_size,), device=device)
    x = torch.stack([data[i:i+context_size] for i in ix])
    y = torch.stack([data[i+1:i+context_size+1] for i in ix])
    return x, y

get_batch()

(tensor([[56, 40, 43,  ...,  1, 46, 53],
         [40, 48, 43,  ..., 46, 43, 63],
         [57,  1, 46,  ...,  1, 53, 59],
         ...,
         [47, 52,  1,  ..., 39, 56, 51],
         [41, 46,  1,  ..., 57, 47, 56],
         [ 1, 42, 43,  ..., 39, 52, 57]]),
 tensor([[40, 43, 39,  ..., 46, 53, 61],
         [48, 43, 41,  ..., 43, 63,  1],
         [ 1, 46, 43,  ..., 53, 59, 56],
         ...,
         [52,  1, 54,  ..., 56, 51, 57],
         [46,  1, 42,  ..., 47, 56, 56],
         [42, 43, 39,  ..., 52, 57, 58]]))

In [12]:
def get_validation_batch():
    ix = torch.randint(val_size - context_size, (batch_size,))
    x = [val_text[i:i+context_size] for i in ix]
    x = [encode(char) for char  in x]
    x = torch.tensor(x)
    label = [val_text[i+1:i+context_size+1] for i in ix]
    label = [encode(char) for char in label ] 
    label = torch.tensor(label)
    return x , label


In [13]:
def estimate_loss(model): 
    X, labels = get_batch() #X: (batch_size , context_size), labels: (batch_size , context_size)
    loss_fn = nn.CrossEntropyLoss() #takes (batch_size * context_size, vocab_size) and (batch_size * context_size)
    loss = 0
    logit_list = []
    for idx in X.flatten():
        _, logits = model.predict(idx.item()) 
        logit_list.append(logits.unsqueeze(0)) #we add another dimension so we can concatanate the list into a tensor later more efficiently.  
        #Currently a list of (batch_size * context_size , (1,vocabsize))
                         
    labels = labels.flatten() #(batch_size * context_size)
    logit_tensor = torch.cat(logit_list,dim=0)    #The tensors must have the same shape in all dimensions except the one specified by dim. 
    #All elements in the list are concatanated along the first dimension to produce a tensor  (batch_size * context_size, vocab_size) 
    loss = loss_fn(logit_tensor,labels)
    return loss.item()
    
            
            
    
     

In [14]:
#Constructing the Bigram Model manually without the Neural Network Module
class BigramModelManual(): 
    def __init__(self, training_text,vocab): 
        self.text = text
        self.vocab = vocab 
        self.training_text = training_text
        self.vocab_size = len(vocab) 
        self.matrice = np.zeros((self.vocab_size,self.vocab_size)) #Matrix Vocab_Size x Vocab_Size
        self.training_size = len(training_text)
    
    def train(self):  #normalizing the matrice based on the frequency of the following character
        for i in range(self.training_size-1):
            self.matrice[self.training_text[i],self.training_text[i+1]] +=1            
        for row in range(len(self.matrice)):
            row_sum = self.matrice[row].sum()
            if row_sum > 0:  # Avoid division by zero
                    self.matrice[row] /= row_sum

    def get_matrice(self):
        return self.matrice

    def get_logits(self,idx):
         torch.tensor(self.matrice[idx])
        
    def predict(self,idx): #idx is the input character
        selected_value = np.random.choice(self.vocab, p=self.matrice[idx])
        logits = torch.tensor(self.matrice[idx])
        return encode(selected_value)[0], logits
    
        
        
    def generate(self,generation_limit,initial_character=' '):
        generated_text = []
        idx = encode(initial_character)[0]
        for i in range(generation_limit):  
            pred, logits = self.predict(idx)
            generated_text.append(pred)
            idx = generated_text[-1]
        return decode(generated_text)
        
    
    

    

In [15]:
#Generating text with the manual BigramModel

model = BigramModelManual(encode(training_text),chars)
matrix = model.get_matrice()
model.train()
print(model.generate(50))
print("Loss: " , estimate_loss(model))

nthr VANShirenl, ori'TI ps go hurggor liloon OLoss


RuntimeError: Expected all tensors to be on the same device, but found at least two devices, cpu and cuda:0! (when checking argument for argument target in method wrapper_CUDA_nll_loss_forward)

In [16]:
from torch.nn import functional as F
#BigramModel using the Neural Network module
class BigramModel(nn.Module):
    def __init__(self,vocab_size): 
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size,vocab_size) #initializing the embedding table (vocabsize,vocab_size)

    def forward(self,idx,targets=None):
        logits = self.token_embedding_table(idx) #For each token in the input tensor idx, the model retrieves its corresponding logits. B,T -> B,T,C
        if targets == None:
            loss = None
        else:
            B,T,C = logits.shape
            logits_flatten = logits.view(B*T,C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits_flatten,targets)
        return logits, loss
    
    def generate(self,idx,max_new_tokens):
        for _ in range(max_new_tokens):
            #idx = B,T
            logits,_  = self.forward(idx) # we get the logits for each Batch (B,T,C
            logits = logits[:,-1,:] #convert logits to B,C (taking out the last column of every batch )
            proba = F.softmax(logits, dim=-1)  #convert the last dimension to probabilities
            new_token= torch.multinomial(proba,num_samples = 1) #generate new token based on proabilities (B,1)
            idx = torch.cat((idx,new_token),dim=1)
            
        return idx
            
            

In [17]:
#Trainign the BigramModule weights
X, labels = get_batch()
m = BigramModel(vocab_size)

idx = torch.zeros((1,1), dtype = torch.long)
optimizer = torch.optim.AdamW(m.parameters(), lr=learning_rate)

#loss, optimizer and logits are connected through nn.Embedding
for iteration in range (10000):
    X, labels = get_batch() #We get a random batch 
    logits, loss = m.forward(X,targets=labels) #We calculate the logits obtained from this batch 
    optimizer.zero_grad(set_to_none=True) #Not sure what this does
    loss.backward() #Computes the gradients of the loss
    optimizer.step() # Updates the weights of the model
print(loss.item())

print(decode(m.generate(idx,100)[0].tolist()))
    

RuntimeError: Expected all tensors to be on the same device, but found at least two devices, cpu and cuda:0! (when checking argument for argument index in method wrapper_CUDA__index_select)

In [21]:
class Head(nn.Module):
    def __init__(self,size):
        super().__init__()
        self.size = size
        self.key = nn.Linear(embd_size,size, bias= False) 
        self.query = nn.Linear(embd_size,size, bias= False)
        self.value = nn.Linear(embd_size,size, bias= False)
        self.dropout = nn.Dropout(dropout) #we dropout and set some affinities to 0 so that the model doesnt overly really on any one token 
        self.register_buffer('tril', torch.tril(torch.ones(context_size, context_size)))

    def forward(self,x):
        k = self.key(x) #we perform linear transformation on x and get key vectors B T 4
        q = self.query(x)  #we perform linear transformation on x and get query vectors B T 4
        v = self.value(x) #we perform linear transformation on x and get value vectors B T 4
        self.weights = q @ k.transpose(-2,-1)*self.size ** -0.5 #we perform matrix multiplication on key and query matrice and scale it down by square root of head_Size do reduce variance for the softmax
        #B T 4 @ B 4 T -> B T T For every token this represents the affinity with other tokens in the same context
        
        self.weights = self.weights.masked_fill(self.tril[:context_size, :context_size] == 0, float('-inf')) #we perform triangular masking to disregard tokens that are positionally in front of the token we are analysing 
        self.weights = F.softmax(self.weights, dim= -1)
        self.weights = self.dropout(self.weights)
        output = self.weights @ v 
        return output

In [22]:
#We combine multiple heads into 1 multihead attention block
class MultiHeadAttention(nn.Module):
    def __init__(self,nb_heads,head_size):
        super().__init__()
        self.nb_heads = nb_heads
        self.head_size = head_size
        self.Heads = nn.ModuleList([Head(head_size) for i in range(nb_heads) ])
        self.proj = nn.Linear(embd_size,embd_size) #Without self.proj, the heads would remain separate and untransformed, which could limit the model's ability to integrate the insights gained from different attention heads.
        self.dropout = nn.Dropout(dropout)
    
    def forward(self,x):
        out = torch.cat ( [heads.forward(x) for heads in self.Heads], dim = 2  )
        out = self.proj(out)
        out = self.dropout(out)
        return out #We concatanate all the invidiual heads to form an embd_size mutli_head
        
    

In [23]:
#The outputs from the MHA are fedforward into the next block using an activation function and linear transformation
class FeedForward(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(embd_size, 4 * embd_size), nn.ReLU(),nn.Linear(4 * embd_size,embd_size), # multiplying by 4 allows to  increase the size of the skip connection pathway?
  nn.Dropout(dropout)      ) #We set some features to 0 so that the model doesnt overly really on any single feature of a token

    def forward(self,x):
        return self.net(x)

In [24]:
class Block(nn.Module):

    def __init__(self,nb_heads,head_size):
        super().__init__()
        self.sa = MultiHeadAttention(nb_heads,head_size) 
        self.ffwd = FeedForward()
        self.ln_1 = nn.LayerNorm(embd_size)#We normalize the features of each token among themselves
        self.ln_2 = nn.LayerNorm(embd_size) 
    def forward(self,x):

        x = self.ln_1(x) + self.sa(x)  # Normalization BEFORE residual
        x = self.ln_2(x) + self.ffwd(x)  # 
        return x
        
        

In [25]:
from torch.nn import functional as F

class TransformerModel(nn.Module):
    def __init__(self,embd_size): 
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size,embd_size) #initializing the token embedding table for the identity of each token (embd_size,embd_size)
        self.positional_embedding_table = nn.Embedding(context_size ,embd_size).to(device) #initializing the positional embedding table for each position in the context 
        self.lm_head = nn.Linear(embd_size,vocab_size)
        self.blocks = nn.Sequential( *[Block(nb_heads,head_size) for _ in range(n_layer)] ) 
        self.ln_f =  nn.LayerNorm(embd_size)
    def forward(self,idx,targets=None):
        idx = idx.to(device, dtype=torch.long)
        seq_len = idx.size(1)
        token_positional_value = self.positional_embedding_table(
    torch.arange(seq_len, device=self.positional_embedding_table.weight.device, dtype=torch.long)
)
        token_embed_value = self.token_embedding_table(idx) 
        
 
        pos_and_embed_value = token_positional_value + token_embed_value #we sum the identity and the position of the token B , T, embd_size
        #4 8 32


        x = self.blocks(pos_and_embed_value) #we forward the values 
        logits = self.lm_head(x) #we transform the output into a class size array to get the logits
        if targets == None:
            loss = None
        else:
            B,T,C = logits.shape
            logits_flatten = logits.view(B*T,C)
            targets = targets.view(B*T)
            targets = targets.to(device)
            loss = F.cross_entropy(logits_flatten,targets)
        return logits, loss
    
    def generate(self,idx,max_new_tokens):
        for _ in range(max_new_tokens):
            if context_size > len(idx[0]):
                new_token = torch.zeros((1,context_size - len(idx)), dtype = torch.long, device=device) #T, make sure the new token is on the gpu 
                idx = torch.cat((idx,new_token),dim=1)
                logits,_  = self.forward(idx) 
            else:
                logits,_  = self.forward(idx[: , -context_size:]) # we get the logits for each Batch (B,T,C)
            logits = logits[:,-1,:] #convert logits to B,C (taking out the last column of every batch )
            proba = F.softmax(logits, dim=-1)  #convert the last dimension to probabilities
            new_token= torch.multinomial(proba,num_samples = 1) #generate new token based on proabilities (B,1)
            idx = torch.cat((idx,new_token),dim=1)
           
            
        return idx
            

In [26]:
m = TransformerModel(embd_size)
optimizer = torch.optim.AdamW(m.parameters(), lr=learning_rate)
m.to(device)  # Move the model to GPU

  
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=2000) #adjusts the learning rate
#Training
#loss, optimizer and logits are connected through nn.Embedding
for iteration in range (2000):
    X, labels = get_batch() #We get a random batch 
    X, labels = X.to(device), labels.to(device) # Move data to GPU
    logits, loss = m.forward(X,targets=labels) #We calculaote the logits obtained from this batch 
    optimizer.zero_grad(set_to_none=True) #Not sure what this does
    loss.backward() #Computes the gradients of the loss
    optimizer.step() # Updates the weights of the model
    print("Iteration", iteration)
    if iteration % 50 == 0:
        print("Iteration", iteration)
        print("Training loss")
        print(loss.item())
        
        

print("Training loss")
print(loss.item())



Iteration 0
Iteration 0
Training loss
4.2836432456970215
Iteration 1
Iteration 2
Iteration 3
Iteration 4
Iteration 5
Iteration 6
Iteration 7


KeyboardInterrupt: 

In [27]:
for iteration in range (10000):
    X, labels = get_validation_batch() #We get a random batch 
    logits, loss = m.forward(X,targets=labels) #We calculate the logits obtained from this batch 
    optimizer.zero_grad(set_to_none=True) #Not sure what this does
    loss.backward() #Computes the gradients of the loss
    optimizer.step() # Updates the weights of the model
print("Validation loss")
print(loss.item())

NameError: name 'get_validation_batch' is not defined

In [None]:
#Text generation
if dont_train:
    m.load_state_dict(torch.load('model_weights.pth'))


  m.load_state_dict(torch.load('model_weights.pth'))


































































































































































































































































CUTUSeMOLor:
Nurse:
I know a my coming told you not villain you, he's a fools ta'not!

AUTONUS:
Let the honourable of your unclest pridegrate.

Third Citizen:
Be your admisher, there by you his weath-harking! hear it calls
He, I'll doght the stand to seeman: therefore thou woldst be that
natives, of my cog, wothing a brozen and
bantic-led few on the night on virtue peace, against
And it canst tell upon a milf, those tripps
In peace. Musicians that incled liege,
And Romeo still for celter husband met them along:
Go, crave way, and with Ravensparited is not.

CATES:
March, O, naply, or never half tell of such are
My ories for the jointa, being readed,
To make thy things is thy husband's death,
We shall wish thy words word. That devour's

tensor([[0]], device='cuda:0')

In [None]:
idx = torch.zeros((1,1), dtype = torch.long, device=device) #Initalize with an empty character, making sure the tensor is on gpu
print(decode(m.generate(idx,1000)[0].tolist()))
