In [None]:
import torch
import torch.nn as nn
from torch.nn import functional as F
device = 'cuda' if torch.cuda.is_available() else 'cpu' #Use GPU if it's available
print(device)

block_size = 8 #represents the length of integers/sequence
batch_size = 4 #represents the number of operations ran in parallel

max_iters = 1000
#eval_interval = 250
learning_rate = 3e-3 #We can experiment with this. Changing the values can lead to improvements or challenges. Choose a value that keeps training speed fast and improves the model well
eval_iters = 250
n_embed = 384 #This can be varied depending on available compute resources
embedding_vector = [0.1, 0.2, 0.8, 1.1] #Each element here is 384units in length (takes on the size of n_embed). Store attributes of the token

n_layer  = 4 #Number of decoder layers




In [None]:
with open('wizard_of_oz.txt','r', encoding='utf-8') as f:
    text = f.read()

vocab_size = len(chars) #vocabulary_size

In [None]:
string_to_int = {ch:i for i, ch in enumerate(chars)}
int_to_string = {i:ch for i, ch in enumerate(chars)}
encode = lambda s: [string_to_int[c] for c in s]
decode = lambda l: ''.join([int_to_string[i] for i in l])

data = torch.tensor(encode(text), dtype=torch.long) #Entire data of the wizard of oz
print(data[:100]) #prints out the first 100 integers

In [None]:
#Validation and Training Splits

n = int(0.8*len(data))
train_data = data[:n]
val_data = data[n:]

#Get batch function
def get_batch(split):
    data = train_data if split == 'train' else val_data
    ix = torch.randint(len(data) - block_size, (batch_size,))
    print(ix)
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    x, y = x.to(device), y.to(device) #This line of code ensure that the data is no longer processed sequential by the CPU but in batches by the GPU in parallel
    return x, y



In [None]:
@torch.no_grad() #This is a decorator that makes sure that python doesn't use gradient here. It improves the performance
def estimate_loss():
    out = {}
    model.eval() #by eval we are testing the model
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = get_batch(split)
            logits, loss = model(X, Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train() #Puts the model on the training mode
    return out

In [None]:
class Block(nn.Module):
    
    def __init__(self, n_embd, n_head):
        #n_embd = embedding dimension
        #n_head = number of heads
        super().__init__()
        
        head_size = n_embd // n_head
        self.aa = MultiHeadAttention(n_head, head_size)
        self.ffwd = FeedForward(n_embd)
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)
        
        
    def forward(self, x):
        y = self.sa(x)
        x = self.ln1(x + y)
        y = self.ffwd(x)
        x = self.ln2(x + y)
        return x





class GPTLanguageModel(nn.Module):
    def __init__(self, vocab_size):
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd) #This is pretty-much a lookup table
        self.position_embedding_table = nn.Embedding(block_size, n_embd) #Implements positional emdedding
        
        self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head) for _ in range(n_layer)]) #Defines the number of sequential decoder blocks. '*' symbol repeats loop for n_layers
    
        self.ln_f = nn.LayerNorm(n_embd) #final layer norm
        self.lm_head = nn.Linear(n_embd, vocab_size)
        
        self.apply(self._init_weights)
        
    
    #Initialize weight around a set standard deviation - Handles standard deviation
    def __init__weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
            
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
        
    #forward pass
    def forward(self, index, targets):
        logits = self.token_embedding_table(index) #This logit is 3-dimensional compared to the logit in the else statement.
        
        #idx and targets are (B, T) tensor of integers
        tok_emb = self.token_embedding_table(idx) #(B,T,C)
        pos_emb = self.position_embedding_table(torch.arange(T, device=device)) #(T, C)
        x = tok_emb + pos+emb #(B,T,C)
        x = self.blocks(x) #(B, T, C)
        x = self.in_f(x) #(B, T, C)
        logits = self.lm_head(x) #(B, T, vocab_size)
        

        if targtets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C) #B = batch and T= Time
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss


    #Function to generate a token
    def generate(self, index, max_new_tokens):
        #index is (B, T) array of indices in the current context
        for _ in range(max_new_tokens):
            #get the predictions
            logits, loss = self.forward(index)
            #focus only on the last time step
            logits = logits[:, -1, :] #becomes (B,C)
            #Apply softmax to get probabilities
            probs = F.softmax(logits, dim=-1) #(B,C) The softmax function gives us the probabiloty distribution
            #Sample from the distribution
            index_next = torch.multinomial(probs, num_samples=1) #(B, 1)
            #Append sampled index to the running sequence
            index = torch.cat((index, index_next), dim=1) #(B, T+1) - This concatenates more token unto the index
        return index



model = GPTLanguageModel(vocab_size)
m = model.to(device)


In [None]:
#Now we create a Pytorch Optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

for iter in range(max_iters):
    if iter % eval_iters == 0:
        losses = estimate_loss()
        print(f"step: {iter}, train loss {losses['train']}, val loss: {losses['val']:.4f}") # f' ' is called f-string. "val loss = validation loss
        #print(f"step: {iter}, train loss {losses['train']}, val loss: {losses['val']}") #use this line if you don't want a limitation to 4 decimal places
    #sample a batch of data
    xb, yb = get_batch('train')

    #evaluate the loss
    logits, loss = model.forward(xb, yb)
    #Previous gradients are from previous data so we don't want to save them as the previous data might not be the best. We want current data
    #This next time "set_to_none=True" helps us resolve this previous gradient issue
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()
print(loss.item())