# **Sequence to Sequence Model for Language Translation (English to French)**

## Importing Libraries and Loading Data

In [1]:
import torch
from torch.utils.data import Dataset, DataLoader
import spacy
from collections import Counter
import pickle
import os
import tarfile
import requests

In [3]:
!python -m spacy download de_core_news_sm
!python -m spacy download en_core_web_sm

Collecting de-core-news-sm==3.7.0
  Downloading https://github.com/explosion/spacy-models/releases/download/de_core_news_sm-3.7.0/de_core_news_sm-3.7.0-py3-none-any.whl (14.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m14.6/14.6 MB[0m [31m24.9 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: de-core-news-sm
Successfully installed de-core-news-sm-3.7.0
[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('de_core_news_sm')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option.
Collecting en-core-web-sm==3.7.1
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.7.1/en_core_web_sm-3.7.1-py3-none-any.whl (12.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.8/

In [4]:
# Download and Load SpaCy Tokenizers
spacy_de = spacy.load('de_core_news_sm')
spacy_en = spacy.load('en_core_web_sm')

In [5]:
#URLs for dataset
TRAIN_URL = "https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-AI0205EN-SkillsNetwork/training.tar.gz"
VALID_URL = "https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-AI0205EN-SkillsNetwork/validation.tar.gz"

In [6]:
# Special tokens
PAD_IDX,BOS_IDX,EOS_IDX,UNK_IDX=0,1,2,3
special_tokens=['<pad>', '<bos>', '<eos>', '<unk>']

In [7]:
#Tokenize Functions
def tokenize_de(text):
  return [tok.text for tok in spacy_de.tokenizer(text)]

def tokenize_en(text):
  return [tok.text for tok in spacy_de.tokenizer(text)]

In [8]:
#Function to download and extract data
def download_and_extract(url,extract_path):
  filename=url.split('/')[-1]
  file_path=os.path.join(extract_path, filename)

  if not os.path.exists(extract_path):
    os.makedirs(extract_path)

  if not os.path.exists(file_path):
    print(f"Downloading {filename}...")
    response=requests.get(url,stream=True)
    with open(file_path,'wb') as f:
      for chunk in response.iter_content(chunk_size=1024):
        if chunk:
          f.write(chunk)

    print(f"Downloaded {filename}")


  #Ectract the tar.gz file
  with tarfile.open(file_path, 'r:gz') as tar:
    tar.extractall(path=extract_path)
    print(f"Extracted {filename}")


In [9]:
#Function to crate Vocabulary
def build_vocab(sentences,tokenizer):
  counter=Counter()
  for sentence in sentences:
    counter.update(tokenizer(sentence))
  vocab={word: i+4 for i ,(word,_) in enumerate(counter.most_common())} # Offset for stepcial tokens
  for i, token in enumerate(special_tokens):
    vocab[token]=i # Assign special tokens
  return vocab

In [10]:
#Custom Dataset class
class TranslationDataset(Dataset):
  def __init__(self,src_sentences,tgt_sentences,src_vocab,tgt_vocab):
    self.src_sentences=src_sentences
    self.tgt_sentences=tgt_sentences
    self.src_vocab=src_vocab
    self.tgt_vocab=tgt_vocab

  def __len__(self):
    return len(self.src_sentences)

  def __getitem__(self,idx):
    src=[self.src_vocab.get(token,UNK_IDX) for token in tokenize_de(self.src_sentences[idx])]
    tgt=[self.tgt_vocab.get(token,UNK_IDX) for token in tokenize_en(self.tgt_sentences[idx])]
    return torch.tensor([BOS_IDX] + src + [EOS_IDX]),torch.tensor([BOS_IDX]+ tgt + [EOS_IDX])


In [25]:
# Function to load dataset
def load_data(data_dir):
  src_sentences,tgt_sentences=[],[]

  train_src_path=os.path.join(data_dir,"train.de")
  train_tgt_path=os.path.join(data_dir,"train.en")

  with open(train_src_path, 'r', encoding='utf-8') as src_file, open(train_tgt_path, 'r', encoding='utf-8') as tgt_file:
    for src_line,tgt_line in zip(src_file, tgt_file):
      src_sentences.append(src_line.strip())
      tgt_sentences.append(tgt_line.strip())


  src_vocab=build_vocab(src_sentences,tokenize_de)
  tgt_vocab=build_vocab(tgt_sentences, tokenize_en)
  dataset=TranslationDataset(src_sentences,tgt_sentences,src_vocab, tgt_vocab)
  return dataset,src_vocab,tgt_vocab

In [12]:
#Collate function for padding
def collate_fn(batch):
  src_batch,tgt_batch=zip(*batch)
  src_batch=torch.nn.utils.rnn.pad_sequence(src_batch,padding_value=PAD_IDX,batch_first=True)
  tgt_batch=torch.nn.utils.rnn.pad_sequence(tgt_batch,padding_value=PAD_IDX,batch_first=True)

  return src_batch,tgt_batch

In [13]:
#Function to get DataLoaders
def get_dataloaders(batch_size=16,data_dir="data"):
  download_and_extract(TRAIN_URL,data_dir)
  dataset,src_vocab,tgt_vocab=load_data(data_dir)
  dataloader=DataLoader(dataset,batch_size=batch_size,collate_fn=collate_fn,shuffle=True)
  return dataloader,src_vocab,tgt_vocab

## Building Seq2Seq Model

In [50]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader

# Encoder
class Encoder(nn.Module):
  def __init__(self,input_dim,emb_dim,hidden_dim,num_layers,dropout):
    super(Encoder,self).__init__()
    self.embedding=nn.Embedding(input_dim,emb_dim)
    self.rnn=nn.LSTM(emb_dim,hidden_dim,num_layers,dropout=dropout,batch_first=True)
    self.dropout=nn.Dropout(dropout)

  def forward(self,src):
    embedded=self.dropout(self.embedding(src))
    output,(hidden,cell)=self.rnn(embedded)
    return hidden,cell


In [51]:
# Decoder
class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hidden_dim, num_layers, dropout):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.rnn = nn.LSTM(emb_dim, hidden_dim, num_layers, dropout=dropout, batch_first=True)
        self.fc_out = nn.Linear(hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, tgt, hidden, cell):
        tgt = tgt.unsqueeze(1)  # Add sequence dimension
        embedded = self.dropout(self.embedding(tgt))
        output, (hidden, cell) = self.rnn(embedded, (hidden, cell))
        prediction = self.fc_out(output.squeeze(1))
        return prediction, hidden, cell

In [52]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, src, tgt, teacher_forcing_ratio=0.5):
        batch_size, tgt_len = tgt.shape
        tgt_vocab_size = self.decoder.fc_out.out_features  # Get vocab size

        # Store outputs
        outputs = torch.zeros(batch_size, tgt_len, tgt_vocab_size).to(self.device)

        # Encode source sequence
        hidden, cell = self.encoder(src)

        # First decoder input is <bos>
        input = tgt[:, 0].to(self.device)  # ✅ Move to device

        for t in range(1, tgt_len):
            output, hidden, cell = self.decoder(input, hidden, cell)
            outputs[:, t, :] = output  # ✅ Store correctly

            top1 = output.argmax(1).to(self.device)  # ✅ Move to device

            # Teacher forcing
            input = tgt[:, t].to(self.device) if torch.rand(1).item() < teacher_forcing_ratio else top1

        return outputs


In [53]:
# Training
def train(model, dataloader, optimizer, criterion, device):
    model.train()
    epoch_loss = 0

    for src, tgt in dataloader:
        src, tgt = src.to(device), tgt.to(device)  # Move tensors to device
        optimizer.zero_grad()

        output = model(src, tgt)
        output_dim = output.shape[-1]

        output = output[:, 1:].reshape(-1, output_dim)  # Flatten for loss function
        tgt = tgt[:, 1:].reshape(-1)  # Flatten targets

        loss = criterion(output, tgt)
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    return epoch_loss / len(dataloader)


In [54]:
# Evaluation
def evaluate(model, dataloader, criterion, device):
    model.eval()
    epoch_loss = 0

    with torch.no_grad():
        for src, tgt in dataloader:
            src, tgt = src.to(device), tgt.to(device)  # Move to device

            output = model(src, tgt, 0)  # No teacher forcing during evaluation
            output_dim = output.shape[-1]

            output = output[:, 1:].reshape(-1, output_dim)
            tgt = tgt[:, 1:].reshape(-1)

            loss = criterion(output, tgt)
            epoch_loss += loss.item()

    return epoch_loss / len(dataloader)


In [60]:
# Hyperparameters and Training setup
device=torch.device("cuda" if torch.cuda.is_available() else "cpu")
batch_size=16
embedding_dim=256
hidden_dim=512
num_layers=2
dropout=0.5
num_epochs=10

In [61]:
# Load Dataloader, Vocabulary
dataloader, src_vocab,tgt_vocab=get_dataloaders(batch_size)

Extracted training.tar.gz


In [62]:
# Model Setup
input_dim=len(src_vocab)
output_dim=len(tgt_vocab)

encoder=Encoder(input_dim,embedding_dim,hidden_dim,num_layers,dropout).to(device)
decoder=Decoder(output_dim,embedding_dim,hidden_dim,num_layers,dropout).to(device)
model=Seq2Seq(encoder,decoder,device).to(device)

optimizer=optim.Adam(model.parameters(), lr=0.001)
criterion=nn.CrossEntropyLoss(ignore_index=PAD_IDX)


In [63]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Seq2Seq(encoder, decoder, device).to(device)  # ✅ Move model to device

In [64]:
# Training Loop
for epoch in range(num_epochs):
  train_loss=train(model,dataloader,optimizer,criterion,device)
  val_loss=evaluate(model,dataloader,criterion,device)

  print(f"Epoch {epoch+1}: Train Loss= {train_loss :.4f} , Val Loss= {val_loss:.4f}")

Epoch 1: Train Loss= 4.7183 , Val Loss= 4.6844
Epoch 2: Train Loss= 4.0945 , Val Loss= 4.3995
Epoch 3: Train Loss= 3.8295 , Val Loss= 4.2137
Epoch 4: Train Loss= 3.6247 , Val Loss= 4.0378
Epoch 5: Train Loss= 3.4623 , Val Loss= 3.9139
Epoch 6: Train Loss= 3.3218 , Val Loss= 3.8320
Epoch 7: Train Loss= 3.1888 , Val Loss= 3.6684
Epoch 8: Train Loss= 3.0677 , Val Loss= 3.5929
Epoch 9: Train Loss= 2.9642 , Val Loss= 3.5035
Epoch 10: Train Loss= 2.8688 , Val Loss= 3.3899


In [68]:
#mount drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [69]:
drive_path="/content/drive/MyDrive/NLP Learning Labs/seq2seq_model.pth"

# Save model function
def save_model(model, optimizer, epoch, loss, path=drive_path):
    checkpoint = {
        "epoch": epoch,
        "model_state_dict": model.state_dict(),
        "optimizer_state_dict": optimizer.state_dict(),
        "loss": loss
    }
    torch.save(checkpoint, path)
    print(f"✅ Model saved at {path}")

In [70]:
save_model(model, optimizer, epoch, val_loss)

✅ Model saved at /content/drive/MyDrive/NLP Learning Labs/seq2seq_model.pth


## Load the Model from Drive

In [71]:
def load_model(model,optimizer,path,drive_path):
  if os.path.exists(path):
    checkpoint=torch.load(path,map_location=device)
    model.load_state_dict(checkpoint["model_state_dict"])
    optimizer.load_state_dict(["optimizer_state_dict"])
    epoch=checkpoint["epoch"]
    loss=checkpoint["loss"]
    print(f"✅ Model loaded from {path}, Last Epoch: {epoch}, Loss: {loss:.4f}")

  else:
    print("❌ No saved model found!")


In [77]:
def translate_sentence(sentence, src_vocab, tgt_vocab, model, device, max_length=50):
    model.eval()

    # Tokenize input sentence
    tokens = ["<bos>"] + tokenize_de(sentence) + ["<eos>"]
    src_indices = [src_vocab.get(token, UNK_IDX) for token in tokens]
    src_tensor = torch.tensor(src_indices, dtype=torch.long).unsqueeze(0).to(device)

    # Encode
    with torch.no_grad():
        hidden, cell = model.encoder(src_tensor)

    # Decode
    tgt_indexes = [BOS_IDX]
    for _ in range(max_length):
        tgt_tensor = torch.tensor([tgt_indexes[-1]], dtype=torch.long).to(device)
        with torch.no_grad():
            output, hidden, cell = model.decoder(tgt_tensor, hidden, cell)
            pred_token = output.argmax(1).item()

        tgt_indexes.append(pred_token)
        if pred_token == EOS_IDX:
            break

    # Convert indexes to words
    tgt_tokens = [list(tgt_vocab.keys())[list(tgt_vocab.values()).index(idx)] for idx in tgt_indexes]
    return " ".join(tgt_tokens[1:-1])  # Remove <bos> and <eos>

# Example Usage:
sentence = "Das kleine Kind klettert an roten Seilen auf einem Spielplatz."
translation = translate_sentence(sentence, src_vocab, tgt_vocab, model, device)
print(f"📝 Translation: {translation}")


📝 Translation: The little child is climbing a a slide on a playground .
