In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset,DataLoader
import re

In [None]:
file=input("Enter the file name: ")

Enter the file name: the-verdict.txt


In [None]:
with open (file, 'r') as f:
  raw_text=f.read()

print(raw_text[:99])

I HAD always thought Jack Gisburn rather a cheap genius--though a good fellow enough--so it was no 


In [None]:
class Tokenizer_v1(nn.Module):
  def __init__(self,tokenizer):
    super().__init__()
    self.tokenizer=tokenizer


  def encode(self,text):
    encoded_ids=self.tokenizer.encode(text,allowed_special={'<|endoftext|>'})
    encoded_tokens=torch.tensor(encoded_ids).unsqueeze(0)
    return encoded_tokens

  def decode(self,ids):
    flat=ids.squeeze(0)
    decoded_txt=self.tokenizer.decode(flat.tolist())
    return decoded_txt


In [None]:
!pip install tiktoken

Collecting tiktoken
  Downloading tiktoken-0.9.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.7 kB)
Downloading tiktoken-0.9.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.2 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.2 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m49.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: tiktoken
Successfully installed tiktoken-0.9.0


In [None]:
import tiktoken

tokenizer=tiktoken.get_encoding('gpt2')

In [None]:
class Dataset_v1(Dataset):

  def __init__(self,txt,tokenizer,max_length,stride):
    super().__init__()
    self.input=[]
    self.target=[]


    token_ids= tokenizer.encode(txt,allowed_special={'<|endoftext|>'})
    for i in range(0,len(token_ids)-max_length,stride):
      self.input_ids=token_ids[i:i+max_length]
      self.target_ids=token_ids[i+1:i+max_length+1]

      self.input.append(torch.tensor(self.input_ids))
      self.target.append(torch.tensor(self.target_ids))


  def __len__(self):
    return len(self.input)

  def __getitem__(self,idx):
    return self.input[idx],self.target[idx]

In [None]:
class DataLoader_v1:

  def __init__(self,txt,max_length=256,stride=128,batch_size=4,
               drop_last=True,shuffle=True,num_workers=0):

    tokenizer=tiktoken.get_encoding('gpt2')
    dataset=Dataset_v1(txt,tokenizer,max_length,stride)

    dataLoader=DataLoader(
        dataset=dataset,
        batch_size=batch_size,
        drop_last=drop_last,
        shuffle=shuffle,
        num_workers=num_workers
    )

    return dataLoader


In [None]:
class Multi_head_attention_v1(nn.Module):

  def __init__(self,d_in,d_out,num_heads,drop_out,context_length,qvk_bias=False):
    super().__init__()
    assert(d_out%num_heads==0), \
          "d_out must be divisible by num_heads"

    self.d_out=d_out
    self.num_heads=num_heads
    self.head_dim=d_out//num_heads
    self.w_query=nn.Linear(d_in,d_out,qvk_bias=qvk_bias)
    self.w_key=nn.Linear(d_in,d_out,qvk_bias=qvk_bias)
    self.w_value=nn.Linear(d_in,d_out,qvk_bias=qvk_bias)
    self.out_proj=nn.Linear(d_out,d_out)
    self.dropout=nn.Dropout(drop_out)
    self.register_buffer('mask', torch.triu(torch.ones(context_length,context_length), diagonal=1))


  def forward(self,x):
    b,num_tokens,d_in=x.shape
    query=self.w_query(x)
    key=self.w_key(x)
    value=self.w_value(x)

    query=query.view(b,num_tokens,self.num_heads,self.head_dim)
    key=key.view(b,num_tokens,self.num_heads,self.head_dim)
    value=value.view(b,num_tokens,self.num_heads,self.head_dim)

    query=query.transpose(1,2)
    key=key.transpose(1,2)
    value=value.transpose(1,2)

    attn_scores=query @ key.transpose(2,3)
    attn_scores.masked_fill_(self.mask.bool()[:num_tokens,:num_tokens] , -torch.inf)
    attn_weights= torch.softmax(attn_scores/key.shape[-1]**0.5, dim=-1)
    attn_weights=self.dropout(attn_weights)

    context_vector= (attn_weights @ value).transpose(1,2)
    context_vector=context_vector.contiguous().view(b,num_tokens,self.d_out)
    context_vector= self.out_proj(context_vector)

    return context_vector


In [None]:
GPT_CONFIG_124M = {
    'context_length': 1024,
    'vocab_size': 50257,
    'emb_dim': 768,
    'n_heads': 12, # attention heads
    'n_layers': 12, # num of transformer layers
    'drop_rate': 0.1,
    'qvk_bias': False
}

In [None]:
class GELU_v1(nn.Module):

  def __init__(self):
    super().__init__()

  def forward(self,x):
    return  0.5*x* ( 1+ torch.tanh([torch.sqrt(torch.tensor(2/torch.pi))
                                    * x+ 0.044715 * x**3]))

class Feed_forward_v1(nn.Module):

  def __init__(self,cfg):
    super().__init__()
    self.layers=nn.Sequential([
        nn.Linear(cfg['emb_dim'],cfg['emb_dim']*4),
        GELU_v1(),
        nn.Linear(cfg['emb_dim']*4,cfg['emb_dim'])
    ])


  def forward(self,x):
    return self.layers(x)




In [None]:
class Layer_Norm(nn.Module):

  def __init__(self,emb_dim):
    super().__init__()
    self.eps=1e-5
    self.scale=nn.Parameter(torch.ones(emb_dim))
    self.shift=nn.Parameter(torch.zeros(emb_dim))

  def forward(self,x):
    mean=x.mean(dim=-1,keep_dim=True)
    var=x.var(dim=-1,keep_dim=True,unbiased=False)
    norm_x=x-mean/torch.sqrt(var +self.eps)

    return self.scale * norm_x +self.shift

In [None]:
class Transformer_Block(nn.Module):

  def __init__(self,cfg):
    super().__init__()
    self.att=Multi_head_attention_v1(cfg['emb_dim'],cfg['emb_dim'],
                                     cfg['num_heads'],cfg['drop_rate'],
                                     cfg['context_length'],qvk_bias=cfg['qvk_bias'])
    self.norm1_layer=Layer_Norm(cfg['emb_dim'])
    self.norm2_layer=Layer_Norm(cfg['emb_dim'])
    self.ff=Feed_forward_v1(cfg)
    self.dropout=nn.Dropout(cfg['drop_rate'])



  def forward(self,x):
    shortcut=x
    x=self.norm1_layer(x)
    x=self.att(x)
    x=self.dropout(x)
    x=x+shortcut

    shortcut=x
    x=self.norm2_layer(x)
    x=self.ff(x)
    x=x+shortcut

    return x




In [None]:

class GPT_model_v1(nn.Module):


  def __init__(self,cfg):
    super().__init__()
    self.token_embedding=nn.Linear(cfg['vocab_size'],cfg['emb_dim'])
    self.pos_embedding=nn.Linear(cfg['context_length'],cfg['emb_dim'])
    self.trf_blocks=nn.Sequential(
        *[Transformer_Block(cfg) for _ in range(cfg['n_layers'])]
    )
    self.drop_emb = nn.Dropout(cfg["drop_rate"])
    self.final_norm=Layer_Norm(cfg['emb_dim'])
    self.out_head=nn.Linear(cfg['emb_dim'],cfg['vocab_size'])

  def forward(self,token_ids):
    batch_size,seq_len=token_ids.shape
    tok_emb=self.token_embedding(token_ids)
    pos_emb=self.pos_embedding(torch.arange(seq_len , device=token_ids.device))
    input_emb=tok_emb+pos_emb
    input_emb=self.drop_emb(input_emb)

    x=self.trf_blocks(input_emb)
    x=self.final_norm(x)
    logits=self.out_head(x)

    return logits


In [None]:
def generate_text_simple(model, idx,context_size,max_new_tokens):

  #idx : (batches, num_tokens)



  for _ in range(max_new_tokens):

    idx_cond= idx[:,-context_size:]
    with  model.no_grad():
      logits= model(idx)

    logits=logits[:, -1,:]
    probas= torch.softmax(logits, dim=-1)
    idx_new=torch.argmax(probas,dim=-1, keepdim=True)

    idx=torch.cat((idx , idx_new),dim=1)

  return idx

In [None]:
def cross_entropy_loss_Scratch(probas,targets):

  #targets: (batches, num_tokens)
  #probas: (batches, num_tokens, vocab_size)
  b,n_tokens,vocab_size=probas.shape
  total_prob=[]

  for i in range(b):
    target_probs= probas[i, [torch.arange(n_tokens)], targets[i]]

    total_prob=torch.cat((total_prob,target_probs))

  log_probs=torch.log(total_prob)
  mean_prob=torch.mean(log_probs)
  neg_log_likelihood= mean_prob*-1


  return neg_log_likelihood



In [None]:
def perplexity(loss):
  return torch.exp(loss)

In [None]:
def calc_loss_batch(input_batch,target_batch,model,device):
  input_batch,target_batch=input_batch.to(device), target_batch.to(device)
  logits=model(input_batch)
  loss=torch.nn.functional.cross_entropy(logits.flatten(0,1), target_batch.flatten())
  return loss


def calc_loss_loader(data_loader, model, device, num_batches=None):
  total_loss=0

  if len(data_loader)==0:
    return float('nan')
  elif num_batches==None:
    num_batches= len(data_loader)
  else:
    num_batches=min(num_batches, len(data_loader))

  for i,(x_batch, y_batch) in enumerate(data_loader):
    if i<num_batches:
      loss=calc_loss_batch(x_batch,y_batch,model,device)
      total_loss+=1
    else:
      break

  return total_loss/num_batches





Training loop for LLM:

In [None]:
def evaluate_model(model, train_loader, val_loader, device, eval_iter):
  model.eval()

  with torch.no_grad():
    train_loss=calc_loss_loader(train_loader, model, device, num_batches=eval_iter)
    test_loss=calc_loss_loader(val_loader, model, device, num_batches=eval_iter)

  model.train()

  return train_loss, test_loss


In [None]:
def model_train(model, train_loader, val_loader, optimizer, device, num_epochs,
                       eval_freq, eval_iter, start_context, tokenizer):
  train_losses, val_losses, track_tokens_seen = [], [], []
  tokens_seen, global_step = 0, -1

  for i in range(num_epochs):
    model.train()

    for input_batch,target_batch in train_loader:
      optimizer.zero_grad()
      loss=calc_loss_batch(input_batch, target_batch, model, device)
      loss.backward()
      optimizer.step()
      tokens_seen+=input_batch.numel()
      global_step += 1

      # Optional evaluation step
      if global_step % eval_freq == 0:
                train_loss, val_loss = evaluate_model(
                    model, train_loader, val_loader, device, eval_iter)
                train_losses.append(train_loss)
                val_losses.append(val_loss)
                track_tokens_seen.append(tokens_seen)
                print(f"Ep {i+1} (Step {global_step:06d}): "
                      f"Train loss {train_loss:.3f}, Val loss {val_loss:.3f}")

        # Print a sample text after each epoch
      generate_and_print_sample(
            model, tokenizer, device, start_context
        )

    return train_losses, val_losses, track_tokens_seen

In [None]:
def generate_and_print_sample(model, tokenizer, device, start_context):
  pass
  # just encoding and decoding occurs here



DECODING STRATEGY 1: TEMPERATURE SCALING

In [None]:
def softmax_with_temperature(logits, temperature):
  temp_scaled=logits/temperature
  probas= torch.softmax(temp_scaled, dim=0)
  return probas



DECODING STRATEGY 2: Top-k sampling

In [None]:
next_token_logits = torch.tensor(
[4.51, 0.89, -1.90, 6.75, 1.63, -1.62, -1.89, 6.28, 1.79]
)
topk=3
top_logits,top_pos=torch.topk(next_token_logits, topk)
print(top_logits)
print(top_pos)

cond_tokens= torch.where(
    condition=next_token_logits<top_logits[-1],
    input=torch.tensor(float("-inf")),
    other=next_token_logits)

print(cond_tokens)

topk_probas=torch.softmax(cond_tokens,dim=-1)
print(topk_probas)



tensor([6.7500, 6.2800, 4.5100])
tensor([3, 7, 0])
tensor([4.5100,   -inf,   -inf, 6.7500,   -inf,   -inf,   -inf, 6.2800,   -inf])
tensor([0.0615, 0.0000, 0.0000, 0.5775, 0.0000, 0.0000, 0.0000, 0.3610, 0.0000])
