In [1]:
with open("verdictbook.txt","r",encoding="utf-8") as f:
    booktext=f.read()

In [3]:
from torch.utils.data import Dataset, DataLoader
import tiktoken
import torch

class DatasetclassV1(Dataset):
    def __init__(self,txt,encoder,strides,maxlen):
        self.inputlist=[]
        self.targetlist=[]

        tokenized = encoder.encode(txt, allowed_special={"<|endoftext|>"})
    
        for i in range(0,len(tokenized)-maxlen,strides):
            input_chunk=tokenized[i:i+maxlen]
            target_chunk=tokenized[i+1:i+maxlen+1]
            self.inputlist.append(torch.tensor(input_chunk))
            self.targetlist.append(torch.tensor(target_chunk))

    def __len__(self):
        return len(self.targetlist)

    def __getitem__(self, index):
        return self.inputlist[index],self.targetlist[index]
    



def create_dataloader_v1 (txt, batch_size=4, max_length=256,stride=128, shuffle=True, drop_last=True,num_workers=0):
    tokenizer = tiktoken.get_encoding("gpt2")
    datasetobj=DatasetclassV1(txt,tokenizer,stride,max_length)
    dataloader = DataLoader(
        datasetobj,
        batch_size=batch_size,
        shuffle=shuffle,
        drop_last=drop_last,
        num_workers=num_workers
    )

    return dataloader



In [4]:
GPT_CONFIG_124M = {
    "vocab_size": 50257,   # Vocabulary size
    "context_length": 256, # Shortened context length (orig: 1024)
    "emb_dim": 768,        # Embedding dimension
    "n_heads": 12,         # Number of attention heads
    "n_layers": 12,        # Number of layers
    "drop_rate": 0.1,      # Dropout rate
    "qkv_bias": False      # Query-key-value bias
}


In [5]:
train_ratio = 0.90
split_idx = int(train_ratio * len(booktext))
train_data = booktext[:split_idx]
val_data = booktext[split_idx:]


torch.manual_seed(123)

train_loader = create_dataloader_v1(
    train_data,
    batch_size=2,
    max_length=GPT_CONFIG_124M["context_length"],
    stride=GPT_CONFIG_124M["context_length"],
    drop_last=True,
    shuffle=True,
    num_workers=0
)

val_loader = create_dataloader_v1(
    val_data,
    batch_size=2,
    max_length=GPT_CONFIG_124M["context_length"],
    stride=GPT_CONFIG_124M["context_length"],
    drop_last=False,
    shuffle=False,
    num_workers=0
)

In [7]:
import torch
import torch.nn as nn

class multiheadv2(nn.Module):
    def __init__(self,d_in,d_out,context_length,dropout,attention_head,boolbias):
        super().__init__()
        self.head_dim=d_out//attention_head
        self.d_out=d_out
        self.attention_head=attention_head

        self.W_query = nn.Linear(d_in, d_out, bias=boolbias)
        self.W_key = nn.Linear(d_in, d_out, bias=boolbias)
        self.W_value = nn.Linear(d_in, d_out, bias=boolbias)

        self.out_proj = nn.Linear(d_out, d_out)  
        self.dropout = nn.Dropout(dropout)
        self.register_buffer('mask', torch.triu(torch.ones(context_length, context_length), diagonal=1))

    def forward(self,x):
        b,num_token,d_out=x.shape

        keys = self.W_key(x) 
        queries = self.W_query(x)
        values = self.W_value(x)

        keys=keys.view(b,num_token,self.attention_head,self.head_dim)
        queries=queries.view(b,num_token,self.attention_head,self.head_dim)
        values=values.view(b,num_token,self.attention_head,self.head_dim)

        keys = keys.transpose(1, 2)
        queries = queries.transpose(1, 2)
        values = values.transpose(1, 2)

        attn_score=queries @ keys.transpose(2,3)

        mask_bool = self.mask.bool()[:num_token, :num_token]
        attn_score.masked_fill_(mask_bool, -torch.inf)
        
        attn_weights = torch.softmax(attn_score / keys.shape[-1]**0.5, dim=-1)
        attn_weights = self.dropout(attn_weights)


        context_vec = (attn_weights @ values).transpose(1, 2) 
        
        context_vec = context_vec.contiguous().view(b, num_token, self.d_out)
        context_vec = self.out_proj(context_vec) # optional projection

        return context_vec
            


In [8]:
class LayerNorm(nn.Module):
    def __init__(self,emb_dim):
        super().__init__()
        self.eps=1e-5
        self.scale_params=nn.Parameter(torch.ones(emb_dim))
        self.shift_params=nn.Parameter(torch.zeros(emb_dim))
    
    def forward(self,x):
        mean=x.mean(dim=-1,keepdim=True)
        var=x.var(dim=-1,keepdim=True,unbiased=False)
        norm=(x-mean)/torch.sqrt(var+self.eps)
        return norm*self.scale_params+ self.shift_params
    
        
class GELU(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, x):
        return 0.5 * x * (1 + torch.tanh(torch.sqrt(torch.tensor(2.0 / torch.pi)) *(x + 0.044715 * torch.pow(x, 3))
        ))
    

class feedforward(nn.Module):
    def __init__(self,config):
        super().__init__()
        self.layers=nn.Sequential(
            nn.Linear(config['emb_dim'],config['emb_dim']*4),
            GELU(),
            nn.Linear(config['emb_dim']*4,config['emb_dim']),

        )
    
    def forward(self,x):
        return self.layers(x)
    


In [9]:
class TransformerBlock(nn.Module):
    def __init__ (self,config):
        super().__init__()
        self.attn=multiheadv2(d_in=config['emb_dim'],d_out=config['emb_dim'],context_length=config['context_length'],dropout=config['drop_rate'],attention_head=config['n_heads'],boolbias=config['qkv_bias'])
        self.Layernorm1=LayerNorm(emb_dim=config['emb_dim'])
        self.Layernorm2=LayerNorm(emb_dim=config['emb_dim'])
        self.feedforw=feedforward(config=config)
        self.dropout=nn.Dropout(config['drop_rate'])
    
    def forward(self,x):
        ## attnetion block
        skip=x
        x=self.Layernorm1(x)
        x=self.attn(x)
        x=self.dropout(x)
        x=x+skip

        ## feed forward nn block
        skip=x
        x=self.Layernorm2(x)
        x=self.feedforw(x)
        x=self.dropout(x)
        x=x+skip

        return x



In [10]:
class GPT_2(nn.Module):
    def __init__ (self,cfg):
        super().__init__()
        self.token_emb=nn.Embedding(cfg['vocab_size'],cfg["emb_dim"])
        self.pos_emb=nn.Embedding(cfg['context_length'],cfg["emb_dim"])
        self.drop_emb = nn.Dropout(cfg["drop_rate"])
        self.trf_blocks = nn.Sequential(
            *[TransformerBlock(cfg) for _ in range(cfg["n_layers"])])
        
        self.final_norm = LayerNorm(cfg["emb_dim"])
        self.out_head = nn.Linear(
            cfg["emb_dim"], cfg["vocab_size"], bias=False
        )
    
    def forward(self,inputidx):
        batch_size,seq=inputidx.shape
        tokens=self.token_emb(inputidx)
        pos_embeds = self.pos_emb(torch.arange(seq, device=inputidx.device))
        x=tokens+pos_embeds
        x=self.drop_emb(x)
        x = self.trf_blocks(x)
        x = self.final_norm(x)
        logits = self.out_head(x)
        return logits
    



In [11]:
torch.manual_seed(123)
model = GPT_2(GPT_CONFIG_124M)
model.eval();


In [12]:
def calc_loss_batch(input_batch, target_batch, model, device):
    input_batch, target_batch = input_batch.to(device), target_batch.to(device)
    logits = model(input_batch)
    loss = torch.nn.functional.cross_entropy(logits.flatten(0, 1), target_batch.flatten())
    return loss


def calc_loss_loader(data_loader, model, device, num_batches=None):
    total_loss = 0.
    if len(data_loader) == 0:
        return float("nan")
    elif num_batches is None:
        num_batches = len(data_loader)
    else:
        num_batches = min(num_batches, len(data_loader))
    for i, (input_batch, target_batch) in enumerate(data_loader):
        if i < num_batches:
            loss = calc_loss_batch(input_batch, target_batch, model, device)
            total_loss += loss.item()
        else:
            break
    return total_loss / num_batches


In [13]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Note:
# Uncommenting the following lines will allow the code to run on Apple Silicon chips, if applicable,
# which is approximately 2x faster than on an Apple CPU (as measured on an M3 MacBook Air).
# However, the resulting loss values may be slightly different.

if torch.cuda.is_available():
   device = torch.device("cuda")
elif torch.backends.mps.is_available():
   device = torch.device("mps")
else:
   device = torch.device("cpu")

print(f"Using {device} device.")


model.to(device) 


torch.manual_seed(123) 

with torch.no_grad(): 
    train_loss = calc_loss_loader(train_loader, model, device)
    val_loss = calc_loss_loader(val_loader, model, device)

print("Training loss:", train_loss)
print("Validation loss:", val_loss)

Using mps device.
Training loss: 10.98758316040039
Validation loss: 10.98110580444336
