In [None]:
import torch

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [None]:
!wget https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt

--2025-07-08 17:34:11--  https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.111.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1115394 (1.1M) [text/plain]
Saving to: ‘input.txt’


2025-07-08 17:34:11 (27.9 MB/s) - ‘input.txt’ saved [1115394/1115394]



In [None]:
with open('input.txt', 'r', encoding='utf-8') as f:
  text = f.read()

In [None]:
print("length of dataset in characters: ", len(text))

length of dataset in characters:  1115394


In [None]:
print(text[:1000])

First Citizen:
Before we proceed any further, hear me speak.

All:
Speak, speak.

First Citizen:
You are all resolved rather to die than to famish?

All:
Resolved. resolved.

First Citizen:
First, you know Caius Marcius is chief enemy to the people.

All:
We know't, we know't.

First Citizen:
Let us kill him, and we'll have corn at our own price.
Is't a verdict?

All:
No more talking on't; let it be done: away, away!

Second Citizen:
One word, good citizens.

First Citizen:
We are accounted poor citizens, the patricians good.
What authority surfeits on would relieve us: if they
would yield us but the superfluity, while it were
wholesome, we might guess they relieved us humanely;
but they think we are too dear: the leanness that
afflicts us, the object of our misery, is as an
inventory to particularise their abundance; our
sufferance is a gain to them Let us revenge this with
our pikes, ere we become rakes: for the gods know I
speak this in hunger for bread, not in thirst for revenge.



In [None]:
chars = sorted(list(set(text)))

In [None]:
vocab_size = len(chars) #This will be our vocab size because we are creating a GPT which models the text dataset on caracter level. It predicts one character at a time
print(vocab_size)
print("".join(chars))

65

 !$&',-.3:;?ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz


In [None]:
### Tokenizing our Vocabulary

#To get Token Ids to encode vocab, we need string to integer mapping (stoi)
#To decode token ids back to tokens, we need integer to string mapping (itos)

stoi = {c:i for i, c in enumerate(chars)}
itos = {i:c for i, c in enumerate(chars)}


#takes a string and returns its tokenized seq
encoded_seq = lambda s: [stoi[c] for c in s]

#Takes an encoded list of tokens and gets its characters
decoded_seq = lambda tokens: [itos[id] for id in tokens]


tokens = encoded_seq("Hello")
print(tokens)
decoded_s = decoded_seq(tokens)
print("".join(decoded_s))

[20, 43, 50, 50, 53]
Hello


In [None]:
#Tokenizing the dataset now
text_tokens = torch.tensor(encoded_seq(text), dtype = torch.long)
text_tokens[:500]

tensor([18, 47, 56, 57, 58,  1, 15, 47, 58, 47, 64, 43, 52, 10,  0, 14, 43, 44,
        53, 56, 43,  1, 61, 43,  1, 54, 56, 53, 41, 43, 43, 42,  1, 39, 52, 63,
         1, 44, 59, 56, 58, 46, 43, 56,  6,  1, 46, 43, 39, 56,  1, 51, 43,  1,
        57, 54, 43, 39, 49,  8,  0,  0, 13, 50, 50, 10,  0, 31, 54, 43, 39, 49,
         6,  1, 57, 54, 43, 39, 49,  8,  0,  0, 18, 47, 56, 57, 58,  1, 15, 47,
        58, 47, 64, 43, 52, 10,  0, 37, 53, 59,  1, 39, 56, 43,  1, 39, 50, 50,
         1, 56, 43, 57, 53, 50, 60, 43, 42,  1, 56, 39, 58, 46, 43, 56,  1, 58,
        53,  1, 42, 47, 43,  1, 58, 46, 39, 52,  1, 58, 53,  1, 44, 39, 51, 47,
        57, 46, 12,  0,  0, 13, 50, 50, 10,  0, 30, 43, 57, 53, 50, 60, 43, 42,
         8,  1, 56, 43, 57, 53, 50, 60, 43, 42,  8,  0,  0, 18, 47, 56, 57, 58,
         1, 15, 47, 58, 47, 64, 43, 52, 10,  0, 18, 47, 56, 57, 58,  6,  1, 63,
        53, 59,  1, 49, 52, 53, 61,  1, 15, 39, 47, 59, 57,  1, 25, 39, 56, 41,
        47, 59, 57,  1, 47, 57,  1, 41, 

In [None]:
data = text_tokens
n = int(0.9*len(text_tokens))
train = data[:n]
val = data[n:]

In [None]:
#So now instead of parsing and passing one sentece as a input at a time, we should sample random chunks from the wntire corpus of a fixed length
#This Fixed length will be out max_seq_length

max_seq_length = 8   #also called as context length
train[:max_seq_length+1]  #+1 because we want 8 different examples of next token prediction

tensor([18, 47, 56, 57, 58,  1, 15, 47, 58])

In [None]:
x = train[:max_seq_length] #[18, 47, 56, 57, 58,  1, 15, 47]
y = train[1:max_seq_length+1] #[47, 56, 57, 58,  1, 15, 47, 58]

for t in range(max_seq_length):
  context = x[:t+1]
  target = y[t]
  print(f"when input is: {context}, target is: {target}")

## This is how models like ChatGPT's have fixed context length. They can never predict next word if an input is provided beyond context length
## creating multiple examples like this from a single sequence instance also helps model in predicting next word based on different input sequence length it gets
## This can also be referred to as Time Dimension

when input is: tensor([18]), target is: 47
when input is: tensor([18, 47]), target is: 56
when input is: tensor([18, 47, 56]), target is: 57
when input is: tensor([18, 47, 56, 57]), target is: 58
when input is: tensor([18, 47, 56, 57, 58]), target is: 1
when input is: tensor([18, 47, 56, 57, 58,  1]), target is: 15
when input is: tensor([18, 47, 56, 57, 58,  1, 15]), target is: 47
when input is: tensor([18, 47, 56, 57, 58,  1, 15, 47]), target is: 58


In [None]:
# Multiple such example chunks are stacked together to keep GPUs busy

batch_size = 4
chunk_start_idx = torch.randint(high = len(train)-max_seq_length, size = (batch_size,))
chunk_start_idx


tensor([628925, 863441, 141299, 353390])

In [None]:
torch.manual_seed(42)
def get_batch(is_train=True):
  data = train if is_train else val
  batches_start_index = torch.randint(high = len(train)-max_seq_length, size = (batch_size,))
  batches_x = []
  batches_y = []
  # for batch_idx in batches_start_index:
  #   curr_batch = data[batch_idx:batch_idx+max_seq_length+1]
  #   x = curr_batch[:max_seq_length]
  #   y = curr_batch[1:max_seq_length+1]
  #   print(f"curr sequence: {x}")
  #   print(f"curr target: {y}")

  #   context = [x[:t+1] for t in range(max_seq_length)]
  #   target = [y[t] for t in range(max_seq_length)]
  #   batches_x.append(context)
  #   batches_y.append(target)

  b_x = torch.stack([data[curr_idx: curr_idx + max_seq_length] for curr_idx in batches_start_index])
  b_y = torch.stack([data[curr_idx+1:curr_idx + max_seq_length+1] for curr_idx in batches_start_index])
  b_x, b_y = b_x.to(device), b_y.to(device)
  return b_x, b_y



b_x, b_y = get_batch()

In [None]:
from torch import nn as nn
import torch.nn.functional as F

In [None]:
class BiagramLanguageModel(nn.Module):
  def __init__(self, vocab_size):
    super().__init__()
    self.token_embedding_table = nn.Embedding(vocab_size, vocab_size)

  def forward(self, idx, targets=None):
    logits = self.token_embedding_table(idx)

    if targets is None:
      loss = None
    else:
      B, T, C = logits.shape
      logits = logits.view(B*T, C)
      targets = targets.view(B*T)
      loss = F.cross_entropy(logits, targets)

    return logits, loss

  def generate(self, idx, max_new_tokens=100):
    for _ in range(max_new_tokens):
      logits, loss = self(idx)
      logits = logits[:, -1, :]
      probs = F.softmax(logits, dim=-1)
      idx_next = torch.multinomial(probs, num_samples=1)
      idx = torch.cat((idx, idx_next), dim=1)
    return idx



In [None]:
model = BiagramLanguageModel(vocab_size)

In [None]:
out, loss = model(b_x, b_y)
out.shape
print(loss)

RuntimeError: Expected all tensors to be on the same device, but found at least two devices, cpu and cuda:0! (when checking argument for argument index in method wrapper_CUDA__index_select)

In [None]:
print("".join(decoded_seq(model.generate(torch.zeros((1,1), dtype=torch.long))[0].tolist())))

In [None]:
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3)

In [None]:
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3)
batch_size = 32
for steps in range(6000):
  xb, yb = get_batch()
  logits, loss = model(xb, yb)

  optimizer.zero_grad()
  loss.backward()
  optimizer.step()
print(loss.item())

In [None]:
print("".join(decoded_seq(model.generate(torch.zeros((1,1), dtype=torch.long), max_new_tokens=500)[0].tolist())))

## Self Attention

Let's say we somehow want to bring the information of previous tokens into the current token. The easiest way to do this would be take the average of all the tokens before curr and curr. This, although will be lossy, still gives us an easy way to bring info of prev token to current token

In [None]:
#Using simple brute force
bx, by = get_batch()

In [None]:
b_x

In [None]:
for curr_batch in range(len(b_x)):
  for i in range(len(b_x[0])):
    b_x[curr_batch][i] = torch.mean(b_x[curr_batch][:i+1].float())
b_x

In [None]:
#Let's take another example also with channel dimension
B, T, C = 4, 8, 2
x = torch.randn(B,T,C)
x.shape
x

In [None]:
x_bow = torch.zeros((B,T,C))
for b in range(B):
  for t in range(T):
    xprev = x[b, :t+1]
    x_bow[b, t] = torch.mean(xprev, 0) #calculate average in x direction of slices matrix

In [None]:
x_bow.shape #Same shape as original x

There's a simple trick to do this using multiplying the matrix with lower Traingle

In [None]:
weights = torch.tril(torch.ones(T,T)) #T X T
weights = weights / weights.sum(1, keepdim=True)
weights

In [None]:
# now doing Weights @ each batch (T X T) @ (T @ C)
xbow2 = weights @ x # This is (T X T) @ (B X T X C) So, since weights doesn't have B dim, Pytorch makes it 3 dimension by adding B= 1

In [None]:
# Comparing if this xbow2 was same as before
torch.allclose(x_bow, xbow2)

3rd way to do this is using Softmax. This is going to be used for self-attension as well

In [None]:
tril = torch.tril(torch.ones(T, T))
weights = torch.zeros((T,T))
weights

In [None]:
weights = weights.masked_fill(tril==0, float('-inf')) #This is just telling that the future tokens should be masked
weights

In [None]:
#Apply softmax to find weights of each non masked / past tokens
weights = F.softmax(weights, dim=1)
weights # All negative infinity got 0 weights. Currently all past tokens have equal weights in affecting current token. But we can enhance this to get weighted average

In [None]:
xbow3 = weights @ x

In [None]:
torch.allclose(x_bow, xbow3)

In [None]:
## Enhance Neural Network
## In Previous BiGram Model we considered output embedding dimension to be of same size as vocab but we now change it
n_embed = 32
block_size = 8 # earlier I was calling it max_seq length. It's also referred at context length

class BiGramLanguageModel(nn.Module):
  def __init__(self):
    super().__init__()
    self.token_embedding_table = nn.Embedding(vocab_size, n_embed) #word embedding; n_embed is our C
    self.positional_encoding = nn.Embedding(block_size, n_embed)
    self.lm_head = nn.Linear(n_embed, vocab_size)

  def forward(self, idx, target=None):
    tok_embedding = self.token_embedding_table(idx) #(B, T, C)
    pos_emb = self.positional_encoding(torch.arange(T, device=device)) # (T,C)
    x = tok_embedding + pos_emb
    logits = self.lm_head(x) #(B, T, vocab_size)

    if target is None:
      loss = None

    else:
      B, T, C = logits.shape
      logits = logits.view(B*T, C)
      target = target.view(B*T)
      loss = F.cross_entropy(logits, target)
      return logits, loss

  def generate(self, idx, max_new_tokens=100):
    for _ in range(max_new_tokens):
      logits, loss = self(idx)
      logits = logits[:, -1, :]
      probs = F.softmax(logits, dim=-1)
      idx_next = torch.multinomial(probs, num_samples=1)
      idx = torch.cat((idx, idx_next), dim=1)
    return idx

In [None]:
#self - attention
torch.manual_seed(43)
B, T, C = 4, 8, 32
x = torch.randn((B,T,C))

head_size = 16
key = nn.Linear(C, head_size, bias=False)
query = nn.Linear(C, head_size, bias=False)

k = key(x) # (B, T, head_size)
q = query(x) # (B,T, head_size)

k.shape, q.shape

In [None]:
# In order to multiply q with k we need to transpose k's 2nd and 3rd dimesion
# basically to make q's 1 batch = (8 x 16) , k's transpose 1 batch = (16 x 8)
k = k.transpose(-2, -1)
print(k.shape)

#Affinities between keys and queries
weights = q @ k # (B, T, 16) # (B, 16, T) --> (B, T, T)

In [None]:
tril = torch.tril(torch.ones(T,T))
weights = weights.masked_fill(tril==0, float('-inf')) #masking future tokens
weights = F.softmax(weights, dim=2) # applying softmax on dot product of key and query

In [None]:
weights[0] #This represents how much focus current token should give to all its previous tokens

In [None]:
#Now we get final self attention values by multiplying these softmax weights with v vector

#value weights
value = nn.Linear(C, head_size, bias=False)
v = value(x) #(B,T,head_size=16)

#now we multiply weights (B,T,T) with (B,T,16) -> (B,T,16)
out = weights @ v
out.shape


In [None]:
out[0]

In [None]:
n_embd = 32
block_size = 8

class SelfAttentionHead(nn.Module):
  def __init__(self, head_size):
    super().__init__()
    self.key = nn.Linear(n_embd, head_size, bias=False)
    self.query = nn.Linear(n_embd, head_size, bias=False)
    self.value = nn.Linear(n_embd, head_size, bias=False)
    self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))

  def forward(self, x):
    B,T,C = x.shape
    k = self.key(x) # (B, T, head_size)
    q = self.query(x)

    weights = q @ k.transpose(-2,-1) * C**-0.5
    weights = weights.masked_fill(self.tril[:T, :T] == 0, float('-inf'))
    weights = F.softmax(weights, dim=-1)
    v = self.value(x)
    output = weights @ v
    return output


In [None]:
## Enhance Neural Network
## In Previous BiGram Model we considered output embedding dimension to be of same size as vocab but we now change it
n_embed = 32
block_size = 8 # earlier I was calling it max_seq length. It's also referred at context length

class BiGramSALanguageModel(nn.Module):
  def __init__(self, vocab_size):
    super().__init__()
    self.token_embedding_table = nn.Embedding(vocab_size, n_embed) #word embedding; n_embed is our C
    self.positional_encoding = nn.Embedding(block_size, n_embed)
    self.sa_head = SelfAttentionHead(n_embd)
    self.lm_head = nn.Linear(n_embed, vocab_size)

  def forward(self, idx, target=None):
    B, T = idx.shape
    tok_embedding = self.token_embedding_table(idx) #(B, T, C)
    pos_emb = self.positional_encoding(torch.arange(T, device=device)) # (T,C)

    x = tok_embedding + pos_emb
    x = self.sa_head(x)
    logits = self.lm_head(x) #(B, T, vocab_size)

    if target is None:
      loss = None

    else:
      B, T, C = logits.shape
      logits = logits.view(B*T, C)
      target = target.view(B*T)
      loss = F.cross_entropy(logits, target)
    return logits, loss

  def generate(self, idx, max_new_tokens=100):
    for _ in range(max_new_tokens):
      idx_cond = idx[:, -block_size:]
      logits, loss = self(idx_cond)
      logits = logits[:, -1, :]
      probs = F.softmax(logits, dim=-1)
      idx_next = torch.multinomial(probs, num_samples=1)
      idx = torch.cat((idx, idx_next), dim=1)
    return idx

In [None]:
model = BiGramSALanguageModel(vocab_size)

In [None]:
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3)
batch_size = 32

for steps in range(6000):
  xb, yb = get_batch()
  logits, loss = model(xb, yb)

  optimizer.zero_grad()
  loss.backward()
  optimizer.step()
print(loss.item())

In [None]:
print("".join(decoded_seq(model.generate(torch.zeros((1,1), dtype=torch.long), max_new_tokens=500)[0].tolist())))

#Multi-Head Attention

In [None]:
class MultiHeadAttention(nn.Module):
  """Multiple heads of self attention in parallel"""

  def __init__(self, num_heads, head_size):
    super().__init__()
    self.heads = nn.ModuleList([SelfAttentionHead(head_size) for _ in range(num_heads)])

  def forward(self, x):
    return torch.cat([h(x) for h in self.heads], dim=-1) #Concatenating over the channel dimension. Hence total channel becomes -> num_heads * head_size

In [None]:
## Enhance Neural Network
## In Previous BiGram Model we considered output embedding dimension to be of same size as vocab but we now change it
n_embed = 32
block_size = 8 # earlier I was calling it max_seq length. It's also referred at context length
num_heads = 4

class MultiHeadLanguageModel(nn.Module):
  def __init__(self, vocab_size):
    super().__init__()
    self.token_embedding_table = nn.Embedding(vocab_size, n_embed) #word embedding; n_embed is our C
    self.positional_encoding = nn.Embedding(block_size, n_embed)
    self.sa_head = MultiHeadAttention(num_heads, n_embed//num_heads)
    self.lm_head = nn.Linear(n_embed, vocab_size)

  def forward(self, idx, target=None):
    B, T = idx.shape
    tok_embedding = self.token_embedding_table(idx) #(B, T, C)
    pos_emb = self.positional_encoding(torch.arange(T, device=device)) # (T,C)

    x = tok_embedding + pos_emb
    x = self.sa_head(x) #(B, T, vocab_size)

    logits = self.lm_head(x)

    if target is None:
      loss = None

    else:
      B, T, C = logits.shape
      logits = logits.view(B*T, C)
      target = target.view(B*T)
      loss = F.cross_entropy(logits, target)
    return logits, loss

  def generate(self, idx, max_new_tokens=100):
    for _ in range(max_new_tokens):
      idx_cond = idx[:, -block_size:]
      logits, loss = self(idx_cond)
      logits = logits[:, -1, :]
      probs = F.softmax(logits, dim=-1)
      idx_next = torch.multinomial(probs, num_samples=1)
      idx = torch.cat((idx, idx_next), dim=1)
    return idx

model = MultiHeadLanguageModel(vocab_size)

In [None]:
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3)
batch_size = 32

for steps in range(6000):
  xb, yb = get_batch()
  logits, loss = model(xb, yb)

  optimizer.zero_grad()
  loss.backward()
  optimizer.step()
print(loss.item())

In [None]:
print("".join(decoded_seq(model.generate(torch.zeros((1,1), dtype=torch.long), max_new_tokens=500)[0].tolist())))

In [None]:
class FFN(nn.Module):
  def __init__(self, n_embd):
    super().__init__()
    self.net = nn.Sequential(
        nn.Linear(n_embd, n_embd),
        nn.ReLU(),
    )

  def forward(self, x):
    return self.net(x)


In [None]:
## Enhance Neural Network
## In this version we add FFN
n_embed = 32
block_size = 8 # earlier I was calling it max_seq length. It's also referred at context length
num_heads = 4

class MultiHeadFFNLanguageModel(nn.Module):
  def __init__(self, vocab_size):
    super().__init__()
    self.token_embedding_table = nn.Embedding(vocab_size, n_embed) #word embedding; n_embed is our C
    self.positional_encoding = nn.Embedding(block_size, n_embed)
    self.sa_head = MultiHeadAttention(num_heads, n_embed//num_heads)
    self.ffwd  = FFN(n_embed)
    self.lm_head = nn.Linear(n_embed, vocab_size)


  def forward(self, idx, target=None):
    B, T = idx.shape
    tok_embedding = self.token_embedding_table(idx) #(B, T, C)
    pos_emb = self.positional_encoding(torch.arange(T, device=device)) # (T,C)

    x = tok_embedding + pos_emb
    x = self.sa_head(x) #(B, T, vocab_size)
    x = self.ffwd(x)

    logits = self.lm_head(x)

    if target is None:
      loss = None

    else:
      B, T, C = logits.shape
      logits = logits.view(B*T, C)
      target = target.view(B*T)
      loss = F.cross_entropy(logits, target)
    return logits, loss

  def generate(self, idx, max_new_tokens=100):
    for _ in range(max_new_tokens):
      idx_cond = idx[:, -block_size:]
      logits, loss = self(idx_cond)
      logits = logits[:, -1, :]
      probs = F.softmax(logits, dim=-1)
      idx_next = torch.multinomial(probs, num_samples=1)
      idx = torch.cat((idx, idx_next), dim=1)
    return idx

model = MultiHeadFFNLanguageModel(vocab_size)

In [None]:
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3)
batch_size = 32

for steps in range(6000):
  xb, yb = get_batch()
  logits, loss = model(xb, yb)

  optimizer.zero_grad()
  loss.backward()
  optimizer.step()
print(loss.item())

In [None]:
print("".join(decoded_seq(model.generate(torch.zeros((1,1), dtype=torch.long), max_new_tokens=500)[0].tolist())))

Now, since Transformers have multiple layers of SA followed by FFN, we implement that here. The Block class here is a single layer of SA Heads + FFN layer

In [None]:
dropout = 0.2
n_embed = 384
block_size = 256
num_heads = 6
learning_rate = 3e-4
max_iters = 5000
batch_size = 64


class SelfAttentionHead(nn.Module):
    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embed, head_size, bias=False)
        self.query = nn.Linear(n_embed, head_size, bias=False)
        self.value = nn.Linear(n_embed, head_size, bias=False)

        # Create tril on the correct device
        tril = torch.tril(torch.ones(block_size, block_size, device=device))
        self.register_buffer('tril', tril)

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        B, T, C = x.shape
        k = self.key(x)
        q = self.query(x)

        weights = q @ k.transpose(-2, -1) * C ** -0.5
        weights = weights.masked_fill(self.tril[:T, :T] == 0, float('-inf'))
        weights = F.softmax(weights, dim=-1)
        weights = self.dropout(weights)

        v = self.value(x)
        output = weights @ v
        return output


class MultiHeadAttention(nn.Module):
    def __init__(self, num_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([SelfAttentionHead(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(n_embed, n_embed)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.proj(out)
        out = self.dropout(out)
        return out


class FFN(nn.Module):
    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.ReLU(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(dropout)
        )

    def forward(self, x):
        return self.net(x)


class Block(nn.Module):
    def __init__(self, num_heads, n_embed):
        super().__init__()
        head_size = n_embed // num_heads
        self.sa = MultiHeadAttention(num_heads, head_size)
        self.ffn = FFN(n_embed)
        self.ln1 = nn.LayerNorm(n_embed)
        self.ln2 = nn.LayerNorm(n_embed)

    def forward(self, x):
        x = x + self.sa(self.ln1(x))
        x = x + self.ffn(self.ln2(x))
        return x

class MultiHeadFFNLanguageModel(nn.Module):
    def __init__(self, vocab_size):
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size, n_embed).to(device)
        self.positional_encoding = nn.Embedding(block_size, n_embed).to(device)

        self.blocks = nn.Sequential(
            Block(num_heads, n_embed),
            Block(num_heads, n_embed),
            Block(num_heads, n_embed),
            Block(num_heads, n_embed),
            Block(num_heads, n_embed),
            Block(num_heads, n_embed),
            nn.LayerNorm(n_embed)
        ).to(device)

        self.lm_head = nn.Linear(n_embed, vocab_size).to(device)

    def forward(self, idx, target=None):
        B, T = idx.shape
        idx = idx.to(device)
        if target is not None:
            target = target.to(device)

        tok_embedding = self.token_embedding_table(idx)  # (B, T, C)
        pos_emb = self.positional_encoding(torch.arange(T, device=device))  # (T, C)

        x = tok_embedding + pos_emb  # broadcasting (B, T, C) + (T, C) works
        x = self.blocks(x)
        logits = self.lm_head(x)

        loss = None
        if target is not None:
            logits = logits.view(B * T, -1)
            target = target.view(B * T)
            loss = F.cross_entropy(logits, target)

        return logits, loss

    def generate(self, idx, max_new_tokens=100):
        idx = idx.to(device)
        for _ in range(max_new_tokens):
            idx_cond = idx[:, -block_size:]
            logits, _ = self(idx_cond)
            logits = logits[:, -1, :]  # take logits for the last time step
            probs = F.softmax(logits, dim=-1)
            idx_next = torch.multinomial(probs, num_samples=1)
            idx = torch.cat((idx, idx_next), dim=1)
        return idx

torch.manual_seed(42)  # for reproducibility

def get_batch(is_train=True):
    data = train if is_train else val

    # Generate random start indices on the correct device
    batches_start_index = torch.randint(
        high=len(data) - block_size,
        size=(batch_size,),
        device=device  # 🧠 ensures no CPU-GPU mismatch
    )

    # Stack input and target sequences
    b_x = torch.stack([data[i: i + block_size] for i in batches_start_index])
    b_y = torch.stack([data[i + 1: i + block_size + 1] for i in batches_start_index])

    # Send to device
    b_x, b_y = b_x.to(device), b_y.to(device)
    return b_x, b_y

# 🧪 Example usage
b_x, b_y = get_batch()
print(b_x.shape)  # Should print: torch.Size([64, 256]) (batch_size x block_size)


optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

for steps in range(max_iters):
    print(f"Step: {steps} of {max_iters}", end="\r")

    xb, yb = get_batch(is_train=True)  # already returns tensors on device
    logits, loss = model(xb, yb)       # model is on device

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

# Print final loss
print(f"\nFinal loss: {loss.item():.4f}")





torch.Size([64, 256])
Step: 0 of 5000

RuntimeError: Expected all tensors to be on the same device, but found at least two devices, cpu and cuda:0! (when checking argument for argument index in method wrapper_CUDA__index_select)

In [None]:
class MultiHeadFFNLanguageModel(nn.Module):
    def __init__(self, vocab_size):
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size, n_embed).to(device)
        self.positional_encoding = nn.Embedding(block_size, n_embed).to(device)

        self.blocks = nn.Sequential(
            Block(num_heads, n_embed),
            Block(num_heads, n_embed),
            Block(num_heads, n_embed),
            Block(num_heads, n_embed),
            Block(num_heads, n_embed),
            Block(num_heads, n_embed),
            nn.LayerNorm(n_embed)
        ).to(device)

        self.lm_head = nn.Linear(n_embed, vocab_size).to(device)

    def forward(self, idx, target=None):
        B, T = idx.shape
        idx = idx.to(device)
        if target is not None:
            target = target.to(device)

        tok_embedding = self.token_embedding_table(idx)  # (B, T, C)
        pos_emb = self.positional_encoding(torch.arange(T, device=device))  # (T, C)

        x = tok_embedding + pos_emb  # broadcasting (B, T, C) + (T, C) works
        x = self.blocks(x)
        logits = self.lm_head(x)

        loss = None
        if target is not None:
            logits = logits.view(B * T, -1)
            target = target.view(B * T)
            loss = F.cross_entropy(logits, target)

        return logits, loss

    def generate(self, idx, max_new_tokens=100):
        idx = idx.to(device)
        for _ in range(max_new_tokens):
            idx_cond = idx[:, -block_size:]
            logits, _ = self(idx_cond)
            logits = logits[:, -1, :]  # take logits for the last time step
            probs = F.softmax(logits, dim=-1)
            idx_next = torch.multinomial(probs, num_samples=1)
            idx = torch.cat((idx, idx_next), dim=1)
        return idx


In [None]:
torch.manual_seed(42)  # for reproducibility

def get_batch(is_train=True):
    data = train if is_train else val

    # Generate random start indices on the correct device
    batches_start_index = torch.randint(
        high=len(data) - block_size,
        size=(batch_size,),
        device=device  # 🧠 ensures no CPU-GPU mismatch
    )

    # Stack input and target sequences
    b_x = torch.stack([data[i: i + block_size] for i in batches_start_index])
    b_y = torch.stack([data[i + 1: i + block_size + 1] for i in batches_start_index])

    # Send to device
    b_x, b_y = b_x.to(device), b_y.to(device)
    return b_x, b_y

# 🧪 Example usage
b_x, b_y = get_batch()
print(b_x.shape)  # Should print: torch.Size([64, 256]) (batch_size x block_size)


torch.Size([64, 256])


In [None]:
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

for steps in range(max_iters):
    print(f"Step: {steps} of {max_iters}", end="\r")

    xb, yb = get_batch(is_train=True)  # already returns tensors on device
    logits, loss = model(xb, yb)       # model is on device

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

# Print final loss
print(f"\nFinal loss: {loss.item():.4f}")


Step: 0 of 5000

RuntimeError: Expected all tensors to be on the same device, but found at least two devices, cpu and cuda:0! (when checking argument for argument index in method wrapper_CUDA__index_select)

In [None]:
print("".join(decoded_seq(model.generate(torch.ones((1,1), dtype=torch.long), max_new_tokens=500)[0].tolist())))

In [None]:
class LayerNorm:
  def __init__(self, dim, eps=1e-5, momentum=0.1):
    self.eps = eps
    self.gamma = torch.ones(dim)
    self.beta = torch.zeros(dim)

  def __call__(self, x):
    # Normalize over the last dimension (feature dimension)
    xmean = x.mean(-1, keepdim=True)
    xvar = x.var(-1, keepdim=True)
    xhat = (x-xmean)/ torch.sqrt(xvar + self.eps)
    self.out = self.gamma *  xhat + self.beta
    return self.out

  def parameters(self):
    return [self.gamma, self.beta]

In [None]:
torch.manual_seed(133)
module = LayerNorm(100)
x = torch.randn(32, 100)
print("Input batch x: ", x)
x = module(x)
print("Normalized x: ", x)

In [None]:
x[0, :].mean(), x[0, :].std() #Normally distributed rows now

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# Set device (GPU if available)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Hyperparameters
dropout = 0.2
n_embed = 384
block_size = 256
num_heads = 6
learning_rate = 3e-4
max_iters = 5000
batch_size = 64
torch.manual_seed(42)


# ---------------- Self-Attention Head ----------------
class SelfAttentionHead(nn.Module):
    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embed, head_size, bias=False)
        self.query = nn.Linear(n_embed, head_size, bias=False)
        self.value = nn.Linear(n_embed, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        B, T, C = x.shape
        k = self.key(x)
        q = self.query(x)

        weights = q @ k.transpose(-2, -1) * C**-0.5
        weights = weights.masked_fill(self.tril[:T, :T] == 0, float('-inf'))
        weights = F.softmax(weights, dim=-1)
        weights = self.dropout(weights)

        v = self.value(x)
        out = weights @ v
        return out


# ---------------- Multi-Head Attention ----------------
class MultiHeadAttention(nn.Module):
    def __init__(self, num_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([SelfAttentionHead(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(n_embed, n_embed)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.proj(out)
        out = self.dropout(out)
        return out


# ---------------- Feedforward Network ----------------
class FFN(nn.Module):
    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.ReLU(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(dropout)
        )

    def forward(self, x):
        return self.net(x)


# ---------------- Transformer Block ----------------
class Block(nn.Module):
    def __init__(self, num_heads, n_embed):
        super().__init__()
        head_size = n_embed // num_heads
        self.sa = MultiHeadAttention(num_heads, head_size)
        self.ffn = FFN(n_embed)
        self.ln1 = nn.LayerNorm(n_embed)
        self.ln2 = nn.LayerNorm(n_embed)

    def forward(self, x):
        x = x + self.sa(self.ln1(x))
        x = x + self.ffn(self.ln2(x))
        return x


# ---------------- Language Model ----------------
class MultiHeadFFNLanguageModel(nn.Module):
    def __init__(self, vocab_size):
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size, n_embed)
        self.positional_encoding = nn.Embedding(block_size, n_embed)

        self.blocks = nn.Sequential(
            Block(num_heads, n_embed),
            Block(num_heads, n_embed),
            Block(num_heads, n_embed),
            Block(num_heads, n_embed),
            Block(num_heads, n_embed),
            Block(num_heads, n_embed),
            nn.LayerNorm(n_embed)
        )

        self.lm_head = nn.Linear(n_embed, vocab_size)

    def forward(self, idx, target=None):
        B, T = idx.shape
        idx = idx.to(device)
        if target is not None:
            target = target.to(device)

        tok_emb = self.token_embedding_table(idx).to(device)
        pos_emb = self.positional_encoding(torch.arange(T, device=idx.device))
        x = tok_emb + pos_emb

        x = self.blocks(x)
        logits = self.lm_head(x)

        loss = None
        if target is not None:
            logits = logits.view(B * T, -1)
            target = target.view(B * T)
            loss = F.cross_entropy(logits, target)

        return logits, loss

    def generate(self, idx, max_new_tokens=100):
        idx = idx.to(device)
        for _ in range(max_new_tokens):
            idx_cond = idx[:, -block_size:]
            logits, _ = self(idx_cond)
            logits = logits[:, -1, :]
            probs = F.softmax(logits, dim=-1)
            idx_next = torch.multinomial(probs, num_samples=1)
            idx = torch.cat((idx, idx_next), dim=1)
        return idx

model = MultiHeadFFNLanguageModel(vocab_size)
model.to(device)


MultiHeadFFNLanguageModel(
  (token_embedding_table): Embedding(65, 384)
  (positional_encoding): Embedding(256, 384)
  (blocks): Sequential(
    (0): Block(
      (sa): MultiHeadAttention(
        (heads): ModuleList(
          (0-5): 6 x SelfAttentionHead(
            (key): Linear(in_features=384, out_features=64, bias=False)
            (query): Linear(in_features=384, out_features=64, bias=False)
            (value): Linear(in_features=384, out_features=64, bias=False)
            (dropout): Dropout(p=0.2, inplace=False)
          )
        )
        (proj): Linear(in_features=384, out_features=384, bias=True)
        (dropout): Dropout(p=0.2, inplace=False)
      )
      (ffn): FFN(
        (net): Sequential(
          (0): Linear(in_features=384, out_features=1536, bias=True)
          (1): ReLU()
          (2): Linear(in_features=1536, out_features=384, bias=True)
          (3): Dropout(p=0.2, inplace=False)
        )
      )
      (ln1): LayerNorm((384,), eps=1e-05, elementwis

In [None]:
torch.manual_seed(42)  # for reproducibility

def get_batch(is_train=True):
    data = train if is_train else val

    # Generate random start indices on the correct device
    batches_start_index = torch.randint(
        high=len(data) - block_size,
        size=(batch_size,),
        device=device  # 🧠 ensures no CPU-GPU mismatch
    )

    # Stack input and target sequences
    b_x = torch.stack([data[i: i + block_size] for i in batches_start_index])
    b_y = torch.stack([data[i + 1: i + block_size + 1] for i in batches_start_index])

    # Send to device
    b_x, b_y = b_x.to(device), b_y.to(device)
    return b_x, b_y


In [None]:
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

for steps in range(max_iters):
    print(f"Step: {steps} of {max_iters}", end="\r")

    xb, yb = get_batch(is_train=True)  # already returns tensors on device
    logits, loss = model(xb, yb)       # model is on device

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

# Print final loss
print(f"\nFinal loss: {loss.item():.4f}")



Final loss: 0.9183


In [None]:
checkpoint = {
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'step': steps,  # Save current step so you can resume
}

torch.save(checkpoint, '/content/gpt_checkpoint.pt')  # Saves to Colab filesystem
print("Checkpoint saved.")

Checkpoint saved.


In [None]:
# Recreate the model and optimizer exactly as before
model = MultiHeadFFNLanguageModel(vocab_size).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

# Load the checkpoint
checkpoint = torch.load('/content/gpt_checkpoint.pt', map_location=device)
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
start_step = checkpoint['step'] + 1

print(f"Resumed from step {start_step}")


Resumed from step 5000


In [None]:
print("".join(decoded_seq(model.generate(torch.ones((1,1), dtype=torch.long), max_new_tokens=500)[0].tolist())))

 to strutch side,
He will see an limb time it and continue I
Against shall put the witter.

MENENIUS:
I'll be the fault?

MENENIUS:
Where is this cur head who cannol.
Hath good all thee against laid these schopes you.

VIRGILIA:
It is not, you arrate, ready;
Nor this putch as your penileng, nor as you
worthippiecy. I'll appear the law by conclured. Was I bolied
the Warwick's exampation? There for we till him he see
the noted the womenty. Awake
Sund will I scorn, and thee from throne gall pluck
Th
