<a href="https://colab.research.google.com/github/amitgupta226571/DEEP-LEARNING-/blob/main/Experiment_7.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import math
import random
import numpy as np

from torch.utils.data import Dataset, DataLoader
from collections import Counter
from nltk.translate.bleu_score import sentence_bleu

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
def load_data(path, max_samples=10000):
    pairs = []
    with open(path, encoding='utf-8') as f:
        for line in f:
            eng, spa = line.strip().split('\t')
            pairs.append((eng.lower(), spa.lower()))
            if len(pairs) >= max_samples:
                break
    return pairs

data = load_data("spa.txt")

In [None]:
random.shuffle(data)

train_size = int(0.8 * len(data))
val_size = int(0.1 * len(data))

train_data = data[:train_size]
val_data = data[train_size:train_size+val_size]
test_data = data[train_size+val_size:]

In [None]:
PAD = "<pad>"
SOS = "<sos>"
EOS = "<eos>"
UNK = "<unk>"

In [None]:
def build_vocab(sentences):
    counter = Counter()
    for sentence in sentences:
        counter.update(sentence.split())

    vocab = [PAD, SOS, EOS, UNK] + list(counter.keys())
    word2idx = {word: idx for idx, word in enumerate(vocab)}
    idx2word = {idx: word for word, idx in word2idx.items()}
    return word2idx, idx2word

In [None]:
src_sentences = [pair[0] for pair in train_data]
tgt_sentences = [pair[1] for pair in train_data]

src_word2idx, src_idx2word = build_vocab(src_sentences)
tgt_word2idx, tgt_idx2word = build_vocab(tgt_sentences)

src_vocab_size = len(src_word2idx)
tgt_vocab_size = len(tgt_word2idx)

print("Source vocab:", src_vocab_size)
print("Target vocab:", tgt_vocab_size)

Source vocab: 2994
Target vocab: 5525


In [None]:
def encode(sentence, word2idx):
    tokens = sentence.split()
    encoded = [word2idx.get(w, word2idx[UNK]) for w in tokens]
    return [word2idx[SOS]] + encoded + [word2idx[EOS]]

In [None]:
class TranslationDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        src, tgt = self.data[idx]
        src_encoded = torch.tensor(encode(src, src_word2idx))
        tgt_encoded = torch.tensor(encode(tgt, tgt_word2idx))
        return src_encoded, tgt_encoded

In [None]:
def collate_fn(batch):
    src_batch, tgt_batch = zip(*batch)

    src_lens = [len(s) for s in src_batch]
    tgt_lens = [len(t) for t in tgt_batch]

    max_src = max(src_lens)
    max_tgt = max(tgt_lens)

    padded_src = []
    padded_tgt = []

    for src, tgt in zip(src_batch, tgt_batch):
        padded_src.append(
            torch.cat([src, torch.full((max_src-len(src),), src_word2idx[PAD])])
        )
        padded_tgt.append(
            torch.cat([tgt, torch.full((max_tgt-len(tgt),), tgt_word2idx[PAD])])
        )

    return torch.stack(padded_src), torch.stack(padded_tgt)

In [None]:
train_loader = DataLoader(TranslationDataset(train_data),
                          batch_size=32,
                          shuffle=True,
                          collate_fn=collate_fn)

val_loader = DataLoader(TranslationDataset(val_data),
                        batch_size=32,
                        collate_fn=collate_fn)

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) *
                             -(math.log(10000.0) / d_model))

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        pe = pe.unsqueeze(0)
        self.register_buffer("pe", pe)

    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

In [None]:
def scaled_dot_product(Q, K, V, mask=None):
    d_k = Q.size(-1)
    scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(d_k)

    if mask is not None:
        scores = scores.masked_fill(mask == 0, -1e9)

    attn = torch.softmax(scores, dim=-1)
    return torch.matmul(attn, V)

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, heads):
        super().__init__()
        self.d_model = d_model
        self.heads = heads
        self.d_k = d_model // heads

        self.q_linear = nn.Linear(d_model, d_model)
        self.k_linear = nn.Linear(d_model, d_model)
        self.v_linear = nn.Linear(d_model, d_model)
        self.out = nn.Linear(d_model, d_model)

    def forward(self, Q, K, V, mask=None):
        batch_size = Q.size(0)

        Q = self.q_linear(Q).view(batch_size, -1, self.heads, self.d_k)
        K = self.k_linear(K).view(batch_size, -1, self.heads, self.d_k)
        V = self.v_linear(V).view(batch_size, -1, self.heads, self.d_k)

        Q = Q.transpose(1, 2)
        K = K.transpose(1, 2)
        V = V.transpose(1, 2)

        scores = scaled_dot_product(Q, K, V, mask)

        concat = scores.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)
        return self.out(concat)

In [None]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, heads, d_ff):
        super().__init__()
        self.attn = MultiHeadAttention(d_model, heads)
        self.norm1 = nn.LayerNorm(d_model)

        self.ff = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Linear(d_ff, d_model)
        )
        self.norm2 = nn.LayerNorm(d_model)

    def forward(self, x, mask):
        x = self.norm1(x + self.attn(x, x, x, mask))
        x = self.norm2(x + self.ff(x))
        return x

In [None]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, heads, d_ff):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, heads)
        self.norm1 = nn.LayerNorm(d_model)

        self.cross_attn = MultiHeadAttention(d_model, heads)
        self.norm2 = nn.LayerNorm(d_model)

        self.ff = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Linear(d_ff, d_model)
        )
        self.norm3 = nn.LayerNorm(d_model)

    def forward(self, x, enc_out, src_mask, tgt_mask):
        x = self.norm1(x + self.self_attn(x, x, x, tgt_mask))
        x = self.norm2(x + self.cross_attn(x, enc_out, enc_out, src_mask))
        x = self.norm3(x + self.ff(x))
        return x

In [None]:
class Transformer(nn.Module):
    def __init__(self,
                 src_vocab,
                 tgt_vocab,
                 d_model=256,
                 heads=8,
                 d_ff=512,
                 num_layers=3):

        super().__init__()

        self.src_embed = nn.Embedding(src_vocab, d_model)
        self.tgt_embed = nn.Embedding(tgt_vocab, d_model)

        self.pos = PositionalEncoding(d_model)

        self.encoder = nn.ModuleList(
            [EncoderLayer(d_model, heads, d_ff) for _ in range(num_layers)]
        )

        self.decoder = nn.ModuleList(
            [DecoderLayer(d_model, heads, d_ff) for _ in range(num_layers)]
        )

        self.fc_out = nn.Linear(d_model, tgt_vocab)

    def make_src_mask(self, src):
        return (src != src_word2idx[PAD]).unsqueeze(1).unsqueeze(2)

    def make_tgt_mask(self, tgt):
        batch_size, tgt_len = tgt.shape
        pad_mask = (tgt != tgt_word2idx[PAD]).unsqueeze(1).unsqueeze(2)

        no_peak_mask = torch.tril(torch.ones((tgt_len, tgt_len))).bool()
        no_peak_mask = no_peak_mask.to(device)

        return pad_mask & no_peak_mask

    def forward(self, src, tgt):
        src_mask = self.make_src_mask(src)
        tgt_mask = self.make_tgt_mask(tgt)

        src = self.pos(self.src_embed(src))
        tgt = self.pos(self.tgt_embed(tgt))

        for layer in self.encoder:
            src = layer(src, src_mask)

        enc_out = src

        for layer in self.decoder:
            tgt = layer(tgt, enc_out, src_mask, tgt_mask)

        return self.fc_out(tgt)

In [None]:
model = Transformer(src_vocab_size, tgt_vocab_size).to(device)

criterion = nn.CrossEntropyLoss(ignore_index=tgt_word2idx[PAD])
optimizer = optim.Adam(model.parameters(), lr=0.0003)

In [None]:
def train_epoch(model, loader):
    model.train()
    total_loss = 0

    for src, tgt in loader:
        src, tgt = src.to(device), tgt.to(device)

        optimizer.zero_grad()

        output = model(src, tgt[:, :-1])
        loss = criterion(
            output.reshape(-1, output.shape[-1]),
            tgt[:, 1:].reshape(-1)
        )

        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    return total_loss / len(loader)

In [None]:
import time
for epoch in range(10):
    start = time.time()
    loss = train_epoch(model, train_loader)
    print(f"Epoch {epoch+1}, Loss: {loss:.4f}, Time: {time.time()-start:.2f}s")

Epoch 1, Loss: 5.0427, Time: 46.96s
Epoch 2, Loss: 3.5355, Time: 40.97s
Epoch 3, Loss: 2.7072, Time: 39.57s
Epoch 4, Loss: 2.0389, Time: 46.10s
Epoch 5, Loss: 1.4954, Time: 41.09s
Epoch 6, Loss: 1.0641, Time: 41.38s
Epoch 7, Loss: 0.7384, Time: 42.97s
Epoch 8, Loss: 0.5271, Time: 41.02s
Epoch 9, Loss: 0.4079, Time: 38.83s
Epoch 10, Loss: 0.3600, Time: 44.51s


In [None]:
def translate(model, sentence):
    model.eval()

    src = torch.tensor([encode(sentence, src_word2idx)]).to(device)

    tgt = torch.tensor([[tgt_word2idx[SOS]]]).to(device)

    for _ in range(20):
        out = model(src, tgt)
        next_word = out[:, -1, :].argmax(dim=-1).item()
        tgt = torch.cat([tgt, torch.tensor([[next_word]]).to(device)], dim=1)

        if next_word == tgt_word2idx[EOS]:
            break

    tokens = [tgt_idx2word[idx] for idx in tgt.squeeze().tolist()]
    return tokens

In [None]:
reference = test_data[0][1].split()
prediction = translate(model, test_data[0][0])

score = sentence_bleu([reference], prediction)
print("BLEU:", score)

BLEU: 8.38826642100846e-155


The hypothesis contains 0 counts of 3-gram overlaps.
Therefore the BLEU score evaluates to 0, independently of
how many N-gram overlaps of lower order it contains.
Consider using lower n-gram order or use SmoothingFunction()
The hypothesis contains 0 counts of 4-gram overlaps.
Therefore the BLEU score evaluates to 0, independently of
how many N-gram overlaps of lower order it contains.
Consider using lower n-gram order or use SmoothingFunction()
