In [19]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

# Vocabulary class to handle mapping between words and numerical indices
class Vocabulary:
    def __init__(self):
        # Initialize dictionaries for word to index and index to word mappings
        self.word2index = {"<PAD>": 0, "<SOS>": 1, "<EOS>": 2, "<UNK>": 3}
        self.index2word = {0: "<PAD>", 1: "<SOS>", 2: "<EOS>", 3: "<UNK"}
        self.word_count = {}  # Keep track of word frequencies
        self.n_words = 4  # Start counting from 3 to account for special tokens

    def add_sentence(self, sentence):
        # Add all words in a sentence to the vocabulary
        for word in sentence.split(' '):
            self.add_word(word)

    def add_word(self, word):
        # Add a word to the vocabulary
        if word not in self.word2index:
            # Assign a new index to the word and update mappings
            self.word2index[word] = self.n_words
            self.index2word[self.n_words] = word
            self.word_count[word] = 1
            self.n_words += 1
        else:
            # Increment word count if the word already exists in the vocabulary
            self.word_count[word] += 1

def tokenize_and_pad(sentences, vocab):
    # Calculate the maximum sentence length for padding
    max_length = max(len(sentence.split(' ')) for sentence in sentences) + 2  # +2 for SOS and EOS tokens
    tokenized_sentences = []
    for sentence in sentences:
        # Convert each sentence to a list of indices, adding SOS and EOS tokens
        tokens = [vocab.word2index["<SOS>"]] + [vocab.word2index[word] for word in sentence.split(' ')] + [vocab.word2index["<EOS>"]]
        # Pad sentences to the maximum length
        padded_tokens = tokens + [vocab.word2index["<PAD>"]] * (max_length - len(tokens))
        tokenized_sentences.append(padded_tokens)
    return torch.tensor(tokenized_sentences, dtype=torch.long)

# Custom Dataset class for English to French sentences
class EngFrDataset(Dataset):
    def __init__(self, pairs):
        self.eng_vocab = Vocabulary()
        self.fr_vocab = Vocabulary()
        self.pairs = []

        # Process each English-French pair
        for eng, fr in pairs:
            self.eng_vocab.add_sentence(eng)
            self.fr_vocab.add_sentence(fr)
            self.pairs.append((eng, fr))

        # Separate English and French sentences
        self.eng_sentences = [pair[0] for pair in self.pairs]
        self.fr_sentences = [pair[1] for pair in self.pairs]
        
        # Tokenize and pad sentences
        self.eng_tokens = tokenize_and_pad(self.eng_sentences, self.eng_vocab)
        self.fr_tokens = tokenize_and_pad(self.fr_sentences, self.fr_vocab)

        # Define the embedding layers for English and French
        self.eng_embedding = torch.nn.Embedding(self.eng_vocab.n_words, 100)  # Embedding size = 100
        self.fr_embedding = torch.nn.Embedding(self.fr_vocab.n_words, 100)    # Embedding size = 100

    def __len__(self):
        # Return the number of sentence pairs
        return len(self.pairs)

    def __getitem__(self, idx):
        # Get the tokenized and padded sentences by index
        eng_tokens = self.eng_tokens[idx]
        fr_tokens = self.fr_tokens[idx]
        # Lookup embeddings for the tokenized sentences
        eng_emb = self.eng_embedding(eng_tokens)
        fr_emb = self.fr_embedding(fr_tokens)
        return eng_tokens, fr_tokens, eng_emb, fr_emb

# Sample dataset of English-French sentence pairs
english_to_french = [

    ("I am cold", "J'ai froid"),

    ("You are tired", "Tu es fatigué"),

    ("He is hungry", "Il a faim"),

    ("She is happy", "Elle est heureuse"),

    ("We are friends", "Nous sommes amis"),

    ("They are students", "Ils sont étudiants"),

    ("The cat is sleeping", "Le chat dort"),

    ("The sun is shining", "Le soleil brille"),

    ("We love music", "Nous aimons la musique"),

    ("She speaks French fluently", "Elle parle français couramment"),

    ("He enjoys reading books", "Il aime lire des livres"),

    ("They play soccer every weekend", "Ils jouent au football chaque week-end"),

    ("The movie starts at 7 PM", "Le film commence à 19 heures"),

    ("She wears a red dress", "Elle porte une robe rouge"),

    ("We cook dinner together", "Nous cuisinons le dîner ensemble"),

    ("He drives a blue car", "Il conduit une voiture bleue"),

    ("They visit museums often", "Ils visitent souvent des musées"),

    ("The restaurant serves delicious food", "Le restaurant sert une délicieuse cuisine"),

    ("She studies mathematics at university", "Elle étudie les mathématiques à l'université"),

    ("We watch movies on Fridays", "Nous regardons des films le vendredi"),

    ("He listens to music while jogging", "Il écoute de la musique en faisant du jogging"),

    ("They travel around the world", "Ils voyagent autour du monde"),

    ("The book is on the table", "Le livre est sur la table"),

    ("She dances gracefully", "Elle danse avec grâce"),

    ("We celebrate birthdays with cake", "Nous célébrons les anniversaires avec un gâteau"),

    ("He works hard every day", "Il travaille dur tous les jours"),

    ("They speak different languages", "Ils parlent différentes langues"),

    ("The flowers bloom in spring", "Les fleurs fleurissent au printemps"),

    ("She writes poetry in her free time", "Elle écrit de la poésie pendant son temps libre"),

    ("We learn something new every day", "Nous apprenons quelque chose de nouveau chaque jour"),

    ("The dog barks loudly", "Le chien aboie bruyamment"),

    ("He sings beautifully", "Il chante magnifiquement"),

    ("They swim in the pool", "Ils nagent dans la piscine"),

    ("The birds chirp in the morning", "Les oiseaux gazouillent le matin"),

    ("She teaches English at school", "Elle enseigne l'anglais à l'école"),

    ("We eat breakfast together", "Nous prenons le petit déjeuner ensemble"),

    ("He paints landscapes", "Il peint des paysages"),

    ("They laugh at the joke", "Ils rient de la blague"),

    ("The clock ticks loudly", "L'horloge tic-tac bruyamment"),

    ("She runs in the park", "Elle court dans le parc"),

    ("We travel by train", "Nous voyageons en train"),

    ("He writes a letter", "Il écrit une lettre"),

    ("They read books at the library", "Ils lisent des livres à la bibliothèque"),

    ("The baby cries", "Le bébé pleure"),

    ("She studies hard for exams", "Elle étudie dur pour les examens"),

    ("We plant flowers in the garden", "Nous plantons des fleurs dans le jardin"),

    ("He fixes the car", "Il répare la voiture"),

    ("They drink coffee in the morning", "Ils boivent du café le matin"),

    ("The sun sets in the evening", "Le soleil se couche le soir"),

    ("She dances at the party", "Elle danse à la fête"),

    ("We play music at the concert", "Nous jouons de la musique au concert"),

    ("He cooks dinner for his family", "Il cuisine le dîner pour sa famille"),

    ("They study French grammar", "Ils étudient la grammaire française"),

    ("The rain falls gently", "La pluie tombe doucement"),

    ("She sings a song", "Elle chante une chanson"),

    ("We watch a movie together", "Nous regardons un film ensemble"),

    ("He sleeps deeply", "Il dort profondément"),

    ("They travel to Paris", "Ils voyagent à Paris"),

    ("The children play in the park", "Les enfants jouent dans le parc"),

    ("She walks along the beach", "Elle se promène le long de la plage"),

    ("We talk on the phone", "Nous parlons au téléphone"),

    ("He waits for the bus", "Il attend le bus"),

    ("They visit the Eiffel Tower", "Ils visitent la tour Eiffel"),

    ("The stars twinkle at night", "Les étoiles scintillent la nuit"),

    ("She dreams of flying", "Elle rêve de voler"),

    ("We work in the office", "Nous travaillons au bureau"),

    ("He studies history", "Il étudie l'histoire"),

    ("They listen to the radio", "Ils écoutent la radio"),

    ("The wind blows gently", "Le vent souffle doucement"),

    ("She swims in the ocean", "Elle nage dans l'océan"),

    ("We dance at the wedding", "Nous dansons au mariage"),

    ("He climbs the mountain", "Il gravit la montagne"),

    ("They hike in the forest", "Ils font de la randonnée dans la forêt"),

    ("The cat meows loudly", "Le chat miaule bruyamment"),

    ("She paints a picture", "Elle peint un tableau"),

    ("We build a sandcastle", "Nous construisons un château de sable"),

    ("He sings in the choir", "Il chante dans le chœur")

]

from sklearn.model_selection import train_test_split

# Splitting dataset into train and test sets
train_data, test_data = train_test_split(english_to_french, test_size=0.2, random_state=42)

# Initialize train and test datasets
train_dataset = EngFrDataset(train_data)
test_dataset = EngFrDataset(test_data)


# Initialize train and test data loaders
train_dataloader = DataLoader(train_dataset, batch_size=1, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=1, shuffle=False)


class Encoder(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(input_size, hidden_size)  # Embedding layer
        self.lstm = nn.LSTM(hidden_size, hidden_size)  # LSTM layer

    def forward(self, input, hidden):
        # Move input tensor to the correct device
        input = input.to(hidden[0].device)
        
        # Forward pass for the encoder
        embedded = self.embedding(input).view(1, 1, -1)
        output, hidden = self.lstm(embedded, hidden)
        return output, hidden

    def initHidden(self):
        # Initializes hidden state
        return (torch.zeros(1, 1, self.hidden_size, device=device),  # Ensure initialization on the correct device
                torch.zeros(1, 1, self.hidden_size, device=device))  # Ensure initialization on the correct device


class Decoder(nn.Module):
    def __init__(self, hidden_size, output_size):
        super(Decoder, self).__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(output_size, hidden_size)
        self.lstm = nn.LSTM(hidden_size, hidden_size)
        self.out = nn.Linear(hidden_size, output_size)
        self.softmax = nn.LogSoftmax(dim=1)
                             
    def forward(self, input, hidden):
        embedded = self.embedding(input).view(1, 1, -1)
        output, hidden = self.lstm(embedded, hidden)
        output = self.softmax(self.out(output[0]))
        return output, hidden

    def initHidden(self):
        return (torch.zeros(1, 1, self.hidden_size),  # LSTM layer requires a tuple of hidden state and cell state
                torch.zeros(1, 1, self.hidden_size))
    



# Assuming all words in the dataset + 'SOS' and 'EOS' tokens are included in eng_vocab and fr_vocab
input_size = train_dataset.eng_vocab.n_words
hidden_size = 256  # Example hidden size
output_size = train_dataset.fr_vocab.n_words

# Create the encoder and decoder instances
encoder = Encoder(input_size, hidden_size)
decoder = Decoder(hidden_size, output_size)

# Set the device to GPU if available, else CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
encoder = encoder.to(device)
decoder = decoder.to(device)

# Set the learning rate for optimization
learning_rate = 0.01

# Initializing optimizers for both encoder and decoder with Stochastic Gradient Descent (SGD)
encoder_optimizer = optim.SGD(encoder.parameters(), lr=learning_rate)
decoder_optimizer = optim.SGD(decoder.parameters(), lr=learning_rate)

# Negative Log Likelihood Loss function for calculating loss
criterion = nn.NLLLoss()

# Set number of epochs for training
n_epochs = 100

def train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion, max_length=12):
    # Initialize encoder hidden state
    encoder_hidden = encoder.initHidden()

    # Clear gradients for optimizers
    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()

    # Calculate the length of input and target tensors
    input_length = input_tensor.size(0)
    target_length = target_tensor.size(0)

    # Initialize loss
    loss = 0

    # Encoding each token in the input
    for ei in range(input_length):
        encoder_output, encoder_hidden = encoder(input_tensor[ei].unsqueeze(0), encoder_hidden)

    # Decoder's first input is the SOS token
    decoder_input = torch.tensor([[train_dataset.fr_vocab.word2index['<SOS>']]], device=device)

    # Decoder starts with the encoder's last hidden state
    decoder_hidden = encoder_hidden

    # Decoding loop
    for di in range(target_length):
        decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
        # Choose top1 word from decoder's output
        topv, topi = decoder_output.topk(1)
        decoder_input = topi.squeeze().detach()  # Detach from history as input

        # Calculate loss
        loss += criterion(decoder_output, target_tensor[di].unsqueeze(0))
        if decoder_input.item() == train_dataset.fr_vocab.word2index['<EOS>']:  # Stop if EOS token is generated
            break

    # Backpropagation
    loss.backward()

    # Update encoder and decoder parameters
    encoder_optimizer.step()
    decoder_optimizer.step()

    # Return average loss
    return loss.item() / target_length

# Training loop
for epoch in range(n_epochs):
    total_loss = 0
    for eng_tokens, fr_tokens, _, _ in train_dataloader:  # Fixed variable name
        # Move tensors to the correct device
        input_tensor = eng_tokens.squeeze().to(device)
        target_tensor = fr_tokens.squeeze().to(device)
        
        # Perform a single training step and update total loss
        loss = train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion)
        total_loss += loss
    
    # Print loss every 10 epochs
    if epoch % 10 == 0:
        print(f'Epoch {epoch}, Loss: {total_loss / len(train_dataloader)}')

def translate_sentence(encoder, decoder, sentence, eng_vocab, fr_vocab, device, max_length=12):
    # Preprocess the input sentence
    tokens = tokenize_and_pad([sentence], eng_vocab)
    input_tensor = tokens.squeeze().to(device)
    
    # Initialize encoder hidden state
    encoder_hidden = encoder.initHidden()

    # Encode the input sentence
    for ei in range(input_tensor.size(0)):
        encoder_output, encoder_hidden = encoder(input_tensor[ei].unsqueeze(0), encoder_hidden)

    # Initialize decoder input with SOS token
    decoder_input = torch.tensor([[fr_vocab.word2index['<SOS>']]], device=device)
    decoder_hidden = encoder_hidden
    
    # Initialize list to store decoded tokens
    decoded_tokens = []

    # Decode tokens until EOS token is generated or maximum length is reached
    for di in range(max_length):
        decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
        topv, topi = decoder_output.topk(1)
        token_index = topi.squeeze().detach().item()
        if token_index == fr_vocab.word2index['<EOS>']:
            break
        else:
            # Replace out-of-vocabulary token indices with '<UNK>' token index
            if token_index not in fr_vocab.index2word:
                token_index = fr_vocab.word2index['<UNK>']
            decoded_tokens.append(token_index)
        decoder_input = topi.squeeze().detach()

    # Convert decoded tokens to words
    translated_sentence = ' '.join([fr_vocab.index2word[token] for token in decoded_tokens])
    
    return translated_sentence


def evaluate_and_show_examples(encoder, decoder, dataloader, criterion, n_examples=5):
    # Switch model to evaluation mode
    encoder.eval()
    decoder.eval()

    total_loss = 0
    correct_predictions = 0
    predicted_examples = []

    # No gradient calculation
    with torch.no_grad():
        for i, (eng_tokens, fr_tokens, _, _) in enumerate(dataloader):
            # Move tensors to the correct device
            input_tensor = eng_tokens.squeeze().to(device)
            target_tensor = fr_tokens.squeeze().to(device)

            encoder_hidden = encoder.initHidden()

            input_length = input_tensor.size(0)
            target_length = target_tensor.size(0)

            loss = 0

            # Encoding step
            for ei in range(input_length):
                encoder_output, encoder_hidden = encoder(input_tensor[ei].unsqueeze(0), encoder_hidden)

            # Decoding step
            decoder_input = torch.tensor([[train_dataset.fr_vocab.word2index['<SOS>']]], device=device)
            decoder_hidden = encoder_hidden

            predicted_indices = []

            for di in range(target_length):
                decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
                topv, topi = decoder_output.topk(1)
                predicted_indices.append(topi.item())
                decoder_input = topi.squeeze().detach()

                loss += criterion(decoder_output, target_tensor[di].unsqueeze(0))
                if decoder_input.item() == train_dataset.fr_vocab.word2index['<EOS>']:
                    break

            # Calculate and print loss and accuracy for the evaluation
            total_loss += loss.item() / target_length
            if predicted_indices == target_tensor.tolist():
                correct_predictions += 1

            # Optionally, print some examples
            if i < n_examples:
                input_string = ' '.join([train_dataset.eng_vocab.index2word[index.item()] for index in input_tensor if
                                         index.item() not in (train_dataset.eng_vocab.word2index['<SOS>'],
                                                              train_dataset.eng_vocab.word2index['<EOS>'])])
                target_string = ' '.join([train_dataset.fr_vocab.index2word[index.item()] for index in target_tensor if
                                          index.item() not in (train_dataset.fr_vocab.word2index['<SOS>'],
                                                               train_dataset.fr_vocab.word2index['<EOS>'])])
                predicted_string = ' '.join([train_dataset.fr_vocab.index2word[index] for index in predicted_indices if
                                             index not in (train_dataset.fr_vocab.word2index['<SOS>'],
                                                           train_dataset.fr_vocab.word2index['<EOS>'])])
                predicted_examples.append((input_string, target_string, predicted_string))

        # Print overall evaluation results
        average_loss = total_loss / len(dataloader)
        accuracy = correct_predictions / len(dataloader)
        print(f'Evaluation Loss: {average_loss}, Accuracy: {accuracy}')
        return predicted_examples


# Training loop
for epoch in range(n_epochs):
    total_loss = 0
    for eng_tokens, fr_tokens, _, _ in train_dataloader:
        # Move tensors to the correct device
        input_tensor = eng_tokens.squeeze().to(device)
        target_tensor = fr_tokens.squeeze().to(device)

        # Perform a single training step and update total loss
        loss = train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion)
        total_loss += loss

    # Print loss every 10 epochs
    if epoch % 10 == 0:
        print(f'Epoch {epoch}, Loss: {total_loss / len(train_dataloader)}')

# Perform evaluation with examples
predicted_examples = evaluate_and_show_examples(encoder, decoder, test_dataloader, criterion)
         

    


Epoch 0, Loss: 2.949303688128908
Epoch 10, Loss: 1.3817307533343757
Epoch 20, Loss: 1.3341595909812236
Epoch 30, Loss: 1.199144161582526
Epoch 40, Loss: 0.9334354954931314
Epoch 50, Loss: 0.6038366156434516
Epoch 60, Loss: 0.29056670297986525
Epoch 70, Loss: 0.10118635875990427
Epoch 80, Loss: 0.049868348924840065
Epoch 90, Loss: 0.029075934017290837
Epoch 0, Loss: 0.020866690733749003
Epoch 10, Loss: 0.016244131788589203
Epoch 20, Loss: 0.013243991631775781
Epoch 30, Loss: 0.011153820108194821
Epoch 40, Loss: 0.009613582528516892
Epoch 50, Loss: 0.008433938459472401
Epoch 60, Loss: 0.00750299757998853
Epoch 70, Loss: 0.006750675481806571
Epoch 80, Loss: 0.006130899537227012
Epoch 90, Loss: 0.005610646022103405
Evaluation Loss: 4.922064770351756, Accuracy: 0.0


In [21]:
for eng_sentence, _ in test_data:
    translated_sentence = translate_sentence(encoder, decoder, eng_sentence, test_dataset.eng_vocab, test_dataset.fr_vocab, device)
    print(f"Input: {eng_sentence}, Translated: {translated_sentence}")


Input: We are friends, Translated: <SOS> Nous vent souffle peint un tableau


KeyError: '<UNK>'