In [1]:
#%pip install torch
import torch
import torch.nn as nn
from torch.nn import functional as F
import numpy as np
import random

Learning_rate = 3e-4
Max_iterations = 5000
Evaluation_iterations = 100
Evaluation_Intervals = 50
Batch_size = 64


Block_size = 15
Embedding_neurons = 400
#Head_Size = 3
Layers_amount = 9
Number_Heads = 7



Temperature = 1.2
dropout = 0.2
device = "cuda"

In [5]:
import os
def save(model,file_name='FromScratchModel.pth'):
    model_folder_path = './model'
    if not os.path.exists(model_folder_path):
        os.makedirs(model_folder_path)
    file_name = os.path.join(model_folder_path, file_name)
    torch.save(model.state_dict(),file_name)
    
def load(file_name='FromScratchModel.pth'):
    model_folder_path = './model'
    
    file_name = os.path.join(model_folder_path, file_name)
    return file_name
    #torch.save(self.state_dict(),file_name)
    


In [6]:
torch.manual_seed(1337)

with open('PokemonStatsFile.txt', 'r', encoding='utf-8', errors='ignore') as f:
    lines = f.readlines()
    random.shuffle(lines)
f.close()

open('PokemonStatsFile.txt', 'w').writelines(lines)
with open('PokemonStatsFile.txt', 'r', encoding='utf-8', errors='ignore') as f:
    text = f.read().replace("\n","")
#print(text)
# here are all the unique characters that occur in this text
chars = sorted(list(set(text.split(","))))#+sorted(list(set(text)))

Alphabet = chars
vocab_size = len(chars)
# create a mapping from characters to integers
stoi = { ch:i for i,ch in enumerate(chars) }
print(stoi)
itos = { i:ch for i,ch in enumerate(chars) }
encode = lambda s: [stoi[c] for c in s] # encoder: take a string, output a list of integers
decode = lambda l: ' '.join([itos[i] for i in l]) # decoder: take a list of integers, output a string
print(stoi['Fairy'])
# Train and test splits
data = torch.tensor(encode(text.split(",")), dtype=torch.long)
n = int(0.9*len(data)) # first 90% will be train, rest val
train_data = data[:n]
val_data = data[n:]


print(len(train_data))

294
91214


In [7]:
def get_batch(split):
    # generate a small batch of data of inputs x and targets y
    data = train_data if split == 'train' else val_data
    ix = torch.randint(len(data) - Block_size, (Batch_size,))
    x = torch.stack([data[i:i+Block_size] for i in ix])
    y = torch.stack([data[i+1:i+Block_size+1] for i in ix])
    #print(x,len(y))
    x, y = x.to(device), y.to(device)

    return x, y

@torch.no_grad()
def estimate_loss(model):
    #Loss output
    out = {}
    #Model evaluation
    model.eval()
    #iterate through the model first using training data then evaluation data
    for split in ['train', 'val']:
        #initiate a losses array
        losses = []
        #loop evalutation for however many iterations
        for k in range(Evaluation_iterations):
            #generates the inputs and the target outputs
            X, Y = get_batch(split)
            #runs the X inputs through the network and compares them to the Y outputs to generate loss
            logits, loss = model(X, Y)
            #appends the loss to an array
            losses.append(loss.item())
        #saves the train and validation average losses seperately 
        out[split] = np.array(losses).mean()
    #trains the model, TODO why
    model.train()
    return out

In [8]:
class HeadOfAttention(nn.Module):
    def __init__(self, Head_Size):
        super().__init__()
        self.Query = nn.Linear(Embedding_neurons,Head_Size, bias=False)
        self.Key = nn.Linear(Embedding_neurons,Head_Size, bias=False)
        self.Value = nn.Linear(Embedding_neurons,Head_Size, bias=False)
        self.register_buffer("tril",torch.tril(torch.ones(Block_size, Block_size)))
        self.Dropout = nn.Dropout(dropout)
    def forward(self,x):
        #get BTC from the input
        Batch,TimeStep,Channel = x.shape
        #Key
        K = self.Key(x)
        #Query
        Q = self.Query(x)
        #Obtaining W from performing a matrix multiplication of the Querys and Keys
        Wqk = Q @ K.transpose(-2,-1) * K.shape[-1]**-0.5
        #Applying a mask
        Wqk = Wqk.masked_fill(self.tril[:TimeStep, :TimeStep] == 0, float('-inf'))
        #Performing a softmax to obtain the probabilities of each tokens likelyhood of following what has been inputted
        Wqk = F.softmax(Wqk.to(device), dim=-1)
        #apply dropout to W
        Wqk = self.Dropout(Wqk)
        #Obtaining the value
        V = self.Value(x)
        #Generating an output
        output = Wqk @ V

        return output

class MultiHeadedAttention(nn.Module):
    def __init__(self,Number_Heads,Head_Size):
        super().__init__()
        #initialise heads of attention
        self.AttentionHeads = nn.ModuleList([HeadOfAttention(Head_Size) for _ in range(Number_Heads)])
        #Linear projections
        self.Projection = nn.Linear(Head_Size * Number_Heads, Embedding_neurons)
        #dropout
        self.Dropout = nn.Dropout(dropout)
    def forward(self,x):
        #concatonating the heads of attention for the output
        out = torch.cat([h(x) for h in self.AttentionHeads], dim=-1)
        #output is put through a dropout to prevent overfitting
        out = self.Dropout(self.Projection(out))
        return out
class FeedForward(nn.Module):
    def __init__(self,Embedding_neurons):
        super().__init__()
        #Initialising FeedForward network
        self.Network = nn.Sequential(
            nn.Linear(Embedding_neurons,4*Embedding_neurons),
            nn.ReLU(),
            nn.Linear(4*Embedding_neurons,Embedding_neurons),
            nn.Dropout(dropout)

        )
    def forward(self,x):
        return self.Network(x)
class Block(nn.Module):
    def __init__(self, Number_Embeds, Number_Heads):
        super().__init__()
        #Calculating the head_size
        Head_Size = Number_Embeds // Number_Heads
        
        self.SelfAttention = MultiHeadedAttention(Number_Heads,Head_Size)
        self.FeedForward = FeedForward(Number_Embeds)
        self.LinearNormalisation1 = nn.LayerNorm(Number_Embeds)
        self.LinearNormalisation2 = nn.LayerNorm(Number_Embeds)
    def forward(self,x):
        #the input is the input concatonated with self attention that had the input linearly normalised as its input
        x = x + self.SelfAttention(self.LinearNormalisation1(x))
        x = x + self.FeedForward(self.LinearNormalisation2(x))
        return x

class LLM(nn.Module):
    def __init__(self):
        super().__init__()
        self.Token_Embedding_Table = nn.Embedding(vocab_size,Embedding_neurons)
        self.Positional_Embedding_Table = nn.Embedding(Block_size,Embedding_neurons)
        self.Blocks = nn.Sequential(*[Block(Embedding_neurons, Number_Heads) for _ in range(Layers_amount)])
        self.LayerNormalisation = nn.LayerNorm(Embedding_neurons) # final layer norm
        self.Linear_Head = nn.Linear(Embedding_neurons, vocab_size)
        #Initialise weights
        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
    def forward(self,index,targets=None):
        Batch, TimeStep = index.shape

        #Parse the indexes of the input through the Token embedding and positional embedding to be learnt
        Token_Embedding = self.Token_Embedding_Table(index)
        Positional_Embedding = self.Positional_Embedding_Table(torch.arange(TimeStep, device=device))
        #concatonate the Token and Positional Embeddings to create the input which we then parse through our network
        x = Token_Embedding + Positional_Embedding
        x = self.Blocks(x)
        x = self.LayerNormalisation(x)
        Logits = self.Linear_Head(x)

        if targets is None:
            loss = None
        else:
            Batch, TimeStep, Channel = Logits.shape
            Logits = Logits.view(Batch*TimeStep, Channel)
            targets = targets.view(Batch*TimeStep)
            #calculate a cross entropy loss between the logits that were created from parsing the input through the linear layer and the target output
            loss = F.cross_entropy(Logits, targets)

        return Logits, loss
    
    def generate(self, index, max_new_tokens):
        
        for _ in range(max_new_tokens):
            #only look at the indexes that are within our block size
            cropped_Index = index[:, -Block_size:]
            
            #Parse the cropped indexes through the network to get the logits and loss
            logits, loss = self(cropped_Index)
            # focus only on the last time step
            logits = logits[:, -1]
            # apply softmax to get probabilities unless you decide to use Temperature which I am
            #logits = torch.tensor(logits, dtype=torch.long,device='cuda')
            probs = torch.exp(logits[-1]/Temperature)/sum(torch.exp(logits[-1]/Temperature))
            #print(torch.exp(logits[-1]/temperature))
            #probs = F.softmax((logits.to(device)), dim=-1)
            probs = probs.reshape(1,len(chars))
            #print(probs)
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1)
            # append sampled index to the running sequence
            idx = torch.cat((idx, idx_next), dim=1) 
        return idx
        

        


In [9]:
model = LLM()
device = 'cuda'
#model.load_state_dict(torch.load('./model/FromScratchModel.pth'))
m = model.to(device)
# print the number of parameters in the model
print(sum(p.numel() for p in m.parameters())/1e6, 'M parameters')

# create a PyTorch optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=Learning_rate) 
#print(list(model.signature.keys()))

17.895533 M parameters


In [31]:

for iter in range(Max_iterations):
   
    # every once in a while evaluate the loss on train and val sets
    if iter % Evaluation_Intervals == 0 or iter == Max_iterations - 1:
        losses = estimate_loss(model)
        print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")
        save(model)

    # sample a batch of data
    xb, yb = get_batch('train')

    # evaluate the loss
    
    logits, loss = model(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

step 0: train loss 6.7725, val loss 6.7657
step 50: train loss 4.3633, val loss 4.3943
step 100: train loss 3.6019, val loss 3.6163
step 150: train loss 3.2877, val loss 3.3044
step 200: train loss 3.1342, val loss 3.1609
step 250: train loss 3.0222, val loss 3.0567
step 300: train loss 2.9005, val loss 2.9445
step 350: train loss 2.7575, val loss 2.8173
step 400: train loss 2.6124, val loss 2.6527
step 450: train loss 2.4189, val loss 2.4838
step 500: train loss 2.2689, val loss 2.3515
step 550: train loss 2.1239, val loss 2.1850


KeyboardInterrupt: 