Q4) Create a transformer from scratch using the Pytorch library

In [None]:
!pip install torchinfo

Collecting torchinfo
  Downloading torchinfo-1.8.0-py3-none-any.whl.metadata (21 kB)
Downloading torchinfo-1.8.0-py3-none-any.whl (23 kB)
Installing collected packages: torchinfo
Successfully installed torchinfo-1.8.0


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import math

# Position-wise Feed Forward Network
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff, dropout=0.1):
        super(FeedForward, self).__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.relu = nn.ReLU()
        self.linear2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        return self.linear2(self.dropout(self.relu(self.linear1(x))))

# Scaled Dot-Product Attention
class ScaledDotProductAttention(nn.Module):
    def __init__(self, dropout=0.1):
        super(ScaledDotProductAttention, self).__init__()
        self.dropout = nn.Dropout(dropout)

    def forward(self, query, key, value, mask=None):
        score = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(query.size(-1))
        if mask is not None:
            score = score.masked_fill(mask == 0, -1e9)
        attention = torch.softmax(score, dim=-1)
        attention = self.dropout(attention)
        return torch.matmul(attention, value)

# Multi-Head Attention
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads, dropout=0.1):
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0
        self.d_k = d_model // num_heads
        self.num_heads = num_heads
        self.attn = ScaledDotProductAttention(dropout)

        self.query = nn.Linear(d_model, d_model)
        self.key = nn.Linear(d_model, d_model)
        self.value = nn.Linear(d_model, d_model)
        self.out = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, query, key, value, mask=None):
        batch_size = query.size(0)
        query = self.query(query).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        key = self.key(key).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        value = self.value(value).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)

        attention = self.attn(query, key, value, mask)
        attention = attention.transpose(1, 2).contiguous().view(batch_size, -1, self.num_heads * self.d_k)
        output = self.out(attention)
        return output

# Positional Encoding
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

# Encoder Layer
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super(EncoderLayer, self).__init__()
        self.attn = MultiHeadAttention(d_model, num_heads, dropout)
        self.ff = FeedForward(d_model, d_ff, dropout)
        self.layernorm1 = nn.LayerNorm(d_model)
        self.layernorm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        attn_output = self.attn(x, x, x, mask)
        x = self.layernorm1(x + self.dropout(attn_output))
        ff_output = self.ff(x)
        x = self.layernorm2(x + self.dropout(ff_output))
        return x

# Transformer Model
class Transformer(nn.Module):
    def __init__(self, d_model, num_heads, num_encoder_layers, d_ff, vocab_size, max_len=5000, dropout=0.1):
        super(Transformer, self).__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, max_len)

        self.encoder_layers = nn.ModuleList([
            EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_encoder_layers)
        ])

        self.fc_out = nn.Linear(d_model, vocab_size)

    def forward(self, src, src_mask=None):
        src = self.embedding(src) * math.sqrt(src.size(-1))
        src = self.positional_encoding(src)

        # Encoder pass
        memory = src
        for layer in self.encoder_layers:
            memory = layer(memory, src_mask)

        output = self.fc_out(memory)
        return output

# Model Initialization
d_model = 128
num_heads = 4
num_encoder_layers = 3
d_ff = 512
vocab_size = 100
seq_length = 20
batch_size = 32
epochs = 5

model = Transformer(d_model, num_heads, num_encoder_layers, d_ff, vocab_size)

# Optimizer and Loss Function
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

# Generate Random Training Data
def generate_data(batch_size, seq_length, vocab_size):
    return torch.randint(0, vocab_size, (batch_size, seq_length))

# Training Loop
for epoch in range(epochs):
    model.train()
    optimizer.zero_grad()

    src = generate_data(batch_size, seq_length, vocab_size)
    tgt = generate_data(batch_size, seq_length, vocab_size)

    output = model(src)

    loss = criterion(output.view(-1, vocab_size), tgt.view(-1))  # Reshape for loss function
    loss.backward()
    optimizer.step()

    print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}")

# Inference
model.eval()
test_input = generate_data(1, seq_length, vocab_size)  # Single test sequence
output = model(test_input)
predicted = torch.argmax(output, dim=-1)
print("\nPredicted Sequence:\n", predicted)


Epoch [1/5], Loss: 4.7972
Epoch [2/5], Loss: 4.8101
Epoch [3/5], Loss: 4.7753
Epoch [4/5], Loss: 4.7176
Epoch [5/5], Loss: 4.7441

Predicted Sequence:
 tensor([[68,  3, 56, 71, 67, 68, 67, 71, 84, 47, 68, 68, 81, 76, 68, 27, 76, 84,
         55,  3]])


In [None]:
print(model)

Transformer(
  (embedding): Embedding(100, 128)
  (positional_encoding): PositionalEncoding()
  (encoder_layers): ModuleList(
    (0-2): 3 x EncoderLayer(
      (attn): MultiHeadAttention(
        (attn): ScaledDotProductAttention(
          (dropout): Dropout(p=0.1, inplace=False)
        )
        (query): Linear(in_features=128, out_features=128, bias=True)
        (key): Linear(in_features=128, out_features=128, bias=True)
        (value): Linear(in_features=128, out_features=128, bias=True)
        (out): Linear(in_features=128, out_features=128, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
      )
      (ff): FeedForward(
        (linear1): Linear(in_features=128, out_features=512, bias=True)
        (relu): ReLU()
        (linear2): Linear(in_features=512, out_features=128, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
      )
      (layernorm1): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
      (layernorm2): LayerNorm((128,), eps=1e-05, el