<a href="https://colab.research.google.com/github/developerabhi14/ML-Notebooks/blob/main/GPT_using_Rotatory_Positional_Encoding.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
import torch
import torch.nn as nn
from torch.nn import functional as F


# hyperparameters
batch_size=64
block_size=256
max_iters=5000
eval_interval=500
learning_rate=3e-4
device='cuda' if torch.cuda.is_available else 'cpu'
# device='cpu'
eval_iters=200
n_embed=384
n_head=6
n_layer=6
dropout=0.2
# =====================

torch.manual_seed(1337)

! wget https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt
with open("input.txt", "r", encoding= "utf-8") as f:
  text=f.read()

# here are all the inique characters that occur inthe text
chars=sorted(list(set(text)))
vocab_size=len(chars)

# Create a mappping from characters to integers
stoi={ ch:i for i, ch in enumerate(chars) }
itos={ i:ch for i, ch in enumerate(chars) }
encode=lambda s: [stoi[c] for c in s] #encoder takes a string, outputs a list of integers
decode=lambda l: ''.join([itos[i] for i in l]) # decoder: takes a list of integers, outputs a string

# Train and test set splits
data=torch.tensor(encode(text), dtype=torch.long)
n=int(0.9*len(data)) # first 90% will be train, rest will be val
train_data=data[:n]
val_data=data[n:]

# data loading
def get_batch(split):
  # generate a small batch of ata of inputs x and targets y
  data=train_data if split=="train" else val_data
  ix=torch.randint(len(data)-block_size, (batch_size,))
  x=torch.stack([data[i:i+block_size] for i in ix])
  y=torch.stack([data[i+1:i+block_size+1] for i in ix])
  x,y=x.to(device), y.to(device)
  return x,y

@torch.no_grad()
def estimate_loss():
  out={}
  model.eval()
  for split in ['train', 'val']:
    losses=torch.zeros(eval_iters)
    for k in range(eval_iters):
      # sample a batch of data
      X,Y=get_batch(split)
      logits, loss=model(X,Y)
      losses[k]=loss.item()
    out[split]=losses.mean()
  model.train()
  return out

class RotaryEmbedding(nn.Module):
    def __init__(self, dim, max_seq_len=2048):
        super().__init__()
        self.dim = dim
        self.max_seq_len = max_seq_len
        inv_freq = 1.0 / (10000 ** (torch.arange(0, dim, 2).float() / dim)).to(device)
        self.register_buffer("inv_freq", inv_freq)

    def forward(self, positions):
        # positions: [seq_len] or [batch_size, seq_len]
        positions=positions.to(device)
        if len(positions.shape) == 1:
            positions = positions.unsqueeze(0)  # [1, seq_len]
        freqs = torch.einsum("b n, d -> b n d", positions.float(), self.inv_freq)
        emb = torch.cat((freqs, freqs), dim=-1)
        return emb

def rotate_half(x):
    x1 = x[..., : x.shape[-1] // 2]
    x2 = x[..., x.shape[-1] // 2 :]
    return torch.cat((-x2, x1), dim=-1)

def apply_rotary_pos_emb(q, k, positions):
    # q, k: [batch_size, seq_len, head_dim]
    emb = RotaryEmbedding(q.shape[-1])(positions.to(q.device))
    cos, sin = emb.cos(), emb.sin()
    q_rotated = (q * cos) + (rotate_half(q) * sin)
    k_rotated = (k * cos) + (rotate_half(k) * sin)
    return q_rotated, k_rotated


class Head(nn.Module):
    """ one head of self-attention """
    def __init__(self,head_size):
      super().__init__()
      self.key=nn.Linear(n_embed, head_size, bias=False)
      self.query=nn.Linear(n_embed, head_size, bias=False)
      self.value=nn.Linear(n_embed, head_size, bias=False)
      self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))

      self.dropout=nn.Dropout(dropout)

    def forward(self, x):
      B,T,C=x.shape
      k=self.key(x)
      q=self.query(x)

      positions = torch.arange(T)
      q, k = apply_rotary_pos_emb(q, k, positions)

      # compute attention scores ("affinities")
      wei=q @ k.transpose(-2,-1) * C**-0.5  # (B,T,C) @ (B,C,T) ----> (B,T,T)
      wei=wei.masked_fill(self.tril[:T, :T]==0, float('-inf')) # (B,T,T)
      wei=F.softmax(wei, dim=-1) #(B,T,T)
      wei=self.dropout(wei)
      # perform the weighted aggregation of the values
      v=self.value(x) #(B,T,C)
      out=wei@v #(B,T,T) @ (B,C,T) ---> (B,T,C)
      return out

class MultiHeadAttention(nn.Module):
  """ multiple heads of self attention in parallel """
  def __init__(self, num_heads, head_size):
    super().__init__()
    self.heads=nn.ModuleList([Head(head_size) for _ in range(num_heads)])
    self.proj=nn.Linear(head_size*num_heads, n_embed)
    self.dropout=nn.Dropout(dropout)

  def forward(self, x):
    out=torch.cat([h(x) for h in self.heads], dim=-1)
    out=self.dropout(self.proj(out))
    return out

class FeedForward(nn.Module):
  """ a simple linear layer followed by a non linearity """

  def __init__(self, n_embed):
    super().__init__()
    self.net=nn.Sequential(
      nn.Linear(n_embed, 4*n_embed),
      nn.ReLU(),
      nn.Linear(4*n_embed, n_embed),
      nn.Dropout(dropout)
    )

  def forward(self, x):
    return self.net(x)

class Block(nn.Module):
  """ Transformer block: communication followed by computation """
  def __init__(self, n_embed, n_head):
    super().__init__()
    head_size=n_embed//n_head
    self.sa=MultiHeadAttention(n_head, n_head)
    self.ffwd=FeedForward(n_embed)
    self.ln1=nn.LayerNorm(n_embed)
    self.ln2=nn.LayerNorm(n_embed)

  def forward(self,x):
    x=x+self.sa(self.ln1(x))
    x=x+self.ffwd(self.ln2(x))
    return x

class BigramLanguageModel(nn.Module):
  def __init__(self):
    super().__init__()
    self.token_embedding_table=nn.Embedding(vocab_size, n_embed)
    self.position_embedding_table=nn.Embedding(block_size, n_embed)

    # self.sa_head=MultiHeadAttention(4,n_embed//4)# i.e. 4 heads of 8 dimensional self-attention
    # self.ffwd=FeedForward(n_embed)

    # replace with blocks
    # self.blocks=nn.Sequential(
    #   Block(n_embed, n_head=4),
    #   Block(n_embed, n_head=4),
    #   Block(n_embed, n_head=4),
    #   nn.LayerNorm(n_embed)
    # )
    self.blocks=nn.Sequential(*[Block(n_embed, n_head=n_head) for _ in range(n_layer)])
    self.lm_head=nn.Linear(n_embed, vocab_size)

  def forward(self, idx, targets=None):
    B,T=idx.shape
    token_emb=self.token_embedding_table(idx) # (B,T,C)
    # positional encoding is now handled by RoPE
    # pos_emb=self.position_embedding_table(torch.arange(T ,device=device)) #(T,C)
    # x=token_emb+pos_emb # (B,T,C)
    x=token_emb
    # x=self.sa_head(x) # apply one head of self attention (B,T,C)
    # x=self.ffwd(x)

    # replace with blocks
    x=self.blocks(x) # (B,T,C)
    logits=self.lm_head(x)# (B,T, vocab_size)
    if targets is None:
      loss=None
    else:
      # pytorch expects()
      B,T,C=logits.shape
      logits=logits.view(B*T, C)
      targets=targets.view(B*T)
      loss=F.cross_entropy(logits, targets)
    return logits, loss

  def generate(self, idx, max_new_tokens):
    for _ in range(max_new_tokens):
      # crop idz to the last block size tokens
      idx_cond=idx[:, -block_size:]
      # get the predictions
      logits,loss=self(idx_cond)
      # focus only on last time step
      logits=logits[:,-1,:] #becomes (B,C)
      # apply softmax to get probabilities
      probs=F.softmax(logits, dim=-1) #(B,C)
      # sample the distribution
      idx_next=torch.multinomial(probs, num_samples=1) #(B,1)
      # append sampled index to the running sequence
      idx=torch.cat((idx, idx_next), dim=1) # (B,T+1)
    return idx


model=BigramLanguageModel()
m=model.to(device)

optimizer=torch.optim.AdamW(m.parameters(), lr=learning_rate)

for iter in range(max_iters):
  if iter%eval_interval ==0:
    losses=estimate_loss()
    print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")
  xb,yb=get_batch('train')
  logits,loss=model(xb,yb)
  optimizer.zero_grad(set_to_none=True)
  loss.backward()
  optimizer.step()




--2025-03-25 12:55:30--  https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1115394 (1.1M) [text/plain]
Saving to: ‘input.txt.3’


2025-03-25 12:55:31 (130 MB/s) - ‘input.txt.3’ saved [1115394/1115394]

step 0: train loss 4.4865, val loss 4.4716
step 500: train loss 1.7649, val loss 1.8883
step 1000: train loss 1.4919, val loss 1.6887
step 1500: train loss 1.3797, val loss 1.5933
step 2000: train loss 1.3123, val loss 1.5482
step 2500: train loss 1.2624, val loss 1.5224
step 3000: train loss 1.2208, val loss 1.4986
step 3500: train loss 1.1887, val loss 1.4850
step 4000: train loss 1.1598, val loss 1.4941
step 4500: train loss 1.1383, val loss 1.4890


RuntimeError: Expected all tensors to be on the same device, but found at least two devices, cuda:0 and cpu! (when checking argument for argument index in method wrapper_CUDA__index_select)

In [5]:
context=torch.zeros((1,1), dtype=torch.long).to(device)
print(decode(m.generate(context, max_new_tokens=300)[0].tolist()))


But with prison: I will see a punity: forewell.

COMINIUS:
Sir, I knave not, sir there has too love.

COMINIUS:
And, no more honest I might perfect my gracious come.
But, then, marry, I shall have been a clothed men's
Trief: come to-day, to venge come to hence the office.

CORIOLANUS:
Your good tong
