<div class="alert alert-block alert-success">
<b>Input Processing:</b> Reading the book called "The Verdict" into the Notebook
</div>

In [43]:
import torch

with open('the-verdict.txt', mode='r',encoding='utf-8') as data:
    text_data = data.read()

<div class="alert alert-block alert-success">
<b>Encoding Class:</b> Encoding using Tiktoken tokenizer. Using GPT2 encoding model
</div>

In [44]:
class Tokenizer:
    
    def __init__(self,text_data):
        import tiktoken
        self._text_data = text_data
        self._encoder = tiktoken.get_encoding('gpt2')
        
    def Encoding(self):
        ids = self._encoder.encode(self._text_data)
        return ids

<div class="alert alert-block alert-info">
<b>Tokenizer Example:</b> Using artificial data
</div>

In [45]:
Id = Tokenizer('This is just an example string.').Encoding()
print(Id)

[1212, 318, 655, 281, 1672, 4731, 13]


<div class="alert alert-block alert-success">
<b>Dataset Class:</b> Dataset class is created using dataset and dataloader class from Torch.utils.data  
</div>

In [46]:
from torch.utils.data import Dataset, DataLoader

class Dataset_(Dataset):
    # Takes a list of ids to initialize
    def __init__(self, ids, context_length, stride):
        
        self._ids = ids
        self._Inputs = []
        self._Targets = []
        
    # Creates input matrix consisting of ids arranged in batches * context_lenght
    # Also creates output for each input 
        
        for i in range(0, len(self._ids)-context_length, stride):
            
            # take the ids from ith position to all the way to the conext size
            input_ids = self._ids[i:i+context_length]
            target_ids = self._ids[i+1:i+context_length+1]
            
            # append the temporary inputs and outputs to main Input and Output array object
            self._Inputs.append(torch.tensor(input_ids))
            self._Targets.append(torch.tensor(target_ids))
 
    #def get(self, rowx):
        #return self._Inputs[rowx], self._Targets[rowx] 
        
    
    def __len__(self):
        
        return len(self._Inputs)
    
    # Returns the input and outpair of desired row
    def __getitem__(self, rowx):
        
        return self._Inputs[rowx], self._Targets[rowx] 

<div class="alert alert-block alert-success">
<b>Dataloader Function:</b> Dataset for training and testing 
</div>

In [47]:
def Load_Data(text, batch_size = 4,
              context_length = 256 ,
              stride= 4,
              drop_last = True,
              shuffle = False,
              num_workers = 0):
    
    ids = Tokenizer(text).Encoding()
    
    dataset = Dataset_(ids,
                       context_length,
                       stride)
    
    dataloader = DataLoader(dataset,
                            batch_size,
                            shuffle = False,
                            drop_last = True)
    return dataloader

In [48]:
GPT_config = {"Vocab_size": 50275,    # Total Byte-Pair encodding dictionary size
            "Context_lenght": 256,   # Length of Input context at a time
            "embedding_dim": 768,     # Total embedding dimension 
            "Number_of_heads": 12,    # Total number of heads in Multihead Attention Mechanis
            "Number_of_layers": 12,   # Toral number of transformer blocks
            "Dropout_rate": 0.1,      # Neuron drop out rate
            "QKV_bias": False         # Query, Key and Value bias
             }

## train and testing data split

train_test_split = 0.90
index = int(train_test_split * len(text_data))
train_text = text_data[:index]
test_text = text_data[index:]


## test - train data loader
## set of batches
train_dataloader = Load_Data(train_text,
                            batch_size=2,
                            context_length=GPT_config['Context_lenght'],
                            stride = GPT_config['Context_lenght'],
                            drop_last= True)

test_dataloader = Load_Data(test_data,
                           batch_size=2,
                           context_length=GPT_config['Context_lenght'],
                           stride = GPT_config['Context_lenght'],
                           drop_last= True)

In [53]:
## Checking number of tokens in each loaders
train_tokens = 0
for input_batch, target_batch in train_dataloader:
    train_tokens += input_batch.numel()

test_tokens = 0
for input_batch, target_batch in test_dataloader:
    test_tokens += input_batch.numel()

print("Training tokens:", train_tokens)
print("Testing tokens:", test_tokens)
print("All tokens:", train_tokens + test_tokens)

Training tokens: 4608
Testing tokens: 512
All tokens: 5120


In [36]:
class Layer_Normalization(torch.nn.Module):
    
    def __init__(self, emb_dim):
        super().__init__()
        
        self._epsilon = 1e-5 # to prevent zero division error
        self._scaling = torch.nn.Parameter(torch.ones(emb_dim)) 
        self._shift = torch.nn.Parameter(torch.zeros(emb_dim))
        
    def forward(self,X):
        
        mean = torch.mean(X, dim=-1, keepdim =True)
        var = torch.var(X, dim=-1, keepdim=True, unbiased = False)
        Normalized_X = (X - mean)/torch.sqrt(var + self._epsilon)
        
        return Normalized_X*self._scaling + self._shift
    
    
class GELU_Activation(torch.nn.Module):
    def __init__(self):
        super().__init__()
        
    def forward(self, x):
        return 0.5 * x * (1 + torch.tanh(
            torch.sqrt(torch.tensor(2.0 / torch.pi)) * 
            (x + 0.044715 * torch.pow(x, 3))
        ))  
    
class Feed_Forward(torch.nn.Module):
    def __init__(self, emb_dim):
        super().__init__()
        self._Model = torch.nn.Sequential(torch.nn.Linear(emb_dim, 4*emb_dim), GELU_Activation(),
                            torch.nn.Linear(4*emb_dim, emb_dim)
                           )
    def forward(self,X):
        return self._Model(X)

In [37]:
class Multihead_Attention(torch.nn.Module):
    
    def __init__(self, dim_in, dim_out, num_heads, dropout): # let just assume what out dimension we want
        super().__init__()
        self._dim_out = dim_out
        self._QueryW = torch.nn.Linear(dim_in, dim_out,bias = False) # Initialize the weights for Query, Key and Value weights
        self._KeyW = torch.nn.Linear(dim_in, dim_out,bias= False)
        self._ValueW = torch.nn.Linear(dim_in, dim_out,bias = False)
        
        self._Dropout = torch.nn.Dropout(dropout)
        
        # Initialize number of heads
        self._heads = num_heads
        # Remainder that to be zero, equal spitting of heads into dimensions
        assert (dim_out%self._heads == 0), "Has to be divisible"
        self._head_dimensions = dim_out//num_heads
        
        
        
    def forward(self,X):  
        # X.shape[0] = batch_size, X.shape[1] = tot_embedding, X.shape[2] = dim_in
        batch_size, tot_embedding, dim_in = X.shape
        # calculation for Query, Key and Value matrix
        Query = self._QueryW(X)
        Key = self._KeyW(X)
        Value = self._ValueW(X)
        
        # The transition to split it in number of heads and dimensions
        Query = Query.view(batch_size, tot_embedding, self._heads, self._head_dimensions)
        Key = Key.view(batch_size, tot_embedding, self._heads, self._head_dimensions)
        Value = Value.view(batch_size, tot_embedding, self._heads, self._head_dimensions)
        
        # Transpose the position of tot_embedding with total number of heads
        Query = Query.transpose(1,2)
        Key = Key.transpose(1,2)
        Value = Value.transpose(1,2)
        
        attention_score = Query @ Key.transpose(2,3)
        
        # Initialization of mask
        mask = torch.tril(torch.ones(tot_embedding,tot_embedding))
        
        # Normalization of attention_score
        attention_score_normalized = torch.softmax(attention_score/Key.shape[-1]**0.5, dim=-1) 
        
        # Masking the attention_score
        attention_score_masked = attention_score_normalized * mask
        
        # Normalization to 1
        attention_weights = attention_score_masked/torch.sum(attention_score_masked, dim =-1, keepdim = True)
        
        attention_weights = self._Dropout(attention_weights)
        
        # Context_vector calculation
        ### Transpose the context vector to rearrange and get number of heads close to head_dimensions
        context_vector = (attention_weights @ Value).transpose(1,2)
        
        
        context_vector = context_vector.contiguous().view(batch_size, tot_embedding, self._dim_out)
        
        
        return context_vector
    
class Transformer_Block(torch.nn.Module):
    
    def __init__(self, GPT_config):
        super().__init__()
        ##### Takes tensor in forward method ##### Test with tensor
        self._Multihead_Attention = Multihead_Attention(GPT_config["embedding_dim"], # dim_in = dim_out
                                                        GPT_config["embedding_dim"],
                                                        GPT_config["Number_of_heads"],
                                                        GPT_config["Dropout_rate"])
        
        self._Layer_Normalization1 = Layer_Normalization(GPT_config["embedding_dim"]) # Linear Normalization Layer
        self._Layer_Normalization2 = Layer_Normalization(GPT_config["embedding_dim"])
        
        self._Feed_Forward = Feed_Forward(GPT_config["embedding_dim"]) # Expansion and contraction layer
        
        self._Dropout_Short = torch.nn.Dropout(GPT_config["Dropout_rate"]) # drop out step
        
        
    def forward(self, X):
        
        shortcut = X                                # seperate line for the short cut
        
        X = self._Layer_Normalization1(X)           # First Layer Normalization
        
        X = self._Multihead_Attention(X)             # Pass through Multihead Attention
        
        X = self._Dropout_Short(X)                  # Drop some Neurons
        
        X = X + shortcut                             # Merge with the shortcut from the input
        
        ## Second stage of transformer 
        
        shortcut = X
        
        X = self._Layer_Normalization2(X)
        
        X = self._Feed_Forward(X)
        
        X = self._Dropout_Short(X)
        
        X = X + shortcut
        
        return X       

In [38]:
class GPT_Model(torch.nn.Module):
    def __init__(self, GPT_config):
        super().__init__()
        
        self._Embeddings = torch.nn.Embedding(GPT_config['Vocab_size'],
                                             GPT_config['embedding_dim'])
        self._Positional_Embedding = torch.nn.Embedding(GPT_config['Context_lenght'],
                                             GPT_config['embedding_dim'])
        self._Dropout = torch.nn.Dropout(GPT_config['Dropout_rate'])
        
        # Transformer step
        self._Transformer = torch.nn.Sequential(
                                        *[Transformer_Block(GPT_config) for _ in range(GPT_config['Number_of_layers'])])
        
        # Output steps
        self._LayerNormalization = Layer_Normalization(GPT_config['embedding_dim'])
        self._Out = torch.nn.Linear(GPT_config['embedding_dim'], GPT_config['Vocab_size'], bias = False)
        
    def forward(self, ids):                         # Here X is input of dimension Batch size and context lenght
        
        batch_size, context_len = ids.shape
        
        token_embedding = self._Embeddings(ids)     # X become 3 dimensional tensor Batch size, context length and embedding dim 
        positional_embedding = self._Positional_Embedding(torch.arange(context_len, device = ids.device))
        
        X = token_embedding + positional_embedding # Add the positional embedding
        
        X = self._Dropout(X)                       # Drop out 
        X = self._Transformer(X)                   # Pass through the transformer             
        
        X = self._LayerNormalization(X)            # layer Normalization, mean = 0, standard deviation = 1
        logits = self._Out(X)
        
        return logits 
    
## Have to perform softmax on the logits

In [64]:
## Loss calculation for one batch
def Cal_loss_batch(Input_batch, Target_batch, model, device):
    
    Input_batch, Target_Batch = Input_batch.to(device), Target_batch.to(device)
    logits = model(Input_batch)
    ## flaten the logits to remove the batch dimensions and club all the context together
    loss = torch.nn.functional.cross_entropy(logits.flatten(0,1), Target_batch.flatten())
    return loss


## Loss calculation for loader

def Cal_loss_loader(dataloader, model, device, num_batches =None):
    total_loss = 0
    if len(dataloader) == 0:
        return "Nothing in the loader"
    
    elif num_batches is None:
        num_batches = len(dataloader)
        
    else:
        num_batches = min(len(dataloader), num_batches)
        
    for i, (Input, Target) in enumerate(dataloader):
        if i < num_batches:
            loss = Cal_loss_batch(Input, Target, model, device)
            total_loss = loss.item() + total_loss
            
        else:
            break
            
    return total_loss/num_batches

In [65]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# GPT Model of GPT Model class
model = GPT_Model(GPT_config)

In [70]:
torch.manual_seed(123)

with torch.no_grad():
    training_loss = Cal_loss_loader(train_dataloader, model, device)
    validation_loss = Cal_loss_loader(test_dataloader, model, device)

The training loss is --10.977535247802734


In [72]:
print(training_loss)
print(validation_loss)

tensor(10.9775)
tensor(11.0123)
