# Generative Pre-trained Model

In [142]:
import torch
import torch.nn as nn
from torch.nn import functional as F
device='cuda' if torch.cuda.is_available() else 'cpu'
print(device)
batch_size=4
block_size=8
learning_rate=3e-4
max_iters=250
eval_iter=500
eval_interval=500
n_layer=4#number of decoder blocks
n_embd=10 #number of features/embedding dimension
n_head=4 #number of heads in multihead attention
dropout=0.2 
head_size=n_embd//n_head

cuda


In [143]:
#using Wizard of oz text
chars=""
with open("wizard of oz.txt", "r", encoding='utf-8') as f:
    text=f.read()
    chars=sorted(set(text))
#extract unique characters from the text, we are creating our vocabulary to work with
vocab_size=len(chars)
print(chars, vocab_size)

['\n', ' ', '!', '"', '&', "'", '(', ')', '*', ',', '-', '.', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ':', ';', '?', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '[', ']', '_', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '\ufeff'] 81


In [144]:
#we are using character-level characterization for each character we will encode the character to an integer and we will also form a decoding operation
#creating a dictionary mapping characters to integer and then integers back to characters
string_to_int= {ch:i for i , ch in enumerate(chars)} 
int_to_string = {i:ch for i,ch in enumerate(chars)}

encode = lambda s: [string_to_int[c] for c in s]
decode = lambda l: ''.join(int_to_string[i] for i in l)
en_hello=encode('hello')
de_hello=decode(en_hello)
print(en_hello, de_hello)

[61, 58, 65, 65, 68] hello


In [145]:
#we will convert the text into tensors as having large string is not efficient to work with
data= torch.tensor(encode(text), dtype=torch.long)# conveting the integer form of the whole text as a tensor
print(data[:50])

tensor([80, 28, 39, 42, 39, 44, 32, 49,  1, 25, 38, 28,  1, 44, 32, 29,  1, 47,
        33, 50, 25, 42, 28,  1, 33, 38,  1, 39, 50,  0,  0,  1,  1, 26, 49,  0,
         0,  1,  1, 36, 11,  1, 30, 42, 25, 38, 35,  1, 26, 25])


In [146]:
#dividing the data into training and testing
n=int(0.8*len(data))
train_data=data[:n]
val_data=data[n:]

def get_batch(split):
    data = train_data if split=='train' else val_data
    ix=torch.randint(len(data)-block_size,(batch_size,))
    x=torch.stack([data[i:i+block_size] for i in ix])
    y=torch.stack([data[i+1:i+(block_size+1)]for i in ix])
    x,y=x.to(device),y.to(device)
    return x,y
x,y=get_batch(train_data)
print("inputs:")
print(x)
print("outputs:")
print(y)

inputs:
tensor([[ 1, 68, 67, 56, 58,  1, 68, 67],
        [54, 64, 59, 54, 72, 73,  0, 55],
        [58, 71,  1, 54, 65, 65,  9,  1],
        [ 1, 55, 78,  1, 72, 74, 56, 61]], device='cuda:0')
outputs:
tensor([[68, 67, 56, 58,  1, 68, 67,  1],
        [64, 59, 54, 72, 73,  0, 55, 58],
        [71,  1, 54, 65, 65,  9,  1, 55],
        [55, 78,  1, 72, 74, 56, 61,  1]], device='cuda:0')


In [147]:
@torch.no_grad #pytorch doesn't do any gradient computation because we only want to compute loss here
def estimate_loss():
    model.eval()
    out={}
    for split in ['train','val']:
        losses=torch.zeros(eval_iter)
        
        for k in range(eval_iter):
            x,y=get_batch(split)
            Logits,loss=model.forward(x,y)
            losses[k]=loss.item()
        out[split]=losses.mean()
    model.train()
    return out    
        

In [148]:
class Head(nn.Module):
    
    def __init__(self,head_size):
        super().__init__()
        self.key=nn.Linear(n_embd,head_size,bias=False)
        self.query=nn.Linear(n_embd,head_size,bias=False)
        self.value=nn.Linear(n_embd,head_size,bias=False)
        self.register_buffer('tril',torch.tril(torch.ones(block_size,block_size)))
        self.dropout=nn.Dropout(dropout)
        
    def forward(self,x):
            #input of size (B,T,C)
            #output of size (B,T,hs)
            B,T,C=x.shape
            k=self.key(x)#(B,T,hs)
            q=self.query(x)#(B,T,hs)
            #computing attention scores
            wei=q@k.transpose(-2,-1)*k.shape[-1]**-0.5 #k.transpose--> (B,T,hs).(B,hs,T)=(B,T,T)
            #k.shape[-1]**-0.5 --> scaling the dot product by 1/sqrt(hs) so that no vector is dominant over other in the dot product
            wei=wei.masked_fill(self.tril[:T,:T]==0, float('-inf'))
            wei=F.softmax(wei,dim=-1)
            #performing weighted aggregation of the values
            v=self.value(x)#(B,T,hs)
            out=wei@v#(B,T,T)@(B,T,hs)-->(B,T,hs)
            return out



In [149]:

class MultiHeadAttention(nn.Module):
    """ multiple heads of self-attention in parallel """

    def __init__(self, num_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])#each head id processing parallely not sequentially--> ModuleList
        self.proj = nn.Linear(head_size * num_heads, n_embd)#adds learnable parameters i.e weight and bias 
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)#concatinating along the last dimension i.e C; concatinating all the features of different heads
        # (B, T, F) -> (B, T, [h1, h1, h1, h1, h2, h2, h2, h2, h3, h3, h3, h3])
        out = self.dropout(self.proj(out))
        return out
    

In [150]:
class FeedForward(nn.Module):
    def __init__(self,n_embd):
        super().__init__()
        self.net=nn.Sequential(
            nn.Linear(n_embd,4*n_embd),
            nn.ReLU(),
            nn.Linear(4*n_embd,n_embd),
            nn.Dropout(dropout)
        )
    def forward(self,x):
        return self.net(x)


In [151]:
class Block(nn.Module):
    
    def __init__(self,n_embd, n_head):
        super().__init__()
        head_size=n_embd//n_head #number of features each head will be processing
        self.sa=MultiHeadAttention(n_embd,n_head) #self-attention
        self.ffwd=FeedForward(n_embd)
        self.ln1=nn.LayerNorm(n_embd)
        self.ln2=nn.LayerNorm(n_embd)
    
    def forward(self,x):#post-norm architecture
        y=self.sa(x)
        x=self.ln1(x+y)
        y=self.ffwd(x)
        x=self.ln2(x+y)
        return x


In [155]:
class GPTLanguageModel(nn.Module):#class BigramLanguageModel inherits from nn.Module class 
    def __init__(self,vocab_size):
        ##__init__ is a constructor method called when an instance of the class is created it takes argument vocab_size
        super().__init__()#calling constructor method of the parent class, important when inheritance is there
        self.embedding_layer_table=nn.Embedding(vocab_size,n_embd)#instance of the class is created as self. 
        #this is an embedding layer which converts token into continous vectors
        self.positional_embedding_table=nn.Embedding(vocab_size,n_embd)#learnable embedding for GPT model
        #embedding for each sequence rather than individual tokens
        self.blocks=nn.Sequential(*[Block(n_embd,n_head=n_head) for _ in range(n_layer)])#number of dcoder blocks, same block is created 4 times  and stored in self.blocks
        self.ln_f=nn.LayerNorm(n_embd)#final layer norm after the decoder blocks which helps the model converge better by normalizing activations across feature dimension
        self.lm_head=nn.Linear(n_embd,vocab_size)#linear layer for predicting the next token in the sequence
        self.apply(self._init_weights)#applying given function to all sub modules of the model
    
    def _init_weights(self,module):#setting initial weights for linear and embedding layers
        if isinstance(module,nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)#weights have normal distribution with 0 mean and 0.02 std
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)#sets the biases to zero if there are any
        elif isinstance(module,nn.Embedding):
            torch.nn.init.normal_(module.weight,mean=0.0, std=0.02)
        
        
    def forward(self,index,targets=None):#forward pass method
#             logits=self.embedding_layer_table(index)#applies embeddig layer to input indices
            
            B, T = index.shape
            tok_emb=self.embedding_layer_table(index)#(B,T,C)
            pos_emb=self.positional_embedding_table(torch.arange(T,device=device))#(T,C)
            
            x=tok_emb+pos_emb#input to be fed into the decoder blocks
            x=self.blocks(x)#feeding x into the decoder blocks
            x=self.ln_f(x)#normalization
            logits=self.lm_head(x)#obtaining probabilities to be fed into softmax
            
            
            if targets==None:
                loss=None
            else:
                B,T,C=logits.shape#extrating the dimension of logits as Batch, Time and Channel
                logits=logits.view(B*T,C)#to treat each element in the sequence as individual prediction
                targets=targets.view(B*T)#each prediction correspondinly has one target
                loss=F.cross_entropy(logits,targets) #computes loss between input indices and target indices
            
            return logits,loss
        
    def generate(self,index,max_new_tokens):#creates a sequence of new indices based on input indices
            #index is a (B,T) array of indices in current context
            for _ in range(max_new_tokens):
                #get new predictions
                logits,loss=self.forward(index)
                #consider only last time stamp to focus on the most recent prediction
                logits=logits[:,-1,:]#becomes (B,C)
                #apply softmax to get probabilities
                probs=F.softmax(logits,dim=-1)#(B,1)
                index_next=torch.multinomial(probs,num_samples=1)#(B,T+1)
                #append new index to the running sequence
                index=torch.cat((index,index_next), dim=1)#updating index 
                
            return index

model = GPTLanguageModel(vocab_size)
# print('loading model parameters...')
# with open('model-01.pkl', 'rb') as f:
#     model = pickle.load(f)
# print('loaded successfully!')
m = model.to(device)


In [156]:
# model=GPTLanguageModel(vocab_size)
# m=model.to(device)

# context=torch.zeros((1,1),dtype=torch.long,device=device)#from where to start the sequence
# print(context)
# generated_chars=decode(m.generate(context,max_new_tokens=500)[0].tolist())
# #[0] is used to extract only the tensor as the generated sequence is returned as a tuple
# #tolist() converts the tensor into a python list
# #decode() is converting integer to string
# print(generated_chars)

# create a PyTorch optimizer
# import mmap
# import random
# import pickle
# import argparse
# optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

# for iter in range(max_iters):
# #     print(iter)
#     if iter % eval_iter == 0:
#         losses = estimate_loss()
#         print(f"step: {iter}, train loss: {losses['train']:.3f}, val loss: {losses['val']:.3f}")

#     # sample a batch of data
#     xb, yb = get_batch('train')

#     # evaluate the loss
#     logits, loss = model.forward(xb, yb)
#     optimizer.zero_grad(set_to_none=True)
#     loss.backward()
#     optimizer.step()
# print(loss.item())

# with open('model-01.pkl', 'wb') as f:
#     pickle.dump(model, f)
# print('model saved')

step: 0, train loss: 4.399, val loss: 4.399
3.4630956649780273
model saved


In [157]:
import mmap
import random
import pickle
import argparse
#create a pytorch optimizer
optimizer=torch.optim.AdamW(model.parameters(),lr=learning_rate)
#Adaptive Moment Estimation with weight decay

for iter in range(max_iters):
    
    if iter % eval_iter==0:
        losses=estimate_loss()
        print(f"iter: {iter}, train loss:{losses['train']:.3f}, val loss: {losses['val']:.3f}")
        
    #sample a batch of data
    x,y=get_batch(train_data)
    
    #evaluating the loss
    logits, loss=model.forward(x,y)
    optimizer.zero_grad(set_to_none=True)#so that previous gradients do not affect the current gradient
    #set to none instead of zero as none occupies a lot less space compared to zero
    loss.backward()#calculates gradient of loss and model parameters through backpropagation
    optimizer.step()#updates the parametrs based on the gradient using AdamW optimizer
print(loss.item())#value of loss for current itteration 
with open('model-01.pkl', 'wb') as f:
    pickle.dump(model, f)
print('model saved')

iter: 0, train loss:3.570, val loss: 3.581
3.2248287200927734
model saved


In [None]:
# context=torch.zeros((1,1),dtype=torch.long,device=device)#from where to start the sequence
# generated_chars=decode(m.generate(context,max_new_tokens=500)[0].tolist())
# print(generated_chars)

In [None]:
prompt = 'Hello! Can you see me?'
context = torch.tensor(encode(prompt), dtype=torch.long, device=device)
generated_chars = decode(m.generate(context.unsqueeze(0), max_new_tokens=100)[0].tolist())
print(generated_chars)