In [None]:
# Import Regex library
import re

In [None]:
#Tokenizing
text = "Hello, there. How are you doing?"
result = re.split(r"([,?.]|\s)", text)
result = [item for item in result if item.strip()]
print(result)

['Hello', ',', 'there', '.', 'How', 'are', 'you', 'doing', '?']


In [None]:
#Create vocabulary using the text file
#Do we consider At and at with different Token IDs
with open("/content/the-verdict.txt", "r", encoding="utf-8") as f:
  raw_text = f.read()

preprocessed = re.split(r"([,.:;?\"'!_]|--|\s)", raw_text)
preprocessed = [item for item in preprocessed if item.strip()]

#Handle unknown end of text tokens
preprocessed.append("<UNK>")
preprocessed.append("<EOT>")

vocab = {token: idx for idx, token in enumerate(set(preprocessed))}


In [None]:
#Implementing Text tokenizer
class textTokenizer:

  def __init__(self, vocab):
    self.str_to_int = vocab
    self.int_to_str = {idx: token for token, idx in vocab.items()}

  def encode(self, text):
    preprocessed = re.split(r"([,.:;?\"'!_]|--|\s)", text)
    preprocessed = [item.strip() for item in preprocessed if item.strip()]
    preprocessed = [token if token in self.str_to_int else "<UNK>" for token in preprocessed]
    encoded = [self.str_to_int[token] for token in preprocessed]
    return encoded


  def decode(self, encoded):
    # Modified to accept a list of integer IDs
    decoded = " ".join([self.int_to_str[idx] for idx in encoded])
    decoded = re.sub(r"\s+([,.:;?\"'!_])", r"\1", decoded)
    return decoded

In [None]:
textTokenizer = textTokenizer(vocab)

In [None]:
encoded = textTokenizer.encode("It's the last you painted you know.")
print(encoded)

[247, 848, 1022, 708, 745, 654, 452, 654, 868, 544]


In [None]:
decoded = textTokenizer.decode(encoded)
print(decoded)

It' s the last you painted you know.


In [None]:
#With unknown tokens and end of text. Tokens not in vocabulary cannot be decoded.Hence lossy.
encodedNew = textTokenizer.encode("Hello, there. How are you doing?" + "<EOT>")
decodedNew = textTokenizer.decode(encodedNew)
decodedNew

'<UNK>, there. How are you doing? <EOT>'

In [None]:
#Byte pair encoding - For efficient handling of unknown tokens, it is lossless
import tiktoken

tikTokenizer = tiktoken.get_encoding("gpt2")
encode = tikTokenizer.encode("Hey, It's a lovely morning. <|endoftext|> Wish you a very happy 16!",
                             allowed_special={"<|endoftext|>"})
decode = tikTokenizer.decode(encode)

for tokenID in encode:
  subword = tikTokenizer.decode_single_token_bytes(tokenID).decode("utf-8")
  print(f"Token ID: {tokenID}, subword: {subword}")

print(f"Encoded Token IDs: {encode}")
print(f"Decode from Token IDs: {decode}")

Token ID: 10814, subword: Hey
Token ID: 11, subword: ,
Token ID: 632, subword:  It
Token ID: 338, subword: 's
Token ID: 257, subword:  a
Token ID: 14081, subword:  lovely
Token ID: 3329, subword:  morning
Token ID: 13, subword: .
Token ID: 220, subword:  
Token ID: 50256, subword: <|endoftext|>
Token ID: 23447, subword:  Wish
Token ID: 345, subword:  you
Token ID: 257, subword:  a
Token ID: 845, subword:  very
Token ID: 3772, subword:  happy
Token ID: 1467, subword:  16
Token ID: 0, subword: !
Encoded Token IDs: [10814, 11, 632, 338, 257, 14081, 3329, 13, 220, 50256, 23447, 345, 257, 845, 3772, 1467, 0]
Decode from Token IDs: Hey, It's a lovely morning. <|endoftext|> Wish you a very happy 16!


In [None]:
#Data sampling with sliding window
import torch
from torch.utils.data import Dataset, DataLoader

class GPTDataset(Dataset):
  def __init__(self, text, tokenizer, maxLen, stride):
    self.input_ids = []
    self.target_ids = []
    tokenIDs = tokenizer.encode(text)
    for i in range(0, len(tokenIDs)-maxLen, stride):
      self.input_ids.append(torch.tensor(tokenIDs[i:i+maxLen]))
      self.target_ids.append(torch.tensor(tokenIDs[i+1:i+maxLen+1]))

  def __len__(self):
    return len(self.input_ids)

  def __getitem__(self, idx):
    return self.input_ids[idx], self.target_ids[idx]

In [None]:
text = "In the heart of a bustling city, surrounded by towering glass buildings and the constant hum of traffic, a small bookstore sat quietly between a café and a flower shop. Inside, the scent of old pages mingled with fresh coffee from next door. People wandered in not just to buy books, but to lose themselves in worlds unknown, to find forgotten authors, or simply to enjoy the stillness that lingered despite the chaos outside. On rainy afternoons, the store felt like a sanctuary — a place where time paused, and stories whispered from dusty shelves."
gptDataset = GPTDataset(text, tikTokenizer, 4, 1)

In [None]:
gptDataset.__getitem__(2)

(tensor([ 2612,   286,   257, 46609]), tensor([  286,   257, 46609,  1748]))

In [None]:
def create_dataloader(text, batch_size=4, maxLen=256, stride=128,
                      shuffle=True, drop_last=True, num_workers=0):
  tikTokenizer = tiktoken.get_encoding("gpt2")
  gptDataset = GPTDataset(text, tikTokenizer, maxLen, stride)
  dataLoader = DataLoader(gptDataset,
                          batch_size=batch_size,
                          shuffle=shuffle,
                          drop_last=drop_last,
                          num_workers=num_workers)
  return dataLoader

In [None]:
dataloader = create_dataloader(raw_text, batch_size=2, maxLen=8, stride=2, shuffle=False)
dataiter = iter(dataloader)
first_batch = next(dataiter)
second_batch = next(dataiter)
inputs, target = next(dataiter)
print(first_batch)
print(second_batch)

[tensor([[   40,   367,  2885,  1464,  1807,  3619,   402,   271],
        [ 2885,  1464,  1807,  3619,   402,   271, 10899,  2138]]), tensor([[  367,  2885,  1464,  1807,  3619,   402,   271, 10899],
        [ 1464,  1807,  3619,   402,   271, 10899,  2138,   257]])]
[tensor([[ 1807,  3619,   402,   271, 10899,  2138,   257,  7026],
        [  402,   271, 10899,  2138,   257,  7026, 15632,   438]]), tensor([[ 3619,   402,   271, 10899,  2138,   257,  7026, 15632],
        [  271, 10899,  2138,   257,  7026, 15632,   438,  2016]])]


In [None]:
#Word Embeddings
vocab_size = 50357
# output_dim = 256
output_dim = 4

token_embedding_layer = torch.nn.Embedding(vocab_size, output_dim)
token_embeddings = token_embedding_layer(inputs)
print(token_embeddings.shape)

#Positional embedding
context_length = 8
pos_embedding_layer = torch.nn.Embedding(context_length, output_dim)
pos_embeddings = pos_embedding_layer(torch.arange(context_length))
print(pos_embeddings.shape)

#Final embedding
final_embeddings = token_embeddings + pos_embeddings
print(final_embeddings.shape)

torch.Size([2, 8, 4])
torch.Size([8, 4])
torch.Size([2, 8, 4])


In [None]:
#Self-attention
query = final_embeddings
print(query[0][1]) # Considering batch1
attn_scores_2 = torch.empty(query.shape[1])

for i, x_i in enumerate(query[0]):
  attn_scores_2[i] = torch.dot(x_i, query[0][1])

print(attn_scores_2)

#Apply Normalisation
attn_score_2 = torch.softmax(attn_scores_2, dim=0)
print(attn_score_2)
print(attn_score_2.sum()) # Should be 1

#Calculate context vector
context_vec_2 = torch.zeros(query.shape[2])
for i, x_i in enumerate(query[0]):
  context_vec_2 += attn_score_2[i]*x_i

print(context_vec_2)


tensor([-0.9279,  1.5110,  1.7080,  0.4483], grad_fn=<SelectBackward0>)
tensor([ 1.3279,  6.2625, -0.7194,  3.2800,  3.6181, -0.6254, -0.1578, -0.0126],
       grad_fn=<CopySlices>)
tensor([6.3412e-03, 8.8155e-01, 8.1860e-04, 4.4664e-02, 6.2633e-02, 8.9925e-04,
        1.4354e-03, 1.6596e-03], grad_fn=<SoftmaxBackward0>)
tensor(1.0000, grad_fn=<SumBackward0>)
tensor([-0.9938,  1.4182,  1.5692,  0.3468], grad_fn=<AddBackward0>)


In [None]:
trial = torch.tensor(
[[0.43, 0.15, 0.89], # Your (x^1)
[0.55, 0.87, 0.66], # journey (x^2)
[0.57, 0.85, 0.64], # starts (x^3)
[0.22, 0.58, 0.33], # with (x^4)
[0.77, 0.25, 0.10], # one (x^5)
[0.05, 0.80, 0.55]] # step (x^6)
)
trial.shape

torch.Size([6, 3])

In [None]:
import tiktoken
import torch

# 1. Tokenize
text = "Your journey begins with one step"
tokenizer = tiktoken.get_encoding("gpt2")
tokens = tokenizer.encode(text)
input_ids = torch.tensor(tokens)

print("Token IDs:", tokens)

#Word Embeddings
vocab_size = 50357
output_dim = 3

token_embedding_layer = torch.nn.Embedding(vocab_size, output_dim)
token_embeddings = token_embedding_layer(input_ids)
print(token_embeddings)
print(token_embeddings.shape)

#Positional embedding
context_length = 6
pos_embedding_layer = torch.nn.Embedding(context_length, output_dim)
pos_embeddings = pos_embedding_layer(torch.arange(context_length))
print(pos_embeddings.shape)

#Final embedding
final_embeddings = token_embeddings + pos_embeddings
print(final_embeddings)

Token IDs: [7120, 7002, 6140, 351, 530, 2239]
tensor([[-0.9652,  0.3715,  1.7600],
        [ 1.1019,  1.2620, -0.6656],
        [ 0.4906,  2.1771,  0.2043],
        [-0.3872,  0.8350, -0.9204],
        [-0.7298, -2.5358,  1.4021],
        [ 0.2725, -0.1636, -0.0523]], grad_fn=<EmbeddingBackward0>)
torch.Size([6, 3])
torch.Size([6, 3])
tensor([[-0.9690,  0.0205,  1.4269],
        [-0.8274,  2.2437,  0.4577],
        [ 0.9628,  2.9165,  1.2324],
        [ 1.3070,  0.5541, -1.4381],
        [-2.0322, -1.9938,  1.8070],
        [-1.5318, -1.8755, -0.3128]], grad_fn=<AddBackward0>)


# **Self-Attention with trainable weights**

nn.Linear is just a way of saying “I want to take each input vector and re-express it in a different space of size d_out.”

In [None]:
import torch.nn as nn

class SelfAttention(nn.Module):
  def __init__(self, d_in, d_out, qkv_bias=False):
    super().__init__()
    self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias)
    self.W_key = nn.Linear(d_in, d_out, bias=qkv_bias)
    self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias)

  def forward(self, x):
    keys = self.W_key(x)
    queries = self.W_query(x)
    values = self.W_value(x)

    attn_scores = queries @ keys.T
    attn_weights = torch.softmax(
        attn_scores / keys.shape[-1]**0.5,
        dim=-1
    )
    context_vec = attn_weights @ values
    return context_vec



In [None]:
torch.manual_seed(789)
sa_v2 = SelfAttention(3, 2)
# print(sa_v2(final_embeddings))

In [None]:
queries = sa_v2.W_query(final_embeddings)
keys = sa_v2.W_key(final_embeddings)
values = sa_v2.W_value(final_embeddings)
attn_scores = queries @ keys.T
print(attn_scores)
attn_weights = torch.softmax(
    attn_scores / keys.shape[-1]**0.5,
    dim=-1
)
context_vec = attn_weights @ values
# print(context_vec)

tensor([[-0.2148, -0.5406,  0.0258, -0.4695,  0.2444, -0.0820],
        [ 2.8970,  1.2005, -0.0835, -1.2237,  1.8947,  2.6521],
        [ 0.5015,  0.2924, -0.0181, -0.1070,  0.2559,  0.4377],
        [ 2.5479,  1.3197, -0.0849, -0.7489,  1.4415,  2.2655],
        [ 1.8434,  0.8863, -0.0585, -0.6267,  1.1012,  1.6565],
        [ 2.4947,  1.0126, -0.0710, -1.0801,  1.6496,  2.2892]],
       grad_fn=<MmBackward0>)


# **Causal attention**

Only the previous tokens influence the next token prediction, hence attn_scores of future tokens for a given query should be handled before applying softmax normalization

In [None]:
x = torch.tril(attn_scores)

# Create a boolean mask where elements are zero
zero_mask = (x == 0)
print(zero_mask)

# Apply the mask to the matrix
attn_scores = x.masked_fill_(zero_mask, -1*torch.inf)

tensor([[False,  True,  True,  True,  True,  True],
        [False, False,  True,  True,  True,  True],
        [False, False, False,  True,  True,  True],
        [False, False, False, False,  True,  True],
        [False, False, False, False, False,  True],
        [False, False, False, False, False, False]])


In [None]:
attn_weights = torch.softmax(
    attn_scores / keys.shape[-1]**0.5,
    dim=-1
)
print(attn_weights)

tensor([[1.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000],
        [0.7685, 0.2315, 0.0000, 0.0000, 0.0000, 0.0000],
        [0.3914, 0.3376, 0.2710, 0.0000, 0.0000, 0.0000],
        [0.5980, 0.2509, 0.0929, 0.0581, 0.0000, 0.0000],
        [0.3945, 0.2005, 0.1028, 0.0688, 0.2334, 0.0000],
        [0.3324, 0.1166, 0.0542, 0.0265, 0.1829, 0.2875]],
       grad_fn=<SoftmaxBackward0>)


## Causal attention class handling multiple batches

In [None]:
class CausalAttention(nn.Module):
  def __init__(self, d_in, d_out, dropout, context_length, qkv_bias=False):
    super().__init__()
    self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias)
    self.W_key = nn.Linear(d_in, d_out, bias=qkv_bias)
    self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias)
    self.dropout = nn.Dropout(dropout)
    self.register_buffer('mask', torch.triu(torch.ones(context_length, context_length), diagonal=1)) # New


  def forward(self, x):
    b, num_tokens, d_in = x.shape # New batch dimension b
    keys = self.W_key(x)
    queries = self.W_query(x)
    values = self.W_value(x)

    attn_scores = queries @ keys.transpose(1,2)
    attn_scores.masked_fill_(  # New, _ ops are in-place
            self.mask.bool()[:num_tokens, :num_tokens], -torch.inf)  # `:num_tokens` to account for cases where the number of tokens in the batch is smaller than the supported context_size
    attn_weights = torch.softmax(
            attn_scores / keys.shape[-1]**0.5, dim=-1
        )
    attn_weights = self.dropout(attn_weights) # New
    context_vec = attn_weights @ values
    return context_vec

In [None]:
torch.triu(torch.ones(context_length, context_length), diagonal=1)

tensor([[0., 1., 1., 1., 1., 1.],
        [0., 0., 1., 1., 1., 1.],
        [0., 0., 0., 1., 1., 1.],
        [0., 0., 0., 0., 1., 1.],
        [0., 0., 0., 0., 0., 1.],
        [0., 0., 0., 0., 0., 0.]])

In [None]:
inputs = torch.tensor(
  [[0.43, 0.15, 0.89], # Your     (x^1)
   [0.55, 0.87, 0.66], # journey  (x^2)
   [0.57, 0.85, 0.64], # starts   (x^3)
   [0.22, 0.58, 0.33], # with     (x^4)
   [0.77, 0.25, 0.10], # one      (x^5)
   [0.05, 0.80, 0.55]] # step     (x^6)
)

batch = torch.stack((inputs, inputs), dim=0)
print(batch.shape)

torch.Size([2, 6, 3])


In [None]:
torch.manual_seed(123)
context_length = batch.shape[1]
d_in, d_out = 3, 2
ca = CausalAttention(d_in, d_out, 0.0, context_length)
context_vecs = ca(batch)
print("context_vecs.shape:", context_vecs.shape)
print(context_vecs)

context_vecs.shape: torch.Size([2, 6, 2])
tensor([[[-0.4519,  0.2216],
         [-0.5874,  0.0058],
         [-0.6300, -0.0632],
         [-0.5675, -0.0843],
         [-0.5526, -0.0981],
         [-0.5299, -0.1081]],

        [[-0.4519,  0.2216],
         [-0.5874,  0.0058],
         [-0.6300, -0.0632],
         [-0.5675, -0.0843],
         [-0.5526, -0.0981],
         [-0.5299, -0.1081]]], grad_fn=<UnsafeViewBackward0>)


In [None]:
class MultiHeadAttention(nn.Module):
  def __init__(self, d_in, d_out, num_heads, dropout, context_length, qkv_bias=False):
    super().__init__()
    self.heads = nn.ModuleList([CausalAttention(d_in, d_out, dropout, context_length, qkv_bias) for _ in range(num_heads)])

  def forward(self, x):
    return torch.cat([head(x) for head in self.heads], dim=-1)

In [None]:
torch.manual_seed(123)
context_length = batch.shape[1]
d_in, d_out = 3, 2
num_heads = 2
mha = MultiHeadAttention(d_in, d_out, num_heads, 0.0, context_length)
context_vecs = mha(batch)
print("context_vecs.shape:", context_vecs.shape)
print(context_vecs)

context_vecs.shape: torch.Size([2, 6, 4])
tensor([[[-0.4519,  0.2216,  0.4772,  0.1063],
         [-0.5874,  0.0058,  0.5891,  0.3257],
         [-0.6300, -0.0632,  0.6202,  0.3860],
         [-0.5675, -0.0843,  0.5478,  0.3589],
         [-0.5526, -0.0981,  0.5321,  0.3428],
         [-0.5299, -0.1081,  0.5077,  0.3493]],

        [[-0.4519,  0.2216,  0.4772,  0.1063],
         [-0.5874,  0.0058,  0.5891,  0.3257],
         [-0.6300, -0.0632,  0.6202,  0.3860],
         [-0.5675, -0.0843,  0.5478,  0.3589],
         [-0.5526, -0.0981,  0.5321,  0.3428],
         [-0.5299, -0.1081,  0.5077,  0.3493]]], grad_fn=<CatBackward0>)
