# Neural Machine Translation with Bidirectional Encoder-Decoder LSTMs:

In [133]:
from tensorflow import keras
from keras.layers import LSTM, Dense, Input
from keras.utils import to_categorical as one_hot
import numpy as np

Using TensorFlow backend.


In [134]:
dataset = [
    ["I love you", "je t'aime"],
    ["What is your name ?", "Comment tu t'appele ?"],
    ["How old are you", "Quel age as tu"],
    ["why is the world so dumb", "pourquoi le monde est si idiot"],
    ["I found the dog", "j'ai trouve le chien"],
    ["He died", "Il est mort"],
    ["I kissed him", "Je l'ai embrasse"],
    ["New-York is a beautiful city", "New-York est une belle ville"],
    ["what are you doing", "Que ce que tu fait"],
    ["I lost hope", "J'ai perdu l'espoir"],
    ["Donald Trump is the worst president", "Donald Trump est le pire president"],
    ["Do you feel bad", "Tu te sens mauvais"],
    ["I found an apple at my desk", "J'ai trouve une pomme sur mon bureau"],
    ["You broke the mug", "T'as casse le mug"],
    ["Do you hate me ?", "Tu me deteste ?"]
]

# Data Preprocessing:
---
Before we jump in to our model, a preprocessing part is a must.

## 1. Similar Words Tokenization:
There's no gain in having 3 word vectors for New-York, Havana and Tokyo, they're all city names, the same for America, Germany and Russia being country names. The dataset would be huge if we include training exemples for each city name and our model will learn better if all he knows about those words is their category appartenance, this way, even if we do not have a training exemple for "I went to Tokyo", our model having already been trained on "I went to Paris" will know how to translate the former.
## 2. Sequence Begin & End Tokens:
The Sequence Begin token and the Encoding vector are fed to the decoder to trigger translation, the end sequence token tells us when the translation is over
## 3. Tokenization and Vocab Dictionnary
A vocab dictionnary mapping each word to its woken and another mapping each token to its word are needed to transform our training data into a valid input and output

In [135]:
custom_words = {
    'new-york': '<city>',
    'tokyo': '<city>',
    'paris': '<city>',
    'donald': '<person>',
    'trump': '<person>',
    'hakim': '<person>',
    'andrew': '<person>',
    'france': '<country>',
    'mexico': '<country>',
}

Next, we'll define some helper functions to de/tokenize words and create the vocabs:

In [223]:
class TranslationVocab:
    def __init__(self):
        self.ov = self.generate_empty_vocab() # original_vocab
        self.tv = self.generate_empty_vocab() # target_vocab
        self.ov_inv = {}
        self.tv_inv = {}
        self.ov_size = len(self.ov)
        self.tv_size = len(self.tv)
        self.o_max = 0 # original phrases data maximum sequence
        self.t_max = 0 # target phrases data maximum sequence

    def load_from_dataset(self, dataset):
        # dataset is a list of lists, each inner list holds 2 strings ['original', 'translation']
        for sample in dataset:
            [original, target] = sample
            o_splitted = original.lower().strip().split(' ')
            t_splitted = target.lower().strip().split(' ')
            
            # loop over original phrase
            for word in o_splitted:
                self.insert_word_to_vocab(self.ov, word, self.ov_size)

            # loop over target phrase
            for word in t_splitted:
                self.insert_word_to_vocab(self.tv, word, self.tv_size, is_target = True)
                
            # update max-lengths
            self.o_max = max(len(o_splitted), self.o_max)
            self.t_max = max(len(t_splitted) + 2, self.t_max) # +2 for <start> & <end> tokens
            
        # assign inverse map dicos
        self.ov_inv = {v:k for k,v in self.ov.items()}
        self.tv_inv = {v:k for k,v in self.tv.items()}
        return self
            
    def get_token_of(self, word, is_target = False): # get orginal-vocab token of
        vocab = self.ov
        if is_target:
            vocab = self.tv
        if word in vocab:
            return vocab[word]
        if word in custom_words:
            return vocab[ custom_words[word] ]
        return vocab['<unknown>']

    def tokenize_phrase(self, phrase, is_target = False, transformation = lambda x : '<start> ' + x + ' <end>'):
        phrase = phrase.lower().strip()
        if transformation is not None:
            phrase = transformation(phrase)
        tokenized = []
        for word in phrase.split(' '):
            tokenized.append(self.get_token_of(word, is_target))
        # pad to max_len
        max_len = max(self.o_max, self.t_max)
#         if is_target:
#             max_len = self.t_max
        padding = [self.ov['<pad>']] * (max_len - len(tokenized))
        if len(padding) > 0:
            return tokenized + padding
        return tokenized
                
    def tokenize_dataset(self, dataset): # return numpy arrays
        X = []
        y = []
        for sample in dataset:
            [original, target] = sample
            X.append(np.array(self.tokenize_phrase(original, transformation = None), dtype=np.int32))
            y.append(np.array(self.tokenize_phrase(target, is_target = True), dtype=np.int32))
        return np.array(X), np.array(y)
            
            
    def insert_word_to_vocab(self, vocab, word, index, is_target = False):
        if not word in vocab:
            if word not in custom_words:
                vocab[word] = index
                if is_target:
                    self.tv_size += 1
                else:
                    self.ov_size += 1
    
    @staticmethod
    def generate_empty_vocab(): # later to be loaded from a file
        return {
            '<pad>': 0,
            '<city>': 1,
            '<country>': 2,
            '<person>': 3,
            '<start>': 4,
            '<end>': 5,
            '<uknown>': 6,
        }

translation_vocab = TranslationVocab().load_from_dataset(dataset)
print(translation_vocab.ov)
print('---')
print(translation_vocab.tv)
print('---')
print(translation_vocab.get_token_of('?', True))            
print('---')
print(translation_vocab.tokenize_phrase('I love Paris', is_target = False, transformation = None))

{'<pad>': 0, '<city>': 1, '<country>': 2, '<person>': 3, '<start>': 4, '<end>': 5, '<uknown>': 6, 'i': 7, 'love': 8, 'you': 9, 'what': 10, 'is': 11, 'your': 12, 'name': 13, '?': 14, 'how': 15, 'old': 16, 'are': 17, 'why': 18, 'the': 19, 'world': 20, 'so': 21, 'dumb': 22, 'found': 23, 'dog': 24, 'he': 25, 'died': 26, 'kissed': 27, 'him': 28, 'a': 29, 'beautiful': 30, 'city': 31, 'doing': 32, 'lost': 33, 'hope': 34, 'worst': 35, 'president': 36, 'do': 37, 'feel': 38, 'bad': 39, 'an': 40, 'apple': 41, 'at': 42, 'my': 43, 'desk': 44, 'broke': 45, 'mug': 46, 'hate': 47, 'me': 48}
---
{'<pad>': 0, '<city>': 1, '<country>': 2, '<person>': 3, '<start>': 4, '<end>': 5, '<uknown>': 6, 'je': 7, "t'aime": 8, 'comment': 9, 'tu': 10, "t'appele": 11, '?': 12, 'quel': 13, 'age': 14, 'as': 15, 'pourquoi': 16, 'le': 17, 'monde': 18, 'est': 19, 'si': 20, 'idiot': 21, "j'ai": 22, 'trouve': 23, 'chien': 24, 'il': 25, 'mort': 26, "l'ai": 27, 'embrasse': 28, 'une': 29, 'belle': 30, 'ville': 31, 'que': 32, 'c

In [224]:
X, decoder_inputs = translation_vocab.tokenize_dataset(dataset)
y = decoder_inputs.copy()[:, 1:]
y = np.insert(y, -1, [translation_vocab.ov['<pad>']], axis=1)
X, y, decoder_inputs = one_hot(X, num_classes=translation_vocab.ov_size), one_hot(y, num_classes=translation_vocab.tv_size), one_hot(decoder_inputs, num_classes=translation_vocab.tv_size)
# X, y = np.expand_dims(X, 2), np.expand_dims(y, 2) # to get a n_samples x Sequence-Length x 1 shape
print(f'training input shape = {X.shape}')
print(f'training output shape = {y.shape}')
print(f'decoder input shape = {decoder_inputs.shape}')

training input shape = (15, 9, 49)
training output shape = (15, 9, 51)
decoder input shape = (15, 9, 51)


# Encoder-Decoder Model

In [174]:
def seq2seq_model(n_hidden):
    encoder_inputs = keras.layers.Input(shape=(None, 49))
    _, h, c = keras.layers.LSTM(n_hidden, return_state=True, name='encoder_lstm')(encoder_inputs)
    
    decoder_inputs = keras.layers.Input(shape=(None, 51))
    decoder_outputs, _, _ = keras.layers.LSTM(n_hidden, return_state = True, return_sequences=True, name='decoder_lstm')(decoder_inputs, initial_state=[h, c])
    outputs = keras.layers.Dense(translation_vocab.tv_size, activation='softmax', name='decoder_dense')(decoder_outputs)
    
    model = keras.models.Model([encoder_inputs, decoder_inputs], outputs)
    return model

In [175]:
model = seq2seq_model(32)
model.summary()

Model: "model_16"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_50 (InputLayer)           [(None, None, 49)]   0                                            
__________________________________________________________________________________________________
input_51 (InputLayer)           [(None, None, 51)]   0                                            
__________________________________________________________________________________________________
encoder_lstm (LSTM)             [(None, 32), (None,  10496       input_50[0][0]                   
__________________________________________________________________________________________________
decoder_lstm (LSTM)             [(None, None, 32), ( 10752       input_51[0][0]                   
                                                                 encoder_lstm[0][1]        

In [176]:
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['acc'])

In [181]:
model.fit([X, decoder_inputs], y, verbose=1, epochs=5)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<tensorflow.python.keras.callbacks.History at 0x7f4b3b3d7df0>

In [244]:
enc_in = keras.layers.Input(shape=(None, 49))
_, state_h, state_c = keras.layers.LSTM(32, return_state=True, name='enc_lstm')(enc_in)
encoder = keras.models.Model(enc_in, [state_h, state_c])
encoder.get_layer('enc_lstm').set_weights(model.get_layer('encoder_lstm').get_weights())

def encode_phrase(phrase):
    tokenized = translation_vocab.tokenize_phrase(phrase)
    tokenized = np.array(tokenized)
    tokenized = one_hot(tokenized, num_classes=translation_vocab.ov_size)
    tokenized = np.expand_dims(tokenized, axis=0)
    hidden, cell = encoder.predict(tokenized)
    return hidden, cell

h, c = encode_phrase('I love you')
print(h.shape)
print(c.shape)

(1, 32)
(1, 32)


In [253]:
dec_in = keras.layers.Input(shape=(None, 51))
input_h, input_c = keras.layers.Input(shape=(32, )), keras.layers.Input(shape=(32, ))

decoder_outputs, state_h, state_c = keras.layers.LSTM(32, return_state = True, name='dec_lstm')(dec_in, initial_state=[input_h, input_c])
outputs = keras.layers.Dense(translation_vocab.tv_size, activation='softmax', name='dec_dense')(decoder_outputs)
decoder = keras.models.Model([dec_in, input_h, input_c], [outputs, state_h, state_c])

# set weights
decoder.get_layer('dec_lstm').set_weights(model.get_layer('decoder_lstm').get_weights())
decoder.get_layer('dec_dense').set_weights(model.get_layer('decoder_dense').get_weights())

def decode_states(h, c, in_token = '<start>'):
    start_token = [translation_vocab.tv[in_token]]
    start_token = np.array(start_token)
    start_token = one_hot(start_token, num_classes=translation_vocab.tv_size)
    start_token = np.expand_dims(start_token, axis=0)
    out_word, h, c = decoder.predict([start_token, h, c])
    out_word = translation_vocab.tv_inv[out_word.argmax()]
    if out_word in ['<pad>', '<end>']:
        return ''
    return out_word + ' ' + decode_states(h, c, out_word)

In [255]:
def translate(english_phrase):
    h, c = encode_phrase(english_phrase)
    return decode_states(h, c, in_token = '<start>')

In [266]:
print(translate('I love you'))
print(translate('He died I love you'))

je t'aime 
je t'aime 
