In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import re

# Tokenizers
def tokenize_en(text):
    return re.findall(r'\w+|[^\w\s]', text.lower())

def tokenize_ja(text):
    return list(text)

In [2]:
sentence_pairs = [
    ("Hello, how are you?", "こんにちは、お元気ですか？"),
    ("What is your name?", "あなたの名前は何ですか？"),
    ("I like to eat sushi.", "私は寿司を食べるのが好きです。"),
    ("The weather is very nice today.", "今日はとても良い天気です。"),
    ("Thank you for your help.", "助けてくれてありがとうございます。"),
]

In [3]:
english_sentences = [pair[0] for pair in sentence_pairs]
japanese_sentences = [pair[1] for pair in sentence_pairs]

def build_vocab(sentences, tokenizer, min_freq=1):
    word_counts = {}
    for sentence in sentences:
        tokens = tokenizer(sentence)
        for token in tokens:
            word_counts[token] = word_counts.get(token, 0) + 1

    vocab = {'<PAD>': 0, '<SOS>': 1, '<EOS>': 2, '<UNK>': 3}
    next_index = len(vocab)
    for token, count in word_counts.items():
        if count >= min_freq:
            vocab[token] = next_index
            next_index += 1
    return vocab

en_vocab = build_vocab(english_sentences, tokenize_en)
ja_vocab = build_vocab(japanese_sentences, tokenize_ja)

In [4]:
def numericalize(sentence, vocab, tokenizer):
    return [vocab.get(token, vocab['<UNK>']) for token in tokenizer(sentence)]

en_numerical_sentences = [[en_vocab['<SOS>']] + numericalize(en, en_vocab, tokenize_en) + [en_vocab['<EOS>']] for en, _ in sentence_pairs]
ja_numerical_sentences = [[ja_vocab['<SOS>']] + numericalize(ja, ja_vocab, tokenize_ja) + [ja_vocab['<EOS>']] for _, ja in sentence_pairs]

# Padding
def pad_sequences(sequences, pad_idx):
    max_len = max(len(seq) for seq in sequences)
    padded_sequences = [seq + [pad_idx] * (max_len - len(seq)) for seq in sequences]
    return torch.tensor(padded_sequences)

en_padded = pad_sequences(en_numerical_sentences, en_vocab['<PAD>'])
ja_padded = pad_sequences(ja_numerical_sentences, ja_vocab['<PAD>'])

In [5]:
class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hid_dim, dropout):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.rnn = nn.GRU(emb_dim, hid_dim, batch_first=True)  # batch_first=True
        self.dropout = nn.Dropout(dropout)

    def forward(self, src):
        embedded = self.dropout(self.embedding(src))
        _, hidden = self.rnn(embedded)
        return hidden.transpose(0,1) # Transpose to (batch_size, num_layers * num_directions, hidden_size)

class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hid_dim, dropout):
        super().__init__()
        self.output_dim = output_dim
        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.rnn = nn.GRU(emb_dim, hid_dim, batch_first=True)  # batch_first=True
        self.fc_out = nn.Linear(hid_dim, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, input, hidden):
        input = input.unsqueeze(1) # unsqueeze to (batch_size, 1)
        embedded = self.dropout(self.embedding(input))
        output, hidden = self.rnn(embedded, hidden.transpose(0,1)) # Transpose hidden to (num_layers * num_directions, batch_size, hidden_size)
        prediction = self.fc_out(output.squeeze(1))
        return prediction, hidden.transpose(0,1) # Transpose hidden back to (batch_size, num_layers * num_directions, hidden_size)

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, source_sequence, target_sequence, teacher_forcing_ratio=0.5):
        batch_size = target_sequence.shape[0]  # Number of sentences in the batch
        target_sequence_length = target_sequence.shape[1]  # Length of the target sequence
        target_vocabulary_size = self.decoder.output_dim  # Size of the Japanese vocabulary

        # Tensor to store the decoder's output predictions
        decoder_outputs = torch.zeros(batch_size, target_sequence_length, target_vocabulary_size).to(source_sequence.device)

        # Get the final hidden state from the encoder
        encoder_final_hidden = self.encoder(source_sequence)

        # Initialize the decoder's input with the <SOS> token (start of sequence)
        decoder_input = target_sequence[:, 0]  # First token of each sentence in the batch

        # Iterate through the target sequence, generating one token at each time step
        for time_step in range(1, target_sequence_length):
            # Pass the current decoder input and hidden state to the decoder
            decoder_prediction, decoder_hidden = self.decoder(decoder_input, encoder_final_hidden)

            # Store the decoder's prediction (probabilities for the next token)
            decoder_outputs[:, time_step] = decoder_prediction

            # Decide whether to use teacher forcing or not
            use_teacher_forcing = torch.rand(1).item() < teacher_forcing_ratio

            # Get the predicted token ID (the token with the highest probability)
            predicted_token = decoder_prediction.argmax(1)

            # Choose the next input for the decoder
            if use_teacher_forcing:
                # Use the actual target token from the target sequence (teacher forcing)
                decoder_input = target_sequence[:, time_step]
            else:
                # Use the predicted token from the decoder
                decoder_input = predicted_token

            # Update the hidden state for the next time step
            encoder_final_hidden = decoder_hidden

        return decoder_outputs

In [6]:
#without using teacher forcing 
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, source_sequence, target_sequence):
        batch_size = target_sequence.shape[0]
        target_sequence_length = target_sequence.shape[1]
        target_vocabulary_size = self.decoder.output_dim

        decoder_outputs = torch.zeros(batch_size, target_sequence_length, target_vocabulary_size).to(source_sequence.device)

        encoder_final_hidden = self.encoder(source_sequence)

        decoder_input = target_sequence[:, 0]

        for time_step in range(1, target_sequence_length):
            decoder_prediction, decoder_hidden = self.decoder(decoder_input, encoder_final_hidden)

            decoder_outputs[:, time_step] = decoder_prediction

            # Always use teacher forcing: use the actual target token
            decoder_input = target_sequence[:, time_step]

            encoder_final_hidden = decoder_hidden

        return decoder_outputs

In [7]:
INPUT_DIM = len(en_vocab)
OUTPUT_DIM = len(ja_vocab)
EMB_DIM = 256
HID_DIM = 512
DROPOUT = 0.5

enc = Encoder(INPUT_DIM, EMB_DIM, HID_DIM, DROPOUT)
dec = Decoder(OUTPUT_DIM, EMB_DIM, HID_DIM, DROPOUT)
model = Seq2Seq(enc, dec).to('cpu')

# Optimizer and Loss
optimizer = optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss(ignore_index=ja_vocab['<PAD>'])

In [8]:
def train(model, src, trg, optimizer, criterion, clip):
    model.train()
    optimizer.zero_grad()
    output = model(src, trg)
    output_dim = output.shape[-1]
    output = output[:, 1:].reshape(-1, output_dim)
    trg = trg[:, 1:].reshape(-1)
    loss = criterion(output, trg)
    loss.backward()
    torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
    optimizer.step()
    return loss.item()

N_EPOCHS = 100
CLIP = 1

In [9]:
for epoch in range(N_EPOCHS):
    loss = train(model, en_padded, ja_padded, optimizer, criterion, CLIP)
    if (epoch + 1) % 10 == 0:
      print(f'Epoch: {epoch+1:02} | Train Loss: {loss:.3f}')

Epoch: 10 | Train Loss: 1.863
Epoch: 20 | Train Loss: 0.306
Epoch: 30 | Train Loss: 0.049
Epoch: 40 | Train Loss: 0.016
Epoch: 50 | Train Loss: 0.008
Epoch: 60 | Train Loss: 0.006
Epoch: 70 | Train Loss: 0.005
Epoch: 80 | Train Loss: 0.004
Epoch: 90 | Train Loss: 0.003
Epoch: 100 | Train Loss: 0.003


In [10]:
def translate(model, sentence, en_vocab, ja_vocab, max_len=50):
    model.eval()
    tokens = [en_vocab['<SOS>']] + numericalize(sentence, en_vocab, tokenize_en) + [en_vocab['<EOS>']]
    src = torch.LongTensor(tokens).unsqueeze(0).to('cpu') # unsqueeze to (1, seq_len)
    with torch.no_grad():
        hidden = model.encoder(src)
    trg_index = [ja_vocab['<SOS>']]
    for _ in range(max_len):
        trg_tensor = torch.LongTensor([trg_index[-1]]).to('cpu')
        with torch.no_grad():
            output, hidden = model.decoder(trg_tensor, hidden)
        pred_token = output.argmax(1).item()
        trg_index.append(pred_token)
        if pred_token == ja_vocab['<EOS>']:
            break
    ja_tokens = [k for k, v in ja_vocab.items() if v in trg_index]
    return ''.join(ja_tokens[1:-1])

In [11]:
test_sentence = "What is your name?"
translated_sentence = translate(model, test_sentence, en_vocab, ja_vocab)
print(f"English: {test_sentence}")
print(f"Japanese: {translated_sentence}")

English: What is your name?
Japanese: <EOS>はですか？あなたの名前
