In [1]:
import torch
import torch.nn as nn

In [31]:
class LayerNormalization(nn.Module):
    def __init__(self,emb_dim):
        super().__init__()
        self.eps = 1e-5
        self.scale = nn.Parameter(torch.ones(emb_dim))
        self.shift = nn.Parameter(torch.zeros(emb_dim))
        
    def forward(self,idx):
        mean = idx.mean(dim=-1,keepdim = True)
        var = idx.var(dim = -1,keepdim =True,unbiased = False)
        norm_x = (idx - mean) / torch.sqrt(var + self.eps)
        return self.scale * norm_x + self.shift

In [32]:
class MultiHeadattation(nn.Module):
    
    def __init__(self,d_in,d_out,context_length,dropout,num_heads,qkv_bias = False):
        super().__init__()
        
        assert (d_out % num_heads == 0), \
        'number of heads must be lower then dimension out'
        
        self.register_buffer('mask',torch.triu(torch.ones(context_length,context_length),diagonal = 1))
        self.d_out  = d_out
        self.num_heads = num_heads
        self.head_dim = self.d_out // self.num_heads
        self.q_weight = nn.Linear(d_in,d_out,bias = qkv_bias)
        self.k_weight = nn.Linear(d_in,d_out,bias = qkv_bias)
        self.v_weight = nn.Linear(d_in,d_out,bias = qkv_bias)
        self.dropout  = nn.Dropout(dropout)
        
    def forward(self,idx):
        batch,num_tokens,d_in = idx.shape
        q_matrix = self.q_weight(idx)
        k_matrix = self.k_weight(idx)
        v_matrix = self.v_weight(idx)
        
        query = q_matrix.view(batch,num_tokens,self.num_heads,self.head_dim)
        key = k_matrix.view(batch,num_tokens,self.num_heads,self.head_dim)
        value = v_matrix.view(batch,num_tokens,self.num_heads,self.head_dim)
        
        query = query.transpose(1,2)
        key = key.transpose(1,2)
        value = value.transpose(1,2)
        
        attention_score = query @ key.transpose(2,3)
        masked_matrix = attention_score.masked_fill(self.mask.bool()[:num_tokens,:num_tokens],-torch.inf)
        attention_weight = torch.softmax(masked_matrix / idx.shape[-1] ** 0.5,dim=-1)
        
        attention_weight = self.dropout(attention_weight)
        context_vec = (attention_weight @ value).transpose(1,2)
        context_vec = context_vec.contiguous().view(batch,num_tokens,self.d_out)
        
        return context_vec

In [33]:
class GELU(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, x):
        return 0.5 * x * (1 + torch.tanh(
            torch.sqrt(torch.tensor(2.0 / torch.pi)) * 
            (x + 0.044715 * torch.pow(x, 3))
        ))

In [34]:
class FeedForward(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(cfg["emb_dim"], 4 * cfg["emb_dim"]),
            GELU(),
            nn.Linear(4 * cfg["emb_dim"], cfg["emb_dim"]),
        )

    def forward(self, x):
        return self.layers(x)

In [40]:
class Transformerblock(nn.Module):
    def __init__(self,cfg):
        super().__init__()
        self.MultiHeadattation_instance = MultiHeadattation(cfg['emb_dim'],cfg['emb_dim'],cfg['context_length'],cfg['drop_rate'],cfg['num_heads'])
        self.att = MultiHeadattation(
            d_in=cfg["emb_dim"],
            d_out=cfg["emb_dim"],
            context_length=cfg["context_length"],
            num_heads=cfg["num_heads"], 
            dropout=cfg["drop_rate"],
            qkv_bias=cfg["qkv_bias"])
        self.ff = FeedForward(cfg)
        self.norm1 = LayerNormalization(cfg["emb_dim"])
        self.norm2 = LayerNormalization(cfg["emb_dim"])
        self.drop_shortcut = nn.Dropout(cfg["drop_rate"])
    
    def forward(self,x):
        
        shortcut = x
        x = self.norm1(x)
        x = self.att(x) 
        x = self.drop_shortcut(x)
        x = x + shortcut  
        
        shortcut = x
        x = self.norm2(x)
        x = self.ff(x)
        x = self.drop_shortcut(x)
        x = x + shortcut 

        return x

In [41]:
class GPT2(nn.Module):
    def __init__(self,cfg):
        super().__init__()
        self.tok_emb = nn.Embedding(cfg["vocab_size"], cfg["emb_dim"])
        self.pos_emb = nn.Embedding(cfg["context_length"], cfg["emb_dim"])
        self.drop_emb = nn.Dropout(cfg["drop_rate"])
        
        self.trf_blocks = nn.Sequential(
            *[Transformerblock(cfg) for _ in range(cfg["num_layers"])])
        
        self.final_norm = LayerNormalization(cfg["emb_dim"])
        self.out_head = nn.Linear(
            cfg["emb_dim"], cfg["vocab_size"], bias=False
        )

    def forward(self, idx):
        batch_size, seq_len = idx.shape
        tok_embeds = self.tok_emb(idx)
        pos_embeds = self.pos_emb(torch.arange(seq_len, device=idx.device))
        idx = tok_embeds + pos_embeds 
        idx = self.drop_emb(idx)
        idx = self.trf_blocks(idx)
        idx = self.final_norm(idx)
        logits = self.out_head(idx)
        return logits
        

In [42]:
GPT_Cfg = {
    'num_heads' : 12,
    'num_layers' : 12,
    'drop_rate' : 0.1,
    'qkv_bias' : False,
    'emb_dim' : 768,
    'context_length' : 1024,
    'vocab_size'  : 50257,
}

In [43]:
import tiktoken
tokenizer = tiktoken.get_encoding("gpt2")
batch = []
txt1 = "Every effort moves you"
txt2 = "Every day holds a"
batch.append(torch.tensor(tokenizer.encode(txt1)))
batch.append(torch.tensor(tokenizer.encode(txt2)))
batch = torch.stack(batch, dim=0)
print(batch)

tensor([[6109, 3626, 6100,  345],
        [6109, 1110, 6622,  257]])


In [44]:
dummygpt = GPT2(GPT_Cfg)
result = dummygpt(batch)
result

tensor([[[ 0.2347, -0.1016,  0.1631,  ...,  0.8065,  0.3054,  0.5566],
         [-0.3107, -0.4797,  0.4004,  ...,  0.6313, -0.7497, -0.5196],
         [ 0.4809, -0.5083,  0.6023,  ..., -0.0438, -0.9024,  0.6370],
         [-0.3182, -0.1915,  0.6027,  ...,  0.7526, -0.4295,  0.3570]],

        [[ 0.3467, -0.1348,  0.1617,  ...,  0.5546,  0.0543,  0.0845],
         [-0.1944, -0.2589, -0.0604,  ...,  0.4980, -0.5669,  0.8765],
         [-0.7341, -1.0489,  0.1133,  ...,  1.3567, -0.4445,  0.3827],
         [-0.3083, -0.2613,  0.2062,  ...,  1.4051, -0.5469,  0.2193]]],
       grad_fn=<UnsafeViewBackward0>)

In [49]:
total_parameters = sum(p.numel() for p in dummygpt.parameters())
print(f"Total Number Of Parameters are :{total_parameters:,}")

Total Number Of Parameters are :177,156,096


In [59]:
def generate_new_text(idx,max_iteration,model,context_size):
    
    for _ in range(max_iteration):
        idx_new = idx[:,-context_size:]
        
        with torch.no_grad():
            logits = model(idx_new)
            
        new_logits = logits[:,-1,:]
        probas = torch.softmax(new_logits,dim=-1)
        max_probas = torch.argmax(probas,dim=-1,keepdim=True)
        
        idx = torch.concat((idx,max_probas),dim=-1)
        
    return idx

In [66]:
text = "my name"
toknized_text = tokenizer.encode(text)
print("Tokenized text is : ",toknized_text)
new_tokenized_text = torch.tensor(toknized_text).unsqueeze(0)
print("Tokenized text shape is : ",new_tokenized_text.shape)

Tokenized text is :  [1820, 1438]
Tokenized text shape is :  torch.Size([1, 2])


In [67]:
predict = generate_new_text(
    idx = new_tokenized_text,
    max_iteration = 6,
    model = dummygpt,
    context_size = GPT_Cfg['context_length']
)

print("output tensor is :",predict)

output tensor is : tensor([[ 1820,  1438,  3440, 22545, 25706, 41705, 33957,  9951]])


In [68]:
uncoded_text = tokenizer.decode(predict.squeeze(0).tolist())
print(uncoded_text)

my name loadomical tan Franch Picks suspended
