In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import mmap
import random

In [2]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'  # Set the device as cuda if cuda is available
print(device)

cuda


Defining hyperparameters

In [16]:
block_size = 64  # Size of the input sequence 
batch_size = 32  # Amount of inputs the model would train on in one single iteration
max_iters = 3000
lr = 3e-4  # Learning rate (defines the rate with which the model would converge)
eval_iters = 10
# eval_interval = 500
n_embd = 384  # Number of embedding tokens generated per input character
n_layer = 4   # Number of decoder layers
n_head = 4    # Number of attention layers running in parallel
dropout = 0.2  # dropout to prevent overfitting

In [4]:
chars = ""
with open('openwebtext/character_vocab.txt', 'r', encoding= 'utf-8') as f:
    text = f.read()
    chars = sorted(set(text))

vocab_size = len(chars)  # Getting all characters present in the dataset

Tokenizer

In [5]:
string_to_int = { ch:i for i,ch in enumerate(chars) }
int_to_string = { i:ch for i,ch in enumerate(chars) }
encode = lambda s : [string_to_int[c] for c in s]
decode = lambda i : ''.join([int_to_string[l] for l in i])

In [6]:
print(encode('Hello'))

[41, 70, 77, 77, 80]


Dataloader function

In [7]:


def get_random_chunk(split):
    filename = "openwebtext/train.txt" if split=='train' else "openwebtext/val.txt"
    with open(filename, 'rb') as f:
        with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
            file_size = len(mm)
            start_pos = random.randint(0, (file_size) - block_size*batch_size)

            mm.seek(start_pos)
            block = mm.read(block_size*batch_size-1)

            decoded_block = block.decode('utf-8', errors='ignore').replace('\r', '')
            data = torch.tensor(encode(decoded_block), dtype=torch.long)
            
    return data

def get_batch(split):
    data = get_random_chunk(split)  # Fetches a random chunk of data of size (batch_size*block_size) from the whole dataset
    ix = torch.randint(len(data)-block_size,(batch_size, ))  # Randomly generates starting positions of the input sequences for a whole batch
    # print(ix)
    x = torch.stack([data[i:i+block_size] for i in ix])  # For every starting position, it generates input sequence
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])   # For every starting position, it generates output sequence by offsetting one character from the input sequence
    x, y = x.to(device), y.to(device)
    return x, y

x,y = get_batch('train')
# print(x)
# print(y)

Head of attention layer

In [8]:
class Head(nn.Module):
    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))  # Lower triangular mask is used because for each timestep, we don't want the model to learn from the future timesteps
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        B,T,C = x.shape
        k = self.key(x)
        q = self.query(x)
        v = self.value(x)

        sim_score = torch.matmul(q,k.transpose(-2,-1))*k.shape[-1]**0.5
        sim_score = sim_score.masked_fill(self.tril[:T,:T] == 0, float('-inf'))
        sim_score = F.softmax(sim_score, dim=-1)
        sim_score = self.dropout(sim_score)
        attention_score = torch.matmul(sim_score,v)
        return attention_score

Attention layer

In [9]:
class MultiHeadAttention(nn.Module):
    def __init__(self, n_head, head_size):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(n_head)])
        self.proj = nn.Linear(head_size*n_head, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.dropout(self.proj(out))
        return out

Feed Forward layer

In [10]:
class FeedForward(nn.Module):
    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4*n_embd),
            nn.ReLU(),
            nn.Linear(4*n_embd, n_embd),
            nn.Dropout(dropout),
        )

    def forward(self,x):
        x = self.net(x)
        return x

A decoder block

In [11]:
class Block(nn.Module):
    def __init__(self, n_embd, n_head):
        super().__init__()
        head_size = n_embd // n_head
        self.sa = MultiHeadAttention(n_head, head_size)
        self.ffwd = FeedForward(n_embd)
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)

    def forward(self,x):
        y = self.sa(x)
        x = self.ln1(x+y)
        y = self.ffwd(x)
        x = self.ln2(x+y)
        return x
        

The entire decoder-only transformer model with multi-head attention 

In [12]:
class GPTLanguageModel(nn.Module):
    def __init__(self, vocab_size):
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        self.position_embedding_table = nn.Embedding(block_size, n_embd)
        self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head) for _ in range(n_layer)])
        self.ln_f = nn.LayerNorm(n_embd)
        self.lm_head = nn.Linear(n_embd, vocab_size)

        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        if isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

    
    def forward(self, index, targets=None):
        
        emb = self.token_embedding_table(index)
        B, T, C = emb.shape
        pos = self.position_embedding_table(torch.arange(T, device=device))
        x = emb + pos
        x = self.blocks(x)
        x = self.ln_f(x)
        logits = self.lm_head(x)
        B, T, C = logits.shape
        
        if targets==None:
            loss = None
        else:
            
            logits = logits.view(B*T,C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss
        

    def generate(self,index, max_new_tokens):
        for _ in range(max_new_tokens):
            
            
            logits, loss = self.forward(index)
            logits = logits[:,-1,:]
            probs = F.softmax(logits, dim=-1)
            index_next = torch.multinomial(probs, num_samples=1)
            index = torch.cat((index,index_next), dim=1)
            # print(index.shape)
        return index


Evaluation function

In [13]:
@torch.no_grad()
def estimate_loss():
    out = {}
    model.eval()
    for split in ['train','val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = get_batch(split)
            logits,loss = model(X,Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

Training function and model initialization

In [14]:
model = GPTLanguageModel(vocab_size).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=lr)
cummulative_loss = 0

for iter in range(max_iters):
    if iter%eval_iters==0:
        # losses = estimate_loss()
        # print(f"iter: {iter}: train_loss : {losses['train']}, eval_loss : {losses['val']}")
        print(f"iter: {iter}: train_loss : {cummulative_loss/(iter+1)}")
    if iter%100==0:
        checkpoint = {
            'epoch': iter,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'train_loss': cummulative_loss/(iter+1),  # Example: saving the last recorded loss
        }
        torch.save(checkpoint, 'model_checkpoint.pth')
    
    x,y = get_batch('train')
    logits, loss = model.forward(x,y)
    cummulative_loss = cummulative_loss + loss.item()
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()
print(loss.item())

iter: 0: train_loss : 0.0
iter: 10: train_loss : 7.7714479619806465
iter: 20: train_loss : 7.076375688825335
iter: 30: train_loss : 6.210507539010817
iter: 40: train_loss : 5.504483473010179
iter: 50: train_loss : 5.010800908593571
iter: 60: train_loss : 4.653175076500315
iter: 70: train_loss : 4.395377468055402
iter: 80: train_loss : 4.202511145744794
iter: 90: train_loss : 4.040774091259464
iter: 100: train_loss : 3.916393775751095
iter: 110: train_loss : 3.8227935610590755
iter: 120: train_loss : 3.7344073382290928
iter: 130: train_loss : 3.6543749452547263
iter: 140: train_loss : 3.584963800213861
iter: 150: train_loss : 3.5298744605866488
iter: 160: train_loss : 3.477072164879082
iter: 170: train_loss : 3.4284398750952114
iter: 180: train_loss : 3.3864025345164768
iter: 190: train_loss : 3.3472317188822163
iter: 200: train_loss : 3.3095176718128263
iter: 210: train_loss : 3.276161826617345
iter: 220: train_loss : 3.2457380747902986
iter: 230: train_loss : 3.219999326771988
iter: 2

Initializing and loading the model from model_checkpoint.pth file. (Can be used to load a previously trained model)

In [14]:
model = GPTLanguageModel(vocab_size).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=lr)
checkpoint = torch.load('model_checkpoint.pth')
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
epoch = checkpoint['epoch']
loss = checkpoint['train_loss']
print(loss)

2.4997562080858002


Generating some sample text

In [15]:
context = torch.zeros((1,1), dtype=torch.long, device = device)
# print(context.shape)
string = model.generate(context, max_new_tokens = 60)[0].tolist()
generated_chars = decode(string)
print(generated_chars)


Or7)0%s, o::aul- 7584170966 wal LSAA= gTLnzዢ MEldyre mar let
