<a href="https://colab.research.google.com/github/dev-chaitanya-dewangan/CHATGPT_TRANSFORM_FROM_SCRATCH/blob/main/LLM_FROM_SCRATCH.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# DATA PREP

In [None]:
import requests
import re
url = "https://www.gutenberg.org/cache/epub/76137/pg76137.txt"
response = requests.get(url)

# Get the text content
full_text = response.text
first_index = full_text.find("PREFACE")
second_index = full_text.find("PREFACE", first_index + 1)
# Preview first 500 characters
book= full_text[second_index:]

In [None]:
preproccesed_cleaned_text = re.split(r'([,.:;?_!"()\']|--|\s)',book)
preporcessed_result=[item.strip() for item in preproccesed_cleaned_text if item.strip()]

# **EXTENED WITH UNK AND ENDOF TEXT**

In [None]:
preporcessed_sorted_result=sorted(set(preporcessed_result))
preporcessed_sorted_result.extend(["<|endoftext|>","<|unk|>"])

In [None]:

vocab={token:id for id,token in enumerate(preporcessed_sorted_result)}


# TOKENIZER


In [None]:
class Tokenizer:
  def __init__(self,vocab):
    self.encoded_vocab=vocab
    self.decoded_vocab={id:word for word,id in vocab.items()}

  def encode(self,text):
    text_cleaning=re.split(r'([,.:;?_!"()\']|--|\s)',text)
    preporccesed_text=[item.strip() for item in text_cleaning if item.strip()]
    tokenized_text = [self.encoded_vocab[i] for i in preporccesed_text]
    return tokenized_text
  def decode(self,tokens):
    text=" ".join([self.decoded_vocab[i] for i in tokens])
    text=re.sub('\s+([,.?!"()\'])',r'\1',text)
    return text

# NEW TOKENIZER EXTENDED WITH THE UNKOWN TEXT AND END OF TEXT\

In [None]:

class TokenizerV2:
  def __init__(self,vocab):
    self.encoded_vocab=vocab
    self.decoded_vocab={id:word for word,id in vocab.items()}

  def encode(self,text):
    text_cleaning=re.split(r'([,.:;?_!"()\']|--|\s)',text)
    preporccesed_text=[item.strip() for item in text_cleaning if item.strip()]
    # tokenized_text = [self.encoded_vocab[i] for i in preporccesed_text] CHANGED HERE
    tokenized_text = [i if i in self.encoded_vocab else "<|unk|>" for i in preporccesed_text]
    return tokenized_text
  def decode(self,tokens):
    text=" ".join([self.decoded_vocab[i] for i in tokens])
    text=re.sub('\s+([,.?!"()\'])',r'\1',text)
    return text


In [None]:
tokenizer=TokenizerV2(vocab)


In [None]:
import tiktoken
import torch
from torch.utils.data import Dataset,DataLoader
class GPTEncoder(Dataset):
  def __init__(self,txt,tokenizer,max_length,stride):
    self.input=[]
    self.target=[]
    tokenized_text=tokenizer.encode(txt,allowed_special={"<|endoftext|>"})
    for i in range(0,len(tokenized_text)-max_length,stride):
      input=tokenized_text[i:i+max_length]
      target=tokenized_text[i+1:1+i+max_length]
      self.input.append(torch.tensor(input))
      self.target.append(torch.tensor(target))
  def __len__(self):
    return len(self.input)
  def __getitem__(self,pos):
    return self.input[pos],self.target[pos]

def create_dataloader(txt,batch_size=4,max_length=256
                      ,stride=128,shuffle=True
                      ,drop_last=True,num_workers=0):
  tokenizer=tiktoken.get_encoding("gpt2")
  dataset = GPTEncoder(txt,tokenizer,max_length,stride)
  dataloader=DataLoader(
      dataset=dataset,
      batch_size=batch_size,
      shuffle=shuffle,
      drop_last=drop_last,
      num_workers=num_workers
  )
  return dataloader
book= full_text[second_index:]
batch_dataloader=create_dataloader(book,batch_size=1,max_length=12,stride=120,shuffle=False)
data_iter=iter(batch_dataloader)
# first=next(batch_dataloader)
print(next(data_iter))

[tensor([[   47, 31688, 11598,    13,   201,   198,   201,   198,   201,   198,
           464,  1204]]), tensor([[31688, 11598,    13,   201,   198,   201,   198,   201,   198,   464,
          1204,   286]])]


# **SIMPLIFIED SELF ATTENTION**

In [None]:
inputs = torch.tensor(
  [[0.43, 0.15, 0.89], # Your     (x^1)
   [0.55, 0.87, 0.66], # journey  (x^2)
   [0.57, 0.85, 0.64], # starts   (x^3)
   [0.22, 0.58, 0.33], # with     (x^4)
   [0.77, 0.25, 0.10], # one      (x^5)
   [0.05, 0.80, 0.55]] # step     (x^6)
)

attenS=torch.empty(inputs.shape[0])
query = inputs[1]
for i,x_i in  enumerate(inputs):
  attenS[i]=torch.dot(x_i,query)

print(attenS)



tensor([0.9544, 1.4950, 1.4754, 0.8434, 0.7070, 1.0865])


In [None]:
attenS=torch.softmax(attenS,dim=0)
print(attenS.sum())

tensor(1.0000)


In [None]:
inputs = torch.tensor(
  [[3,5,9], # Your     (x^1)
   [5,7,6], # journey  (x^2)
   [7,5,4], # starts   (x^3)
   [2,8,3], # with     (x^4)
   [7,5,0], # one      (x^5)
   [5,0,5]] # step     (x^6)
,dtype=torch.float32)

atten_scores=inputs @ inputs.T
attention_weights=torch.softmax(atten_scores,dim=-1)
context=attention_weights @ atten_scores




torch.Size([6, 6])

# **FULL CHATGPT MODEL IMPLEMENTED**

In [None]:
#CONFIG FOR GPT2 144M
GPT2_CONFIG_144M = {

	"vocab_size":50257,
	"n_heads":12
	"n_layers":12,
	"emb_dim":768,
	"context_length":1024,
	"drop_rate":0.1,
	"ff_dim" :3072
}

import torch
import torch.nn as nn
import torch.nn.functional as F
class LayerNorm(nn.module):
	def __init__(self,dim,eps=1e-5):
		super().__init__()
		self.wieghts =nn.Parameter(torch.ones(dim))
		self.bias =nn.Parameter(torch.zeros(dim))
		self.eps=eps
	def forward(self,x):
		mean   =x.mean(-1,keepdim=True)
		var    =x.var(-1,keepdim=True,unbiased=False)
		x_norm =(x-mean)/torch.sqrt(var+self.eps)
		return self.weights*x_norm+self.bias
class FeedForward(nn.Module):

	def __init__(self,cfg):
		super().__init__()
		self.net=nn.Sequential(
			nn.Linear(cfg['emb_dim'],cfg['ff_dim']),
			nn.GELU(),
			nn.Linear(cfg['ff_dim'],cfg['emb_dim']),
			nn.Dropout(cfg['drop_rate'])
			)
	def forward(self,x):
		return self.net(x)
class MultiHeadAttention(nn.Module):
	def __init__(self,cfg):
		self.n_heads=cfg['n_heads']
		self.emb_dim=cfg['emb_dim']
		self.head_dim=self.emb_dim/self.n_heads
		assert self.head_dim*self.n_heads == self.emb_dim,

		self.context_input=nn.Linear(self.emb_dim,3*self.emb_dim)
		self.output       =nn.Linear(self.emb_dim,self.emb_dim)
		self.dropout      =nn.Droptout(cfg['dropt_rate'])

		self.register_buffer("mask",torch.tril(torch.ones(cfg["context_length"],cfg["context_length"])).unsqueeze(0).unsqueeze(0))
	def forward(self,x):
		B,T,C=x.size()

		context_input=self.context_input(x)
		context_input=context_input.reshape(B,T,3,self.n_head,self.head_dim)
		context_input=context_input.permute(2,0,3,1,4)
		q,k,v=context_input[0],context_input[1],context_input[2]

		attn_scores=(q@k.transpose(-2,-1)/(self.head_dim**o.5))
		attn_probs =F.softmax(attn_scores,dim=-1)
		attn_probs =self.dropout(attn_probs)

		attn_output=attn_probs @ v
		attn_output=attn_output.transpose(1,2).reshape(B,T,C)

		output=self.output(attn_output)
		output=self.dropout(output)
		return output
class TransformerBlock(nn.Module):
	def __init__(self,cfg):
		super().__init__()
		self.ln1=LayerNorm(cfg['emb_dim'])
		self.attn=MultiHeadAttention(cfg)
		self.ln2=LayerNorm(cfg['emb_dim'])
		self.ff=FeedForward(cfg)
	def forward(self,x):
		x=x+self.attn(self.ln1(x))
		x=x+self.ff(self.ln2(x))
		return x
class GPT2Model(nn.Module):
	def __init__(self,cfg):
		super().__init__()
		self.tok_emb=nn.Embedding(cfg['vocab_size'],cfg['emb_dim'])
		self.pos_emb =nn.Embedding(cfg['context_length'],cfg['emb_dim'])
		self.drop    =nn.Dropout(cfg['drop_rate'])

		self.blocks=nn.Sequential(*[TransformerBlock(cfg) for _ in range(cfg['n_layers'])])
		self.ln_f=LayerNorm(cfg['emb_dim'])
		self.head =nn.Linear(cfg['emb_dim'])
	def forward(self,idx):
		B,T=idx.size()

		tok_emb=self.tok_emb(idx)
		pos=torch.arrange(T,device=idx.device)
		pos_emb=self.pos_emb(pos)

		x=tok_emb+pos_emb
		x=self.drop(x)
		x-self.blocks(x)
		x=self.ln_f(x)
		logits=self.head(x)


























