<a href="https://colab.research.google.com/github/Bluesparx/english-to-spanish-transformer/blob/main/English_to_spanish.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [50]:
import numpy as np
import torch
import math
from torch import nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split
from torch.optim import Adam
from difflib import SequenceMatcher
from torch.optim.lr_scheduler import StepLR

def similar(a, b):
    return SequenceMatcher(None, a, b).ratio()

def get_device():
    return torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

def scaled_dot_product(q, k, v, mask=None):
    d_k = q.size()[-1]
    scaled = torch.matmul(q, k.transpose(-1, -2)) / math.sqrt(d_k)
    if mask is not None:
        scaled = scaled.permute(1, 0, 2, 3) + mask
        scaled = scaled.permute(1, 0, 2, 3)
    attention = F.softmax(scaled, dim=-1)
    values = torch.matmul(attention, v)
    return values, attention

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_sequence_length):
        super().__init__()
        self.max_sequence_length = max_sequence_length
        self.d_model = d_model

    def forward(self, x):
        even_i = torch.arange(0, self.d_model, 2).float()
        denominator = torch.pow(10000, even_i/self.d_model)
        position = (torch.arange(self.max_sequence_length)
                          .reshape(self.max_sequence_length, 1))
        even_PE = torch.sin(position / denominator)
        odd_PE = torch.cos(position / denominator)
        stacked = torch.stack([even_PE, odd_PE], dim=2)
        PE = torch.flatten(stacked, start_dim=1, end_dim=2)
        return PE



class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
        self.qkv_layer = nn.Linear(d_model , 3 * d_model)
        self.linear_layer = nn.Linear(d_model, d_model)

    def forward(self, x, mask):
        batch_size, sequence_length, d_model = x.size()
        qkv = self.qkv_layer(x)
        qkv = qkv.reshape(batch_size, sequence_length, self.num_heads, 3 * self.head_dim)
        qkv = qkv.permute(0, 2, 1, 3)
        q, k, v = qkv.chunk(3, dim=-1)
        values, attention = scaled_dot_product(q, k, v, mask)
        values = values.permute(0, 2, 1, 3).reshape(batch_size, sequence_length, self.num_heads * self.head_dim)
        out = self.linear_layer(values)
        return out


class LayerNormalization(nn.Module):
    def __init__(self, parameters_shape, eps=1e-5):
        super().__init__()
        self.parameters_shape=parameters_shape
        self.eps=eps
        self.gamma = nn.Parameter(torch.ones(parameters_shape))
        self.beta =  nn.Parameter(torch.zeros(parameters_shape))

    def forward(self, inputs):
        dims = [-(i + 1) for i in range(len(self.parameters_shape))]
        mean = inputs.mean(dim=dims, keepdim=True)
        var = ((inputs - mean) ** 2).mean(dim=dims, keepdim=True)
        std = (var + self.eps).sqrt()
        y = (inputs - mean) / std
        out = self.gamma * y + self.beta
        return out


class PositionwiseFeedForward(nn.Module):
    def __init__(self, d_model, hidden, drop_prob=0.1):
        super(PositionwiseFeedForward, self).__init__()
        self.linear1 = nn.Linear(d_model, hidden)
        self.linear2 = nn.Linear(hidden, d_model)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=drop_prob)

    def forward(self, x):
        x = self.linear1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.linear2(x)
        return x


class EncoderLayer(nn.Module):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob):
        super(EncoderLayer, self).__init__()
        self.attention = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
        self.norm1 = LayerNormalization(parameters_shape=[d_model])
        self.dropout1 = nn.Dropout(p=drop_prob)
        self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
        self.norm2 = LayerNormalization(parameters_shape=[d_model])
        self.dropout2 = nn.Dropout(p=drop_prob)

    def forward(self, x, self_attention_mask):
        residual_x = x.clone()
        x = self.attention(x, mask=self_attention_mask)
        x = self.dropout1(x)
        x = self.norm1(x + residual_x)
        residual_x = x.clone()
        x = self.ffn(x)
        x = self.dropout2(x)
        x = self.norm2(x + residual_x)
        return x

class SequentialEncoder(nn.Sequential):
    def forward(self, *inputs):
        x, self_attention_mask  = inputs
        for module in self._modules.values():
            x = module(x, self_attention_mask)
        return x

class Encoder(nn.Module):
    def __init__(self,
                 d_model,
                 ffn_hidden,
                 num_heads,
                 drop_prob,
                 num_layers,
                 max_sequence_length,
                 language_to_index,
                 START_TOKEN,
                 END_TOKEN,
                 PADDING_TOKEN):
        super().__init__()
        self.sentence_embedding = SentenceEmbedding(max_sequence_length, d_model, language_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN)
        self.layers = SequentialEncoder(*[EncoderLayer(d_model, ffn_hidden, num_heads, drop_prob)
                                      for _ in range(num_layers)])

    def forward(self, x, self_attention_mask, start_token, end_token):
        x = self.sentence_embedding(x, start_token, end_token)
        x = self.layers(x, self_attention_mask)
        return x


class MultiHeadCrossAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
        self.kv_layer = nn.Linear(d_model , 2 * d_model)
        self.q_layer = nn.Linear(d_model , d_model)
        self.linear_layer = nn.Linear(d_model, d_model)

    def forward(self, x, y, mask):
        batch_size, sequence_length, d_model = x.size() # in practice, this is the same for both languages...so we can technically combine with normal attention
        kv = self.kv_layer(x)
        q = self.q_layer(y)
        kv = kv.reshape(batch_size, sequence_length, self.num_heads, 2 * self.head_dim)
        q = q.reshape(batch_size, sequence_length, self.num_heads, self.head_dim)
        kv = kv.permute(0, 2, 1, 3)
        q = q.permute(0, 2, 1, 3)
        k, v = kv.chunk(2, dim=-1)
        values, attention = scaled_dot_product(q, k, v, mask) # We don't need the mask for cross attention, removing in outer function!
        values = values.permute(0, 2, 1, 3).reshape(batch_size, sequence_length, d_model)
        out = self.linear_layer(values)
        return out


class DecoderLayer(nn.Module):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob):
        super(DecoderLayer, self).__init__()
        self.self_attention = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
        self.layer_norm1 = LayerNormalization(parameters_shape=[d_model])
        self.dropout1 = nn.Dropout(p=drop_prob)

        self.encoder_decoder_attention = MultiHeadCrossAttention(d_model=d_model, num_heads=num_heads)
        self.layer_norm2 = LayerNormalization(parameters_shape=[d_model])
        self.dropout2 = nn.Dropout(p=drop_prob)

        self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
        self.layer_norm3 = LayerNormalization(parameters_shape=[d_model])
        self.dropout3 = nn.Dropout(p=drop_prob)

    def forward(self, x, y, self_attention_mask, cross_attention_mask):
        _y = y.clone()
        y = self.self_attention(y, mask=self_attention_mask)
        y = self.dropout1(y)
        y = self.layer_norm1(y + _y)

        _y = y.clone()
        y = self.encoder_decoder_attention(x, y, mask=cross_attention_mask)
        y = self.dropout2(y)
        y = self.layer_norm2(y + _y)

        _y = y.clone()
        y = self.ffn(y)
        y = self.dropout3(y)
        y = self.layer_norm3(y + _y)
        return y


class SequentialDecoder(nn.Sequential):
    def forward(self, *inputs):
        x, y, self_attention_mask, cross_attention_mask = inputs
        for module in self._modules.values():
            y = module(x, y, self_attention_mask, cross_attention_mask)
        return y

class Decoder(nn.Module):
    def __init__(self,
                 d_model,
                 ffn_hidden,
                 num_heads,
                 drop_prob,
                 num_layers,
                 max_sequence_length,
                 language_to_index,
                 START_TOKEN,
                 END_TOKEN,
                 PADDING_TOKEN):
        super().__init__()
        self.sentence_embedding = SentenceEmbedding(max_sequence_length, d_model, language_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN)
        self.layers = SequentialDecoder(*[DecoderLayer(d_model, ffn_hidden, num_heads, drop_prob) for _ in range(num_layers)])

    def forward(self, x, y, self_attention_mask, cross_attention_mask, start_token, end_token):
        y = self.sentence_embedding(y, start_token, end_token)
        y = self.layers(x, y, self_attention_mask, cross_attention_mask)
        return y


class Transformer(nn.Module):
    def __init__(self,
                d_model,
                ffn_hidden,
                num_heads,
                drop_prob,
                num_layers,
                max_sequence_length,
                spn_vocab_size,
                english_to_index,
                spanish_to_index,
                START_TOKEN,
                END_TOKEN,
                PADDING_TOKEN
                ):
        super().__init__()
        self.encoder = Encoder(d_model, ffn_hidden, num_heads, drop_prob, num_layers, max_sequence_length, english_to_ind, START_TOKEN, END_TOKEN, PADDING_TOKEN)
        self.decoder = Decoder(d_model, ffn_hidden, num_heads, drop_prob,num_layers, max_sequence_length, spanish_to_ind, START_TOKEN, END_TOKEN, PADDING_TOKEN)
        self.linear = nn.Linear(d_model, spn_vocab_size)
        self.device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

    def forward(self,
                x,
                y,
                encoder_self_attention_mask=None,
                decoder_self_attention_mask=None,
                decoder_cross_attention_mask=None,
                enc_start_token=False,
                enc_end_token=False,
                dec_start_token=False, # We should make this true
                dec_end_token=False): # x, y are batch of sentences
        x = self.encoder(x, encoder_self_attention_mask, start_token=enc_start_token, end_token=enc_end_token)
        out = self.decoder(x, y, decoder_self_attention_mask, decoder_cross_attention_mask, start_token=dec_start_token, end_token=dec_end_token)
        out = self.linear(out)
        return out

In [51]:
class SentenceEmbedding(nn.Module):
    def __init__(self, max_sequence_length, d_model, language_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN):
        super().__init__()
        self.vocab_size = len(language_to_index)
        self.max_sequence_length = max_sequence_length
        self.embedding = nn.Embedding(self.vocab_size, d_model)
        self.language_to_index = language_to_index
        self.position_encoder = PositionalEncoding(d_model, max_sequence_length)
        self.dropout = nn.Dropout(p=0.1)
        self.START_TOKEN = START_TOKEN
        self.END_TOKEN = END_TOKEN
        self.PADDING_TOKEN = PADDING_TOKEN

    def batch_tokenize(self, batch, start_token, end_token):
        def tokenize(sentence, start_token, end_token):
            sentence_word_indices = [self.language_to_index.get(token, self.language_to_index[self.PADDING_TOKEN]) for token in sentence]
            if start_token:
                sentence_word_indices.insert(0, self.language_to_index[self.START_TOKEN])
            if end_token:
                sentence_word_indices.append(self.language_to_index[self.END_TOKEN])

            padding_needed = self.max_sequence_length - len(sentence_word_indices)
            if padding_needed > 0:
                sentence_word_indices.extend([self.language_to_index[self.PADDING_TOKEN]] * padding_needed)
            return sentence_word_indices[:self.max_sequence_length]

        tokenized = [tokenize(sentence, start_token, end_token) for sentence in batch]
        tokenized = torch.tensor(tokenized, dtype=torch.long)
        return tokenized

    def forward(self, x, start_token, end_token):
        x = self.batch_tokenize(x, start_token, end_token)
        x = self.embedding(x)
        pos = self.position_encoder(x)
        x = self.dropout(x + pos)
        return x


In [61]:
import os
os.environ['KAGGLE_CONFIG_DIR'] = '/content'
!echo '{"username":"naziahassan00042","key":"8eb8a0d58fcb6b7db2b45ae245435c81"}' > /content/kaggle.json
!chmod 600 /content/kaggle.json

import zipfile
import pandas as pd
import torch
import numpy as np
from torch.utils.data import Dataset, DataLoader

!kaggle datasets download -d lonnieqin/englishspanish-translation-dataset

with zipfile.ZipFile('/content/englishspanish-translation-dataset.zip', 'r') as zip_ref:
    zip_ref.extractall('/content/dataset')

df = pd.read_csv('/content/dataset/data.csv')

english_set = df.iloc[:, 0]  # First column for English
spanish_set = df.iloc[:, 1]  # Second column for Spanish

START_TOKEN = '<start>'
PADDING_TOKEN = '<pad>'
END_TOKEN = '<end>'

english_vocabulary = [START_TOKEN, ' ', '!', '"', '#', '$', '%', '&', "'", '(', ')', '*', '+', ',', '-', '.', '/',
                        '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
                        ':', '<', '=', '>', '?', '@',
                        'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L',
                        'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X',
                        'Y', 'Z',
                        '[', '\\', ']', '^', '_', '`',
                        'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l',
                        'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x',
                        'y', 'z',
                        '{', '|', '}', '~', PADDING_TOKEN, END_TOKEN]

spanish_vocabulary = [START_TOKEN, ' ', '!', '"', '#', '$', '%', '&', "'", '(', ')', '*', '+', ',', '-', '.', '/',
                      '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ':', '<', '=', '>', '?',
                      '¡', '¿',
                      'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L',
                      'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X',
                      'Y', 'Z',
                      'á', 'é', 'í', 'ó', 'ú', 'ü', 'Á', 'É', 'Í', 'Ó', 'Ú', 'Ü',
                      'ñ', 'Ñ',
                      '[', '\\', ']', '^', '_', '`',
                      'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l',
                      'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x',
                      'y', 'z',
                      '{', '|', '}', '~', PADDING_TOKEN, END_TOKEN]

ind_to_spanish = {k: v for k, v in enumerate(spanish_vocabulary)}
spanish_to_ind = {v: k for k, v in enumerate(spanish_vocabulary)}
ind_to_english = {k: v for k, v in enumerate(english_vocabulary)}
english_to_ind = {v: k for k, v in enumerate(english_vocabulary)}

english_sentences = english_set.tolist()
spanish_sentences = spanish_set.tolist()

max_sequence_length = 40


Dataset URL: https://www.kaggle.com/datasets/lonnieqin/englishspanish-translation-dataset
License(s): unknown
englishspanish-translation-dataset.zip: Skipping, found more recently modified local copy (use --force to force download)


In [53]:
# check for valid sentences
def isvalid(sentence, vocab):
    for token in list(set(sentence)):
        if token not in vocab:
            return False
    return True

def isvalidlength(sentence, max_sequence_length):
    return len(list(sentence)) < (max_sequence_length - 2)  # need to readd eos and start token

valid_indices = []
for index in range(len(english_sentences)):
    spanish_sentence, english_sentence = spanish_sentences[index], english_sentences[index]
    if isvalidlength(spanish_sentence, max_sequence_length) \
            and isvalidlength(english_sentence, max_sequence_length) \
            and isvalid(spanish_sentence, spanish_vocabulary) \
            and isvalid(english_sentence, english_vocabulary):
        valid_indices.append(index)

# Filter sentences based on valid indices
english_sentences = [english_sentences[i] for i in valid_indices]
spanish_sentences = [spanish_sentences[i] for i in valid_indices]

In [54]:
print("Sample English sentences:", english_sentences[:5])
print("Sample Spanish sentences:", spanish_sentences[:5])

max_sentences = 1500
english_sentences = english_sentences[:max_sentences]
spanish_sentences = spanish_sentences[:max_sentences]

assert len(english_sentences) == len(spanish_sentences), "mismatch"


Sample English sentences: ['Go.', 'Go.', 'Go.', 'Go.', 'Hi.']
Sample Spanish sentences: ['Ve.', 'Vete.', 'Vaya.', 'Váyase.', 'Hola.']


In [55]:
d_model = 512
batch_size = 15
ffn_hidden = 1024
num_heads = 8
drop_prob = 0.1
num_layers = 1
spn_vocab_size = len(spanish_vocabulary)

transformer = Transformer(d_model,
                          ffn_hidden,
                          num_heads,
                          drop_prob,
                          num_layers,
                          max_sequence_length,
                          spn_vocab_size,
                          english_to_ind,
                          spanish_to_ind,
                          START_TOKEN, END_TOKEN, PADDING_TOKEN)

sentence_embedding = SentenceEmbedding(max_sequence_length, d_model, english_to_ind , START_TOKEN, END_TOKEN, PADDING_TOKEN)


class TextDataset(Dataset):

    def __init__(self, english_sentences, spanish_sentences):
        self.english_sentences = english_sentences
        self.spanish_sentences = spanish_sentences

    def __len__(self):
        return len(self.english_sentences)

    def __getitem__(self, idx):
        return self.english_sentences[idx], self.spanish_sentences[idx]

criterion = nn.CrossEntropyLoss(ignore_index=spanish_to_ind[PADDING_TOKEN], reduction='none')

optim = torch.optim.Adam(transformer.parameters(), lr=0.001)
scheduler = StepLR(optim, step_size=1, gamma=0.9)  # Decay LR by 10% every epoch

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

dataset = TextDataset(english_sentences, spanish_sentences)
train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
# Move model to device
transformer.to(device)

clip_value = 1.0

# Initialize parameters
for params in transformer.parameters():
    if params.dim() > 1:
        nn.init.xavier_uniform_(params)


In [58]:
# Initialize lists for reference and candidate translations
references = []
candidates = []

iterator = iter(train_loader)


print(f"batch size: {batch_size}")

transformer.train()
transformer.to(device)
total_loss = 0
num_epochs = 20
train_accuracy = 0
final_accuracy = 0

for epoch in range(num_epochs):
    print(f"Epoch {epoch}")
    iterator = iter(train_loader)
    # Clear references and candidates for each epoch
    references.clear()
    candidates.clear()

    total_accuracy = 0  # Initialize total accuracy for the epoch
    total_batches = 0  # Initialize batch counter for the epoch

    for batch_num, batch in enumerate(iterator):
        similarity = 0
        transformer.train()
        eng_batch, spn_batch = batch

        mask = torch.full([max_sequence_length, max_sequence_length], float('-inf'))
        mask = torch.triu(mask, diagonal=1)
        encoder_self_attention_mask = mask
        decoder_self_attention_mask = mask
        decoder_cross_attention_mask = mask

        optim.zero_grad()

        # predictions
        spn_predictions = transformer(eng_batch,
                                      spn_batch,
                                      encoder_self_attention_mask.to(device),
                                      decoder_self_attention_mask.to(device),
                                      decoder_cross_attention_mask.to(device),
                                      enc_start_token=True,
                                      enc_end_token=True,
                                      dec_start_token=True,
                                      dec_end_token=True)

        # Calculate loss
        labels = transformer.decoder.sentence_embedding.batch_tokenize(spn_batch, start_token=False, end_token=True)
        loss = criterion(spn_predictions.view(-1, spn_vocab_size).to(device),
                         labels.view(-1).to(device))

        # Ignore padding
        valid_indices = torch.where(labels.view(-1) != spanish_to_ind[PADDING_TOKEN], True, False)
        loss = loss * valid_indices.float()

        # Compute the sum of valid losses
        valid_loss = loss.sum()

        valid_loss.backward()

        torch.nn.utils.clip_grad_norm_(transformer.parameters(), clip_value)

        optim.step()

        spn_sentence_predicted = torch.argmax(spn_predictions, axis=2)

        for i in range(len(spn_batch)):
            reference = spn_batch[i]  # Keep as string
            candidate = spn_sentence_predicted[i].tolist()  # Convert tensor to list of indices
            # Remove padding and end tokens
            candidate = [ind_to_spanish[idx] for idx in candidate if idx not in [spanish_to_ind[PADDING_TOKEN], spanish_to_ind[END_TOKEN]]]
            references.append(reference)
            candidates.append(candidate)

        for cand, ref in zip(candidates, references):
            similarity+= similar(ref, cand)

        similarity /= len(candidates)
        total_loss += valid_loss.item()
        batch_accuracy = similarity  # Similarity represents batch accuracy here
        total_accuracy += batch_accuracy  # Accumulate batch accuracy
        total_batches += 1  # Increment batch counter

        if batch_num % 50 == 0:
            print(f"Iteration {batch_num} : {valid_loss.item()}")
            print(f"English: {eng_batch[len(eng_batch)-1]}")
            print(f"Spanish Translation: {spn_batch[len(spn_batch)-1]}")
            predicted_sentence = ""
            for idx in spn_sentence_predicted[0]:
                if idx == spanish_to_ind[END_TOKEN]:
                    break
                predicted_sentence += ind_to_spanish[idx.item()]
            print(f"Spanish Prediction: {predicted_sentence}")
            print(f"Similarity: {similarity}")

    scheduler.step()

    # Compute average training accuracy for the epoch
    train_accuracy = total_accuracy / total_batches
    print(f"Train Accuracy for Epoch {epoch}: {train_accuracy}")

    final_accuracy += train_accuracy

print(f"Train Accuracy: {final_accuracy / num_epochs}")

batch size: 15
Epoch 0
Iteration 0 : 270.95709228515625
English: She walks.
Spanish Translation: Anda.
Spanish Prediction: Eom s esteró.
Similarity: 0.47458331579021235
Iteration 50 : 278.3660888671875
English: Let Tom go.
Spanish Translation: Deja ir a Tom.
Spanish Prediction: ¡íguelo.
Similarity: 0.4678786805359512
Train Accuracy for Epoch 0: 0.4650376446805105
Epoch 1
Iteration 0 : 262.16607666015625
English: Help!
Spanish Translation: ¡Auxilio!
Spanish Prediction: Eire a  a.
Similarity: 0.5731641343406049
Iteration 50 : 288.65997314453125
English: Go for it.
Spanish Translation: Ve a por ello.
Spanish Prediction: Eo pago.óeosa.
Similarity: 0.4986527031112982
Train Accuracy for Epoch 1: 0.5080774063625481
Epoch 2
Iteration 0 : 255.5123748779297
English: So long.
Spanish Translation: Hasta la vista.
Spanish Prediction: Eesuntianes.
Similarity: 0.5946393517143281
Iteration 50 : 336.3207702636719
English: Tom paused.
Spanish Translation: Tom se detuvo.
Spanish Prediction: Es eíeaoe
Sim

In [59]:
# Save the trained model
torch.save(transformer.state_dict(), 'transformer_model.pth')