In [1]:
# -----------------------
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from tqdm import tqdm
from collections import Counter
import re

# For text preprocessing
import nltk
from nltk.tokenize import word_tokenize
nltk.download('punkt')

# For loading datasets
from datasets import load_dataset

# Set random seed for reproducibility
torch.manual_seed(42)
np.random.seed(42)

print("Libraries imported successfully!")

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


Libraries imported successfully!


In [4]:
!pip install datasets --upgrade

Collecting datasets
  Downloading datasets-3.6.0-py3-none-any.whl.metadata (19 kB)
Collecting fsspec<=2025.3.0,>=2023.1.0 (from fsspec[http]<=2025.3.0,>=2023.1.0->datasets)
  Downloading fsspec-2025.3.0-py3-none-any.whl.metadata (11 kB)
Downloading datasets-3.6.0-py3-none-any.whl (491 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m491.5/491.5 kB[0m [31m13.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading fsspec-2025.3.0-py3-none-any.whl (193 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m193.6/193.6 kB[0m [31m17.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: fsspec, datasets
  Attempting uninstall: fsspec
    Found existing installation: fsspec 2025.3.2
    Uninstalling fsspec-2025.3.2:
      Successfully uninstalled fsspec-2025.3.2
  Attempting uninstall: datasets
    Found existing installation: datasets 2.14.4
    Uninstalling datasets-2.14.4:
      Successfully uninstalled datasets-2.14.4
[31mERROR: pip's dependency r

In [3]:
dataset_name = "opus100"
language_pair = "en-fr"

print("\n2. Download and check the shape of the dataset")
dataset = load_dataset("opus100", language_pair )
train_data = dataset["train"]
test_data = dataset["test"]

print(f"\nDataset loaded! Total training examples: {len(train_data)}")
print(f"Sample English: {train_data[0]['translation']['en'][:60]}...")
print(f"Sample French: {train_data[0]['translation']['fr'][:60]}...")


2. Download and check the shape of the dataset


test-00000-of-00001.parquet:   0%|          | 0.00/327k [00:00<?, ?B/s]

train-00000-of-00001.parquet:   0%|          | 0.00/142M [00:00<?, ?B/s]

validation-00000-of-00001.parquet:   0%|          | 0.00/334k [00:00<?, ?B/s]

Generating test split:   0%|          | 0/2000 [00:00<?, ? examples/s]

Generating train split:   0%|          | 0/1000000 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/2000 [00:00<?, ? examples/s]


Dataset loaded! Total training examples: 1000000
Sample English: The time now is 05:08 ....
Sample French: The time now is 05:05 ....


In [4]:
def clean_text(text):
    """Clean and normalize text"""
    text = text.lower()
    text = re.sub(r"['\",\.\?\!\-]", "", text)  # Remove basic punctuation
    text = re.sub(r"[^a-zA-Zéèêëàâäôöûüç\s]", "", text)  # Keep French letters
    text = re.sub(r"\s+", " ", text).strip()  # Remove extra spaces
    return text

In [5]:
train_src = []
train_tgt = []

for sample in tqdm(train_data):
    src_text = clean_text(sample['translation']['en'])
    tgt_text = clean_text(sample['translation']['fr'])
    train_src.append(src_text)
    train_tgt.append(tgt_text)

# Process test data
test_src = []
test_tgt = []

for sample in tqdm(test_data):
    src_text = clean_text(sample['translation']['en'])
    tgt_text = clean_text(sample['translation']['fr'])
    test_src.append(src_text)
    test_tgt.append(tgt_text)

print("\nData cleaning complete!")
print(f"Example cleaned English: {train_src[0][:60]}...")
print(f"Example cleaned French: {train_tgt[0][:60]}...")

100%|██████████| 1000000/1000000 [00:58<00:00, 16957.19it/s]
100%|██████████| 2000/2000 [00:00<00:00, 17268.36it/s]


Data cleaning complete!
Example cleaned English: the time now is...
Example cleaned French: the time now is...





In [6]:
nltk.download('punkt')
nltk.download('punkt_tab')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt_tab.zip.


True

In [7]:
def tokenize(text):
    """Simple word tokenization"""
    return word_tokenize(text)

train_src_tokens = [tokenize(text) for text in train_src]
train_tgt_tokens = [tokenize(text) for text in train_tgt]
test_src_tokens = [tokenize(text) for text in test_src]
test_tgt_tokens = [tokenize(text) for text in test_tgt]

print("\nTokenization complete!")
print(f"Example tokenized English: {train_src_tokens[0][:10]}...")
print(f"Example tokenized French: {train_tgt_tokens[0][:10]}...")

print("\n5. Build vocabulary for source and target languages")


Tokenization complete!
Example tokenized English: ['the', 'time', 'now', 'is']...
Example tokenized French: ['the', 'time', 'now', 'is']...

5. Build vocabulary for source and target languages


In [8]:
def build_vocab(token_lists, max_vocab_size=10000):
    """Build vocabulary from token lists"""
    word_counts = Counter()
    for tokens in token_lists:
        word_counts.update(tokens)

    most_common = word_counts.most_common(max_vocab_size - 4)

    vocab = {
        '<PAD>': 0,
        '<SOS>': 1,
        '<EOS>': 2,
        '<UNK>': 1
    }

    for idx, (word, _) in enumerate(most_common):
        vocab[word] = idx + 4

    return vocab

In [9]:
src_vocab = build_vocab(train_src_tokens)
tgt_vocab = build_vocab(train_tgt_tokens)

print(f"\nSource vocab size: {len(src_vocab)}")
print(f"Target vocab size: {len(tgt_vocab)}")
print(f"Sample source vocab items: {list(src_vocab.items())[:10]}")
print(f"Sample target vocab items: {list(tgt_vocab.items())[:10]}")

idx_to_src = {idx: word for word, idx in src_vocab.items()}
idx_to_tgt = {idx: word for word, idx in tgt_vocab.items()}


Source vocab size: 10000
Target vocab size: 10000
Sample source vocab items: [('<PAD>', 0), ('<SOS>', 1), ('<EOS>', 2), ('<UNK>', 1), ('the', 4), ('of', 5), ('and', 6), ('to', 7), ('in', 8), ('a', 9)]
Sample target vocab items: [('<PAD>', 0), ('<SOS>', 1), ('<EOS>', 2), ('<UNK>', 1), ('de', 4), ('la', 5), ('et', 6), ('les', 7), ('le', 8), ('des', 9)]


In [10]:
class TranslationDataset(Dataset):
    def __init__(self, src_texts, tgt_texts, src_vocab, tgt_vocab, max_len=50):
        self.src_texts = src_texts
        self.tgt_texts = tgt_texts
        self.src_vocab = src_vocab
        self.tgt_vocab = tgt_vocab
        self.max_len = max_len

    def __len__(self):
        return len(self.src_texts)

    def __getitem__(self, idx):
        src_tokens = self.src_texts[idx]
        tgt_tokens = self.tgt_texts[idx]

        src_indices = [self.src_vocab.get(token, self.src_vocab['<UNK>']) for token in src_tokens]
        tgt_indices = [self.tgt_vocab.get(token, self.tgt_vocab['<UNK>']) for token in tgt_tokens]

        tgt_indices = [self.tgt_vocab['<SOS>']] + tgt_indices + [self.tgt_vocab['<EOS>']]

        src_indices = self._pad_or_truncate(src_indices, self.src_vocab['<PAD>'])
        tgt_indices = self._pad_or_truncate(tgt_indices, self.tgt_vocab['<PAD>'])

        return torch.tensor(src_indices, dtype=torch.long), torch.tensor(tgt_indices, dtype=torch.long)

    def _pad_or_truncate(self, sequence, pad_idx):
        """Pad or truncate sequence to max_len"""
        if len(sequence) > self.max_len:
            return sequence[:self.max_len]
        else:
            return sequence + [pad_idx] * (self.max_len - len(sequence))

In [11]:
max_seq_len = 30
train_dataset = TranslationDataset(train_src_tokens, train_tgt_tokens, src_vocab, tgt_vocab, max_seq_len)
test_dataset = TranslationDataset(test_src_tokens, test_tgt_tokens, src_vocab, tgt_vocab, max_seq_len)

In [12]:
batch_size = 64
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

In [13]:
sample_batch = next(iter(train_loader))
src_batch, tgt_batch = sample_batch
print(f"\nSample batch shapes - Source: {src_batch.shape}, Target: {tgt_batch.shape}")


Sample batch shapes - Source: torch.Size([64, 30]), Target: torch.Size([64, 30])


In [26]:
class Seq2SeqLSTM(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, embedding_dim, hidden_dim, n_layers=1, dropout=0.2):
        super().__init__()

        self.encoder_embedding = nn.Embedding(src_vocab_size, embedding_dim, padding_idx=0)
        self.encoder_lstm = nn.LSTM(embedding_dim, hidden_dim, n_layers, dropout=dropout if n_layers > 1 else 0, batch_first=True)

        self.decoder_embedding = nn.Embedding(tgt_vocab_size, embedding_dim, padding_idx=0)
        self.decoder_lstm = nn.LSTM(embedding_dim, hidden_dim, n_layers, dropout=dropout if n_layers > 1 else 0, batch_first=True)
        self.decoder_fc = nn.Linear(hidden_dim, tgt_vocab_size)

        self.hidden_dim = hidden_dim
        self.n_layers = n_layers
        self.dropout = nn.Dropout(dropout)

    def forward(self, src, tgt):
        enc_embedded = self.dropout(self.encoder_embedding(src))
        _, (hidden, cell) = self.encoder_lstm(enc_embedded)

        dec_embedded = self.dropout(self.decoder_embedding(tgt[:, :-1]))  # Remove EOS token
        dec_output, _ = self.decoder_lstm(dec_embedded, (hidden, cell))

        output = self.decoder_fc(dec_output)
        return output

    def predict(self, src, tgt_vocab, max_len=30, device='cpu'):
        """Generate translation for a single source sequence"""
        self.eval()

        src = src.unsqueeze(0).to(device)
        enc_embedded = self.encoder_embedding(src)
        _, (hidden, cell) = self.encoder_lstm(enc_embedded)

        tgt = torch.tensor([[tgt_vocab['<SOS>']]], device=device)
        output_seq = []

        for _ in range(max_len):
            dec_embedded = self.decoder_embedding(tgt)
            dec_output, (hidden, cell) = self.decoder_lstm(dec_embedded, (hidden, cell))
            output = self.decoder_fc(dec_output.squeeze(1))

            next_token = output.argmax(1)
            output_seq.append(next_token.item())

            if next_token.item() == tgt_vocab['<EOS>']:
                break

            tgt = next_token.unsqueeze(0)

        return output_seq

In [15]:
embedding_dim = 256
hidden_dim = 512
n_layers = 2
dropout = 0.3

model = Seq2SeqLSTM(
    src_vocab_size=len(src_vocab),
    tgt_vocab_size=len(tgt_vocab),
    embedding_dim=embedding_dim,
    hidden_dim=hidden_dim,
    n_layers=n_layers,
    dropout=dropout
)

In [16]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
print(f"\nModel created and moved to {device}!")
print(model)


Model created and moved to cuda!
Seq2SeqLSTM(
  (encoder_embedding): Embedding(10000, 256, padding_idx=0)
  (encoder_lstm): LSTM(256, 512, num_layers=2, batch_first=True, dropout=0.3)
  (decoder_embedding): Embedding(10000, 256, padding_idx=0)
  (decoder_lstm): LSTM(256, 512, num_layers=2, batch_first=True, dropout=0.3)
  (decoder_fc): Linear(in_features=512, out_features=10000, bias=True)
  (dropout): Dropout(p=0.3, inplace=False)
)


In [17]:
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [18]:
def train_epoch(model, loader, optimizer, criterion, clip=1.0):
    model.train()
    epoch_loss = 0

    for src, tgt in tqdm(loader, desc="Training"):
        src, tgt = src.to(device), tgt.to(device)

        optimizer.zero_grad()

        # Forward pass
        output = model(src, tgt)

        # Reshape for loss calculation
        output_dim = output.shape[-1]
        output = output.reshape(-1, output_dim)
        tgt = tgt[:, 1:].reshape(-1)  # Remove SOS token

        loss = criterion(output, tgt)

        # Backward pass
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()

        epoch_loss += loss.item()

    return epoch_loss / len(loader)

In [19]:
def evaluate(model, loader, criterion):
    model.eval()
    epoch_loss = 0

    with torch.no_grad():
        for src, tgt in tqdm(loader, desc="Evaluating"):
            src, tgt = src.to(device), tgt.to(device)

            output = model(src, tgt)

            output_dim = output.shape[-1]
            output = output.reshape(-1, output_dim)
            tgt = tgt[:, 1:].reshape(-1)

            loss = criterion(output, tgt)
            epoch_loss += loss.item()

    return epoch_loss / len(loader)

In [20]:
n_epochs = 5
best_valid_loss = float('inf')

for epoch in range(n_epochs):
    print(f"\nEpoch {epoch+1}/{n_epochs}")

    train_loss = train_epoch(model, train_loader, optimizer, criterion)
    valid_loss = evaluate(model, test_loader, criterion)

    print(f"Train Loss: {train_loss:.3f} | Val Loss: {valid_loss:.3f}")

    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'best_model.pt')
        print("Model saved!")


Epoch 1/5


Training: 100%|██████████| 15625/15625 [13:30<00:00, 19.29it/s]
Evaluating: 100%|██████████| 32/32 [00:00<00:00, 55.57it/s]


Train Loss: 4.149 | Val Loss: 3.635
Model saved!

Epoch 2/5


Training: 100%|██████████| 15625/15625 [13:32<00:00, 19.23it/s]
Evaluating: 100%|██████████| 32/32 [00:00<00:00, 55.70it/s]


Train Loss: 3.475 | Val Loss: 3.326
Model saved!

Epoch 3/5


Training: 100%|██████████| 15625/15625 [13:32<00:00, 19.23it/s]
Evaluating: 100%|██████████| 32/32 [00:00<00:00, 53.67it/s]


Train Loss: 3.244 | Val Loss: 3.176
Model saved!

Epoch 4/5


Training: 100%|██████████| 15625/15625 [13:32<00:00, 19.23it/s]
Evaluating: 100%|██████████| 32/32 [00:00<00:00, 55.92it/s]


Train Loss: 3.110 | Val Loss: 3.080
Model saved!

Epoch 5/5


Training: 100%|██████████| 15625/15625 [13:34<00:00, 19.18it/s]
Evaluating: 100%|██████████| 32/32 [00:00<00:00, 53.23it/s]


Train Loss: 3.017 | Val Loss: 3.015
Model saved!


In [31]:
def translate_sentence(model, sentence, src_vocab, tgt_vocab, idx_to_tgt, device, max_len=30):
    """Translate a single sentence with proper dimension handling"""
    sentence = clean_text(sentence)
    tokens = tokenize(sentence)

    indices = [src_vocab.get(token, src_vocab['<UNK>']) for token in tokens]
    if len(indices) < max_len:
        indices += [src_vocab['<PAD>']] * (max_len - len(indices))
    else:
        indices = indices[:max_len]

    src_tensor = torch.LongTensor(indices).unsqueeze(0).to(device)

    model.eval()
    with torch.no_grad():
        encoder_emb = model.encoder_embedding(src_tensor)
        encoder_out, (hidden, cell) = model.encoder_lstm(encoder_emb)

        decoder_input = torch.tensor([[tgt_vocab['<SOS>']]], device=device)
        output_seq = []

        for _ in range(max_len):
            decoder_emb = model.decoder_embedding(decoder_input)
            decoder_out, (hidden, cell) = model.decoder_lstm(decoder_emb, (hidden, cell))
            output = model.decoder_fc(decoder_out.squeeze(1))

            next_token = output.argmax(1).item()
            if next_token == tgt_vocab['<EOS>']:
                break

            output_seq.append(next_token)
            decoder_input = torch.tensor([[next_token]], device=device)

    translated_tokens = [idx_to_tgt[idx] for idx in output_seq if idx not in (tgt_vocab['<SOS>'], tgt_vocab['<EOS>'])]
    return ' '.join(translated_tokens)

In [32]:
test_sentences = [
    "hello",
    "how are you",
    "what is your name",
    "this is a good example",
    "the weather is nice today"
]

print("Successful Translations:")
for sent in test_sentences:
    try:
        translation = translate_sentence(model, sent, src_vocab, tgt_vocab, idx_to_tgt, device)
        print(f"EN: {sent.ljust(30)} → FR: {translation}")
    except Exception as e:
        print(f"Error with '{sent}': {str(e)}")
        print("Debug Info:")
        print(f"- Input shape: {src_tensor.shape if 'src_tensor' in locals() else 'N/A'}")
        print(f"- Vocab size: {len(tgt_vocab)}")

Successful Translations:
EN: hello                          → FR: bonjour
EN: how are you                    → FR: comment ça va
EN: what is your name              → FR: comment sappelle votre nom
EN: this is a good example         → FR: cest un exemple
EN: the weather is nice today      → FR: la journée est bonne aujourdhui


In [None]:
import nbformat

# Path to your notebook (Colab environment)
notebook_path = '/content/Copy_of_LSTM.ipynb'

# Load the notebook
with open(notebook_path, 'r', encoding='utf-8') as f:
    notebook = nbformat.read(f, as_version=4)

# Remove the 'widgets' metadata (this will strip all widgets)
if 'widgets' in notebook.metadata:
    del notebook.metadata['widgets']

# Save the cleaned notebook back
with open(notebook_path, 'w', encoding='utf-8') as f:
    nbformat.write(notebook, f)

print("All widget metadata removed from the notebook.")