In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import nltk
from nltk.tokenize import word_tokenize
from torch.utils.data import Dataset, DataLoader

nltk.download('punkt')

# Set device
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", DEVICE)


Using device: cpu


[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\Rose\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [None]:
file_path = r"C:\Users\Rose\Documents\mcmaster\semester 2\NLP\assignments\dataset french\eng-fra.txt"

data = []
with open(file_path, "r", encoding="utf-8") as f:
    for line in f:
        parts = line.strip().split("\t")  # Split by tab
        if len(parts) == 2:  # Ensure it has both English and French parts
            data.append(parts)

# Convert to DataFrame
df = pd.DataFrame(data, columns=["English", "French"])

# Display the first few rows
print(df.head())


  English      French
0     Go.        Va !
1    Run!     Cours !
2    Run!    Courez !
3    Wow!  Ça alors !
4   Fire!    Au feu !


In [None]:
# Tokenization function using NLTK
def tokenize(sentence):
    return word_tokenize(sentence.lower())  # Convert to lowercase for consistency

# Apply tokenization to both columns
df['English'] = df['English'].apply(tokenize)
df['French'] = df['French'].apply(tokenize)

# show tokenized data
print(df.head())


     English          French
0    [go, .]         [va, !]
1   [run, !]      [cours, !]
2   [run, !]     [courez, !]
3   [wow, !]  [ça, alors, !]
4  [fire, !]    [au, feu, !]


In [None]:
class Vocabulary:
    def __init__(self):
        self.word2idx = {"<PAD>": 0, "<SOS>": 1, "<EOS>": 2, "<UNK>": 3}  # Special tokens
        self.idx2word = {0: "<PAD>", 1: "<SOS>", 2: "<EOS>", 3: "<UNK>"}
        self.word_count = {}

    def add_sentence(self, sentence):
        for word in sentence:
            if word not in self.word2idx:
                idx = len(self.word2idx)
                self.word2idx[word] = idx
                self.idx2word[idx] = word
            self.word_count[word] = self.word_count.get(word, 0) + 1

    def numericalize(self, sentence):
        return [self.word2idx.get(word, self.word2idx["<UNK>"]) for word in sentence]


In [None]:
# Initialize vocabularies
eng_vocab = Vocabulary()
fr_vocab = Vocabulary()

# Populate vocabularies
for _, row in df.iterrows():
    eng_vocab.add_sentence(row['English'])
    fr_vocab.add_sentence(row['French'])

# Print vocabulary size
print(f"English Vocabulary Size: {len(eng_vocab.word2idx)}")
print(f"French Vocabulary Size: {len(fr_vocab.word2idx)}")


English Vocabulary Size: 7004
French Vocabulary Size: 10848


In [None]:
# Convert tokenized sentences into numerical sequences
df['English'] = df['English'].apply(eng_vocab.numericalize)
df['French'] = df['French'].apply(lambda x: [fr_vocab.word2idx['<SOS>']] + fr_vocab.numericalize(x) + [fr_vocab.word2idx['<EOS>']])

# Display converted sequences
print(df.head())


  English             French
0  [4, 5]       [1, 4, 5, 2]
1  [6, 7]       [1, 6, 5, 2]
2  [6, 7]       [1, 7, 5, 2]
3  [8, 7]    [1, 8, 9, 5, 2]
4  [9, 7]  [1, 10, 11, 5, 2]


In [None]:
from torch.utils.data import Dataset, DataLoader

class TranslationDataset(Dataset):
    def __init__(self, df, eng_vocab, fr_vocab, max_len=10):
        self.data = df
        self.eng_vocab = eng_vocab
        self.fr_vocab = fr_vocab
        self.max_len = max_len  # Max sequence length for padding

    def __len__(self):
        return len(self.data)

    def pad_sequence(self, seq, pad_idx):
        """Pad sequences to max_len with <PAD> tokens."""
        return seq + [pad_idx] * (self.max_len - len(seq)) if len(seq) < self.max_len else seq[:self.max_len]

    def __getitem__(self, index):
        eng_seq = self.pad_sequence(self.data.iloc[index]['English'], self.eng_vocab.word2idx['<PAD>'])
        fr_seq = self.pad_sequence(self.data.iloc[index]['French'], self.fr_vocab.word2idx['<PAD>'])
        return torch.tensor(eng_seq, dtype=torch.long), torch.tensor(fr_seq, dtype=torch.long)

# Create dataset instance
dataset = TranslationDataset(df, eng_vocab, fr_vocab)


In [None]:
# Create DataLoader for batching
dataloader = DataLoader(dataset, batch_size=64, shuffle=True)

# Fetch one batch to check
for eng_batch, fr_batch in dataloader:
    print("English batch shape:", eng_batch.shape)
    print("French batch shape:", fr_batch.shape)
    break  # Print only the first batch


English batch shape: torch.Size([64, 10])
French batch shape: torch.Size([64, 10])


In [None]:
class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hidden_dim, n_layers, dropout):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.lstm = nn.LSTM(emb_dim, hidden_dim, n_layers, dropout=dropout, batch_first=True)

    def forward(self, src):
        embedded = self.embedding(src)  # Convert word indices to embeddings
        outputs, (hidden, cell) = self.lstm(embedded)  # LSTM processing
        return outputs, hidden, cell


In [None]:
class Attention(nn.Module):
    def __init__(self, hidden_dim):
        super(Attention, self).__init__()
        self.attn = nn.Linear(hidden_dim * 2, hidden_dim)  # Combine encoder & decoder hidden states
        self.v = nn.Linear(hidden_dim, 1, bias=False)  # Compute attention scores

    def forward(self, hidden, encoder_outputs):
        seq_len = encoder_outputs.shape[1]  # Get sequence length
        hidden = hidden[-1].unsqueeze(1).repeat(1, seq_len, 1)  # Repeat hidden state across sequence

        # Compute energy scores
        energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=2)))
        attention = self.v(energy).squeeze(2)  # Compute final attention scores

        return torch.softmax(attention, dim=1)  # Normalize scores


In [None]:
class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hidden_dim, n_layers, dropout, attention):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.lstm = nn.LSTM(emb_dim + hidden_dim, hidden_dim, n_layers, dropout=dropout, batch_first=True)
        self.fc_out = nn.Linear(hidden_dim * 2, output_dim)  # Use both LSTM output + Attention
        self.attention = attention

    def forward(self, input, hidden, cell, encoder_outputs):
        input = input.unsqueeze(1)  # Add sequence dimension
        embedded = self.embedding(input)

        # Apply attention
        attn_weights = self.attention(hidden, encoder_outputs)  # Compute attention weights
        attn_applied = torch.bmm(attn_weights.unsqueeze(1), encoder_outputs)  # Weighted sum of encoder outputs

        lstm_input = torch.cat((embedded, attn_applied), dim=2)  # Combine embedding with attention
        output, (hidden, cell) = self.lstm(lstm_input, (hidden, cell))  # Pass through LSTM

        prediction = self.fc_out(torch.cat((output.squeeze(1), attn_applied.squeeze(1)), dim=1))  # Combine features

        return prediction, hidden, cell


In [None]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, src, trg):
        encoder_outputs, hidden, cell = self.encoder(src)  # Encode input
        outputs = torch.zeros(trg.shape[1], trg.shape[0], self.decoder.fc_out.out_features).to(DEVICE)
        input = trg[:, 0]  # Start decoding with <SOS>

        for t in range(1, trg.shape[1]):  # Decode each timestep
            output, hidden, cell = self.decoder(input, hidden, cell, encoder_outputs)
            outputs[t] = output
            input = output.argmax(1)  # Get predicted word

        return outputs


In [None]:
# Define model parameters
INPUT_DIM = len(eng_vocab.word2idx)
OUTPUT_DIM = len(fr_vocab.word2idx)
EMB_DIM = 256
HIDDEN_DIM = 512
N_LAYERS = 2
DROPOUT = 0.5


encoder = Encoder(INPUT_DIM, EMB_DIM, HIDDEN_DIM, N_LAYERS, DROPOUT).to(DEVICE)
attention = Attention(HIDDEN_DIM).to(DEVICE)
decoder = Decoder(OUTPUT_DIM, EMB_DIM, HIDDEN_DIM, N_LAYERS, DROPOUT, attention).to(DEVICE)
model = Seq2Seq(encoder, decoder).to(DEVICE)

def init_weights(m):
    if isinstance(m, nn.Linear):
        nn.init.xavier_uniform_(m.weight)
    elif isinstance(m, nn.LSTM):
        for name, param in m.named_parameters():
            if "weight" in name:
                nn.init.orthogonal_(param)

print("Model initialized successfully!")


Model initialized successfully!


In [None]:
# Dummy input batch (batch_size=5, seq_len=10)
dummy_src = torch.randint(0, INPUT_DIM, (5, 10)).to(DEVICE)

# Forward pass through the encoder
encoder_outputs, hidden, cell = encoder(dummy_src)

# Print shapes
print("Encoder Outputs Shape:", encoder_outputs.shape)
print("Encoder Hidden Shape:", hidden.shape)
print("Encoder Cell Shape:", cell.shape)


Encoder Outputs Shape: torch.Size([5, 10, 512])
Encoder Hidden Shape: torch.Size([2, 5, 512])
Encoder Cell Shape: torch.Size([2, 5, 512])


In [None]:
# Forward pass through the attention layer
attn_weights = attention(hidden, encoder_outputs)

# Print shapes
print("Attention Weights Shape:", attn_weights.shape)


Attention Weights Shape: torch.Size([5, 10])


In [None]:
# Dummy target input (batch_size=5)
dummy_trg = torch.randint(0, OUTPUT_DIM, (5,)).to(DEVICE)

# Forward pass through the decoder
decoder_output, hidden, cell = decoder(dummy_trg, hidden, cell, encoder_outputs)

# Print shapes
print("Decoder Output Shape:", decoder_output.shape)
print("Decoder Hidden Shape:", hidden.shape)
print("Decoder Cell Shape:", cell.shape)


Decoder Output Shape: torch.Size([5, 10848])
Decoder Hidden Shape: torch.Size([2, 5, 512])
Decoder Cell Shape: torch.Size([2, 5, 512])


In [None]:
# Dummy target sequence (batch_size=5, seq_len=10)
dummy_trg = torch.randint(0, OUTPUT_DIM, (5, 10)).to(DEVICE)

# Forward pass through Seq2Seq model
outputs = model(dummy_src, dummy_trg)

# Print shape
print("Seq2Seq Output Shape:", outputs.shape)


Seq2Seq Output Shape: torch.Size([10, 5, 10848])


In [None]:
# loss function and optimizer
criterion = nn.CrossEntropyLoss(ignore_index=eng_vocab.word2idx["<PAD>"])  # Ignore <PAD> tokens
optimizer = optim.Adam(model.parameters(), lr=0.009)

print("Loss function and optimizer initialized.")


Loss function and optimizer initialized.


In [None]:
import time

# Training function
def train(model, dataloader, optimizer, criterion, clip=1):
    model.train()
    epoch_loss = 0

    for eng_batch, fr_batch in dataloader:
        eng_batch, fr_batch = eng_batch.to(DEVICE), fr_batch.to(DEVICE)

        optimizer.zero_grad()  # Reset gradients
        output = model(eng_batch, fr_batch)  # Forward pass

        # Reshape output and target for loss calculation
        output_dim = output.shape[-1]
        output = output[1:].reshape(-1, output_dim)
        target = fr_batch[:, 1:].reshape(-1)

        loss = criterion(output, target)  # Compute loss
        loss.backward()  # Backpropagation

        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)  # Prevent exploding gradients
        optimizer.step()  # Update model weights

        epoch_loss += loss.item()

    return epoch_loss / len(dataloader)

# Training loop
N_EPOCHS = 10 #since it takes a long time

for epoch in range(1, N_EPOCHS + 1):
    start_time = time.time()

    train_loss = train(model, dataloader, optimizer, criterion)

    end_time = time.time()
    epoch_mins, epoch_secs = divmod(int(end_time - start_time), 60)

    print(f"Epoch {epoch}: Train Loss = {train_loss:.4f} | Time: {epoch_mins}m {epoch_secs}s")


Epoch 1: Train Loss = 6.9513 | Time: 5m 23s
Epoch 2: Train Loss = 6.6708 | Time: 5m 46s
Epoch 3: Train Loss = 6.6584 | Time: 6m 0s
Epoch 4: Train Loss = 6.5926 | Time: 5m 26s
Epoch 5: Train Loss = 6.5839 | Time: 5m 41s
Epoch 6: Train Loss = 6.5787 | Time: 5m 38s
Epoch 7: Train Loss = 6.5726 | Time: 5m 47s
Epoch 8: Train Loss = 6.5719 | Time: 8m 45s
Epoch 9: Train Loss = 6.5715 | Time: 9m 40s
Epoch 10: Train Loss = 6.5720 | Time: 9m 22s


In [None]:
import torch.nn.functional as F

def translate_sentence(model, sentence, eng_vocab, fr_vocab, max_len=10):
    model.eval()  # Set to evaluation mode

    tokens = word_tokenize(sentence.lower())  # Tokenize input
    numericalized = [eng_vocab.word2idx.get(token, eng_vocab.word2idx["<UNK>"]) for token in tokens]

    input_tensor = torch.tensor(numericalized, dtype=torch.long).unsqueeze(0).to(DEVICE)  # Add batch dimension
    encoder_outputs, hidden, cell = model.encoder(input_tensor)  # Get encoder outputs

    trg_indexes = [fr_vocab.word2idx["<SOS>"]]

    for _ in range(max_len):
        trg_tensor = torch.tensor([trg_indexes[-1]], dtype=torch.long).to(DEVICE)
        output, hidden, cell = model.decoder(trg_tensor, hidden, cell, encoder_outputs)

        predicted_token = output.argmax(1).item()  # Get best word
        trg_indexes.append(predicted_token)

        if predicted_token == fr_vocab.word2idx["<EOS>"]:
            break

    translated_tokens = [fr_vocab.idx2word[i] for i in trg_indexes[1:-1]]  # Remove <SOS> & <EOS>
    return " ".join(translated_tokens)

# Test translation
test_sentence = "hi how are you!"
translated_sentence = translate_sentence(model, test_sentence, eng_vocab, fr_vocab)
print(f"English: {test_sentence}")
print(f"French: {translated_sentence}")


English: hi how are you!
French: de de de de de de de de de


In [None]:
!pip install sacrebleu




In [None]:
import sacrebleu

def evaluate_bleu(model, df_sample, eng_vocab, fr_vocab):
    model.eval()
    references = []
    hypotheses = []

    num_samples = min(100, len(df_sample))

    for i in range(num_samples):
        # Convert numericalized source sentence back to words
        src_tokens = df_sample.iloc[i]["English"]
        src_sentence = " ".join([eng_vocab.idx2word[token] for token in src_tokens if token not in [0, 1, 2, 3]])

        # Convert numericalized target sentence back to words
        trg_tokens = df_sample.iloc[i]["French"][1:-1]  # Remove <SOS> and <EOS>
        target_sentence = " ".join([fr_vocab.idx2word[token] for token in trg_tokens if token not in [0, 1, 2, 3]])

        # Get model translation
        translated_sentence = translate_sentence(model, src_sentence, eng_vocab, fr_vocab)

        references.append(target_sentence)  # List of reference sentences
        hypotheses.append(translated_sentence)  # List of model translations

    # Compute BLEU score using SacreBLEU
    bleu_score = sacrebleu.corpus_bleu(hypotheses, [references]).score

    return bleu_score

# Evaluate BLEU score
bleu_score = evaluate_bleu(model, df, eng_vocab, fr_vocab)
print(f"BLEU Score: {bleu_score:.4f}")


BLEU Score: 0.0848


In [None]:
# Save model state
torch.save(model.state_dict(), "seq2seq_model.pth")
print("Model saved successfully!")


Model saved successfully!


In [None]:
model.load_state_dict(torch.load("seq2seq_model.pth", map_location=DEVICE))
model.eval()  # Set to evaluation mode
print("Model loaded successfully!")


Model loaded successfully!


In [None]:
test_sentence = "I love learning languages!"
translated_sentence = translate_sentence(model, test_sentence, eng_vocab, fr_vocab)
print(f"English: {test_sentence}")
print(f"French: {translated_sentence}")
