<a href="https://colab.research.google.com/github/JingchenYan1/Real-Time-ML/blob/main/Homework5.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
import time
import numpy as np


class CharDataset(Dataset):
    def __init__(self, text, seq_length):

        self.text = text
        self.seq_length = seq_length

        self.chars = sorted(list(set(text)))
        self.vocab_size = len(self.chars)
        self.char_to_idx = { ch:i for i, ch in enumerate(self.chars) }
        self.idx_to_char = { i:ch for i, ch in enumerate(self.chars) }

        self.data = [self.char_to_idx[c] for c in text]

        self.input_seqs = []
        self.targets = []
        for i in range(len(self.data) - seq_length):
            self.input_seqs.append(self.data[i:i+seq_length])
            self.targets.append(self.data[i+seq_length])

    def __len__(self):
        return len(self.input_seqs)

    def __getitem__(self, idx):
        return torch.tensor(self.input_seqs[idx], dtype=torch.long), torch.tensor(self.targets[idx], dtype=torch.long)

class TransformerLM(nn.Module):
    def __init__(self, vocab_size, d_model=128, nhead=4, num_layers=2, dim_feedforward=256, max_seq_len=30):

        super(TransformerLM, self).__init__()
        self.token_embedding = nn.Embedding(vocab_size, d_model)
        self.pos_embedding = nn.Parameter(torch.zeros(1, max_seq_len, d_model))
        encoder_layer = nn.TransformerEncoderLayer(d_model, nhead, dim_feedforward)
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers)
        self.fc_out = nn.Linear(d_model, vocab_size)

    def forward(self, x):
        emb = self.token_embedding(x)  # [B, L, d_model]
        emb = emb + self.pos_embedding[:, :emb.size(1), :]
        emb = emb.transpose(0, 1)
        transformer_out = self.transformer(emb)  # [L, B, d_model]
        out = self.fc_out(transformer_out[-1])  # [B, vocab_size]
        return out

class RNNLM(nn.Module):
    def __init__(self, vocab_size, embed_size=128, hidden_size=128, num_layers=1):
        super(RNNLM, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        # 使用 LSTM 层
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, vocab_size)

    def forward(self, x):
        # x: [batch_size, seq_len]
        emb = self.embedding(x)  # [B, L, embed_size]
        out, _ = self.lstm(emb)  # out: [B, L, hidden_size]
        out = self.fc(out[:, -1, :])  # [B, vocab_size]
        return out

class RNNAttentionLM(nn.Module):
    def __init__(self, vocab_size, embed_size=128, hidden_size=128, num_layers=1):
        super(RNNAttentionLM, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
        self.attn = nn.Linear(hidden_size, 1)
        self.fc = nn.Linear(hidden_size, vocab_size)

    def forward(self, x):
        emb = self.embedding(x)  # [B, L, embed_size]
        lstm_out, _ = self.lstm(emb)  # lstm_out: [B, L, hidden_size]
        attn_scores = self.attn(lstm_out)
        attn_weights = torch.softmax(attn_scores, dim=1)
        context = torch.sum(attn_weights * lstm_out, dim=1)
        out = self.fc(context)  # [B, vocab_size]
        return out

def train_model(model, train_loader, val_loader, num_epochs=10, lr=0.001, device='cpu'):
    model = model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()

    history = {'train_loss': [], 'val_acc': []}

    for epoch in range(num_epochs):
        model.train()
        epoch_loss = 0.0
        for x, target in train_loader:
            x, target = x.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(x)  # output shape: [B, vocab_size]
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
        avg_loss = epoch_loss / len(train_loader)

        model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for x, target in val_loader:
                x, target = x.to(device), target.to(device)
                output = model(x)
                pred = output.argmax(dim=-1)
                correct += (pred == target).sum().item()
                total += target.size(0)
        val_acc = correct / total

        history['train_loss'].append(avg_loss)
        history['val_acc'].append(val_acc)
        print(f"Epoch {epoch+1}/{num_epochs} - Loss: {avg_loss:.4f}, Val Acc: {val_acc:.4f}")

    return history


if __name__ == "__main__":
    text = (
        "Next character prediction is a fundamental task in the field of natural language processing (NLP) that involves "
        "predicting the next character in a sequence of text based on the characters that precede it. This task is essential "
        "for various applications, including text auto-completion, spell checking, and even in the development of sophisticated "
        "AI models capable of generating human-like text. "
        "At its core, next character prediction relies on statistical models or deep learning algorithms to analyze a given "
        "sequence of text and predict which character is most likely to follow. These predictions are based on patterns and "
        "relationships learned from large datasets of text during the training phase of the model. "
        "One of the most popular approaches to next character prediction involves the use of Recurrent Neural Networks (RNNs), "
        "and more specifically, a variant called Long Short-Term Memory (LSTM) networks. RNNs are particularly well-suited for "
        "sequential data like text, as they can maintain information in 'memory' about previous characters to inform the prediction "
        "of the next character. LSTM networks enhance this capability by being able to remember long-term dependencies, making them "
        "even more effective for next character prediction tasks. "
        "Training a model for next character prediction involves feeding it large amounts of text data, allowing it to learn the "
        "probability of each character's appearance following a sequence of characters. During this training process, the model "
        "adjusts its parameters to minimize the difference between its predictions and the actual outcomes, thus improving its "
        "predictive accuracy over time. "
        "Once trained, the model can be used to predict the next character in a given piece of text by considering the sequence "
        "of characters that precede it. This can enhance user experience in text editing software, improve efficiency in coding "
        "environments with auto-completion features, and enable more natural interactions with AI-based chatbots and virtual assistants. "
        "In summary, next character prediction plays a crucial role in enhancing the capabilities of various NLP applications, making "
        "text-based interactions more efficient, accurate, and human-like. Through the use of advanced machine learning models like RNNs "
        "and LSTMs, next character prediction continues to evolve, opening new possibilities for the future of text-based technology."
    )

    seq_lengths = [10, 20, 30]
    transformer_results = {}

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print("Current Device：", device)

    for seq_length in seq_lengths:
        print("\n==============================")
        print(f"Train the Transformer model (sequence length = {seq_length})")
        dataset = CharDataset(text, seq_length)
        val_size = int(0.1 * len(dataset))
        train_size = len(dataset) - val_size
        train_dataset, val_dataset = random_split(dataset, [train_size, val_size])
        train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=32)

        model = TransformerLM(vocab_size=dataset.vocab_size, max_seq_len=seq_length)
        print("Number of model parameters：", sum(p.numel() for p in model.parameters()))

        start_time = time.time()
        history = train_model(model, train_loader, val_loader, num_epochs=20, lr=0.001, device=device)
        elapsed = time.time() - start_time

        transformer_results[seq_length] = {
            "history": history,
            "time": elapsed,
            "model_params": sum(p.numel() for p in model.parameters())
        }
        print(f"sequence length {seq_length} total training time: {elapsed:.2f} 秒")

    print("\n==============================")
    print("Training RNN model (LSTM without attention) (sequence length = 20)")
    dataset_rnn = CharDataset(text, seq_length=20)
    val_size = int(0.1 * len(dataset_rnn))
    train_size = len(dataset_rnn) - val_size
    train_dataset_rnn, val_dataset_rnn = random_split(dataset_rnn, [train_size, val_size])
    train_loader_rnn = DataLoader(train_dataset_rnn, batch_size=32, shuffle=True)
    val_loader_rnn = DataLoader(val_dataset_rnn, batch_size=32)

    rnn_model = RNNLM(vocab_size=dataset_rnn.vocab_size)
    print("Number of RNN model parameters：", sum(p.numel() for p in rnn_model.parameters()))
    start_time = time.time()
    history_rnn = train_model(rnn_model, train_loader_rnn, val_loader_rnn, num_epochs=20, lr=0.001, device=device)
    elapsed_rnn = time.time() - start_time

    print("\n==============================")
    print("Training RNN model with attention mechanism (sequence length = 20)")
    rnn_att_model = RNNAttentionLM(vocab_size=dataset_rnn.vocab_size)
    print("RNN + Attention Number of model parameters：", sum(p.numel() for p in rnn_att_model.parameters()))
    start_time = time.time()
    history_rnn_att = train_model(rnn_att_model, train_loader_rnn, val_loader_rnn, num_epochs=20, lr=0.001, device=device)
    elapsed_rnn_att = time.time() - start_time

    print("\n==============================")
    print("Summary of training results：")
    for seq_length, res in transformer_results.items():
        print(f"Transformer (sequence length {seq_length}) - final training loss: {res['history']['train_loss'][-1]:.4f}, Final verification accuracy: {res['history']['val_acc'][-1]:.4f}, training time: {res['time']:.2f}s, Parameter quantity: {res['model_params']}")
    print(f"RNN (without attention) - final training loss: {history_rnn['train_loss'][-1]:.4f}, Final verification accuracy: {history_rnn['val_acc'][-1]:.4f}, training time: {elapsed_rnn:.2f}s, Parameter quantity: {sum(p.numel() for p in rnn_model.parameters())}")
    print(f"RNN (with attention) - final training loss: {history_rnn_att['train_loss'][-1]:.4f}, Final verification accuracy: {history_rnn_att['val_acc'][-1]:.4f}, training time: {elapsed_rnn_att:.2f}s, Parameter quantity: {sum(p.numel() for p in rnn_att_model.parameters())}")


Current Device： cuda

Train the Transformer model (sequence length = 10)
Number of model parameters： 277548




Epoch 1/20 - Loss: 2.8174, Val Acc: 0.2152
Epoch 2/20 - Loss: 2.3826, Val Acc: 0.3249
Epoch 3/20 - Loss: 2.2072, Val Acc: 0.3207
Epoch 4/20 - Loss: 2.0727, Val Acc: 0.3333
Epoch 5/20 - Loss: 1.9281, Val Acc: 0.3586
Epoch 6/20 - Loss: 1.8277, Val Acc: 0.3502
Epoch 7/20 - Loss: 1.7012, Val Acc: 0.3924
Epoch 8/20 - Loss: 1.5934, Val Acc: 0.3671
Epoch 9/20 - Loss: 1.4615, Val Acc: 0.3713
Epoch 10/20 - Loss: 1.3586, Val Acc: 0.4008
Epoch 11/20 - Loss: 1.2562, Val Acc: 0.3544
Epoch 12/20 - Loss: 1.1598, Val Acc: 0.3924
Epoch 13/20 - Loss: 1.0823, Val Acc: 0.3502
Epoch 14/20 - Loss: 0.9955, Val Acc: 0.3840
Epoch 15/20 - Loss: 0.9451, Val Acc: 0.3671
Epoch 16/20 - Loss: 0.8780, Val Acc: 0.3502
Epoch 17/20 - Loss: 0.8578, Val Acc: 0.4008
Epoch 18/20 - Loss: 0.7882, Val Acc: 0.3671
Epoch 19/20 - Loss: 0.7312, Val Acc: 0.4135
Epoch 20/20 - Loss: 0.6668, Val Acc: 0.4093
sequence length 10 total training time: 19.00 秒

Train the Transformer model (sequence length = 20)
Number of model parameters： 2



Epoch 1/20 - Loss: 2.7914, Val Acc: 0.1822
Epoch 2/20 - Loss: 2.4470, Val Acc: 0.2373
Epoch 3/20 - Loss: 2.3253, Val Acc: 0.2415
Epoch 4/20 - Loss: 2.2442, Val Acc: 0.2331
Epoch 5/20 - Loss: 2.1696, Val Acc: 0.2458
Epoch 6/20 - Loss: 2.1037, Val Acc: 0.2797
Epoch 7/20 - Loss: 1.9844, Val Acc: 0.2712
Epoch 8/20 - Loss: 1.8719, Val Acc: 0.2585
Epoch 9/20 - Loss: 1.7603, Val Acc: 0.3305
Epoch 10/20 - Loss: 1.6339, Val Acc: 0.3305
Epoch 11/20 - Loss: 1.5283, Val Acc: 0.3220
Epoch 12/20 - Loss: 1.3803, Val Acc: 0.3432
Epoch 13/20 - Loss: 1.2796, Val Acc: 0.3475
Epoch 14/20 - Loss: 1.1808, Val Acc: 0.3390
Epoch 15/20 - Loss: 1.0637, Val Acc: 0.3475
Epoch 16/20 - Loss: 1.0025, Val Acc: 0.3856
Epoch 17/20 - Loss: 0.9638, Val Acc: 0.3856
Epoch 18/20 - Loss: 0.8226, Val Acc: 0.3644
Epoch 19/20 - Loss: 0.7354, Val Acc: 0.3898
Epoch 20/20 - Loss: 0.7342, Val Acc: 0.3347
sequence length 20 total training time: 8.73 秒

Train the Transformer model (sequence length = 30)
Number of model parameters： 28

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
import time
import math
import requests

def get_data_loaders(seq_length, batch_size=128):
    url = "https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt"
    response = requests.get(url)
    text = response.text

    chars = sorted(list(set(text)))
    char_to_int = {ch: i for i, ch in enumerate(chars)}
    int_to_char = {i: ch for i, ch in enumerate(chars)}

    encoded_text = [char_to_int[ch] for ch in text]

    sequences = []
    targets = []
    for i in range(0, len(encoded_text) - seq_length):
        seq = encoded_text[i:i+seq_length]
        target = encoded_text[i+seq_length]
        sequences.append(seq)
        targets.append(target)

    sequences = torch.tensor(sequences, dtype=torch.long)
    targets = torch.tensor(targets, dtype=torch.long)

    class CharDataset(Dataset):
        def __init__(self, sequences, targets):
            self.sequences = sequences
            self.targets = targets

        def __len__(self):
            return len(self.sequences)

        def __getitem__(self, index):
            return self.sequences[index], self.targets[index]

    dataset = CharDataset(sequences, targets)

    train_size = int(len(dataset) * 0.8)
    test_size = len(dataset) - train_size
    train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

    train_loader = DataLoader(train_dataset, shuffle=True, batch_size=batch_size)
    test_loader = DataLoader(test_dataset, shuffle=False, batch_size=batch_size)

    return train_loader, test_loader, len(chars)

class TransformerLM(nn.Module):
    def __init__(self, vocab_size, d_model=128, nhead=2, num_layers=2,
                 dim_feedforward=256, max_seq_len=50):

        super(TransformerLM, self).__init__()
        self.token_embedding = nn.Embedding(vocab_size, d_model)
        self.pos_embedding = nn.Parameter(torch.zeros(1, max_seq_len, d_model))
        encoder_layer = nn.TransformerEncoderLayer(d_model, nhead, dim_feedforward)
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers)
        self.fc_out = nn.Linear(d_model, vocab_size)

    def forward(self, x):
        emb = self.token_embedding(x)  # [B, L, d_model]
        emb = emb + self.pos_embedding[:, :emb.size(1), :]
        emb = emb.transpose(0, 1)
        transformer_out = self.transformer(emb)  # [L, B, d_model]
        out = self.fc_out(transformer_out[-1])  # [B, vocab_size]
        return out

class RNNLM(nn.Module):
    def __init__(self, vocab_size, embed_size=128, hidden_size=128, num_layers=1):
        super(RNNLM, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, vocab_size)

    def forward(self, x):
        emb = self.embedding(x)  # [B, L, embed_size]
        out, _ = self.lstm(emb)   # out: [B, L, hidden_size]
        out = self.fc(out[:, -1, :])  # [B, vocab_size]
        return out

def train_model(model, train_loader, test_loader, num_epochs=10, lr=0.001, device='cpu'):
    model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()

    history = {'train_loss': [], 'test_acc': [], 'epoch_time': []}
    for epoch in range(num_epochs):
        start_time = time.time()
        model.train()
        running_loss = 0.0
        for x, target in train_loader:
            x, target = x.to(device), target.to(device)
            optimizer.zero_grad()
            outputs = model(x)
            loss = criterion(outputs, target)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        avg_loss = running_loss / len(train_loader)

        model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for x, target in test_loader:
                x, target = x.to(device), target.to(device)
                outputs = model(x)
                _, predicted = torch.max(outputs, dim=1)
                correct += (predicted == target).sum().item()
                total += target.size(0)
        test_acc = correct / total
        epoch_time = time.time() - start_time

        history['train_loss'].append(avg_loss)
        history['test_acc'].append(test_acc)
        history['epoch_time'].append(epoch_time)
        print(f"Epoch {epoch+1}/{num_epochs} - Loss: {avg_loss:.4f}, Test Acc: {test_acc:.4f}, Time: {epoch_time:.2f}s")
    return history

def get_model_size(model):
    return sum(p.numel() for p in model.parameters())

def compute_perplexity(loss):
    return math.exp(loss)

def run_transformer_experiments(seq_length, num_epochs=10, batch_size=128, device='cpu'):
    print(f"\n=== Transformer Experiments | Sequence Length: {seq_length} ===")
    train_loader, test_loader, vocab_size = get_data_loaders(seq_length, batch_size)
    results = {}
    for num_layers in [1, 2, 4]:
        for nhead in [2, 4]:
            print(f"\nTransformer Model - Layers: {num_layers}, Heads: {nhead}")
            model = TransformerLM(vocab_size=vocab_size, d_model=128, nhead=nhead,
                                  num_layers=num_layers, dim_feedforward=256, max_seq_len=seq_length)
            param_count = get_model_size(model)
            print(f"Model Size: {param_count} parameters")

            start_time = time.time()
            history = train_model(model, train_loader, test_loader, num_epochs=num_epochs, lr=0.001, device=device)
            total_time = time.time() - start_time

            final_loss = history['train_loss'][-1]
            final_test_acc = history['test_acc'][-1]
            perplexity = compute_perplexity(final_loss)
            avg_epoch_time = sum(history['epoch_time']) / len(history['epoch_time'])

            results[(num_layers, nhead)] = {
                'final_loss': final_loss,
                'final_test_acc': final_test_acc,
                'perplexity': perplexity,
                'avg_epoch_time': avg_epoch_time,
                'total_time': total_time,
                'model_size': param_count
            }
    return results

def run_rnn_experiment(seq_length, num_epochs=10, batch_size=128, device='cpu'):
    print(f"\n=== RNN Experiment | Sequence Length: {seq_length} ===")
    train_loader, test_loader, vocab_size = get_data_loaders(seq_length, batch_size)
    model = RNNLM(vocab_size=vocab_size, embed_size=128, hidden_size=128, num_layers=1)
    param_count = get_model_size(model)
    print(f"RNN Model Size: {param_count} parameters")

    start_time = time.time()
    history = train_model(model, train_loader, test_loader, num_epochs=num_epochs, lr=0.001, device=device)
    total_time = time.time() - start_time
    final_loss = history['train_loss'][-1]
    final_test_acc = history['test_acc'][-1]
    perplexity = compute_perplexity(final_loss)
    avg_epoch_time = sum(history['epoch_time']) / len(history['epoch_time'])

    results = {
        'final_loss': final_loss,
        'final_test_acc': final_test_acc,
        'perplexity': perplexity,
        'avg_epoch_time': avg_epoch_time,
        'total_time': total_time,
        'model_size': param_count
    }
    return results

if __name__ == "__main__":
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print("Using device:", device)
    num_epochs = 10
    batch_size = 128

    transformer_results_20 = run_transformer_experiments(seq_length=20, num_epochs=num_epochs, batch_size=batch_size, device=device)
    transformer_results_30 = run_transformer_experiments(seq_length=30, num_epochs=num_epochs, batch_size=batch_size, device=device)
    transformer_results_50 = run_transformer_experiments(seq_length=50, num_epochs=num_epochs, batch_size=batch_size, device=device)

    rnn_results_20 = run_rnn_experiment(seq_length=20, num_epochs=num_epochs, batch_size=batch_size, device=device)
    rnn_results_30 = run_rnn_experiment(seq_length=30, num_epochs=num_epochs, batch_size=batch_size, device=device)

    print("\n=== Summary of Transformer Experiments (Sequence Length 20) ===")
    for (num_layers, nhead), res in transformer_results_20.items():
        print(f"Layers: {num_layers}, Heads: {nhead} -> Loss: {res['final_loss']:.4f}, Test Acc: {res['final_test_acc']:.4f}, "
              f"Perplexity: {res['perplexity']:.4f}, Avg Epoch Time: {res['avg_epoch_time']:.2f}s, Total Time: {res['total_time']:.2f}s, "
              f"Model Size: {res['model_size']}")

    print("\n=== Summary of Transformer Experiments (Sequence Length 30) ===")
    for (num_layers, nhead), res in transformer_results_30.items():
        print(f"Layers: {num_layers}, Heads: {nhead} -> Loss: {res['final_loss']:.4f}, Test Acc: {res['final_test_acc']:.4f}, "
              f"Perplexity: {res['perplexity']:.4f}, Avg Epoch Time: {res['avg_epoch_time']:.2f}s, Total Time: {res['total_time']:.2f}s, "
              f"Model Size: {res['model_size']}")

    print("\n=== Summary of Transformer Experiments (Sequence Length 50) ===")
    for (num_layers, nhead), res in transformer_results_50.items():
        print(f"Layers: {num_layers}, Heads: {nhead} -> Loss: {res['final_loss']:.4f}, Test Acc: {res['final_test_acc']:.4f}, "
              f"Perplexity: {res['perplexity']:.4f}, Avg Epoch Time: {res['avg_epoch_time']:.2f}s, Total Time: {res['total_time']:.2f}s, "
              f"Model Size: {res['model_size']}")

    print("\n=== Summary of RNN Experiments ===")
    print(f"RNN (Sequence Length 20) -> Loss: {rnn_results_20['final_loss']:.4f}, Test Acc: {rnn_results_20['final_test_acc']:.4f}, "
          f"Perplexity: {rnn_results_20['perplexity']:.4f}, Avg Epoch Time: {rnn_results_20['avg_epoch_time']:.2f}s, "
          f"Total Time: {rnn_results_20['total_time']:.2f}s, Model Size: {rnn_results_20['model_size']}")
    print(f"RNN (Sequence Length 30) -> Loss: {rnn_results_30['final_loss']:.4f}, Test Acc: {rnn_results_30['final_test_acc']:.4f}, "
          f"Perplexity: {rnn_results_30['perplexity']:.4f}, Avg Epoch Time: {rnn_results_30['avg_epoch_time']:.2f}s, "
          f"Total Time: {rnn_results_30['total_time']:.2f}s, Model Size: {rnn_results_30['model_size']}")


Using device: cuda

=== Transformer Experiments | Sequence Length: 20 ===

Transformer Model - Layers: 1, Heads: 2
Model Size: 151745 parameters




Epoch 1/10 - Loss: 2.0600, Test Acc: 0.4521, Time: 34.68s
Epoch 2/10 - Loss: 1.8860, Test Acc: 0.4647, Time: 34.76s
Epoch 3/10 - Loss: 1.8518, Test Acc: 0.4684, Time: 34.02s
Epoch 4/10 - Loss: 1.8336, Test Acc: 0.4738, Time: 34.85s
Epoch 5/10 - Loss: 1.8221, Test Acc: 0.4709, Time: 34.44s
Epoch 6/10 - Loss: 1.8131, Test Acc: 0.4769, Time: 33.78s
Epoch 7/10 - Loss: 1.8068, Test Acc: 0.4801, Time: 34.16s
Epoch 8/10 - Loss: 1.8021, Test Acc: 0.4780, Time: 34.22s
Epoch 9/10 - Loss: 1.7997, Test Acc: 0.4785, Time: 34.04s
Epoch 10/10 - Loss: 1.7938, Test Acc: 0.4818, Time: 34.13s

Transformer Model - Layers: 1, Heads: 4
Model Size: 151745 parameters
Epoch 1/10 - Loss: 2.0161, Test Acc: 0.4680, Time: 34.95s
Epoch 2/10 - Loss: 1.8195, Test Acc: 0.4850, Time: 33.81s
Epoch 3/10 - Loss: 1.7793, Test Acc: 0.4907, Time: 34.01s
Epoch 4/10 - Loss: 1.7592, Test Acc: 0.4965, Time: 34.40s
Epoch 5/10 - Loss: 1.7475, Test Acc: 0.4970, Time: 34.21s
Epoch 6/10 - Loss: 1.7392, Test Acc: 0.4995, Time: 34.28s


In [None]:
!pip install python-docx

from google.colab import drive
drive.mount('/content/drive')
from docx import Document
doc_path = "/content/drive/My Drive/Dataset - English to French.docx"
doc = Document(doc_path)
raw_text = "\n".join([para.text for para in doc.paragraphs])
namespace = {}
exec(raw_text, namespace)
english_to_french = namespace['english_to_french']
import pandas as pd, re, math, torch, torch.nn as nn, torch.optim as optim
from torch.utils.data import Dataset, DataLoader
def preprocess(sentence):
    sentence = sentence.lower().strip()
    sentence = re.sub(r"[^a-zA-ZÀ-ÿ\s]", "", sentence)
    tokens = sentence.split()
    return tokens
df = pd.DataFrame(english_to_french, columns=["English", "French"])
df["English_tokens"] = df["English"].apply(preprocess)
df["French_tokens"] = df["French"].apply(preprocess)
class Vocab:
    def __init__(self, tokens_list, min_freq=1):
        self.word2index = {"<pad>":0,"<sos>":1,"<eos>":2,"<unk>":3}
        self.index2word = {0:"<pad>",1:"<sos>",2:"<eos>",3:"<unk>"}
        self.word_freq = {}
        self.min_freq = min_freq
        self.build_vocab(tokens_list)
    def build_vocab(self, tokens_list):
        idx = len(self.word2index)
        for tokens in tokens_list:
            for word in tokens:
                self.word_freq[word] = self.word_freq.get(word,0)+1
        for word, freq in self.word_freq.items():
            if freq>=self.min_freq:
                self.word2index[word] = idx
                self.index2word[idx] = word
                idx+=1
    def numericalize(self, tokens):
        return [self.word2index.get(word, self.word2index["<unk>"]) for word in tokens]
english_vocab = Vocab(df["English_tokens"])
french_vocab = Vocab(df["French_tokens"])
class TranslationDataset(Dataset):
    def __init__(self, df, src_col, tgt_col, src_vocab, tgt_vocab):
        self.df = df; self.src_col = src_col; self.tgt_col = tgt_col; self.src_vocab = src_vocab; self.tgt_vocab = tgt_vocab
    def __len__(self):
        return len(self.df)
    def __getitem__(self, idx):
        src_tokens = self.df.iloc[idx][self.src_col]
        tgt_tokens = self.df.iloc[idx][self.tgt_col]
        src_indices = self.src_vocab.numericalize(src_tokens)
        tgt_indices = [self.tgt_vocab.word2index["<sos>"]] + self.tgt_vocab.numericalize(tgt_tokens) + [self.tgt_vocab.word2index["<eos>"]]
        return torch.tensor(src_indices), torch.tensor(tgt_indices)
def collate_fn(batch):
    src_batch, tgt_batch = zip(*batch)
    src_lens = [len(s) for s in src_batch]
    tgt_lens = [len(t) for t in tgt_batch]
    src_padded = nn.utils.rnn.pad_sequence(src_batch, batch_first=True, padding_value=english_vocab.word2index["<pad>"])
    tgt_padded = nn.utils.rnn.pad_sequence(tgt_batch, batch_first=True, padding_value=french_vocab.word2index["<pad>"])
    return src_padded, tgt_padded, src_lens, tgt_lens
dataset_en2fr = TranslationDataset(df, "English_tokens", "French_tokens", english_vocab, french_vocab)
dataloader_en2fr = DataLoader(dataset_en2fr, batch_size=4, shuffle=True, collate_fn=collate_fn)
def compute_accuracy(output, tgt, pad_idx):
    pred_tokens = output.argmax(dim=-1)
    mask = (tgt != pad_idx)
    correct = (pred_tokens == tgt)*mask
    return correct.sum().item()/mask.sum().item()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hid_dim, num_layers=1):
        super(Encoder,self).__init__()
        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.gru = nn.GRU(emb_dim, hid_dim, num_layers, batch_first=True)
    def forward(self, src, src_lens):
        embedded = self.embedding(src)
        packed = nn.utils.rnn.pack_padded_sequence(embedded, src_lens, batch_first=True, enforce_sorted=False)
        outputs, hidden = self.gru(packed)
        return hidden
class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hid_dim, num_layers=1):
        super(Decoder,self).__init__()
        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.gru = nn.GRU(emb_dim, hid_dim, num_layers, batch_first=True)
        self.fc_out = nn.Linear(hid_dim, output_dim)
    def forward(self, input, hidden):
        input = input.unsqueeze(1)
        embedded = self.embedding(input)
        output, hidden = self.gru(embedded, hidden)
        prediction = self.fc_out(output.squeeze(1))
        return prediction, hidden
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super(Seq2Seq,self).__init__()
        self.encoder = encoder; self.decoder = decoder; self.device = device
    def forward(self, src, src_lens, tgt, teacher_forcing_ratio=0.5):
        batch_size = src.size(0); tgt_len = tgt.size(1); tgt_vocab_size = self.decoder.fc_out.out_features
        outputs = torch.zeros(batch_size, tgt_len, tgt_vocab_size).to(self.device)
        hidden = self.encoder(src, src_lens)
        input_token = tgt[:,0]
        for t in range(1,tgt_len):
            output, hidden = self.decoder(input_token, hidden)
            outputs[:,t] = output
            teacher_force = torch.rand(1).item() < teacher_forcing_ratio
            top1 = output.argmax(1)
            input_token = tgt[:,t] if teacher_force else top1
        return outputs
INPUT_DIM = len(english_vocab.word2index)
OUTPUT_DIM = len(french_vocab.word2index)
EMB_DIM = 256; HID_DIM = 512; N_LAYERS = 1; NUM_EPOCHS = 10
encoder_p1 = Encoder(INPUT_DIM, EMB_DIM, HID_DIM, N_LAYERS).to(device)
decoder_p1 = Decoder(OUTPUT_DIM, EMB_DIM, HID_DIM, N_LAYERS).to(device)
model_p1 = Seq2Seq(encoder_p1, decoder_p1, device).to(device)
optimizer_p1 = optim.Adam(model_p1.parameters())
criterion_p1 = nn.CrossEntropyLoss(ignore_index=french_vocab.word2index["<pad>"])
def train_model(model, dataloader, optimizer, criterion, clip=1):
    model.train()
    epoch_loss=0; epoch_acc=0
    for src, tgt, src_lens, tgt_lens in dataloader:
        src, tgt = src.to(device), tgt.to(device)
        optimizer.zero_grad()
        output = model(src, src_lens, tgt)
        output_dim = output.shape[-1]
        output_for_loss = output[:,1:].reshape(-1, output_dim)
        tgt_for_loss = tgt[:,1:].reshape(-1)
        loss = criterion(output_for_loss, tgt_for_loss)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        acc = compute_accuracy(output[:,1:], tgt[:,1:], french_vocab.word2index["<pad>"])
        epoch_loss += loss.item(); epoch_acc += acc
    return epoch_loss/len(dataloader), epoch_acc/len(dataloader)
def evaluate_model(model, dataloader, criterion):
    model.eval()
    epoch_loss=0; epoch_acc=0
    with torch.no_grad():
        for src, tgt, src_lens, tgt_lens in dataloader:
            src, tgt = src.to(device), tgt.to(device)
            output = model(src, src_lens, tgt, teacher_forcing_ratio=0)
            output_dim = output.shape[-1]
            output_for_loss = output[:,1:].reshape(-1, output_dim)
            tgt_for_loss = tgt[:,1:].reshape(-1)
            loss = criterion(output_for_loss, tgt_for_loss)
            acc = compute_accuracy(output[:,1:], tgt[:,1:], french_vocab.word2index["<pad>"])
            epoch_loss += loss.item(); epoch_acc += acc
    return epoch_loss/len(dataloader), epoch_acc/len(dataloader)
print("=== RNN Seq2Seq without Attention ===")
for epoch in range(NUM_EPOCHS):
    train_loss, train_acc = train_model(model_p1, dataloader_en2fr, optimizer_p1, criterion_p1)
    val_loss, val_acc = evaluate_model(model_p1, dataloader_en2fr, criterion_p1)
    print(f"Epoch {epoch+1}: Train Loss = {train_loss:.4f}, Train Acc = {train_acc:.4f}, Val Loss = {val_loss:.4f}, Val Acc = {val_acc:.4f}")
def translate_sentence(model, sentence, src_vocab, tgt_vocab, max_len=20):
    model.eval()
    tokens = preprocess(sentence)
    indices = src_vocab.numericalize(tokens)
    src_tensor = torch.tensor(indices).unsqueeze(0).to(device)
    src_len = [len(indices)]
    with torch.no_grad():
        hidden = model.encoder(src_tensor, src_len)
    input_token = tgt_vocab.word2index["<sos>"]
    translated_sentence = []
    for _ in range(max_len):
        with torch.no_grad():
            output, hidden = model.decoder(torch.tensor([input_token]).to(device), hidden)
        top1 = output.argmax(1).item()
        if top1 == tgt_vocab.word2index["<eos>"]:
            break
        translated_sentence.append(tgt_vocab.index2word[top1])
        input_token = top1
    return " ".join(translated_sentence)
test_sentences = ["She wears a red dress and dances at the party",
"After they visit the museum, they play video games",
"Although he is tired, he works hard every day",
"She sings a song while cooking dinner",
"We eat breakfast together before we go to the gym",
"He said that the coffee is hot",
"She thinks that the teacher explains the lesson well",
"They do not enjoy the sunset when it rains",
"She is not happy because the cat is sleeping on her dress",
"He plays the guitar and also sings in the choir"]
print("=== RNN Seq2Seq without Attention Translation Test ===")
for sentence in test_sentences:
    print(f"\nInput: {sentence}")
    output = translate_sentence(model_p1, sentence, english_vocab, french_vocab)
    print(f"Output: {output}")
class EncoderAttn(nn.Module):
    def __init__(self, input_dim, emb_dim, hid_dim, num_layers=1):
        super(EncoderAttn,self).__init__()
        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.gru = nn.GRU(emb_dim, hid_dim, num_layers, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(hid_dim*2, hid_dim)
    def forward(self, src, src_lens, return_all=False):
        embedded = self.embedding(src)
        packed = nn.utils.rnn.pack_padded_sequence(embedded, src_lens, batch_first=True, enforce_sorted=False)
        outputs, hidden = self.gru(packed)
        outputs, _ = nn.utils.rnn.pad_packed_sequence(outputs, batch_first=True)
        hidden = torch.cat((hidden[-2], hidden[-1]), dim=1)
        hidden = self.fc(hidden).unsqueeze(0)
        if return_all:
            return outputs, hidden
        return hidden
class AttnDecoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hid_dim, num_layers=1):
        super(AttnDecoder, self).__init__()
        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.query_linear = nn.Linear(emb_dim, hid_dim)
        self.attn_linear = nn.Linear(hid_dim*2, hid_dim)
        self.gru = nn.GRU(emb_dim+hid_dim, hid_dim, num_layers, batch_first=True)
        self.fc_out = nn.Linear(emb_dim+hid_dim*2, output_dim)
    def forward(self, input, hidden, encoder_outputs):
        embedded = self.embedding(input).unsqueeze(1)
        query = self.query_linear(embedded)
        proj_enc = self.attn_linear(encoder_outputs)
        attn_weights = torch.bmm(query, proj_enc.transpose(1,2))
        attn_weights = torch.softmax(attn_weights, dim=-1)
        context = torch.bmm(attn_weights, proj_enc)
        rnn_input = torch.cat((embedded, context), dim=2)
        output, hidden = self.gru(rnn_input, hidden)
        output = output.squeeze(1)
        context = context.squeeze(1)
        embedded = embedded.squeeze(1)
        prediction = self.fc_out(torch.cat((output, context, embedded), dim=1))
        return prediction, hidden
class Seq2SeqAttn(nn.Module):
    def __init__(self, encoder, decoder, device):
        super(Seq2SeqAttn,self).__init__()
        self.encoder = encoder; self.decoder = decoder; self.device = device
    def forward(self, src, src_lens, tgt, teacher_forcing_ratio=0.5):
        batch_size = src.size(0); tgt_len = tgt.size(1); tgt_vocab_size = self.decoder.fc_out.out_features
        outputs = torch.zeros(batch_size, tgt_len, tgt_vocab_size).to(self.device)
        encoder_outputs, hidden = self.encoder(src, src_lens, return_all=True)
        input_token = tgt[:,0]
        for t in range(1,tgt_len):
            output, hidden = self.decoder(input_token, hidden, encoder_outputs)
            outputs[:,t] = output
            teacher_force = torch.rand(1).item() < teacher_forcing_ratio
            top1 = output.argmax(1)
            input_token = tgt[:,t] if teacher_force else top1
        return outputs
encoder_attn = EncoderAttn(INPUT_DIM, EMB_DIM, HID_DIM, 1).to(device)
decoder_attn = AttnDecoder(OUTPUT_DIM, EMB_DIM, HID_DIM, 1).to(device)
model_attn = Seq2SeqAttn(encoder_attn, decoder_attn, device).to(device)
optimizer_attn = optim.Adam(model_attn.parameters())
criterion_attn = nn.CrossEntropyLoss(ignore_index=french_vocab.word2index["<pad>"])
def train_model_attn(model, dataloader, optimizer, criterion, clip=1):
    model.train()
    epoch_loss=0; epoch_acc=0
    for src, tgt, src_lens, tgt_lens in dataloader:
        src, tgt = src.to(device), tgt.to(device)
        optimizer.zero_grad()
        output = model(src, src_lens, tgt)
        output_dim = output.shape[-1]
        output_for_loss = output[:,1:].reshape(-1, output_dim)
        tgt_for_loss = tgt[:,1:].reshape(-1)
        loss = criterion(output_for_loss, tgt_for_loss)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        acc = compute_accuracy(output[:,1:], tgt[:,1:], french_vocab.word2index["<pad>"])
        epoch_loss += loss.item(); epoch_acc += acc
    return epoch_loss/len(dataloader), epoch_acc/len(dataloader)
def evaluate_model_attn(model, dataloader, criterion):
    model.eval()
    epoch_loss=0; epoch_acc=0
    with torch.no_grad():
        for src, tgt, src_lens, tgt_lens in dataloader:
            src, tgt = src.to(device), tgt.to(device)
            output = model(src, src_lens, tgt, teacher_forcing_ratio=0)
            output_dim = output.shape[-1]
            output_for_loss = output[:,1:].reshape(-1, output_dim)
            tgt_for_loss = tgt[:,1:].reshape(-1)
            loss = criterion(output_for_loss, tgt_for_loss)
            acc = compute_accuracy(output[:,1:], tgt[:,1:], french_vocab.word2index["<pad>"])
            epoch_loss += loss.item(); epoch_acc += acc
    return epoch_loss/len(dataloader), epoch_acc/len(dataloader)
print("=== RNN Seq2Seq with Attention ===")
for epoch in range(NUM_EPOCHS):
    train_loss, train_acc = train_model_attn(model_attn, dataloader_en2fr, optimizer_attn, criterion_attn)
    val_loss, val_acc = evaluate_model_attn(model_attn, dataloader_en2fr, criterion_attn)
    print(f"Epoch {epoch+1}: Train Loss = {train_loss:.4f}, Train Acc = {train_acc:.4f}, Val Loss = {val_loss:.4f}, Val Acc = {val_acc:.4f}")
def translate_sentence_attn(model, sentence, src_vocab, tgt_vocab, max_len=20):
    model.eval()
    tokens = preprocess(sentence)
    indices = src_vocab.numericalize(tokens)
    src_tensor = torch.tensor(indices).unsqueeze(0).to(device)
    src_len = [len(indices)]
    encoder_outputs, hidden = model.encoder(src_tensor, src_len, return_all=True)
    input_token = torch.tensor([tgt_vocab.word2index["<sos>"]]).to(device)
    translated_sentence = []
    for _ in range(max_len):
        with torch.no_grad():
            output, hidden = model.decoder(input_token, hidden, encoder_outputs)
        top1 = output.argmax(1)
        if top1.item()==tgt_vocab.word2index["<eos>"]:
            break
        translated_sentence.append(tgt_vocab.index2word[top1.item()])
        input_token = top1
    return " ".join(translated_sentence)
print("=== RNN Seq2Seq with Attention Translation Test ===")
for sentence in test_sentences:
    print(f"\nInput: {sentence}")
    output = translate_sentence_attn(model_attn, sentence, english_vocab, french_vocab)
    print(f"Output: {output}")
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding,self).__init__()
        self.dropout = nn.Dropout(dropout)
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float()*(-math.log(10000.0)/d_model))
        pe[:,0::2] = torch.sin(position*div_term)
        pe[:,1::2] = torch.cos(position*div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)
    def forward(self, x):
        x = x + self.pe[:,:x.size(1)]
        return self.dropout(x)
class TransformerModel(nn.Module):
    def __init__(self, input_dim, output_dim, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout=0.1):
        super(TransformerModel,self).__init__()
        self.src_embedding = nn.Embedding(input_dim, d_model)
        self.tgt_embedding = nn.Embedding(output_dim, d_model)
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        self.transformer = nn.Transformer(d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout)
        self.fc_out = nn.Linear(d_model, output_dim)
        self.d_model = d_model
    def forward(self, src, tgt):
        src_emb = self.pos_encoder(self.src_embedding(src)*math.sqrt(self.d_model))
        tgt_emb = self.pos_encoder(self.tgt_embedding(tgt)*math.sqrt(self.d_model))
        src_emb = src_emb.transpose(0,1)
        tgt_emb = tgt_emb.transpose(0,1)
        tgt_mask = self.transformer.generate_square_subsequent_mask(tgt_emb.size(0)).to(src.device)
        output = self.transformer(src_emb, tgt_emb, tgt_mask=tgt_mask)
        output = self.fc_out(output.transpose(0,1))
        return output
def train_transformer(model, dataloader, optimizer, criterion, clip=1):
    model.train()
    epoch_loss=0; epoch_acc=0
    for src, tgt, src_lens, tgt_lens in dataloader:
        src, tgt = src.to(device), tgt.to(device)
        optimizer.zero_grad()
        tgt_input = tgt[:,:-1]
        output = model(src, tgt_input)
        output_dim = output.shape[-1]
        output = output.reshape(-1, output_dim)
        tgt_out = tgt[:,1:].reshape(-1)
        loss = criterion(output, tgt_out)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        acc = compute_accuracy(output.view(tgt.size(0), -1, output_dim), tgt[:,1:], french_vocab.word2index["<pad>"])
        epoch_loss += loss.item(); epoch_acc += acc
    return epoch_loss/len(dataloader), epoch_acc/len(dataloader)
def evaluate_transformer(model, dataloader, criterion):
    model.eval()
    epoch_loss=0; epoch_acc=0
    with torch.no_grad():
        for src, tgt, src_lens, tgt_lens in dataloader:
            src, tgt = src.to(device), tgt.to(device)
            tgt_input = tgt[:,:-1]
            output = model(src, tgt_input)
            output_dim = output.shape[-1]
            output = output.reshape(-1, output_dim)
            tgt_out = tgt[:,1:].reshape(-1)
            loss = criterion(output, tgt_out)
            acc = compute_accuracy(output.view(tgt.size(0), -1, output_dim), tgt[:,1:], french_vocab.word2index["<pad>"])
            epoch_loss += loss.item(); epoch_acc += acc
    return epoch_loss/len(dataloader), epoch_acc/len(dataloader)
def translate_sentence_transformer(model, sentence, src_vocab, tgt_vocab, max_len=20):
    model.eval()
    tokens = preprocess(sentence)
    indices = src_vocab.numericalize(tokens)
    src_tensor = torch.tensor(indices).unsqueeze(0).to(device)
    src_emb = model.pos_encoder(model.src_embedding(src_tensor)*math.sqrt(model.d_model)).transpose(0,1)
    memory = model.transformer.encoder(src_emb)
    tgt_indices = [tgt_vocab.word2index["<sos>"]]
    for i in range(max_len):
        tgt_tensor = torch.tensor(tgt_indices).unsqueeze(0).to(device)
        tgt_emb = model.pos_encoder(model.tgt_embedding(tgt_tensor)*math.sqrt(model.d_model)).transpose(0,1)
        tgt_mask = model.transformer.generate_square_subsequent_mask(tgt_emb.size(0)).to(device)
        output = model.transformer.decoder(tgt_emb, memory, tgt_mask=tgt_mask)
        output = model.fc_out(output.transpose(0,1))
        next_token = output[0,-1].argmax().item()
        if next_token==tgt_vocab.word2index["<eos>"]:
            break
        tgt_indices.append(next_token)
    return " ".join([tgt_vocab.index2word[idx] for idx in tgt_indices[1:]])
best_trans_acc = -1
best_trans_model = None
trans_results = {}
print("=== Transformer Seq2Seq Experiments ===")
for num_layers in [1,2,4]:
    for nhead in [2,4]:
        model_trans = TransformerModel(INPUT_DIM, OUTPUT_DIM, EMB_DIM, nhead, num_layers, num_layers, HID_DIM).to(device)
        optimizer_trans = optim.Adam(model_trans.parameters())
        criterion_trans = nn.CrossEntropyLoss(ignore_index=french_vocab.word2index["<pad>"])
        for epoch in range(NUM_EPOCHS):
            train_loss, train_acc = train_transformer(model_trans, dataloader_en2fr, optimizer_trans, criterion_trans)
            val_loss, val_acc = evaluate_transformer(model_trans, dataloader_en2fr, criterion_trans)
        trans_results[f"layers{num_layers}_heads{nhead}"] = (val_loss, val_acc)
        if val_acc > best_trans_acc:
            best_trans_acc = val_acc
            best_trans_model = model_trans
        print(f"Config layers={num_layers}, heads={nhead}: Val Loss = {val_loss:.4f}, Val Acc = {val_acc:.4f}")
print("=== Transformer Translation Test ===")
for sentence in test_sentences:
    print(f"\nInput: {sentence}")
    output = translate_sentence_transformer(best_trans_model, sentence, english_vocab, french_vocab)
    print(f"Output: {output}")


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
=== RNN Seq2Seq without Attention ===
Epoch 1: Train Loss = 4.7463, Train Acc = 0.2239, Val Loss = 3.8285, Val Acc = 0.3225
Epoch 2: Train Loss = 3.5566, Train Acc = 0.3675, Val Loss = 3.1059, Val Acc = 0.3967
Epoch 3: Train Loss = 2.9498, Train Acc = 0.4168, Val Loss = 2.4265, Val Acc = 0.4944
Epoch 4: Train Loss = 2.2823, Train Acc = 0.4844, Val Loss = 1.6858, Val Acc = 0.6723
Epoch 5: Train Loss = 1.5539, Train Acc = 0.6613, Val Loss = 0.9628, Val Acc = 0.8365
Epoch 6: Train Loss = 0.9748, Train Acc = 0.8117, Val Loss = 0.4993, Val Acc = 0.9447
Epoch 7: Train Loss = 0.5056, Train Acc = 0.9135, Val Loss = 0.2317, Val Acc = 0.9739
Epoch 8: Train Loss = 0.2431, Train Acc = 0.9624, Val Loss = 0.0965, Val Acc = 0.9987
Epoch 9: Train Loss = 0.0944, Train Acc = 0.9931, Val Loss = 0.0416, Val Acc = 0.9984
Epoch 10: Train Loss = 0.0458, Train Acc = 0.9974, Val Loss



Config layers=1, heads=2: Val Loss = 0.6196, Val Acc = 0.8843
Config layers=1, heads=4: Val Loss = 0.3687, Val Acc = 0.9328
Config layers=2, heads=2: Val Loss = 1.8247, Val Acc = 0.5534
Config layers=2, heads=4: Val Loss = 1.6366, Val Acc = 0.5741
Config layers=4, heads=2: Val Loss = 4.5403, Val Acc = 0.1771
Config layers=4, heads=4: Val Loss = 4.5466, Val Acc = 0.1754
=== Transformer Translation Test ===

Input: She wears a red dress and dances at the party
Output: elle danse à la porte une robe rouge

Input: After they visit the museum, they play video games
Output: ils jouent aux jeux vidéo

Input: Although he is tired, he works hard every day
Output: il travaille dur tous les jours

Input: She sings a song while cooking dinner
Output: elle danse avec grâce

Input: We eat breakfast together before we go to the gym
Output: nous prenons le dîner ensemble

Input: He said that the coffee is hot
Output: il a faim dans la leçon

Input: She thinks that the teacher explains the lesson well


In [None]:
from google.colab import drive
drive.mount('/content/drive')
from docx import Document
doc_path = "/content/drive/My Drive/Dataset - English to French.docx"
doc = Document(doc_path)
raw_text = "\n".join([para.text for para in doc.paragraphs])
namespace = {}
exec(raw_text, namespace)
english_to_french = namespace['english_to_french']
import pandas as pd, re, math, torch, torch.nn as nn, torch.optim as optim
from torch.utils.data import Dataset, DataLoader
def preprocess(sentence):
    sentence = sentence.lower().strip()
    sentence = re.sub(r"[^a-zA-ZÀ-ÿ\s]", "", sentence)
    tokens = sentence.split()
    return tokens
df = pd.DataFrame(english_to_french, columns=["English", "French"])
df["English_tokens"] = df["English"].apply(preprocess)
df["French_tokens"] = df["French"].apply(preprocess)
class Vocab:
    def __init__(self, tokens_list, min_freq=1):
        self.word2index = {"<pad>":0,"<sos>":1,"<eos>":2,"<unk>":3}
        self.index2word = {0:"<pad>",1:"<sos>",2:"<eos>",3:"<unk>"}
        self.word_freq = {}
        self.min_freq = min_freq
        self.build_vocab(tokens_list)
    def build_vocab(self, tokens_list):
        idx = len(self.word2index)
        for tokens in tokens_list:
            for word in tokens:
                self.word_freq[word] = self.word_freq.get(word,0)+1
        for word, freq in self.word_freq.items():
            if freq>=self.min_freq:
                self.word2index[word] = idx
                self.index2word[idx] = word
                idx+=1
    def numericalize(self, tokens):
        return [self.word2index.get(word, self.word2index["<unk>"]) for word in tokens]
english_vocab = Vocab(df["English_tokens"])
french_vocab = Vocab(df["French_tokens"])
class TranslationDataset(Dataset):
    def __init__(self, df, src_col, tgt_col, src_vocab, tgt_vocab):
        self.df = df; self.src_col = src_col; self.tgt_col = tgt_col; self.src_vocab = src_vocab; self.tgt_vocab = tgt_vocab
    def __len__(self):
        return len(self.df)
    def __getitem__(self, idx):
        src_tokens = self.df.iloc[idx][self.src_col]
        tgt_tokens = self.df.iloc[idx][self.tgt_col]
        src_indices = self.src_vocab.numericalize(src_tokens)
        tgt_indices = [self.tgt_vocab.word2index["<sos>"]] + self.tgt_vocab.numericalize(tgt_tokens) + [self.tgt_vocab.word2index["<eos>"]]
        return torch.tensor(src_indices), torch.tensor(tgt_indices)
def collate_fn(batch):
    src_batch, tgt_batch = zip(*batch)
    src_lens = [len(s) for s in src_batch]
    tgt_lens = [len(t) for t in tgt_batch]
    src_padded = nn.utils.rnn.pad_sequence(src_batch, batch_first=True, padding_value=french_vocab.word2index["<pad>"])
    tgt_padded = nn.utils.rnn.pad_sequence(tgt_batch, batch_first=True, padding_value=english_vocab.word2index["<pad>"])
    return src_padded, tgt_padded, src_lens, tgt_lens
dataset_f2e = TranslationDataset(df, "French_tokens", "English_tokens", french_vocab, english_vocab)
dataloader_f2e = DataLoader(dataset_f2e, batch_size=4, shuffle=True, collate_fn=collate_fn)
def compute_accuracy(output, tgt, pad_idx):
    pred_tokens = output.argmax(dim=-1)
    mask = (tgt != pad_idx)
    correct = (pred_tokens == tgt)*mask
    return correct.sum().item()/mask.sum().item()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hid_dim, num_layers=1):
        super(Encoder,self).__init__()
        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.gru = nn.GRU(emb_dim, hid_dim, num_layers, batch_first=True)
    def forward(self, src, src_lens):
        embedded = self.embedding(src)
        packed = nn.utils.rnn.pack_padded_sequence(embedded, src_lens, batch_first=True, enforce_sorted=False)
        outputs, hidden = self.gru(packed)
        return hidden
class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hid_dim, num_layers=1):
        super(Decoder,self).__init__()
        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.gru = nn.GRU(emb_dim, hid_dim, num_layers, batch_first=True)
        self.fc_out = nn.Linear(hid_dim, output_dim)
    def forward(self, input, hidden):
        input = input.unsqueeze(1)
        embedded = self.embedding(input)
        output, hidden = self.gru(embedded, hidden)
        prediction = self.fc_out(output.squeeze(1))
        return prediction, hidden
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super(Seq2Seq,self).__init__()
        self.encoder = encoder; self.decoder = decoder; self.device = device
    def forward(self, src, src_lens, tgt, teacher_forcing_ratio=0.5):
        batch_size = src.size(0); tgt_len = tgt.size(1); tgt_vocab_size = self.decoder.fc_out.out_features
        outputs = torch.zeros(batch_size, tgt_len, tgt_vocab_size).to(self.device)
        hidden = self.encoder(src, src_lens)
        input_token = tgt[:,0]
        for t in range(1,tgt_len):
            output, hidden = self.decoder(input_token, hidden)
            outputs[:,t] = output
            teacher_force = torch.rand(1).item() < teacher_forcing_ratio
            top1 = output.argmax(1)
            input_token = tgt[:,t] if teacher_force else top1
        return outputs
EMB_DIM = 256; HID_DIM = 512; N_LAYERS = 1; NUM_EPOCHS = 10
encoder_f2e = Encoder(len(french_vocab.word2index), EMB_DIM, HID_DIM, N_LAYERS).to(device)
decoder_f2e = Decoder(len(english_vocab.word2index), EMB_DIM, HID_DIM, N_LAYERS).to(device)
model_f2e = Seq2Seq(encoder_f2e, decoder_f2e, device).to(device)
optimizer_f2e = optim.Adam(model_f2e.parameters())
criterion_f2e = nn.CrossEntropyLoss(ignore_index=english_vocab.word2index["<pad>"])
def train_model(model, dataloader, optimizer, criterion, clip=1):
    model.train()
    epoch_loss=0; epoch_acc=0
    for src, tgt, src_lens, tgt_lens in dataloader:
        src, tgt = src.to(device), tgt.to(device)
        optimizer.zero_grad()
        output = model(src, src_lens, tgt)
        output_dim = output.shape[-1]
        output_for_loss = output[:,1:].reshape(-1, output_dim)
        tgt_for_loss = tgt[:,1:].reshape(-1)
        loss = criterion(output_for_loss, tgt_for_loss)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        acc = compute_accuracy(output[:,1:], tgt[:,1:], english_vocab.word2index["<pad>"])
        epoch_loss += loss.item(); epoch_acc += acc
    return epoch_loss/len(dataloader), epoch_acc/len(dataloader)
def evaluate_model(model, dataloader, criterion):
    model.eval()
    epoch_loss=0; epoch_acc=0
    with torch.no_grad():
        for src, tgt, src_lens, tgt_lens in dataloader:
            src, tgt = src.to(device), tgt.to(device)
            output = model(src, src_lens, tgt, teacher_forcing_ratio=0)
            output_dim = output.shape[-1]
            output_for_loss = output[:,1:].reshape(-1, output_dim)
            tgt_for_loss = tgt[:,1:].reshape(-1)
            loss = criterion(output_for_loss, tgt_for_loss)
            acc = compute_accuracy(output[:,1:], tgt[:,1:], english_vocab.word2index["<pad>"])
            epoch_loss += loss.item(); epoch_acc += acc
    return epoch_loss/len(dataloader), epoch_acc/len(dataloader)
print("=== RNN Seq2Seq French-to-English ===")
for epoch in range(NUM_EPOCHS):
    train_loss, train_acc = train_model(model_f2e, dataloader_f2e, optimizer_f2e, criterion_f2e)
    val_loss, val_acc = evaluate_model(model_f2e, dataloader_f2e, criterion_f2e)
    print(f"Epoch {epoch+1}: Train Loss = {train_loss:.4f}, Train Acc = {train_acc:.4f}, Val Loss = {val_loss:.4f}, Val Acc = {val_acc:.4f}")
def translate_sentence_f2e(model, sentence, src_vocab, tgt_vocab, max_len=20):
    model.eval()
    tokens = preprocess(sentence)
    indices = src_vocab.numericalize(tokens)
    src_tensor = torch.tensor(indices).unsqueeze(0).to(device)
    src_len = [len(indices)]
    with torch.no_grad():
        hidden = model.encoder(src_tensor, src_len)
    input_token = tgt_vocab.word2index["<sos>"]
    translated_sentence = []
    for _ in range(max_len):
        with torch.no_grad():
            output, hidden = model.decoder(torch.tensor([input_token]).to(device), hidden)
        top1 = output.argmax(1).item()
        if top1 == tgt_vocab.word2index["<eos>"]:
            break
        translated_sentence.append(tgt_vocab.index2word[top1])
        input_token = top1
    return " ".join(translated_sentence)
test_french_sentences = ["elle porte une robe rouge et danse à la fête",
"après avoir visité le musée, ils jouent aux jeux vidéo",
"bien qu'il soit fatigué, il travaille dur tous les jours",
"elle chante une chanson en cuisinant le dîner",
"nous prenons le petit déjeuner ensemble avant d'aller à la salle de sport"]
print("=== RNN Seq2Seq French-to-English Translation Test ===")
for sentence in test_french_sentences:
    print(f"\nInput: {sentence}")
    output = translate_sentence_f2e(model_f2e, sentence, french_vocab, english_vocab)
    print(f"Output: {output}")
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding,self).__init__()
        self.dropout = nn.Dropout(dropout)
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float()*(-math.log(10000.0)/d_model))
        pe[:,0::2] = torch.sin(position*div_term)
        pe[:,1::2] = torch.cos(position*div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)
    def forward(self, x):
        x = x + self.pe[:,:x.size(1)]
        return self.dropout(x)
class TransformerModel(nn.Module):
    def __init__(self, input_dim, output_dim, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout=0.1):
        super(TransformerModel,self).__init__()
        self.src_embedding = nn.Embedding(input_dim, d_model)
        self.tgt_embedding = nn.Embedding(output_dim, d_model)
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        self.transformer = nn.Transformer(d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout)
        self.fc_out = nn.Linear(d_model, output_dim)
        self.d_model = d_model
    def forward(self, src, tgt):
        src_emb = self.pos_encoder(self.src_embedding(src)*math.sqrt(self.d_model))
        tgt_emb = self.pos_encoder(self.tgt_embedding(tgt)*math.sqrt(self.d_model))
        src_emb = src_emb.transpose(0,1)
        tgt_emb = tgt_emb.transpose(0,1)
        tgt_mask = self.transformer.generate_square_subsequent_mask(tgt_emb.size(0)).to(src.device)
        output = self.transformer(src_emb, tgt_emb, tgt_mask=tgt_mask)
        output = self.fc_out(output.transpose(0,1))
        return output
def train_transformer(model, dataloader, optimizer, criterion, clip=1):
    model.train()
    epoch_loss=0; epoch_acc=0
    for src, tgt, src_lens, tgt_lens in dataloader:
        src, tgt = src.to(device), tgt.to(device)
        optimizer.zero_grad()
        tgt_input = tgt[:,:-1]
        output = model(src, tgt_input)
        output_dim = output.shape[-1]
        output = output.reshape(-1, output_dim)
        tgt_out = tgt[:,1:].reshape(-1)
        loss = criterion(output, tgt_out)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        acc = compute_accuracy(output.view(tgt.size(0), -1, output_dim), tgt[:,1:], english_vocab.word2index["<pad>"])
        epoch_loss += loss.item(); epoch_acc += acc
    return epoch_loss/len(dataloader), epoch_acc/len(dataloader)
def evaluate_transformer(model, dataloader, criterion):
    model.eval()
    epoch_loss=0; epoch_acc=0
    with torch.no_grad():
        for src, tgt, src_lens, tgt_lens in dataloader:
            src, tgt = src.to(device), tgt.to(device)
            tgt_input = tgt[:,:-1]
            output = model(src, tgt_input)
            output_dim = output.shape[-1]
            output = output.reshape(-1, output_dim)
            tgt_out = tgt[:,1:].reshape(-1)
            loss = criterion(output, tgt_out)
            acc = compute_accuracy(output.view(tgt.size(0), -1, output_dim), tgt[:,1:], english_vocab.word2index["<pad>"])
            epoch_loss += loss.item(); epoch_acc += acc
    return epoch_loss/len(dataloader), epoch_acc/len(dataloader)
def translate_sentence_transformer(model, sentence, src_vocab, tgt_vocab, max_len=20):
    model.eval()
    tokens = preprocess(sentence)
    indices = src_vocab.numericalize(tokens)
    src_tensor = torch.tensor(indices).unsqueeze(0).to(device)
    src_emb = model.pos_encoder(model.src_embedding(src_tensor)*math.sqrt(model.d_model)).transpose(0,1)
    memory = model.transformer.encoder(src_emb)
    tgt_indices = [tgt_vocab.word2index["<sos>"]]
    for i in range(max_len):
        tgt_tensor = torch.tensor(tgt_indices).unsqueeze(0).to(device)
        tgt_emb = model.pos_encoder(model.tgt_embedding(tgt_tensor)*math.sqrt(model.d_model)).transpose(0,1)
        tgt_mask = model.transformer.generate_square_subsequent_mask(tgt_emb.size(0)).to(device)
        output = model.transformer.decoder(tgt_emb, memory, tgt_mask=tgt_mask)
        output = model.fc_out(output.transpose(0,1))
        next_token = output[0,-1].argmax().item()
        if next_token==tgt_vocab.word2index["<eos>"]:
            break
        tgt_indices.append(next_token)
    return " ".join([tgt_vocab.index2word[idx] for idx in tgt_indices[1:]])
best_trans_acc = -1
best_trans_model = None
trans_results = {}
print("=== Transformer Seq2Seq French-to-English Experiments ===")
for num_layers in [1,2,4]:
    for nhead in [2,4]:
        model_trans = TransformerModel(len(french_vocab.word2index), len(english_vocab.word2index), EMB_DIM, nhead, num_layers, num_layers, HID_DIM).to(device)
        optimizer_trans = optim.Adam(model_trans.parameters())
        criterion_trans = nn.CrossEntropyLoss(ignore_index=english_vocab.word2index["<pad>"])
        for epoch in range(NUM_EPOCHS):
            train_loss, train_acc = train_transformer(model_trans, dataloader_f2e, optimizer_trans, criterion_trans)
            val_loss, val_acc = evaluate_transformer(model_trans, dataloader_f2e, criterion_trans)
        trans_results[f"layers{num_layers}_heads{nhead}"] = (val_loss, val_acc)
        if val_acc > best_trans_acc:
            best_trans_acc = val_acc
            best_trans_model = model_trans
        print(f"Config layers={num_layers}, heads={nhead}: Val Loss = {val_loss:.4f}, Val Acc = {val_acc:.4f}")
print("=== Transformer French-to-English Translation Test ===")
for sentence in test_french_sentences:
    print(f"\nInput: {sentence}")
    output = translate_sentence_transformer(best_trans_model, sentence, french_vocab, english_vocab)
    print(f"Output: {output}")


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
=== RNN Seq2Seq French-to-English ===
Epoch 1: Train Loss = 4.5612, Train Acc = 0.2584, Val Loss = 3.4862, Val Acc = 0.3796
Epoch 2: Train Loss = 3.2736, Train Acc = 0.4103, Val Loss = 2.8057, Val Acc = 0.4403
Epoch 3: Train Loss = 2.6080, Train Acc = 0.4771, Val Loss = 2.0664, Val Acc = 0.5617
Epoch 4: Train Loss = 2.0560, Train Acc = 0.5283, Val Loss = 1.3633, Val Acc = 0.7605
Epoch 5: Train Loss = 1.2675, Train Acc = 0.7682, Val Loss = 0.7492, Val Acc = 0.9156
Epoch 6: Train Loss = 0.6652, Train Acc = 0.9123, Val Loss = 0.3172, Val Acc = 0.9645
Epoch 7: Train Loss = 0.3352, Train Acc = 0.9524, Val Loss = 0.1847, Val Acc = 0.9793
Epoch 8: Train Loss = 0.1391, Train Acc = 0.9861, Val Loss = 0.0668, Val Acc = 0.9939
Epoch 9: Train Loss = 0.0654, Train Acc = 0.9906, Val Loss = 0.0348, Val Acc = 0.9983
Epoch 10: Train Loss = 0.0393, Train Acc = 0.9954, Val Loss



Config layers=1, heads=2: Val Loss = 0.4986, Val Acc = 0.8872
Config layers=1, heads=4: Val Loss = 0.5234, Val Acc = 0.9096
Config layers=2, heads=2: Val Loss = 2.2139, Val Acc = 0.4544
Config layers=2, heads=4: Val Loss = 1.8115, Val Acc = 0.5283
Config layers=4, heads=2: Val Loss = 4.3842, Val Acc = 0.1881
Config layers=4, heads=4: Val Loss = 4.1699, Val Acc = 0.1879
=== Transformer French-to-English Translation Test ===

Input: elle porte une robe rouge et danse à la fête
Output: she catches at the party

Input: après avoir visité le musée, ils jouent aux jeux vidéo
Output: they play soccer every weekend

Input: bien qu'il soit fatigué, il travaille dur tous les jours
Output: he works hard every day

Input: elle chante une chanson en cuisinant le dîner
Output: she sings in the bus in the bus in the bus in the bus in the bus in the bus

Input: nous prenons le petit déjeuner ensemble avant d'aller à la salle de sport
Output: we practice yoga
