<a href="https://colab.research.google.com/github/ZERO-70/Transformer/blob/main/Transformers_exercise.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Embedder for tokens

In [2]:
import torch
import math
import torch.nn as nn
import torch.nn.functional as F


class Embedder(nn.Module):
  def __init__(self, vocab_size:int, d_model:int):
    super().__init__()
    self.d_model = d_model
    self.vocab_size = vocab_size
    self.embedding = nn.Embedding(vocab_size, d_model)
  def forward(self, x):
    return self.embedding(x)* math.sqrt(self.d_model)


#Positional Encoder for Embeddings

In [3]:
class PositionalEncoder(nn.Module):
  def __init__(self,d_model,max_seq_len):
    super().__init__()
    self.d_model = d_model
    self.max_seq_len = max_seq_len
    pe = torch.zeros(max_seq_len,d_model)
    positions = torch.arange(max_seq_len).float().unsqueeze(1)
    denominator = torch.exp(
        torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)
    )
    pe[:, 0::2] = torch.sin(positions * denominator)
    pe[:, 1::2] = torch.cos(positions * denominator)
    # just to understand the dimensions
    print(pe.shape)
    pe = pe.unsqueeze(0)
    # just to understand the dimensions
    print(pe.shape)
    self.register_buffer("pe", pe)
  def forward(self, x):
    return x + self.pe[:, :x.size(1), :]
#printing the dimensions for understanding
pos = PositionalEncoder(512,200)

torch.Size([200, 512])
torch.Size([1, 200, 512])


In [4]:
position = torch.arange(0, 12, dtype=torch.float).unsqueeze(1)

print(position)

tensor([[ 0.],
        [ 1.],
        [ 2.],
        [ 3.],
        [ 4.],
        [ 5.],
        [ 6.],
        [ 7.],
        [ 8.],
        [ 9.],
        [10.],
        [11.]])


#MultiHeadAttention

In [5]:
class MultiHeadAttention(nn.Module):
  def __init__(self,d_model,num_heads):
    super().__init__()
    assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
    self.d_model = d_model
    self.num_heads = num_heads
    self.head_dim = d_model // num_heads
    self.QueryLinear = nn.Linear(d_model, d_model, bias = False)
    self.KeyLinear = nn.Linear(d_model, d_model, bias = False)
    self.ValueLinear = nn.Linear(d_model, d_model, bias = False)
    self.FinalLinear = nn.Linear(d_model, d_model, bias = False)

  def split_input_into_heads(self,x,batch_size):
    seq_length = x.size(1)
    x = x.reshape(batch_size, seq_length, self.num_heads, self.head_dim)
    return x.permute(0, 2, 1, 3)

  def compute_attention(self,query,key,value,mask=None):
    scores = torch.matmul(query,key.transpose(-2,-1))/(self.head_dim ** 0.5)
    if mask is not None:
      scores = scores.masked_fill(mask == 0, float('-inf'))
    attention_weights = torch.softmax(scores,dim=-1)
    return torch.matmul(attention_weights,value)

  def combine_attention(self,x,batch_size):
    x = x.permute(0, 2, 1, 3).contiguous()
    # -1 parameter means to figure out the dimesion
    x = x.reshape(batch_size, -1, self.d_model)
    return x

  def forward(self,query,key,value,mask = None):
    batch_size = query.size(0)
    query = self.split_input_into_heads(self.QueryLinear(query),batch_size)
    key = self.split_input_into_heads(self.KeyLinear(key),batch_size)
    value = self.split_input_into_heads(self.ValueLinear(value),batch_size)
    attention = self.compute_attention(query,key,value,mask)
    reordered_attention = self.combine_attention(attention,batch_size)
    return self.FinalLinear(reordered_attention)


#FeedForward

In [6]:
class FeedForward(nn.Module):
  def __init__(self,d_model,d_ff):
    super().__init__()
    self.linear1 = nn.Linear(d_model,d_ff)
    self.linear2 = nn.Linear(d_ff,d_model)
    self.relu = nn.ReLU()
  def forward(self,x):
    return self.linear2(self.relu(self.linear1(x)))

#Encoder

In [7]:
class Encoder(nn.Module):
  def __init__(self,d_model,num_heads,d_ff,drop_out):
    super().__init__()
    self.attention_block = MultiHeadAttention(d_model,num_heads)
    self.feed_block = FeedForward(d_model,d_ff)
    self.layer_norm1 = nn.LayerNorm(d_model)
    self.layer_norm2 = nn.LayerNorm(d_model)
    self.drop_out = nn.Dropout(drop_out)
  def forward(self,x,mask):
    attention_output = self.attention_block(x,x,x,mask)
    x = self.layer_norm1(x + self.drop_out(attention_output))
    feed_output = self.feed_block(x)
    return self.layer_norm2(x + self.drop_out(feed_output))

#Wraper for Multiple Encoders

In [8]:
class TransformerEncoder(nn.Module):
  def __init__(self,vocab_size,d_model,num_layers,num_heads,d_ff,drop_out,max_seq_len):
    super().__init__()
    self.embedder = Embedder(vocab_size,d_model)
    self.positional_encoder = PositionalEncoder(d_model,max_seq_len)
    self.encoder_layers = nn.ModuleList([Encoder(d_model,num_heads,d_ff,drop_out) for _ in range(num_layers)])
    self.drop_out = nn.Dropout(drop_out)

  def forward(self,x,mask):
    x = self.embedder(x)
    x = self.positional_encoder(x)
    x = self.drop_out(x)
    for layer in self.encoder_layers:
      x = layer(x,mask)
    return x

#Classifier

In [9]:
import torch.nn.functional as F
class ClassifierHead(nn.Module):
  def __init__(self, d_model,num_classes):
    super().__init__()
    self.linear = nn.Linear(d_model,num_classes)

  def forward(self,x):
    logits = self.linear(x)
    return F.log_softmax(logits,dim = -1)


#Training Encoder on Yelp-full-review dataset

In [9]:
!pip install datasets



##imports and locking in seed for same results each time trained.

In [10]:
!pip install transformers
import torch
import torch.nn as nn
import random
import numpy as np
from torch.utils.data import DataLoader


# 1.1 Set seeds for reproducibility (so you can compare runs)
SEED = 42
torch.manual_seed(SEED)
random.seed(SEED)
np.random.seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)



In [11]:
!pip install --upgrade datasets huggingface_hub transformers



In [12]:
from datasets import load_dataset
import re
from collections import Counter
from transformers import BertTokenizerFast

raw_ds = load_dataset("yelp_review_full", streaming=True)

tokenizer = BertTokenizerFast.from_pretrained("bert-base-uncased")

VOCAB_SIZE = tokenizer.vocab_size

print("vocab size of pretrained tokenizer : ",VOCAB_SIZE)

MAX_LEN = 128

class YelpDataset(torch.utils.data.Dataset):
  def __init__(self,split):
    self.texts = raw_ds[split]["text"]
    self.labels = raw_ds[split]["label"]
  def __len__(self):
    return len(self.texts)
  def __getitem__(self,idx):
    enc = tokenizer(
        self.texts[idx],
        truncation=True,
        padding="max_length",
        max_length=MAX_LEN,
        return_tensors="pt",
    )
    return {
        "input_ids" : enc["input_ids"].squeeze(),
        "attention_mask" : enc["attention_mask"].squeeze(),
        "label" : torch.tensor(self.labels[idx])
    }

BATCH_SIZE = 32
NUM_WORKERS = 8
train_ds = YelpDataset("train")
test_ds = YelpDataset("test")

train_loader = DataLoader(
    train_ds,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=NUM_WORKERS,
)
test_loader = DataLoader(
    test_ds, batch_size=BATCH_SIZE,
    num_workers=NUM_WORKERS, pin_memory=True
)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md: 0.00B [00:00, ?B/s]

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

vocab size of pretrained tokenizer :  30522


NameError: name 'DataLoader' is not defined

In [None]:
D_MODEL = 128
NUM_LAYERS = 2
NUM_HEADS = 8
D_FF = 512
DROPOUT = 0.1
NUM_CLASSES = 5

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

trans_model = TransformerEncoder(
    vocab_size = VOCAB_SIZE,
    d_model = D_MODEL,
    num_layers = NUM_LAYERS,
    num_heads=NUM_HEADS,
    d_ff = D_FF,
    drop_out = DROPOUT,
    max_seq_len = MAX_LEN
).to(device)

class_model = ClassifierHead(D_MODEL,NUM_CLASSES).to(device)


optm = torch.optim.Adam(
    list(trans_model.parameters()) + list(class_model.parameters()), lr=0.001
)

criterion = nn.NLLLoss()

In [None]:
EPOCHS = 3
for epoch in range(1,EPOCHS+1):
  trans_model.train()
  class_model.train()
  total_loss = 0

  for batch in train_loader:
    input_ids = batch["input_ids"].to(device)
    attn_mask = batch["attention_mask"].to(device)
    labels = batch["label"].to(device)
    mask = attn_mask.unsqueeze(1).unsqueeze(2)
    optm.zero_grad()
    enc_output = trans_model(input_ids,mask)
    cls_vec = enc_output[:, 0, :]
    logits = class_model(cls_vec)
    loss = criterion(logits,labels)
    loss.backward()
    optm.step()
    total_loss += loss.item()
  avg = total_loss / len(train_loader)
  print(f"Epoch {epoch} — Avg Train Loss: {avg:.4f}")



#Decoder

In [10]:
class Decoder(nn.Module):
  def __init__(self,d_model,num_heads,d_ff,drop_out):
    super().__init__()
    self.attention_block = MultiHeadAttention(d_model,num_heads)
    self.feed_block = FeedForward(d_model,d_ff)
    self.layer_norm1 = nn.LayerNorm(d_model)
    self.layer_norm2 = nn.LayerNorm(d_model)
    self.drop_out = nn.Dropout(drop_out)
  def forward(self,x,tgk_mask):
    attention_output = self.attention_block(x,x,x,tgk_mask)
    x = self.layer_norm1(x + self.drop_out(attention_output))
    feed_output = self.feed_block(x)
    return self.layer_norm2(x + self.drop_out(feed_output))

# this is what the mask looks like
seq_length = 128
tgt_mask = (1 - torch.triu(
  torch.ones(1, seq_length, seq_length), diagonal=1)
).bool()

#Wrapper for Multiple Decoders

In [11]:
class TransformerDecoder(nn.Module):
  def __init__(self,vocab_size,d_model,num_layers,num_heads,d_ff,drop_out,max_seq_len):
    super().__init__()
    self.embedder = Embedder(vocab_size,d_model)
    self.positional_encoder = PositionalEncoder(d_model,max_seq_len)
    self.encoder_layers = nn.ModuleList([Decoder(d_model,num_heads,d_ff,drop_out) for _ in range(num_layers)])
    self.linear = nn.Linear(d_model,vocab_size)
  def forward(self,x,mask):
    x = self.embedder(x)
    x = self.positional_encoder(x)
    for layer in self.encoder_layers:
      x = layer(x,mask)
    x = self.linear(x)
    return F.log_softmax(x,dim = -1)

#Decoder and Encoder Integration

##Decoder

In [12]:
class ExtendedDecoder(nn.Module):
  def __init__(self,d_model,num_heads,d_ff,drop_out):
    super().__init__()
    self.attention_block = MultiHeadAttention(d_model,num_heads)
    self.cross_attention_block = MultiHeadAttention(d_model,num_heads)
    self.feed_block = FeedForward(d_model,d_ff)
    self.layer_norm1 = nn.LayerNorm(d_model)
    self.layer_norm2 = nn.LayerNorm(d_model)
    self.layer_norm3 = nn.LayerNorm(d_model)
    self.drop_out = nn.Dropout(drop_out)
  def forward(self,x,y,tgk_mask,cross_mask):
    attention_output = self.attention_block(x,x,x,tgk_mask)
    x = self.layer_norm1(x + self.drop_out(attention_output))
    cross_attention_output = self.cross_attention_block(x,y,y,cross_mask)
    x = self.layer_norm2(x + self.drop_out(cross_attention_output))
    feed_output = self.feed_block(x)
    return self.layer_norm3(x + self.drop_out(feed_output))

# this is what the mask looks like
seq_length = 128
tgt_mask = (1 - torch.triu(
  torch.ones(1, seq_length, seq_length), diagonal=1)
).bool()

##Wrapper for Multiple Decoders

In [13]:
class ExtendedTransformerDecoder(nn.Module):
  def __init__(self,vocab_size,d_model,num_layers,num_heads,d_ff,drop_out,max_seq_len):
    super().__init__()
    self.embedder = Embedder(vocab_size,d_model)
    self.positional_encoder = PositionalEncoder(d_model,max_seq_len)
    self.encoder_layers = nn.ModuleList([ExtendedDecoder(d_model,num_heads,d_ff,drop_out) for _ in range(num_layers)])
    self.linear = nn.Linear(d_model,vocab_size)
  def forward(self,x,y,tgk_mask,cross_mask):
    x = self.embedder(x)
    x = self.positional_encoder(x)
    for layer in self.encoder_layers:
      x = layer(x,y,tgk_mask,cross_mask)
    x = self.linear(x)
    return F.log_softmax(x,dim = -1)

##Transformer

In [14]:
class Transformer(nn.Module):
  def __init__(self,vocab_size,d_model,num_heads,
               num_layers,d_ff, max_seq_len, dropout):
    super().__init__()
    self.encoder = TransformerEncoder(vocab_size, d_model, num_layers, num_heads, d_ff, dropout, max_seq_len)
    self.decoder = ExtendedTransformerDecoder(vocab_size, d_model, num_layers, num_heads, d_ff, dropout, max_seq_len)

  def forward(self,x,src_mask,tgt_mask,cross_mask):
    encoder_output = self.encoder(x,src_mask)
    decoder_output = self.decoder(x,encoder_output,tgt_mask,cross_mask)
    return decoder_output