<a href="https://colab.research.google.com/github/AbhinavaReddy-hub/learning-DL-/blob/main/Dl_03.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

# ----- Data and Tokenization Functions -----

# sentence_en = "I love AI ."
# sentence_fr = "J' adore l'IA ."

# word_map_en = {"<pad>": 0, "I": 1, "love": 2, "AI": 3, ".": 4}
# word_map_fr = {"<pad>": 0, "J'": 1, "adore": 2, "l'IA": 3, ".": 4}
sentence_en = "I love AI ."
sentence_fr = "<pad> मुझे एआई से प्यार है |"

word_map_en = {"<pad>": 0, "I": 1, "love": 2, "AI": 3, ".": 4}
word_map_fr = {"<pad>": 0, "मुझे": 1, "एआई": 2, "से": 3, "प्यार": 4, "है": 5, "|": 6}


def tokenize(sentence, word_map):
    tokens = [word_map[word] for word in sentence.split()]
    return torch.tensor(tokens, dtype=torch.long)

# For testing, create input and target tensors
input_tensor = tokenize(sentence_en, word_map_en).unsqueeze(0)
target_tensor = tokenize(sentence_fr, word_map_fr).unsqueeze(0)
print("Input tensor:", input_tensor.shape, input_tensor)
print("Target tensor:", target_tensor.shape, target_tensor)

# ----- Positional Encoding -----

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.encoding = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        self.encoding[:, 0::2] = torch.sin(position * div_term)
        self.encoding[:, 1::2] = torch.cos(position * div_term)
        self.encoding = self.encoding.unsqueeze(0)

    def forward(self, x):
        return x + self.encoding[:, :x.size(1)]

# ----- Multi-Head Attention -----

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model
        self.d_k = d_model // num_heads
        self.d_v = d_model // num_heads

        self.query = nn.Linear(d_model, d_model)
        self.key = nn.Linear(d_model, d_model)
        self.value = nn.Linear(d_model, d_model)
        self.fc = nn.Linear(d_model, d_model)

    # def forward(self, query, key, value, mask=None):
    def forward(self, x, mask=None):

        batch_size = x.size(0)

        # Project the inputs
        q = self.query(x).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        k = self.key(x).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        v = self.value(x).view(batch_size, -1, self.num_heads, self.d_v).transpose(1, 2)

        # Compute attention scores
        attn_scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.d_k)

        if mask is not None:
            attn_scores = attn_scores.masked_fill(mask == 0, float('-inf'))

        attn_weights = F.softmax(attn_scores, dim=-1)
        attention_output = torch.matmul(attn_weights, v)
        attention_output = attention_output.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)
        output = self.fc(attention_output)
        return output

# ----- Feed Forward Network -----
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff=2048):
        super(FeedForward, self).__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
    def forward(self, x):
        return self.fc2(F.relu(self.fc1(x)))

# ----- Encoder Layer -----

class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff=2048):
        super(EncoderLayer, self).__init__()
        self.multihead_attn = MultiHeadAttention(d_model, num_heads)
        self.feedforward = FeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)

    def forward(self, x, mask=None):
        attn_output = self.multihead_attn(x, mask)
        x = self.norm1(x + attn_output)
        ff_output = self.feedforward(x)
        x = self.norm2(x + ff_output)
        return x

# ----- Decoder Layer -----

class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff=2048):
        super(DecoderLayer, self).__init__()
        self.multihead_attn1 = MultiHeadAttention(d_model, num_heads)
        self.multihead_attn2 = MultiHeadAttention(d_model, num_heads)
        self.feedforward = FeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)

    def forward(self, x, encoder_output, tgt_mask=None, src_mask=None):
        attn_output1 = self.multihead_attn1(x, tgt_mask)
        x = self.norm1(x + attn_output1)
        attn_output2 = self.multihead_attn2(x, src_mask)
        x = self.norm2(x + attn_output2)
        ff_output = self.feedforward(x)
        x = self.norm3(x + ff_output)
        return x

# ----- Transformer Model -----

class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_encoder_layers, num_decoder_layers, max_len=5000):
        super(Transformer, self).__init__()
        self.encoder_embed = nn.Embedding(src_vocab_size, d_model)
        self.decoder_embed = nn.Embedding(tgt_vocab_size, d_model)
        self.pos_encoder = PositionalEncoding(d_model, max_len)
        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads) for _ in range(num_encoder_layers)])
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads) for _ in range(num_decoder_layers)])
        self.fc_out = nn.Linear(d_model, tgt_vocab_size)

    def forward(self, src, tgt, tgt_mask=None, src_mask=None):
        src_emb = self.pos_encoder(self.encoder_embed(src))
        tgt_emb = self.pos_encoder(self.decoder_embed(tgt))

        encoder_output = src_emb
        for layer in self.encoder_layers:
            encoder_output = layer(encoder_output, src_mask)

        decoder_output = tgt_emb
        for layer in self.decoder_layers:
            decoder_output = layer(decoder_output, encoder_output, tgt_mask, src_mask)

        return self.fc_out(decoder_output)

# ----- Translation Function with Autoregressive Decoding -----
def translate(input_sentence, word_map_en, word_map_fr, transformer, max_len=10):
    transformer.eval()
    input_tensor = tokenize(input_sentence, word_map_en).unsqueeze(0)
    device = next(transformer.parameters()).device
    input_tensor = input_tensor.to(device)

    # Initialize target with <pad> tokens
    target = torch.zeros((1, max_len), dtype=torch.long).to(device)

    reverse_word_map_fr = {v: k for k, v in word_map_fr.items()}

    for i in range(max_len - 1):
        current_len = i + 1
        # Create causal mask for the current sequence
        tgt_mask = torch.tril(torch.ones(current_len, current_len)).bool().unsqueeze(0).unsqueeze(0).to(device)

        # Generate output for current target sequence
        output = transformer(input_tensor, target[:, :current_len], tgt_mask=tgt_mask)
        next_token = output.argmax(-1)[:, -1]
        target[:, i+1] = next_token

        # Stop if end token is generated
        if next_token.item() == word_map_fr["|"]:
            break

    # Convert token indices to words
    translated_tokens = target[0].cpu().tolist()
    translated_sentence = []
    for token in translated_tokens:
        if token == word_map_fr["<pad>"]:
            continue
        translated_sentence.append(reverse_word_map_fr[token])
        if token == word_map_fr["|"]:
            break
    return " ".join(translated_sentence)

# ----- Model Instantiation -----

vocab_size_en = len(word_map_en)
vocab_size_fr = len(word_map_fr)
d_model = 8
num_heads = 2  # d_model must be divisible by num_heads
num_encoder_layers = 2
num_decoder_layers = 2

transformer = Transformer(
    src_vocab_size=vocab_size_en,
    tgt_vocab_size=vocab_size_fr,
    d_model=d_model,
    num_heads=num_heads,
    num_encoder_layers=num_encoder_layers,
    num_decoder_layers=num_decoder_layers
)

# Test translation
translated = translate("I love AI .", word_map_en, word_map_fr, transformer)
print("Translated sentence:", translated)

Input tensor: torch.Size([1, 4]) tensor([[1, 2, 3, 4]])
Target tensor: torch.Size([1, 7]) tensor([[0, 1, 2, 3, 4, 5, 6]])
Translated sentence: मुझे मुझे मुझे मुझे |


In [None]:
import torch.optim as optim
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_encoder_layers, num_decoder_layers, max_len=5000):
        super(Transformer, self).__init__()
        self.encoder_embed = nn.Embedding(src_vocab_size, d_model)
        self.decoder_embed = nn.Embedding(tgt_vocab_size, d_model)
        self.pos_encoder = PositionalEncoding(d_model, max_len)
        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads) for _ in range(num_encoder_layers)])
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads) for _ in range(num_decoder_layers)])
        self.fc_out = nn.Linear(d_model, tgt_vocab_size)

    def forward(self, src, tgt, tgt_mask=None, src_mask=None):
        src_emb = self.pos_encoder(self.encoder_embed(src))
        tgt_emb = self.pos_encoder(self.decoder_embed(tgt))

        encoder_output = src_emb
        for layer in self.encoder_layers:
            encoder_output = layer(encoder_output, src_mask)

        decoder_output = tgt_emb
        for layer in self.decoder_layers:
            decoder_output = layer(decoder_output, encoder_output, tgt_mask, src_mask)

        return self.fc_out(decoder_output)

# ----- Training Loop -----
def train_model(model, input_tensor, target_tensor, epochs=1000):
    criterion = nn.CrossEntropyLoss(ignore_index=0)  # Ignore padding
    optimizer = optim.Adam(model.parameters(), lr=0.0001)

    for epoch in range(epochs):
        optimizer.zero_grad()
        output = model(input_tensor, target_tensor[:, :-1])  # Exclude last token
        loss = criterion(output.view(-1, output.size(-1)), target_tensor[:, 1:].contiguous().view(-1))
        loss.backward()
        optimizer.step()

        if (epoch + 1) % 100 == 0:
            print(f"Epoch {epoch+1}, Loss: {loss.item()}")

# ----- Model Instantiation & Training -----
vocab_size_en = len(word_map_en)
vocab_size_fr = len(word_map_fr)
d_model = 32  # Increased model capacity
num_heads = 4
num_encoder_layers = 2
num_decoder_layers = 2

transformer = Transformer(
    src_vocab_size=vocab_size_en,
    tgt_vocab_size=vocab_size_fr,
    d_model=d_model,
    num_heads=num_heads,
    num_encoder_layers=num_encoder_layers,
    num_decoder_layers=num_decoder_layers
)

# Train the model on the single example
train_model(transformer, input_tensor, target_tensor, epochs=1000)

# ----- Test Translation -----
translated = translate("I love AI .", word_map_en, word_map_fr, transformer)
print("Translated sentence:", translated)

Epoch 100, Loss: 0.16641046106815338
Epoch 200, Loss: 0.13167111575603485
Epoch 300, Loss: 0.10554251074790955
Epoch 400, Loss: 0.08544322103261948
Epoch 500, Loss: 0.06999007612466812
Epoch 600, Loss: 0.058014173060655594
Epoch 700, Loss: 0.048628564924001694
Epoch 800, Loss: 0.0411829836666584
Epoch 900, Loss: 0.03520451486110687
Epoch 1000, Loss: 0.030348151922225952
Translated sentence: मुझे एआई से प्यार है |
