In [118]:
import torch
import torch.nn as nn

In [119]:
class MaskedMultiheadAttention(nn.Module):
    def __init__(self, d_in, d_out, context_length, dropout, num_heads, qkv_bias = False):
        super().__init__()
        assert(d_out%num_heads==0), "d_out must devide into d_in"
        self.head_dim = d_out//num_heads
        self.W_q      = nn.Linear(d_in, d_out, bias = qkv_bias)
        self.W_k      = nn.Linear(d_in, d_out, bias = qkv_bias)
        self.W_v      = nn.Linear(d_in, d_out, bias = qkv_bias)
        self.dropout  = nn.Dropout(p=dropout)
        self.projection = nn.Linear(d_out, d_out)
        self.num_heads= num_heads
        self.d_out    = d_out
        self.register_buffer("mask", torch.triu(torch.ones(context_length, context_length), diagonal=1))
    def forward(self, x):
        batch_size, num_tokens, d_in = x.shape
        queries = self.W_q(x)
        keys    = self.W_k(x)
        values  = self.W_v(x)
        queries = queries.view(batch_size, num_tokens, self.num_heads,self.head_dim)
        keys    = keys   .view(batch_size, num_tokens, self.num_heads,self.head_dim)
        values  = values .view(batch_size, num_tokens, self.num_heads,self.head_dim)
        queries = queries.transpose(1,2)
        keys    = keys.transpose(1,2)
        values  = values.transpose(1,2)
        attention_scores = queries@keys.transpose(2,3)
        mask_bool        = self.mask.bool()[:num_tokens,:num_tokens]
        attention_scores.masked_fill_(mask_bool, -torch.inf)
        attenton_weights = torch.softmax(attention_scores/keys.shape[-1]**0.5, dim=-1)
        attenton_weights = self.dropout(attenton_weights)
        context_vec      = (attenton_weights@values).transpose(1,2)
        context_vec      = context_vec.contiguous().view(
            batch_size, num_tokens, self.d_out
        )
        context_vec      = self.projection(context_vec)
        return context_vec


In [120]:
class LayerNorm(nn.Module):
    def __init__(self, emb_dim) -> None:
        super().__init__()
        self.esp = 1e-5
        self.scale = nn.Parameter(torch.ones(emb_dim))
        self.shift = nn.Parameter(torch.zeros(emb_dim))
    def forward(self,x:torch.Tensor):
        mean = x.mean(dim=-1, keepdim=True)
        var  = x.var(dim=-1,keepdim=True)
        normalization = (x-mean)/torch.sqrt(var+self.esp)
        return self.scale*normalization+self.shift

In [121]:
class FeedForward(nn.Module):
    def __init__(self, cfg) -> None:
        super().__init__()
        self.Layer = nn.Sequential(
         nn.Linear(cfg["emb_dim"],4*cfg["emb_dim"]),
         nn.GELU(),
         nn.Linear(4*cfg["emb_dim"],cfg["emb_dim"])
    
        )
    def forward(self,x):
        return self.Layer(x)

In [122]:
class TransFormerBlock(nn.Module):
    def __init__(self,cfg):
        super().__init__()
        self.LayerNorm   = LayerNorm(cfg["emb_dim"])
        self.Muitiheads  = MaskedMultiheadAttention(d_in=cfg["emb_dim"], d_out = cfg["emb_dim"], context_length=cfg["context_length"], 
                                                    dropout=cfg["drop_rate"], num_heads=cfg["n_heads"])
        self.FeedForward = FeedForward(cfg)
        self.Dropout     = nn.Dropout(cfg["drop_rate"])
    def forward(self,x):
        shortcut = x
        x        = self.LayerNorm(x)
        x        = self.Muitiheads(x)
        x        = self.Dropout(x)
        x        = x + shortcut

        shortcut = x
        x        = self.LayerNorm(x)
        x        = self.FeedForward(x)
        x        = self.Dropout(x)
        x        = x+ shortcut
        return x

In [123]:
GPT_CONFIG_124M = {
 "vocab_size": 50257, # Vocabulary size
 "context_length": 1024, # Context length
 "emb_dim": 768, # Embedding dimension
 "n_heads": 12, # Number of attention heads
 "n_layers": 12, # Number of layers
 "drop_rate": 0.1, # Dropout rate
 "qkv_bias": False # Query-Key-Value bias
}

In [124]:
class GPTModel(nn.Module):
    def __init__(self, cfg) -> None:
        super().__init__()
        self.tok_emb = nn.Embedding(cfg["vocab_size"]    , cfg["emb_dim"])
        self.pos_emb = nn.Embedding(cfg["context_length"], cfg["emb_dim"])
        self.drop_emb = nn.Dropout (cfg["drop_rate"])
        self.trf_blocks = nn.Sequential(
            *[TransFormerBlock(cfg) for _ in range(cfg["n_layers"])]
        )
        self.LayerNorm = LayerNorm(cfg["emb_dim"])
        self.out_head  = nn.Linear(cfg["emb_dim"], cfg["vocab_size"])
        #self.dropout   = nn.Dropout(cfg["drop_rate"])
    def forward(self,x):
        batch_size, sequen = x.shape
        tok_emb            = self.tok_emb(x)
        pos_emb            = self.pos_emb(torch.arange(sequen))
        input              = tok_emb + pos_emb
        input              = self.drop_emb(input)
        input              = self.trf_blocks(input)
        output             = self.LayerNorm(input)
        output             = self.out_head(output)
        return output

In [125]:
import tiktoken

tokenizer = tiktoken.get_encoding("gpt2")
batch = []
txt1 = "Every effort moves you"
txt2 = "Every day hold a"
batch.append(torch.tensor(tokenizer.encode(txt1)))
batch.append(torch.tensor(tokenizer.encode(txt2)))
batch = torch.stack(batch, dim=0)
print(batch)

tensor([[6109, 3626, 6100,  345],
        [6109, 1110, 1745,  257]])


In [126]:
torch.manual_seed(123)
model = GPTModel(GPT_CONFIG_124M)
model.eval()

GPTModel(
  (tok_emb): Embedding(50257, 768)
  (pos_emb): Embedding(1024, 768)
  (drop_emb): Dropout(p=0.1, inplace=False)
  (trf_blocks): Sequential(
    (0): TransFormerBlock(
      (LayerNorm): LayerNorm()
      (Muitiheads): MaskedMultiheadAttention(
        (W_q): Linear(in_features=768, out_features=768, bias=False)
        (W_k): Linear(in_features=768, out_features=768, bias=False)
        (W_v): Linear(in_features=768, out_features=768, bias=False)
        (dropout): Dropout(p=0.1, inplace=False)
        (projection): Linear(in_features=768, out_features=768, bias=True)
      )
      (FeedForward): FeedForward(
        (Layer): Sequential(
          (0): Linear(in_features=768, out_features=3072, bias=True)
          (1): GELU(approximate='none')
          (2): Linear(in_features=3072, out_features=768, bias=True)
        )
      )
      (Dropout): Dropout(p=0.1, inplace=False)
    )
    (1): TransFormerBlock(
      (LayerNorm): LayerNorm()
      (Muitiheads): MaskedMultiheadA

In [127]:
def generate_text_simple(model, idx, max_new_tokens, context_size):
    for _ in range(max_new_tokens):
        idx_cond = idx[:, -context_size:]
        with torch.no_grad():
            logits = model(idx_cond)
            logits = logits[:,-1,:]
            probas = torch.softmax (logits , dim=-1)
            idx_next = torch.argmax(probas , dim=-1, keepdim=True)
            idx = torch.cat((idx, idx_next), dim=1)
    return idx

In [128]:
import tiktoken
def text_to_token_ids(text, tokenizer):
    encoded = tokenizer.encode(text, allowed_special={'<|endoftext|>'})
    encoded_tensor = torch.tensor(encoded).unsqueeze(0)
    return encoded_tensor
def tokenid_to_text(token_ids, tokenizer):
    flat = token_ids.squeeze(0)
    return tokenizer.decode(flat.tolist())
start_context = "Every effort moves you"
tokenizer = tiktoken.get_encoding("gpt2")
token_ids = generate_text_simple(
 model=model,
 idx=text_to_token_ids(start_context, tokenizer),
 max_new_tokens=10,
 context_size=GPT_CONFIG_124M["context_length"]
)
print("Output text:\n", tokenid_to_text(token_ids, tokenizer))

Output text:
 Every effort moves you Aeiman ByeswickattributeometerSin 19elve Chal


#### Calculating the text generation loss