In [1]:
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader

from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [3]:
# Hyper Parameters
MAX_LENGTH = 25
EMBEDDING_DIM = 128
HIDDEN_DIM = 256
NUM_LAYERS = 2
DROPOUT = 0.3
BATCH_SIZE = 64
NUM_EPOCHS = 100
LEARNING_RATE = 0.001
CLIP = 1

### Data Preprocessing

In [None]:
import pandas as pd

df = pd.read_csv("eng_spn.csv")
df.head()

In [5]:
input_data = df["English words/sentences"]
target_data = df["French words/sentences"].apply(lambda x: "<sos> " + x + " <eos>")

In [6]:
# Tokenization
input_tokenizer = Tokenizer()
input_tokenizer.fit_on_texts(input_data)
input_sequences = input_tokenizer.texts_to_sequences(input_data)

target_tokenizer = Tokenizer()
target_tokenizer.fit_on_texts(target_data)
target_sequences = target_tokenizer.texts_to_sequences(target_data)

In [7]:
# Padding
padded_input_sequences = pad_sequences(
    input_sequences, maxlen=MAX_LENGTH, padding="post"
)

padded_target_sequences = pad_sequences(
    target_sequences, maxlen=MAX_LENGTH, padding="post"
)

In [8]:
# Vocab Size
input_vocab_size = len(input_tokenizer.word_index) + 1
target_vocab_size = len(target_tokenizer.word_index) + 1

In [9]:
# Converting to Pytorch Tensor
input_tensor = torch.tensor(padded_input_sequences, dtype=torch.long)
target_tensor = torch.tensor(padded_target_sequences, dtype=torch.long)

In [10]:
# DataLoader
dataloader = DataLoader(
    TensorDataset(input_tensor, target_tensor), batch_size=BATCH_SIZE, shuffle=True
)

### Encoder

In [11]:
class Encoder(nn.Module):
    def __init__(self, vocab_size, embd_dim, hidden_dim, num_layers, dropout):
        super(Encoder, self).__init__()

        # Embedding layer to convert tokens into dense vectors
        self.embedding = nn.Embedding(vocab_size, embd_dim)

        # LSTM layer
        self.lstm = nn.LSTM(
            embd_dim,
            hidden_dim,
            num_layers=num_layers,
            dropout=dropout,
            batch_first=True,
        )

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        embedded = self.embedding(x)
        lstm_output, (hidden, cell) = self.lstm(embedded)
        return hidden, cell

### Decoder

In [12]:
class Decoder(nn.Module):
    def __init__(self, vocab_size, embd_dim, hidden_dim, num_layers, dropout):
        super(Decoder, self).__init__()

        # Embedding layer to convert tokens into dense vectors
        self.embedding = nn.Embedding(vocab_size, embd_dim)

        # LSTM layer
        self.lstm = nn.LSTM(
            embd_dim,
            hidden_dim,
            num_layers=num_layers,
            dropout=dropout,
            batch_first=True,
        )
        self.fc = nn.Linear(hidden_dim, vocab_size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, hidden, cell):
        x = x.unsqueeze(1)

        embedded = self.embedding(x)
        lstm_output, (hidden, cell) = self.lstm(embedded, (hidden, cell))

        lstm_output = lstm_output.squeeze(1)

        prediction = self.fc(lstm_output)

        return prediction, hidden, cell

### Seq2Seq

In [None]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super(Seq2Seq, self).__init__()

        self.encoder = encoder  # Encoder for input processing
        self.decoder = decoder  # Decoder for output generation

    def forward(self, input, target):
        batch_size, max_length = target.size()
        target_vocab_size = self.decoder.fc.out_features

        # Tensor to store outputs for all time steps
        outputs = torch.zeros(batch_size, max_length, target_vocab_size)

        # Get initial hidden and cell states from the encoder
        hidden, cell = self.encoder(input)

        # Start decoding with the first target token
        target_input = target[:, 0]

        for t in range(1, max_length):
            output, hidden, cell = self.decoder(target_input, hidden, cell)
            outputs[:, t, :] = output
            target_input = target[:, t]

        return outputs

### Training

In [14]:
encoder = Encoder(input_vocab_size, EMBEDDING_DIM, HIDDEN_DIM, NUM_LAYERS, DROPOUT)
decoder = Decoder(target_vocab_size, EMBEDDING_DIM, HIDDEN_DIM, NUM_LAYERS, DROPOUT)

model = Seq2Seq(encoder, decoder)

In [None]:
### Checkpoint


# Saving Model
def save_checkpoint(epoch, model, filename="checkpoint.pth"):
    torch.save(
        {
            "epoch": epoch + 1,
            "model_state_dict": model.state_dict(),
        },
        filename,
    )


# Loading Model
def load_checkpoint(filename, model):
    checkpoint = torch.load(filename)

    model.load_state_dict(checkpoint["model_state_dict"])
    start_epoch = checkpoint["epoch"]
    return start_epoch


# Initializing Model
try:
    start_epoch = load_checkpoint("checkpoint.pth", model)
    print(f"Resuming training from epoch {start_epoch}")
except FileNotFoundError:
    start_epoch = 1
    print(f"No checkpoint found, starting training from scratch...")

In [16]:
# Initialize Adam Optimizer & Loss Function
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
criterion = nn.CrossEntropyLoss(ignore_index=0)

In [17]:
def train(model, optimizer, criterion, dataloader, epochs=NUM_EPOCHS):
    model.train()

    total_loss = 0

    for epoch in range(start_epoch, epochs):
        epoch_loss = 0

        for input, target in dataloader:

            # Reset Gradients
            optimizer.zero_grad()

            # Forward Pass
            output = model(input, target)

            # Reshape output and target to calculate loss
            output = output[:, 1:].reshape(-1, output.shape[2])
            target = target[:, 1:].reshape(-1)

            loss = criterion(output, target)  # Compute loss
            loss.backward()  # Backpropagation

            # Clip gradients to prevent exploding gradients
            torch.nn.utils.clip_grad_norm_(model.parameters(), CLIP)

            optimizer.step()  # Update model parameters
            epoch_loss += loss.item()

        save_checkpoint(epoch, model)

        print(f"Epoch {epoch}/{epochs}, Loss: {epoch_loss/len(dataloader)}")
        total_loss += epoch_loss

    print(f"Total Loss: {total_loss/len(dataloader)}")

In [None]:
# Training
train(model, optimizer, criterion, dataloader)

### Prediction

In [None]:
def predict(
    model, input_sequence, input_tokenizer, target_tokenizer, max_length=MAX_LENGTH
):
    model.eval()  # Set model to evaluation mode

    # Preprocess input sequence
    input_sequence = input_tokenizer.texts_to_sequences([input_sequence])
    input_sequence = pad_sequences(input_sequence, maxlen=max_length, padding="post")
    input_tensor = torch.tensor(input_sequence, dtype=torch.long)

    with torch.no_grad():
        hidden, cell = model.encoder(input_tensor)

    sos_token = target_tokenizer.word_index["sos"]
    x_input = torch.tensor([sos_token], dtype=torch.long)

    translated_sentence = []

    for _ in range(max_length):
        with torch.no_grad():
            prediction, hidden, cell = model.decoder(x_input, hidden, cell)

        predicted_token = prediction.argmax(1).item()

        if predicted_token == target_tokenizer.word_index["eos"]:
            break

        translated_sentence.append(predicted_token)

        # Update input for next step
        x_input = torch.tensor([predicted_token], dtype=torch.long)

    # Convert token IDs back to text
    translated_sentence = target_tokenizer.sequences_to_texts([translated_sentence])[0]

    return translated_sentence

In [None]:
# Prediction
input_sentence = "ill teach tom"
translated_sentence = predict(model, input_sentence, input_tokenizer, target_tokenizer)
print(f"Input Sentence: {input_sentence}")
print(f"Translated Sentence: {translated_sentence}")