In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset


import numpy as np
import time
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader, random_split
import requests

#For training the models with different layers and heads
from itertools import product

#For Document processing in Problems 3 and 4
#pip install python-docx
from collections import Counter
from docx import Document
from torch.nn.utils.rnn import pad_sequence
import random
import math



device = torch.device("cuda")
print(f"Using device: {device}")

#Check the GPU name and number
'''
devNumber = torch.cuda.current_device()
devName = torch.cuda.get_device_name(devNumber)

print(f"Current device number is: {devNumber}")
print(f"GPU name is: {devName}")
'''

Using device: cuda


'\ndevNumber = torch.cuda.current_device()\ndevName = torch.cuda.get_device_name(devNumber)\n\nprint(f"Current device number is: {devNumber}")\nprint(f"GPU name is: {devName}")\n'

In [None]:
# ========================
# Problem 1: Transformer-based Next Character Prediction
# ========================

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import time
from sklearn.model_selection import train_test_split

# ========== Dataset ==========
text = """Next character prediction is a fundamental task in the field of natural language processing (NLP) that involves predicting the next character in a sequence of text based on the characters that precede it. This task is essential for various applications, including text auto-completion, spell checking, and even in the development of sophisticated AI models capable of generating human-like text.
At its core, next character prediction relies on statistical models or deep learning algorithms to analyze a given sequence of text and predict which character is most likely to follow. These predictions are based on patterns and relationships learned from large datasets of text during the training phase of the model.
One of the most popular approaches to next character prediction involves the use of Recurrent Neural Networks (RNNs), and more specifically, a variant called Long Short-Term Memory (LSTM) networks. RNNs are particularly well-suited for sequential data like text, as they can maintain information in 'memory' about previous characters to inform the prediction of the next character. LSTM networks enhance this capability by being able to remember long-term dependencies, making them even more effective for next character prediction tasks.
Training a model for next character prediction involves feeding it large amounts of text data, allowing it to learn the probability of each character's appearance following a sequence of characters. During this training process, the model adjusts its parameters to minimize the difference between its predictions and the actual outcomes, thus improving its predictive accuracy over time.
Once trained, the model can be used to predict the next character in a given piece of text by considering the sequence of characters that precede it. This can enhance user experience in text editing software, improve efficiency in coding environments with auto-completion features, and enable more natural interactions with AI-based chatbots and virtual assistants.
In summary, next character prediction plays a crucial role in enhancing the capabilities of various NLP applications, making text-based interactions more efficient, accurate, and human-like. Through the use of advanced machine learning models like RNNs and LSTMs, next character prediction continues to evolve, opening new possibilities for the future of text-based technology."""

chars = sorted(list(set(text)))
char_to_ix = {ch: i for i, ch in enumerate(chars)}
ix_to_char = {i: ch for i, ch in enumerate(chars)}

def prepare_data(seq_length):
    X, y = [], []
    for i in range(len(text) - seq_length):
        X.append([char_to_ix[ch] for ch in text[i:i+seq_length]])
        y.append(char_to_ix[text[i+seq_length]])
    return np.array(X), np.array(y)

# ========== Model ==========
class TransformerCharModel(nn.Module):
    def __init__(self, vocab_size, seq_len, d_model=64, nhead=2, num_layers=2, dim_feedforward=128):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_embedding = nn.Parameter(torch.randn(1, seq_len, d_model))
        encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, dim_feedforward=dim_feedforward)
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.fc = nn.Linear(d_model, vocab_size)

    def forward(self, x):
        x = self.embedding(x) + self.pos_embedding
        x = x.permute(1, 0, 2)  # seq_len first
        out = self.transformer(x)
        return self.fc(out[-1])

# ========== Training ==========
def train_model(seq_len, epochs=40, lr=0.005):
    X, y = prepare_data(seq_len)
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

    X_train = torch.tensor(X_train, dtype=torch.long)
    y_train = torch.tensor(y_train, dtype=torch.long)
    X_val = torch.tensor(X_val, dtype=torch.long)
    y_val = torch.tensor(y_val, dtype=torch.long)

    model = TransformerCharModel(vocab_size=len(chars), seq_len=seq_len)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    loss_fn = nn.CrossEntropyLoss()

    model_size = sum(p.numel() for p in model.parameters())
    start = time.time()

    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()
        out = model(X_train)
        loss = loss_fn(out, y_train)
        loss.backward()
        optimizer.step()

        model.eval()
        with torch.no_grad():
            val_out = model(X_val)
            val_loss = loss_fn(val_out, y_val)
            acc = (val_out.argmax(dim=1) == y_val).float().mean().item()

        if (epoch + 1) % 10 == 0:
            print(f"[{seq_len}] Epoch {epoch+1}: Train Loss={loss.item():.4f}, Val Loss={val_loss.item():.4f}, Acc={acc:.4f}")

    elapsed = time.time() - start
    return loss.item(), val_loss.item(), acc, elapsed, model_size

# ========== Run Experiments ==========
results = []
for length in [10, 20, 30]:
    print(f"\nTraining for sequence length {length}")
    train_loss, val_loss, val_acc, duration, size = train_model(length)
    results.append({
        "Seq Len": length,
        "Train Loss": train_loss,
        "Val Loss": val_loss,
        "Val Acc": val_acc,
        "Time": duration,
        "Params": size
    })

# ========== Summary ==========
print("\n==== Final Results ====")
for res in results:
    print(f"SeqLen {res['Seq Len']:>2} | Train Loss: {res['Train Loss']:.4f} | Val Acc: {res['Val Acc']:.4f} | "
          f"Time: {res['Time']:.2f}s | Params: {res['Params']}")



Training Transformer with sequence length: 10
Epoch 10, Loss: 2.5806, Val Loss: 2.6396, Val Acc: 0.2689
Epoch 20, Loss: 2.2084, Val Loss: 2.4492, Val Acc: 0.3067
Epoch 30, Loss: 1.9263, Val Loss: 2.3725, Val Acc: 0.3508
Epoch 40, Loss: 1.6495, Val Loss: 2.3561, Val Acc: 0.3739
Epoch 50, Loss: 1.3817, Val Loss: 2.3911, Val Acc: 0.3908

Training Transformer with sequence length: 20
Epoch 10, Loss: 2.5817, Val Loss: 2.6580, Val Acc: 0.2468
Epoch 20, Loss: 2.2576, Val Loss: 2.4979, Val Acc: 0.2954
Epoch 30, Loss: 1.9780, Val Loss: 2.3831, Val Acc: 0.3586
Epoch 40, Loss: 1.6860, Val Loss: 2.3186, Val Acc: 0.4135
Epoch 50, Loss: 1.3948, Val Loss: 2.3322, Val Acc: 0.4072

Training Transformer with sequence length: 30
Epoch 10, Loss: 2.6281, Val Loss: 2.6028, Val Acc: 0.2542
Epoch 20, Loss: 2.3246, Val Loss: 2.4322, Val Acc: 0.2669
Epoch 30, Loss: 2.0576, Val Loss: 2.3558, Val Acc: 0.2945
Epoch 40, Loss: 1.7719, Val Loss: 2.3053, Val Acc: 0.3453
Epoch 50, Loss: 1.4476, Val Loss: 2.4044, Val A

In [None]:
# ========================
# Problem 1: RNN-based approach with cross-attention
# The RNN-based approach without cross-attention is already implemented in HW3
# ========================

import torch
import torch.nn as nn
import torch.optim as optim
import time
import numpy as np
from sklearn.model_selection import train_test_split

# === CONFIGURATION ===
class Config:
    hidden_dim = 128
    learning_rate = 0.005
    num_epochs = 50
    sequence_lengths = [10, 20, 30]
    rnn_modes = ['LSTM', 'GRU']

# === TEXT SETUP ===
raw_text = """Next character prediction is a fundamental task in the field of natural language processing (NLP) that involves predicting the next character in a sequence of text..."""
chars = sorted(list(set(raw_text)))
char_to_idx = {c: i for i, c in enumerate(chars)}
idx_to_char = {i: c for i, c in enumerate(chars)}
vocab_size = len(chars)

def create_dataset(seq_len):
    X, y = [], []
    for i in range(len(raw_text) - seq_len):
        X.append([char_to_idx[c] for c in raw_text[i:i + seq_len]])
        y.append(char_to_idx[raw_text[i + seq_len]])
    return torch.tensor(X), torch.tensor(y)

# === CUSTOM ATTENTION LAYER ===
class SimpleAttention(nn.Module):
    def __init__(self, hidden_dim):
        super().__init__()
        self.attn = nn.Sequential(
            nn.Linear(hidden_dim * 2, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, 1, bias=False)
        )

    def forward(self, encoder_outputs):
        attn_scores = self.attn(encoder_outputs).squeeze(-1)  # (batch, seq_len)
        attn_weights = torch.softmax(attn_scores, dim=1)      # (batch, seq_len)
        weighted_sum = torch.bmm(attn_weights.unsqueeze(1), encoder_outputs)  # (batch, 1, hidden*2)
        return weighted_sum.squeeze(1)

# === MODEL ===
class RecurrentAttentionModel(nn.Module):
    def __init__(self, vocab_size, hidden_dim, rnn_type='LSTM'):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, hidden_dim)
        self.rnn_type = rnn_type
        rnn_cls = nn.LSTM if rnn_type == 'LSTM' else nn.GRU
        self.rnn = rnn_cls(hidden_dim, hidden_dim, batch_first=True, bidirectional=True)
        self.attn = SimpleAttention(hidden_dim)
        self.out = nn.Linear(hidden_dim * 2, vocab_size)

    def forward(self, x):
        x_embed = self.embedding(x)                   # (batch, seq_len, hidden)
        rnn_out, _ = self.rnn(x_embed)                # (batch, seq_len, hidden*2)
        context = self.attn(rnn_out)                  # (batch, hidden*2)
        return self.out(context)                      # (batch, vocab)

# === TRAINING LOOP ===
def train_model(model, train_x, train_y, val_x, val_y, cfg):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=cfg.learning_rate)
    model_size = sum(p.numel() for p in model.parameters())
    start_time = time.time()

    for epoch in range(cfg.num_epochs):
        model.train()
        optimizer.zero_grad()
        outputs = model(train_x)
        loss = criterion(outputs, train_y)
        loss.backward()
        optimizer.step()

        model.eval()
        with torch.no_grad():
            val_outputs = model(val_x)
            val_loss = criterion(val_outputs, val_y).item()
            predictions = val_outputs.argmax(dim=1)
            accuracy = (predictions == val_y).float().mean().item()

        if (epoch + 1) % 10 == 0:
            print(f"[{model.rnn_type}] Epoch {epoch+1}: Train Loss={loss.item():.4f} | Val Loss={val_loss:.4f} | Acc={accuracy:.4f}")

    duration = time.time() - start_time
    return loss.item(), val_loss, accuracy, duration, model_size

# === MAIN EXPERIMENT LOOP ===
results = []
cfg = Config()

for rnn_type in cfg.rnn_modes:
    print(f"\n== Training RNN: {rnn_type} ==")
    for seq_len in cfg.sequence_lengths:
        print(f"-- Sequence Length: {seq_len}")
        X, y = create_dataset(seq_len)
        X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

        model = RecurrentAttentionModel(vocab_size, cfg.hidden_dim, rnn_type)
        train_loss, val_loss, val_acc, time_taken, param_count = train_model(
            model, X_train, y_train, X_val, y_val, cfg
        )

        results.append({
            'Model': f'{rnn_type}-Attn',
            'Seq Len': seq_len,
            'Train Loss': train_loss,
            'Val Loss': val_loss,
            'Accuracy': val_acc,
            'Time': time_taken,
            'Params': param_count
        })

# === REPORT RESULTS ===
print("\n=== Results Summary ===")
for r in results:
    print(f"{r['Model']} | Seq={r['Seq Len']} | Train Loss={r['Train Loss']:.4f} | "
          f"Val Loss={r['Val Loss']:.4f} | Acc={r['Accuracy']:.4f} | "
          f"Time={r['Time']:.2f}s | Params={r['Params']}")



== Training RNN: LSTM ==
-- Sequence Length: 10
[LSTM] Epoch 10: Train Loss=1.9722 | Val Loss=3.5993 | Acc=0.0625
[LSTM] Epoch 20: Train Loss=0.3456 | Val Loss=4.6127 | Acc=0.0625
[LSTM] Epoch 30: Train Loss=0.0197 | Val Loss=5.6436 | Acc=0.0938
[LSTM] Epoch 40: Train Loss=0.0035 | Val Loss=6.2638 | Acc=0.0938
[LSTM] Epoch 50: Train Loss=0.0015 | Val Loss=6.6444 | Acc=0.0938
-- Sequence Length: 20
[LSTM] Epoch 10: Train Loss=2.4225 | Val Loss=3.7580 | Acc=0.0333
[LSTM] Epoch 20: Train Loss=1.2597 | Val Loss=4.7150 | Acc=0.1000
[LSTM] Epoch 30: Train Loss=0.3285 | Val Loss=6.0369 | Acc=0.1000
[LSTM] Epoch 40: Train Loss=0.0447 | Val Loss=7.2752 | Acc=0.1000
[LSTM] Epoch 50: Train Loss=0.0094 | Val Loss=7.8635 | Acc=0.1000
-- Sequence Length: 30
[LSTM] Epoch 10: Train Loss=2.6214 | Val Loss=3.7956 | Acc=0.0357
[LSTM] Epoch 20: Train Loss=1.7139 | Val Loss=5.1505 | Acc=0.0000
[LSTM] Epoch 30: Train Loss=0.6881 | Val Loss=6.5154 | Acc=0.0000
[LSTM] Epoch 40: Train Loss=0.1556 | Val Loss=7

In [None]:
#=======================
# Problem 2
#=======================
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import requests
import time
import numpy as np
from sklearn.model_selection import train_test_split
from itertools import product

class CharacterMapper:
    def __init__(self, text):
        self.chars = sorted(list(set(text)))
        self.char_to_idx = {ch: i for i, ch in enumerate(self.chars)}
        self.idx_to_char = {i: ch for i, ch in enumerate(self.chars)}
        self.vocab_size = len(self.chars)

    def encode(self, text):
        return [self.char_to_idx[ch] for ch in text]

    def decode(self, indices):
        return ''.join([self.idx_to_char[idx] for idx in indices])

class TextSequenceDataset(Dataset):
    def __init__(self, sequences, targets):
        self.sequences = sequences
        self.targets = targets

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        return self.sequences[idx], self.targets[idx]

class PositionalEmbedder(nn.Module):
    def __init__(self, d_model, max_len=500):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe.unsqueeze(0))

    def forward(self, x):
        return x + self.pe[:, :x.size(1)].to(x.device)

class ShakespeareTransformer(nn.Module):
    def __init__(self, vocab_size, d_model=64, nhead=4, num_layers=2):
        super().__init__()
        self.token_embedding = nn.Embedding(vocab_size, d_model)
        self.position_embedding = PositionalEmbedder(d_model)

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=nhead,
            dim_feedforward=256,
            batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.classifier = nn.Linear(d_model, vocab_size)

    def forward(self, x):
        x = self.token_embedding(x)
        x = self.position_embedding(x)
        x = self.transformer(x)
        x = self.classifier(x[:, -1, :])
        return x

class ModelTrainer:
    def __init__(self, model, train_loader, val_loader, learning_rate=0.005):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model = model.to(self.device)
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.optimizer = optim.Adam(model.parameters(), lr=learning_rate)
        self.criterion = nn.CrossEntropyLoss()

    def train_epoch(self):
        try:
            self.model.train()
            total_loss = 0
            for batch_X, batch_y in self.train_loader:
                batch_X, batch_y = batch_X.to(self.device), batch_y.to(self.device)
                self.optimizer.zero_grad()
                output = self.model(batch_X)
                loss = self.criterion(output, batch_y)
                loss.backward()
                self.optimizer.step()
                total_loss += loss.item()
            return total_loss / len(self.train_loader)
        except KeyboardInterrupt:
            print("\nTraining interrupted by user")
            return None
        except Exception as e:
            print(f"\nError during training: {str(e)}")
            return None

    def evaluate(self):
        try:
            self.model.eval()
            total_loss, correct, total = 0, 0, 0
            with torch.no_grad():
                for batch_X, batch_y in self.val_loader:
                    batch_X, batch_y = batch_X.to(self.device), batch_y.to(self.device)
                    output = self.model(batch_X)
                    loss = self.criterion(output, batch_y)
                    total_loss += loss.item()
                    _, predicted = torch.max(output, 1)
                    correct += (predicted == batch_y).sum().item()
                    total += batch_y.size(0)
            return total_loss / len(self.val_loader), correct / total
        except Exception as e:
            print(f"\nError during evaluation: {str(e)}")
            return None, None

def prepare_sequences(text, sequence_length):
    mapper = CharacterMapper(text)
    encoded_text = mapper.encode(text)

    sequences, targets = [], []
    for i in range(len(encoded_text) - sequence_length):
        seq = encoded_text[i:i + sequence_length]
        target = encoded_text[i + sequence_length]
        sequences.append(seq)
        targets.append(target)

    return torch.tensor(sequences, dtype=torch.long), torch.tensor(targets, dtype=torch.long), mapper

def main():
    try:
        # Load Shakespeare dataset
        url = "https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt"
        text = requests.get(url).text

        # Model configurations
        configs = {
            "sequence_lengths": [20, 30],
            "d_model": 64,
            "num_layers": [1, 2, 4],
            "nhead": [2, 4],
            "epochs": 10,
            "batch_size": 64,
            "learning_rate": 0.005
        }

        results = []
        for num_layers, nhead in product(configs["num_layers"], configs["nhead"]):
            print(f"\nTransformer: Layers={num_layers} Heads={nhead}")

            for seq_length in configs["sequence_lengths"]:
                print(f"\nTraining with sequence length: {seq_length}")

                try:
                    # Prepare data
                    X, y, mapper = prepare_sequences(text, seq_length)
                    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

                    train_dataset = TextSequenceDataset(X_train, y_train)
                    val_dataset = TextSequenceDataset(X_val, y_val)

                    train_loader = DataLoader(train_dataset, batch_size=configs["batch_size"], shuffle=True, num_workers=0)
                    val_loader = DataLoader(val_dataset, batch_size=configs["batch_size"], shuffle=False, num_workers=0)

                    # Initialize model and trainer
                    model = ShakespeareTransformer(mapper.vocab_size, configs["d_model"], nhead, num_layers)
                    trainer = ModelTrainer(model, train_loader, val_loader, learning_rate=configs["learning_rate"])

                    # Training loop
                    start_time = time.time()
                    for epoch in range(1, configs["epochs"] + 1):
                        train_loss = trainer.train_epoch()
                        if train_loss is None:
                            break

                        val_loss, val_acc = trainer.evaluate()
                        if val_loss is None or val_acc is None:
                            break

                        if epoch % 2 == 0:
                            print(f'Epoch {epoch}, Train Loss: {train_loss:.4f}, '
                                  f'Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}')

                    exec_time = time.time() - start_time
                    model_size = sum(p.numel() for p in model.parameters())

                    results.append({
                        "Seq Len": seq_length,
                        "Layers": num_layers,
                        "Heads": nhead,
                        "Loss": train_loss,
                        "Val Acc": val_acc,
                        "Time": exec_time,
                        "Model Size": model_size
                    })

                except Exception as e:
                    print(f"Error during training with sequence length {seq_length}: {str(e)}")
                    continue

        # Print results
        print("\nTransformer Results:")
        for r in results:
            print(f"Seq Len: {r['Seq Len']} | Layers: {r['Layers']} | Heads: {r['Heads']} | "
                  f"Loss: {r['Loss']:.4f} | Val Acc: {r['Val Acc']:.4f} | Time: {r['Time']:.2f}s | "
                  f"Model Size: {r['Model Size']}")

    except KeyboardInterrupt:
        print("\nProgram interrupted by user")
    except Exception as e:
        print(f"\nError in main program: {str(e)}")

if __name__ == "__main__":
    main()


Transformer: Layers=1 Heads=2

Training with sequence length: 20
Epoch 2, Train Loss: 2.1809, Val Loss: 2.0653, Val Acc: 0.3887
Epoch 4, Train Loss: 2.1476, Val Loss: 2.0461, Val Acc: 0.3909
Epoch 6, Train Loss: 2.1920, Val Loss: 2.0999, Val Acc: 0.3690
Epoch 8, Train Loss: 2.1590, Val Loss: 2.1057, Val Acc: 0.3670
Epoch 10, Train Loss: 2.1932, Val Loss: 2.1535, Val Acc: 0.3543

Training with sequence length: 30
Epoch 2, Train Loss: 2.2237, Val Loss: 2.2230, Val Acc: 0.3338
Epoch 4, Train Loss: 2.2700, Val Loss: 2.1819, Val Acc: 0.3460
Epoch 6, Train Loss: 2.2300, Val Loss: 2.2253, Val Acc: 0.3305
Epoch 8, Train Loss: 2.2326, Val Loss: 2.2138, Val Acc: 0.3401
Epoch 10, Train Loss: 2.2197, Val Loss: 2.1971, Val Acc: 0.3447

Transformer: Layers=1 Heads=4

Training with sequence length: 20
Epoch 2, Train Loss: 2.1112, Val Loss: 2.0279, Val Acc: 0.3969
Epoch 4, Train Loss: 2.0786, Val Loss: 1.9760, Val Acc: 0.4108
Epoch 6, Train Loss: 2.0755, Val Loss: 2.0391, Val Acc: 0.3918
Epoch 8, Tra

In [None]:
#===============
# Problem 2 sequence length to 50. Perform the training and report the accuracy and model complexity results.
#=================

def main():
    try:
        # Load Shakespeare dataset
        url = "https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt"
        text = requests.get(url).text

        # Model configurations
        configs = {
            "sequence_lengths": [50],  # Changed to only use sequence length 50
            "d_model": 64,
            "num_layers": [2],  # Fixed to 2 layers
            "nhead": [4],  # Fixed to 4 heads
            "epochs": 10,
            "batch_size": 64,
            "learning_rate": 0.005
        }

        results = []
        for num_layers, nhead in product(configs["num_layers"], configs["nhead"]):
            print(f"\nTransformer: Layers={num_layers} Heads={nhead}")

            for seq_length in configs["sequence_lengths"]:
                print(f"\nTraining with sequence length: {seq_length}")

                try:
                    # Prepare data
                    X, y, mapper = prepare_sequences(text, seq_length)
                    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

                    train_dataset = TextSequenceDataset(X_train, y_train)
                    val_dataset = TextSequenceDataset(X_val, y_val)

                    train_loader = DataLoader(train_dataset, batch_size=configs["batch_size"], shuffle=True, num_workers=0)
                    val_loader = DataLoader(val_dataset, batch_size=configs["batch_size"], shuffle=False, num_workers=0)

                    # Initialize model and trainer
                    model = ShakespeareTransformer(mapper.vocab_size, configs["d_model"], nhead, num_layers)
                    trainer = ModelTrainer(model, train_loader, val_loader, learning_rate=configs["learning_rate"])

                    # Training loop
                    start_time = time.time()
                    for epoch in range(1, configs["epochs"] + 1):
                        train_loss = trainer.train_epoch()
                        if train_loss is None:
                            break

                        val_loss, val_acc = trainer.evaluate()
                        if val_loss is None or val_acc is None:
                            break

                        if epoch % 2 == 0:
                            print(f'Epoch {epoch}, Train Loss: {train_loss:.4f}, '
                                  f'Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}')

                    exec_time = time.time() - start_time
                    model_size = sum(p.numel() for p in model.parameters())

                    results.append({
                        "Seq Len": seq_length,
                        "Layers": num_layers,
                        "Heads": nhead,
                        "Loss": train_loss,
                        "Val Acc": val_acc,
                        "Time": exec_time,
                        "Model Size": model_size
                    })

                except Exception as e:
                    print(f"Error during training with sequence length {seq_length}: {str(e)}")
                    continue

        # Print results
        print("\nTransformer Results:")
        for r in results:
            print(f"Seq Len: {r['Seq Len']} | Layers: {r['Layers']} | Heads: {r['Heads']} | "
                  f"Loss: {r['Loss']:.4f} | Val Acc: {r['Val Acc']:.4f} | Time: {r['Time']:.2f}s | "
                  f"Model Size: {r['Model Size']}")

    except KeyboardInterrupt:
        print("\nProgram interrupted by user")
    except Exception as e:
        print(f"\nError in main program: {str(e)}")

if __name__ == "__main__":
    main()



Transformer: Layers=2 Heads=4

Training with sequence length: 50
Error during training with sequence length 50: PositionalEncoder.__init__() missing 1 required positional argument: 'max_seq_length'

Transformer Results:


In [None]:
#===========================
# Problem 3
#============================
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from docx import Document
import time
import numpy as np
from itertools import product

class TextProcessor:
    def __init__(self):
        self.word_to_idx = {"<PAD>": 0, "<SOS>": 1, "<EOS>": 2, "<UNK>": 3}
        self.idx_to_word = {0: "<PAD>", 1: "<SOS>", 2: "<EOS>", 3: "<UNK>"}
        self.vocab_size = 4

    def process_text(self, text):
        words = text.split()
        indices = []
        for word in words:
            if word not in self.word_to_idx:
                self.word_to_idx[word] = self.vocab_size
                self.idx_to_word[self.vocab_size] = word
                self.vocab_size += 1
            indices.append(self.word_to_idx[word])
        indices.append(self.word_to_idx["<EOS>"])
        return indices

    def decode_indices(self, indices):
        return " ".join([self.idx_to_word.get(idx, "<UNK>") for idx in indices])

class TranslationData:
    def __init__(self, docx_path):
        self.pairs = self._load_pairs(docx_path)
        self.english_processor = TextProcessor()
        self.french_processor = TextProcessor()
        self._build_vocabularies()

    def _load_pairs(self, docx_path):
        doc = Document(docx_path)
        text = "\n".join([p.text for p in doc.paragraphs])
        pairs = []
        for line in text.split("\n"):
            if '", "' in line:
                en, fr = line.split('", "')
                en = en.replace('("', '').strip()
                fr = fr.replace('")', '').strip()
                pairs.append((en, fr))
        return pairs

    def _build_vocabularies(self):
        for en, fr in self.pairs:
            self.english_processor.process_text(en)
            self.french_processor.process_text(fr)

class TranslationDataset(Dataset):
    def __init__(self, translation_data):
        self.translation_data = translation_data
        self.pairs = translation_data.pairs

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        en, fr = self.pairs[idx]
        en_indices = self.translation_data.english_processor.process_text(en)
        fr_indices = self.translation_data.french_processor.process_text(fr)
        return torch.tensor(en_indices), torch.tensor(fr_indices)

class PositionalEncoder(nn.Module):
    def __init__(self, d_model, max_seq_length=100):
        super().__init__()
        self.encoding = nn.Parameter(torch.zeros(max_seq_length, d_model))
        nn.init.normal_(self.encoding, mean=0, std=0.02)

    def forward(self, x):
        return x + self.encoding[:x.size(1), :]

class TranslationTransformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, nhead, num_layers):
        super().__init__()
        self.src_embedding = nn.Embedding(src_vocab_size, d_model)
        self.tgt_embedding = nn.Embedding(tgt_vocab_size, d_model)
        self.position_encoder = PositionalEncoder(d_model)

        self.transformer = nn.Transformer(
            d_model=d_model,
            nhead=nhead,
            num_encoder_layers=num_layers,
            num_decoder_layers=num_layers
        )

        self.fc_out = nn.Linear(d_model, tgt_vocab_size)

    def forward(self, src, tgt):
        src_embedded = self.position_encoder(self.src_embedding(src))
        tgt_embedded = self.position_encoder(self.tgt_embedding(tgt))

        src_embedded = src_embedded.transpose(0, 1)
        tgt_embedded = tgt_embedded.transpose(0, 1)

        src_mask = self.transformer.generate_square_subsequent_mask(src.size(1)).to(src.device)
        tgt_mask = self.transformer.generate_square_subsequent_mask(tgt.size(1)).to(tgt.device)

        output = self.transformer(
            src_embedded,
            tgt_embedded,
            src_mask=src_mask,
            tgt_mask=tgt_mask
        )

        return self.fc_out(output)

class ModelTrainer:
    def __init__(self, model, train_loader, learning_rate=0.001):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model = model.to(self.device)
        self.train_loader = train_loader
        self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)
        self.criterion = nn.CrossEntropyLoss(ignore_index=0)

    def train_epoch(self):
        self.model.train()
        total_loss = 0
        for src, tgt in self.train_loader:
            src, tgt = src.to(self.device), tgt.to(self.device)
            tgt_input = tgt[:, :-1]
            tgt_output = tgt[:, 1:]

            self.optimizer.zero_grad()
            output = self.model(src, tgt_input)
            output = output.permute(1, 2, 0)

            loss = self.criterion(output, tgt_output)
            loss.backward()
            self.optimizer.step()
            total_loss += loss.item()

        return total_loss / len(self.train_loader)

    def evaluate(self):
        self.model.eval()
        total_loss, correct, total = 0, 0, 0
        with torch.no_grad():
            for src, tgt in self.train_loader:
                src, tgt = src.to(self.device), tgt.to(self.device)
                tgt_input = tgt[:, :-1]
                tgt_output = tgt[:, 1:]

                output = self.model(src, tgt_input)
                output = output.permute(1, 2, 0)

                loss = self.criterion(output, tgt_output)
                total_loss += loss.item()

                preds = output.argmax(dim=1)
                mask = tgt_output != 0
                correct += (preds[mask] == tgt_output[mask]).sum().item()
                total += mask.sum().item()

        return total_loss / len(self.train_loader), correct / total if total > 0 else 0

class TranslationGenerator:
    def __init__(self, model, english_processor, french_processor):
        self.model = model
        self.english_processor = english_processor
        self.french_processor = french_processor
        self.device = next(model.parameters()).device

    def generate(self, english_text, max_length=100):
        src_indices = self.english_processor.process_text(english_text)
        src_tensor = torch.tensor(src_indices).unsqueeze(0).to(self.device)

        tgt_indices = [self.french_processor.word_to_idx["<SOS>"]]

        self.model.eval()
        with torch.no_grad():
            src_embedded = self.model.position_encoder(self.model.src_embedding(src_tensor))
            src_embedded = src_embedded.transpose(0, 1)

            for _ in range(max_length):
                tgt_tensor = torch.tensor(tgt_indices).unsqueeze(0).to(self.device)
                tgt_embedded = self.model.position_encoder(self.model.tgt_embedding(tgt_tensor))
                tgt_embedded = tgt_embedded.transpose(0, 1)

                output = self.model.transformer(
                    src_embedded,
                    tgt_embedded,
                    src_mask=self.model.transformer.generate_square_subsequent_mask(src_tensor.size(1)).to(self.device),
                    tgt_mask=self.model.transformer.generate_square_subsequent_mask(tgt_tensor.size(1)).to(self.device)
                )

                output = self.model.fc_out(output[-1, :, :])
                next_token = output.argmax(dim=1).item()
                tgt_indices.append(next_token)

                if next_token == self.french_processor.word_to_idx["<EOS>"]:
                    break

        return self.french_processor.decode_indices(tgt_indices[1:-1])

def main():
    # Load and prepare data
    translation_data = TranslationData("Dataset - English to French.docx")
    dataset = TranslationDataset(translation_data)
    train_loader = DataLoader(dataset, batch_size=32, shuffle=True, collate_fn=lambda x: (
        nn.utils.rnn.pad_sequence([item[0] for item in x], batch_first=True, padding_value=0),
        nn.utils.rnn.pad_sequence([item[1] for item in x], batch_first=True, padding_value=0)
    ))

    # Model configurations
    configs = {
        "d_model": 128,
        "num_layers": [1, 2, 4],
        "nhead": [2, 4],
        "epochs": 50,
        "learning_rate": 0.001
    }

    results = []
    for num_layers, nhead in product(configs["num_layers"], configs["nhead"]):
        print(f"\nTransformer: Layers = {num_layers} Heads = {nhead}")

        model = TranslationTransformer(
            translation_data.english_processor.vocab_size,
            translation_data.french_processor.vocab_size,
            configs["d_model"],
            nhead,
            num_layers
        )

        trainer = ModelTrainer(model, train_loader, configs["learning_rate"])

        start_time = time.time()
        for epoch in range(1, configs["epochs"] + 1):
            train_loss = trainer.train_epoch()
            if epoch % 10 == 0 or epoch == configs["epochs"]:
                print(f"Epoch {epoch}/{configs['epochs']} - Train Loss: {train_loss:.4f}")

        exec_time = time.time() - start_time
        val_loss, val_acc = trainer.evaluate()
        model_size = sum(p.numel() for p in model.parameters())

        # Qualitative validation
        generator = TranslationGenerator(model, translation_data.english_processor, translation_data.french_processor)
        idx = np.random.randint(len(dataset))
        en, fr = dataset.pairs[idx]
        translated = generator.generate(en)

        print("\nQualitative Validation Example:")
        print(f"English: {en}")
        print(f"True French: {fr}")
        print(f"Predicted French: {translated}")
        print("-" * 50)

        results.append({
            "Layers": num_layers,
            "Heads": nhead,
            "Train Loss": train_loss,
            "Val Loss": val_loss,
            "Val Acc": val_acc,
            "Time": exec_time,
            "Model Size": model_size
        })

    print("\nTransformer Results:")
    for r in results:
        print(f"Layers: {r['Layers']} | Heads: {r['Heads']} | "
              f"Train Loss: {r['Train Loss']:.4f} | Val Loss: {r['Val Loss']:.4f} | "
              f"Val Acc: {r['Val Acc']:.4f} | Time: {r['Time']:.2f}s | Model Size: {r['Model Size']}")

if __name__ == "__main__":
    main()


Transformer: Layers = 1 Heads = 2
Epoch 10/50 - Train Loss: 2.0421
Epoch 20/50 - Train Loss: 0.5201
Epoch 30/50 - Train Loss: 0.1592
Epoch 40/50 - Train Loss: 0.0868
Epoch 50/50 - Train Loss: 0.0633

Qualitative Validation Example:
English: He fixes his bicycle
True French: Il répare son vélo,
Predicted French: 
--------------------------------------------------

Transformer: Layers = 1 Heads = 4
Epoch 10/50 - Train Loss: 2.0800
Epoch 20/50 - Train Loss: 0.5069
Epoch 30/50 - Train Loss: 0.1477
Epoch 40/50 - Train Loss: 0.0742
Epoch 50/50 - Train Loss: 0.0449

Qualitative Validation Example:
English: She studies hard for exams
True French: Elle étudie dur pour les examens,
Predicted French: le bus,
--------------------------------------------------

Transformer: Layers = 2 Heads = 2
Epoch 10/50 - Train Loss: 2.3126
Epoch 20/50 - Train Loss: 0.6363
Epoch 30/50 - Train Loss: 0.1661
Epoch 40/50 - Train Loss: 0.0687
Epoch 50/50 - Train Loss: 0.0471

Qualitative Validation Example:
English:

In [None]:
#===========================
#Problem 4
#===========================
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from docx import Document
import time
import numpy as np
from itertools import product

class LanguageProcessor:
    def __init__(self):
        self.token_to_id = {"<PAD>": 0, "<SOS>": 1, "<EOS>": 2, "<UNK>": 3}
        self.id_to_token = {0: "<PAD>", 1: "<SOS>", 2: "<EOS>", 3: "<UNK>"}
        self.vocab_size = 4

    def process_text(self, text):
        tokens = text.split()
        indices = []
        for token in tokens:
            if token not in self.token_to_id:
                self.token_to_id[token] = self.vocab_size
                self.id_to_token[self.vocab_size] = token
                self.vocab_size += 1
            indices.append(self.token_to_id[token])
        indices.append(self.token_to_id["<EOS>"])
        return indices

    def decode_sequence(self, indices):
        return " ".join([self.id_to_token.get(idx, "<UNK>") for idx in indices])

class TranslationCorpus:
    def __init__(self, docx_path):
        self.pairs = self._load_translation_pairs(docx_path)
        self.source_processor = LanguageProcessor()
        self.target_processor = LanguageProcessor()
        self._build_vocabularies()

    def _load_translation_pairs(self, docx_path):
        doc = Document(docx_path)
        text = "\n".join([p.text for p in doc.paragraphs])
        pairs = []
        for line in text.split("\n"):
            if '", "' in line:
                fr, en = line.split('", "')
                fr = fr.replace('("', '').strip()
                en = en.replace('")', '').strip()
                pairs.append((fr, en))
        return pairs

    def _build_vocabularies(self):
        for fr, en in self.pairs:
            self.source_processor.process_text(fr)
            self.target_processor.process_text(en)

class TranslationDataset(Dataset):
    def __init__(self, corpus):
        self.corpus = corpus
        self.pairs = corpus.pairs

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        fr, en = self.pairs[idx]
        fr_indices = self.corpus.source_processor.process_text(fr)
        en_indices = self.corpus.target_processor.process_text(en)
        return torch.tensor(fr_indices), torch.tensor(en_indices)

class PositionalEmbedding(nn.Module):
    def __init__(self, d_model, max_seq_length=100):
        super().__init__()
        self.embedding = nn.Parameter(torch.zeros(max_seq_length, d_model))
        nn.init.normal_(self.embedding, mean=0, std=0.02)

    def forward(self, x):
        return x + self.embedding[:x.size(1), :]

class NeuralTranslator(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, nhead, num_layers):
        super().__init__()
        self.src_embed = nn.Embedding(src_vocab_size, d_model)
        self.tgt_embed = nn.Embedding(tgt_vocab_size, d_model)
        self.pos_embed = PositionalEmbedding(d_model)

        self.transformer = nn.Transformer(
            d_model=d_model,
            nhead=nhead,
            num_encoder_layers=num_layers,
            num_decoder_layers=num_layers
        )

        self.output_layer = nn.Linear(d_model, tgt_vocab_size)

    def forward(self, src, tgt):
        src_emb = self.pos_embed(self.src_embed(src))
        tgt_emb = self.pos_embed(self.tgt_embed(tgt))

        src_emb = src_emb.transpose(0, 1)
        tgt_emb = tgt_emb.transpose(0, 1)

        src_mask = self.transformer.generate_square_subsequent_mask(src.size(1)).to(src.device)
        tgt_mask = self.transformer.generate_square_subsequent_mask(tgt.size(1)).to(tgt.device)

        output = self.transformer(
            src_emb,
            tgt_emb,
            src_mask=src_mask,
            tgt_mask=tgt_mask
        )

        return self.output_layer(output)

class ModelTrainer:
    def __init__(self, model, train_loader, learning_rate=0.001):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model = model.to(self.device)
        self.train_loader = train_loader
        self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)
        self.criterion = nn.CrossEntropyLoss(ignore_index=0)

    def train_epoch(self):
        self.model.train()
        total_loss = 0
        for src, tgt in self.train_loader:
            src, tgt = src.to(self.device), tgt.to(self.device)
            tgt_input = tgt[:, :-1]
            tgt_output = tgt[:, 1:]

            self.optimizer.zero_grad()
            output = self.model(src, tgt_input)
            output = output.permute(1, 2, 0)

            loss = self.criterion(output, tgt_output)
            loss.backward()
            self.optimizer.step()
            total_loss += loss.item()

        return total_loss / len(self.train_loader)

    def evaluate(self):
        self.model.eval()
        total_loss, correct, total = 0, 0, 0
        with torch.no_grad():
            for src, tgt in self.train_loader:
                src, tgt = src.to(self.device), tgt.to(self.device)
                tgt_input = tgt[:, :-1]
                tgt_output = tgt[:, 1:]

                output = self.model(src, tgt_input)
                output = output.permute(1, 2, 0)

                loss = self.criterion(output, tgt_output)
                total_loss += loss.item()

                preds = output.argmax(dim=1)
                mask = tgt_output != 0
                correct += (preds[mask] == tgt_output[mask]).sum().item()
                total += mask.sum().item()

        return total_loss / len(self.train_loader), correct / total if total > 0 else 0

class TranslationGenerator:
    def __init__(self, model, source_processor, target_processor):
        self.model = model
        self.source_processor = source_processor
        self.target_processor = target_processor
        self.device = next(model.parameters()).device

    def generate(self, source_text, max_length=100):
        src_indices = self.source_processor.process_text(source_text)
        src_tensor = torch.tensor(src_indices).unsqueeze(0).to(self.device)

        tgt_indices = [self.target_processor.token_to_id["<SOS>"]]

        self.model.eval()
        with torch.no_grad():
            src_emb = self.model.pos_embed(self.model.src_embed(src_tensor))
            src_emb = src_emb.transpose(0, 1)

            for _ in range(max_length):
                tgt_tensor = torch.tensor(tgt_indices).unsqueeze(0).to(self.device)
                tgt_emb = self.model.pos_embed(self.model.tgt_embed(tgt_tensor))
                tgt_emb = tgt_emb.transpose(0, 1)

                output = self.model.transformer(
                    src_emb,
                    tgt_emb,
                    src_mask=self.model.transformer.generate_square_subsequent_mask(src_tensor.size(1)).to(self.device),
                    tgt_mask=self.model.transformer.generate_square_subsequent_mask(tgt_tensor.size(1)).to(self.device)
                )

                output = self.model.output_layer(output[-1, :, :])
                next_token = output.argmax(dim=1).item()
                tgt_indices.append(next_token)

                if next_token == self.target_processor.token_to_id["<EOS>"]:
                    break

        return self.target_processor.decode_sequence(tgt_indices[1:-1])

def main():
    # Load and prepare data
    corpus = TranslationCorpus("Dataset - English to French.docx")
    dataset = TranslationDataset(corpus)
    train_loader = DataLoader(dataset, batch_size=32, shuffle=True, collate_fn=lambda x: (
        nn.utils.rnn.pad_sequence([item[0] for item in x], batch_first=True, padding_value=0),
        nn.utils.rnn.pad_sequence([item[1] for item in x], batch_first=True, padding_value=0)
    ))

    # Model configurations
    configs = {
        "d_model": 128,
        "num_layers": [1, 2, 4],
        "nhead": [2, 4],
        "epochs": 50,
        "learning_rate": 0.001
    }

    results = []
    for num_layers, nhead in product(configs["num_layers"], configs["nhead"]):
        print(f"\nTransformer: Layers = {num_layers} Heads = {nhead}")

        model = NeuralTranslator(
            corpus.source_processor.vocab_size,
            corpus.target_processor.vocab_size,
            configs["d_model"],
            nhead,
            num_layers
        )

        trainer = ModelTrainer(model, train_loader, configs["learning_rate"])

        start_time = time.time()
        for epoch in range(1, configs["epochs"] + 1):
            train_loss = trainer.train_epoch()
            if epoch % 10 == 0 or epoch == configs["epochs"]:
                print(f"Epoch {epoch}/{configs['epochs']} - Train Loss: {train_loss:.4f}")

        exec_time = time.time() - start_time
        val_loss, val_acc = trainer.evaluate()
        model_size = sum(p.numel() for p in model.parameters())

        # Qualitative validation
        generator = TranslationGenerator(model, corpus.source_processor, corpus.target_processor)
        idx = np.random.randint(len(dataset))
        fr, en = dataset.pairs[idx]
        translated = generator.generate(fr)

        print("\nQualitative Validation Example:")
        print(f"French: {fr}")
        print(f"True English: {en}")
        print(f"Predicted English: {translated}")
        print("-" * 50)

        results.append({
            "Layers": num_layers,
            "Heads": nhead,
            "Train Loss": train_loss,
            "Val Loss": val_loss,
            "Val Acc": val_acc,
            "Time": exec_time,
            "Model Size": model_size
        })

    print("\nTransformer Results:")
    for r in results:
        print(f"Layers: {r['Layers']} | Heads: {r['Heads']} | "
              f"Train Loss: {r['Train Loss']:.4f} | Val Loss: {r['Val Loss']:.4f} | "
              f"Val Acc: {r['Val Acc']:.4f} | Time: {r['Time']:.2f}s | Model Size: {r['Model Size']}")

if __name__ == "__main__":
    main()



Transformer: Layers = 1 Heads = 2
Epoch 10/50 - Train Loss: 2.0272
Epoch 20/50 - Train Loss: 0.5794
Epoch 30/50 - Train Loss: 0.1634
Epoch 40/50 - Train Loss: 0.0860
Epoch 50/50 - Train Loss: 0.0548

Qualitative Validation Example:
French: The baby cries
True English: Le bébé pleure,
Predicted English: 
--------------------------------------------------

Transformer: Layers = 1 Heads = 4
Epoch 10/50 - Train Loss: 2.1115
Epoch 20/50 - Train Loss: 0.5283
Epoch 30/50 - Train Loss: 0.1562
Epoch 40/50 - Train Loss: 0.0700
Epoch 50/50 - Train Loss: 0.0509

Qualitative Validation Example:
French: She dances at the party
True English: Elle danse à la fête,
Predicted English: 
--------------------------------------------------

Transformer: Layers = 2 Heads = 2
Epoch 10/50 - Train Loss: 2.0650
Epoch 20/50 - Train Loss: 0.5568
Epoch 30/50 - Train Loss: 0.1519
Epoch 40/50 - Train Loss: 0.0646
Epoch 50/50 - Train Loss: 0.0408

Qualitative Validation Example:
French: We love music
True English: No