<a href="https://colab.research.google.com/github/aakhterov/ML_projects/blob/master/machine_translation/machine_translation_with_lstm.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [25]:
from typing import List
import numpy as np
import tensorflow as tf
from pprint import pprint
from string import punctuation
from tensorflow.keras.layers import TextVectorization, Embedding, LSTM, Dense, Input
from tensorflow.keras.models import Model
from tensorflow.keras.losses import SparseCategoricalCrossentropy
from tensorflow.keras.utils import pad_sequences
from tensorflow.keras.callbacks import TensorBoard
from sklearn.model_selection import train_test_split

from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [26]:
UNKNOWN_TOKEN = '[UNK]'
START_TOKEN = '[START]'
END_TOKEN = '[END]'

In [27]:
class Vectorization:

  def __init__(self,
               max_tokens,
               max_length=None,
               unknown_token=UNKNOWN_TOKEN,
               start_token=START_TOKEN,
               end_token=END_TOKEN
               ):

    self.max_tokens = max_tokens
    self.max_length = max_length
    self.unknown_token = unknown_token
    self.start_token = start_token
    self.end_token=end_token
    self.vocabulary = ['', self.unknown_token, self.start_token, self.end_token]

  def __preprocessing(self, input: str) -> str:
    output = ''.join(map(lambda ch: ch if ch not in punctuation else ' ', input.lower())).strip()
    return output

  def token_to_text(self, tokens: List) -> str:
    words = [self.vocabulary[token] for token in tokens]
    return " ".join(words)

  def fit(self, X: List):
      lens = []
      for x in X:
        words = self.__preprocessing(x).split()
        lens.append(len(words))
        for word in words:
          token = word.strip()
          if token not in self.vocabulary and self.max_tokens is not None and len(self.vocabulary)<self.max_tokens:
            self.vocabulary.append(token)
      lens = np.array(lens)
      if self.max_length is None:
        self.max_length = int(np.mean(lens) + 2 * np.std(lens))
      return self

  def predict(self,
              X: List,
              is_padding=True,
              is_add_start_token=False,
              is_add_end_token=False
              ) -> List[List]:
    output = []
    for x in X:

      vector = [self.vocabulary.index(self.start_token)] if is_add_start_token else []

      for word in self.__preprocessing(x).split():
        token = word.strip()
        vector.append(self.vocabulary.index(token) if token in self.vocabulary else self.vocabulary.index(self.unknown_token))

      vector = vector[:self.max_length-1]
      if is_add_end_token:
        vector.append(self.vocabulary.index(self.end_token))

      output.append(vector)
    return pad_sequences(output,
                         maxlen=self.max_length,
                         padding='post',
                         truncating='post') if is_padding else output

In [28]:
input_phrases, output_phrases = [], []
with open('/content/drive/MyDrive/Colab Notebooks/Data/rus.txt') as f:
  for line in f.readlines()[:10_000]:
    x, y = line.split('CC-BY')[0].strip().split('\t')
    input_phrases.append(x)
    output_phrases.append(y)

In [29]:
input_vocab = 10_000
output_vocab = 10_000
# max_length = 30

In [30]:
encoder_vec = Vectorization(max_tokens=input_vocab)
encoder_vec.fit(input_phrases)
X_encoder = encoder_vec.predict(input_phrases)

decoder_vec = Vectorization(max_tokens=output_vocab)
decoder_vec.fit(output_phrases)
X_decoder = decoder_vec.predict(output_phrases, is_add_start_token=True, is_add_end_token=True)
Y_decoder = decoder_vec.predict(output_phrases, is_add_end_token=True)

In [31]:
idx = 1
print(f"Index: {idx}")
print("======= Encoder =======")
print(f"Input phrase: {input_phrases[idx]}")
print(f"Vector: {X_encoder[idx]}")
print("======= Decoder =======")
print(f"Input phrase: {output_phrases[idx]}")
print(f"Vector: {X_decoder[idx]}")
print(f"Output phrase: {output_phrases[idx]}")
print(f"Vector: {Y_decoder[idx]}")
print("==============")
print(f"Start phrase token index: {decoder_vec.vocabulary.index(START_TOKEN)}")
print(f"End phrase token index: {decoder_vec.vocabulary.index(END_TOKEN)}")

Index: 1
Input phrase: Go.
Vector: [4 0 0 0]
Input phrase: Иди.
Vector: [2 5 3 0]
Output phrase: Иди.
Vector: [5 3 0 0]
Start phrase token index: 2
End phrase token index: 3


In [65]:
# X_train, X_test, y_train, y_test  = train_test_split(np.array(list(zip(X_encoder, X_decoder))), np.array(Y_decoder), train_size = 0.8)

In [63]:
encoder_input = Input(shape=(None, ),
                      name='encoder_input')
encoder_emedding = Embedding(input_dim=input_vocab,
                             output_dim=64,
                             mask_zero=True,
                             name='encoder_embedding')
encoder_lstm = LSTM(units=128,
                    return_state=True,
                    name='encoder_lstm')

decoder_input = Input(shape=(None, ),
                      name='decoder_input')
decoder_initial_h_state = Input(shape=(None, ),
                                name='decoder_initial_h_state')
decoder_initial_c_state = Input(shape=(None, ),
                                name='decoder_initial_c_state')
decoder_emedding = Embedding(input_dim=output_vocab,
                             output_dim=64,
                             mask_zero=True,
                             name='decoder_embedding')
decoder_lstm= LSTM(units=128,
                   return_sequences=True,
                   return_state=True,
                   name='decoder_lstm')
decoder_dense = Dense(units=output_vocab,
                      activation='softmax',
                      name='decoder_output')

In [64]:
def encoder_net(encoder_input):
  out = encoder_emedding(encoder_input)
  _, h, c = encoder_lstm(out)
  return h, c

def decoder_net(decoder_input, decoder_initial_state):
  out = decoder_emedding(decoder_input)
  out, _, _ = decoder_lstm(out, initial_state=decoder_initial_state)
  out = decoder_dense(out)
  return out

In [65]:
encoder_state = encoder_net(encoder_input=encoder_input)

decoder_connected_to_encoder_output = decoder_net(decoder_input=decoder_input,
                                                  decoder_initial_state=encoder_state)

In [86]:
model_train = Model(inputs=[encoder_input, decoder_input], outputs=decoder_connected_to_encoder_output)
model_train.summary()

Model: "model_8"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 encoder_input (InputLayer)  [(None, None)]               0         []                            
                                                                                                  
 decoder_input (InputLayer)  [(None, None)]               0         []                            
                                                                                                  
 encoder_embedding (Embeddi  (None, None, 64)             640000    ['encoder_input[0][0]']       
 ng)                                                                                              
                                                                                                  
 decoder_embedding (Embeddi  (None, None, 64)             640000    ['decoder_input[0][0]'] 

In [87]:
model_train.compile(
    loss=SparseCategoricalCrossentropy(),
    optimizer="adam",
    metrics=["accuracy"],
)

In [88]:
x_data = {
    'encoder_input': X_encoder,
    'decoder_input': X_decoder
}

y_data = {
    'decoder_output': Y_decoder
}

In [89]:
history = model_train.fit(
    x = x_data,
    y = y_data,
    validation_split = 0.2,
    batch_size=64,
    epochs=50
)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


In [90]:
model_train.save_weights('/content/drive/MyDrive/Colab Notebooks/Data/machine_translation_encoder_decoder_weights.h5')

In [91]:
decoder_output = decoder_net(decoder_input=decoder_input,
                             decoder_initial_state=[decoder_initial_h_state, decoder_initial_c_state])

In [92]:
model_encoder_prediction = Model(inputs=encoder_input, outputs=encoder_state)
model_encoder_prediction.summary()

Model: "model_9"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 encoder_input (InputLayer)  [(None, None)]            0         
                                                                 
 encoder_embedding (Embeddi  (None, None, 64)          640000    
 ng)                                                             
                                                                 
 encoder_lstm (LSTM)         [(None, 128),             98816     
                              (None, 128),                       
                              (None, 128)]                       
                                                                 
Total params: 738816 (2.82 MB)
Trainable params: 738816 (2.82 MB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [93]:
model_decoder_prediction = Model(inputs=[decoder_input, decoder_initial_h_state, decoder_initial_c_state],
                                 outputs=decoder_output)
model_decoder_prediction.summary()

Model: "model_10"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 decoder_input (InputLayer)  [(None, None)]               0         []                            
                                                                                                  
 decoder_embedding (Embeddi  (None, None, 64)             640000    ['decoder_input[0][0]']       
 ng)                                                                                              
                                                                                                  
 decoder_initial_h_state (I  [(None, None)]               0         []                            
 nputLayer)                                                                                       
                                                                                           

In [106]:
def predict(text: str) -> str:
  tokens = encoder_vec.predict([text])
  max_length = encoder_vec.max_length

  encoder_state = model_encoder_prediction.predict(tokens)

  decoder_input = np.zeros((1, max_length), dtype=np.int16)
  current_token = decoder_vec.vocabulary.index(START_TOKEN)
  count_tokens = 0
  output = []

  while current_token != decoder_vec.vocabulary.index(END_TOKEN) and count_tokens < max_length:
    decoder_input[0, count_tokens] = current_token
    x_data = {
        'decoder_input': decoder_input,
        'decoder_initial_h_state': encoder_state[0],
        'decoder_initial_c_state': encoder_state[1]
    }
    decoder_output = model_decoder_prediction.predict(x_data)
    current_token = np.argmax(decoder_output[0, count_tokens, :])
    count_tokens += 1

    word = decoder_vec.vocabulary[current_token]

    output.append(word)

  return ' '.join(output[:-1])

In [110]:
predict('i ran')



'я бежала'