In [1]:
import torch 
import torch.nn as nn
from torch.nn import functional as F
import mmap
import random
import pickle
device = 'cuda' if torch.cuda.is_available() else 'cpu'

block_size = 32
batch_size = 128
max_iters = 200
learning_rate = 3e-4
eval_iters = 100
eval_interval = 500
n_embd = 384
n_layer = 1
n_head = 1
dropout = 0.2

In [2]:
device

'cuda'

In [3]:
chars = ""
with open('vocab.txt','r',encoding = 'utf-8') as f:
    text = f.read()
    chars = sorted(list(set(text)))

vocab_size = len(chars)

In [4]:
string_to_int = {ch:i for i,ch in enumerate(chars)}
int_to_string = {i:ch for i,ch in enumerate(chars)}
encode = lambda s: [string_to_int[c] for c in s]
decode = lambda l: ''.join([int_to_string[i] for i in l])

# data = torch.tensor(encode(text),dtype=torch.long)
# print(data[:100])

In [5]:
# n = int(0.8*len(data))
# train_data = data[:n]
# val_data = data[n:]

In [6]:
#memory map for using small snipets of text from a single file of any size

def get_random_chunk(split):
    filename = "output_train.txt" if split == "train" else "output_val.txt"
    with open(filename,'rb') as f:
        with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
            #Determine the file size and a random position to start reading
            file_size = len(mm)
            start_pos = random.randint(0, (file_size)-block_size*batch_size)
            #Seek to the random position and read the chunk
            mm.seek(start_pos)
            block = mm.read(block_size*batch_size-1)
            #Decode the chunk to a string, ignoring any invalid byte sequences
            decoded_block = block.decode('utf-8', errors='ignore').replace('\r', '')

            #Train and test splits
            data = torch.tensor(encode(decoded_block), dtype=torch.long)
    return data

In [7]:
def get_batch(split):
    # data = train_data if split == 'train' else val_data
    data = get_random_chunk(split)
    ix = torch.randint(len(data)-block_size,(batch_size,))
    # print(ix)
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    x,y = x.to(device),y.to(device)
    return x,y

In [8]:
@torch.no_grad()
def estimate_loss():
    out = {}
    model.eval()
    for split in ['train','val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X,Y = get_batch(split)
            logits,loss = model(X,Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

## Note: sinusoidal functions are used in base transformer model while learned embeddings like the ones used here are used in variants like GPT. So better performance would be shown for our model.

Note: Multihead attention allows us to hear different parts of the conversation between the heads and scaling(1/root of length of key) helps to head all parts evenly.

Note: Module list does not one layer/ head after another, but rather each is isolated and gets it unique perspective. Simultaneous computation. Uses GPU capabilities.
Sequential processing is where one block depends on another to synchronously complete. We are waiting on one to finish before we move to next one.

In [10]:
class Head(nn.Module):
    """one head of self-attention"""
    def __init__(self,head_size):
        super().__init__()
        self.key = nn.Linear(n_embd, head_size, bias= False)
        self.query = nn.Linear(n_embd, head_size, bias = False)
        self.value = nn.Linear(n_embd, head_size, bias = False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size))) #this just registers a no look ahead masking in the model state. Reduces computation.

        self.dropout = nn.Dropout(dropout)
    def forward(self,x):
        # input of size (batch,time-step,channels)
        # output of size (batch, time-step, head size)
        B,T,C = x.shape
        k = self.key(x)  # (B,T,hs)
        q = self.query(x) # (B,T,hs)
        # compute attention scores ("affinities")
        wei = q @ k.transpose(-2,-1)*k.shape[-1]**-0.5 #(B,T,HS) @ (B,hs,T) -> (B,T,T)
        wei = wei.masked_fill(self.tril[:T,:T] == 0, float('-inf')) #(B,T,T)
        wei = F.softmax(wei,dim = -1) #(B,T,T)
        wei = self.dropout(wei)
        #perform the weighted aggregation of the values
        v = self.value(x) #(B,T,hs)
        out = wei @ v #(B,T,T) @ (B,T,hs) -> (B,T,hs)
        return out

class MultiHeadAttention(nn.Module):
    """ multiple heads of self attention in parallel"""
    def __init__(self,num_heads,head_size):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(head_size*num_heads,n_embd) #adds more learnable parameters to help the network learn more about the text
        self.dropout = nn.Dropout(dropout)
    def forward(self,x):
        out = torch.cat([h(x) for h in self.heads],dim = -1) #(B, T, F)
        out = self.dropout(self.proj(out))
        return out
                
class FeedFoward(nn.Module):
    """ a simple linear layer followed by a non-linearity"""
    def __init__(self,n_embd):
        super().__init__()
        self.net = nn.Sequential(nn.Linear(n_embd, 4*n_embd),
                                 nn.ReLU(),
                                 nn.Linear(4*n_embd,n_embd),
                                 nn.Dropout(dropout)) #causes a percentage of neurons to dropout and become 0. Prevents overfitting
    def forward(self,x):
        return self.net(x)

class Block(nn.Module):
    def __init__(self,n_embd,n_head):
        #n_embd: embedding dimension, n_head = the number of heads we'd like
        super().__init__()
        head_size = n_embd//n_head
        self.sa = MultiHeadAttention(n_head,head_size)
        self.ffwd = FeedFoward(n_embd)
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)
        
    def forward(self, x):
        y = self.sa(x)
        x = self.ln1(x+y)
        y = self.ffwd(x)
        x = self.ln2(x+y)
        return x

class GPTLanguageModel(nn.Module):
    def __init__(self, vocab_size):
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size,n_embd)
        self.position_embedding_table = nn.Embedding(block_size,n_embd)
        self.blocks = nn.Sequential(*[Block(n_embd,n_head = n_head) for _ in range(n_layer)])
        self.ln_f = nn.LayerNorm(n_embd) #final layer norm ( can be used to make model converge better)
        self.lm_head = nn.Linear(n_embd,vocab_size)

        self.apply(self.__init__weights)
    def __init__weights(self,module):
        if isinstance(module,nn.Linear):
            torch.nn.init.normal_(module.weight, mean = 0.0, std = 0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
            elif isinstance(module,nn.Embedding):
                torch.nn.init.normal_(module.weight, mean=0.0, std = 0.02)
    
    def forward(self,index,targets = None):

        B,T = index.shape
        
        #idx and targets are both (B,T) tensor of intergers 
        tok_emb = self.token_embedding_table(index)
        pos_emb = self.position_embedding_table(torch.arange(T,device = device)) #(T,C)
        x = tok_emb + pos_emb #(B,T,C)
        x = self.blocks(x) #(B,T,C)
        x = self.ln_f(x) #(B,T,C)
        logits = self.lm_head(x) #(B,T,vocab_size)
        
        if targets is None:
            loss = None
        else:
            B,T,C = logits.shape
            logits = logits.view(B*T,C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits,targets)
            
        return logits,loss
    def generate(self,index,max_new_tokens):
        for _ in range(max_new_tokens):
            # get the predictions
            logits,loss = self.forward(index)
            # focus on only thelast time step
            logits = logits[:,-1,:]
            # apply softmax to get the probabilities
            probs = F.softmax(logits,dim = -1)
            # sample from the distribution
            index_next = torch.multinomial(probs,num_samples = 1)
            index = torch.cat((index,index_next),dim = 1)
        return index
model = GPTLanguageModel(vocab_size)
print("loading model parameters...")

# context = torch.zeros((1,1),dtype = torch.long,device = device)
# generated_chars = decode(m.generate(context, max_new_tokens = 500)[0].tolist())
# print(generated_chars)

with open('model-01.pkl','rb') as f:
    model = pickle.load(f)
print("loaded successfully!")
m = model.to(device)

loading model parameters...
loaded successfully!


In [11]:
optimizer = torch.optim.AdamW(model.parameters(),lr=learning_rate)
for iter in range(max_iters):
    if iter % eval_iters == 0:
        losses = estimate_loss()
        print(f"step: {iter}, train loss {losses['train']:.4f}, val loss: {losses['val']:.4f}")

    #sample a batch of data
    xb,yb = get_batch('train')

    #evaluate the loss
    logits,loss= model.forward(xb,yb)
    optimizer.zero_grad(set_to_none = True)
    loss.backward()
    optimizer.step()

print(loss.item())

with open('model-01.pkl','wb') as f:
    pickle.dump(model,f)
print('model saved')

step: 0, train loss 2.2247, val loss: 2.3066
step: 100, train loss 2.1129, val loss: 2.0858
2.2786340713500977
model saved
