<a href="https://colab.research.google.com/github/AkHiLdEvGoD/DeepLearning-Algorithms/blob/main/Transformer_Encoder.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [36]:
import torch
import torch.nn as nn
import torch.optim as optim
import math
from tqdm import tqdm

In [20]:
def attention(Q,K,V):
  d_k = Q.size(-1)
  scores = torch.matmul(Q,K.transpose(-2,-1))/math.sqrt(d_k)
  weights = torch.softmax(scores,dim=-1)
  return torch.matmul(weights,V)

In [21]:
class MultiHeadAttention(nn.Module):
  def __init__(self,n_heads,d_model):
    super().__init__()
    assert d_model % n_heads == 0
    self.d_head = d_model // n_heads
    self.h = n_heads
    self.linear_Q = nn.Linear(d_model,d_model)
    self.linear_K = nn.Linear(d_model,d_model)
    self.linear_V = nn.Linear(d_model,d_model)
    self.final_projection = nn.Linear(d_model,d_model)

  def forward(self,x):
    B,T,D = x.size()
    Q_head =  self.linear_Q(x).view(B,T,self.h,self.d_head).transpose(1,2)  #(B,H,T,D)
    K_head =  self.linear_K(x).view(B,T,self.h,self.d_head).transpose(1,2)  #(B,H,T,D)
    V_head =  self.linear_V(x).view(B,T,self.h,self.d_head).transpose(1,2)  #(B,H,T,D)

    x = attention(Q_head,K_head,V_head)  #(B,H,T,D)
    x = x.transpose(1,2).contiguous().view(B,T,D)
    return self.final_projection(x)

In [29]:
class FeedForward(nn.Module):
  def __init__(self,d_model,d_ff,dropout=0.1):
    super().__init__()
    self.ff = nn.Sequential(
        nn.Linear(d_model,d_ff),
        nn.ReLU(),
        nn.Linear(d_ff,d_model),
        nn.Dropout(dropout)
    )

  def forward(self,x):
    return self.ff(x)

In [30]:
class EncoderLayer(nn.Module):
  def __init__(self,d_model,heads,d_ff,dropout=0.1):
    super().__init__()
    self.Attention = MultiHeadAttention(heads,d_model)
    self.FeedForward = FeedForward(d_model,d_ff,dropout)
    self.norm1 = nn.LayerNorm(d_model)
    self.norm2 = nn.LayerNorm(d_model)
    self.dropout = nn.Dropout(dropout)

  def forward(self,x):
    Z_norm = self.norm1(x + self.dropout(self.Attention(x)))
    Y_norm = self.norm2(Z_norm + self.dropout(self.FeedForward(x)))
    return Y_norm

In [31]:
class PositionalEncoding(nn.Module):
  def __init__(self,embd_dims,max_len=5000):
    super().__init__()
    self.pe = torch.zeros(max_len,embd_dims)
    pos = torch.arange(0,max_len).float().unsqueeze(1)
    den = torch.exp(torch.arange(0,embd_dims,2)* -(math.log(10000.0)/embd_dims))
    self.pe[:,0::2] = torch.sin(pos*den)
    self.pe[:,1::2] = torch.cos(pos*den)
    self.pe = self.pe.unsqueeze(0)

  def forward(self,x):
    return x + self.pe[:,:x.size(1)]

In [32]:
class Encoder(nn.Module):
  def __init__(self,d_model,heads,d_ff,vocab_size,dropouts=0.1,max_len=100,N_layers=4):
    super().__init__()
    self.embed = nn.Embedding(vocab_size,d_model)
    self.p_enc = PositionalEncoding(d_model,max_len)
    self.layers = nn.ModuleList([EncoderLayer(d_model,heads,d_ff,dropouts) for layer in range(N_layers)])
    self.norm = nn.LayerNorm(d_model)

  def forward(self,src):
    embedded = self.embed(src) * math.sqrt(self.embed.embedding_dim)
    x = self.p_enc(embedded)
    for layer in self.layers:
      x = layer(x)
    return self.norm(x)

In [33]:
def generate_data(num_samples, seq_len=10, vocab_size=50):
    X = torch.randint(1, vocab_size, (num_samples, seq_len))
    y = (X.sum(dim=1) % 2).long()  # 0 if sum even, 1 if odd
    return X, y

In [34]:
class EncoderClassifier(nn.Module):
  def __init__(self,d_model=128,heads=4,d_ff=256,vocab_size=50,n_classes=2):
    super().__init__()
    self.encoder = Encoder(d_model,heads,d_ff,vocab_size)
    self.fc_out = nn.Linear(d_model,n_classes)

  def forward(self,src):
    enc_out = self.encoder(src)
    cls_token = enc_out[:,0,:]
    return self.fc_out(cls_token)

In [41]:
model = EncoderClassifier()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(),lr=1e-3)

X,y = generate_data(num_samples=100)
train_data = list(zip(X,y))

epochs=8
for epoch in range(epochs):
  total_loss = 0
  for xb,yb in tqdm(train_data):
    xb,yb = xb.unsqueeze(0),yb.unsqueeze(0)
    optimizer.zero_grad()
    pred = model(xb)
    loss = criterion(pred,yb)
    loss.backward()
    optimizer.step()
    total_loss += loss.item()
  print(f'\nEpoch {epoch+1}, Loss: {total_loss/len(train_data):.2f}')

100%|██████████| 100/100 [00:02<00:00, 48.69it/s]



Epoch 1, Loss: 0.84


100%|██████████| 100/100 [00:01<00:00, 56.98it/s]



Epoch 2, Loss: 0.72


100%|██████████| 100/100 [00:01<00:00, 56.63it/s]



Epoch 3, Loss: 0.64


100%|██████████| 100/100 [00:01<00:00, 57.10it/s]



Epoch 4, Loss: 0.62


100%|██████████| 100/100 [00:01<00:00, 56.71it/s]



Epoch 5, Loss: 0.58


100%|██████████| 100/100 [00:01<00:00, 56.21it/s]



Epoch 6, Loss: 0.45


100%|██████████| 100/100 [00:02<00:00, 36.94it/s]



Epoch 7, Loss: 0.43


100%|██████████| 100/100 [00:01<00:00, 55.09it/s]


Epoch 8, Loss: 0.39



