In [6]:
import string
import math
import re
from pickle import dump,load
from unicodedata import normalize
from numpy import array,argmax
from numpy.random import rand,shuffle
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras.utils import to_categorical
from keras.utils.vis_utils import plot_model
from keras.models import Sequential,load_model
from keras.layers import LSTM,Dense,Embedding,RepeatVector,TimeDistributed
from keras.callbacks import ModelCheckpoint,EarlyStopping
from nltk.translate.bleu_score import corpus_bleu,SmoothingFunction
from keras import backend as K
K.tensorflow_backend._get_available_gpus()

[]

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [0]:
class Preprocessing:
    def __init__(self, filename='translation_english.txt'):
        self.document = self.load_doc(filename)
        self.clean_document = ''

    def load_doc(self, filename):
        file = open(filename, mode='rt', encoding='utf-8')
        text = file.read()
        file.close()
        return text

    def clean_pairs(self, lines):
        cleaned = list()
        re_print = re.compile('[^%s]' % re.escape(string.printable))
        table = str.maketrans('', '', string.punctuation)
        for pair in lines:
            clean_pair = list()
            for line in pair:
                line = normalize('NFD', line).encode('ascii', 'ignore')
                line = line.decode('UTF-8')
                line = line.lower().split()
                line = [word.translate(table) for word in line]
                line = [re_print.sub('', word) for word in line]
                line = [word for word in line if word.isalpha()]
                clean_pair.append(' '.join(line))
            cleaned.append(clean_pair)
        self.clean_document = array(cleaned)
        return array(cleaned)

    def get_clean_pairs(self):
        return self.clean_document

    def to_pairs(self):
        lines = self.document.strip().split('\n')
        pairs = [line.split('\t') for line in  lines]
        return pairs

    def save_clean_data(self, document, filename='english-spanish.pkl'):
        dump(document, open(filename, 'wb'))
        print('Saved: %s' % filename)

    def preprocess(self):
        self.save_clean_data(
            self.clean_pairs(
                self.to_pairs()
            )
        )

In [8]:
preprocess = Preprocessing()
preprocess.preprocess()
for i in range(900,1000):
	print('[%s] => [%s]' % (preprocess.get_clean_pairs()[i,0], preprocess.get_clean_pairs()[i,1]))

Saved: english-spanish.pkl
[be careful] => [ten cuidado]
[be careful] => [se cuidadoso]
[be content] => [estate contento]
[be on time] => [llega a tiempo]
[be on time] => [llegue a tiempo]
[be patient] => [sea paciente]
[be serious] => [se serio]
[birds sing] => [los pajaros cantan]
[birds sing] => [los pajaros estan cantando]
[bring food] => [traed comida]
[bring help] => [traed ayuda]
[bring wine] => [trae vino]
[can i come] => [puedo ir]
[can i come] => [puedo venir]
[can i come] => [puedo acercarme]
[can i help] => [puedo ayudar]
[can i stay] => [me puedo quedar]
[carry this] => [lleva esto]
[check that] => [comprobad eso]
[check this] => [comprueba esto]
[choose one] => [escoge uno]
[come again] => [vuelve otra vez]
[come alone] => [ven solo]
[come along] => [vente]
[come along] => [venganse]
[come early] => [veni temprano]
[come early] => [ven temprano]
[come early] => [vengan temprano]
[come early] => [venga temprano]
[come on in] => [pasale]
[come on in] => [pasele]
[come on in

In [9]:
class TrainMachineTranslation:
    def __init__(self, file_dataset = 'english-spanish.pkl'):
        self.dataset = self.load_data(file_dataset)
        self.eng_vocab_size = 0
        self.eng_length = 0
        self.spain_vocab_size = 0
        self.spain_length = 0
        self.train = ''
        self.test = ''
        self.trainX = ''
        self.trainY = ''
        self.testX = ''
        self.testY = ''
        self.main_train()

    def split_dataset(self):
        raw_dataset = self.dataset
        shuffle(raw_dataset)
        new_dataset = []
        for i in raw_dataset:
            if len(i[1].split(" ")) <= 4 and len(i[0].split(" ")) <= 4:
                new_dataset.append([i[0],i[1]])
        new_dataset = array(new_dataset)
        n_sentences = len(new_dataset)
        self.dataset = new_dataset[:n_sentences, :]
        shuffle(self.dataset)
        split = math.floor(len(self.dataset) - (len(self.dataset)*0.2))
        self.train, self.test = self.dataset[:split], self.dataset[split:]

    def load_data(self, filename):
	      return load(open(filename, 'rb'))

    def save_clean_data(self, filename_dataset = 'english-spanish-both.pkl', filename_test = 'english-spanish-test.pkl', filename_train = 'english-spanish-train.pkl'):
        dump(self.dataset, open(filename_dataset, 'wb'))
        dump(self.train, open(filename_train, 'wb'))
        dump(self.test, open(filename_test, 'wb'))
        print('Saved: ', filename_dataset, filename_train, filename_test)

    def create_tokenizer(self, lines):
        tokenizer = Tokenizer()
        tokenizer.fit_on_texts(lines)
        return tokenizer

    def max_length(self, lines):
        return max(len(line.split()) for line in lines)

    def encode_sequences(self, tokenizer, length, lines):
        X = tokenizer.texts_to_sequences(lines)
        X = pad_sequences(X, maxlen=length, padding='post')
        return X
 
    def encode_output(self, sequences, vocab_size):
        ylist = [to_categorical(sequence, num_classes=vocab_size) for sequence in sequences]
        y = array(ylist).reshape(sequences.shape[0], sequences.shape[1], vocab_size)
        return y

    def define_model(self, src_vocab, tar_vocab, src_timesteps, tar_timesteps, n_units):
        model = Sequential()
        model.add(Embedding(src_vocab, n_units, input_length=src_timesteps, mask_zero=True))
        model.add(LSTM(n_units))
        model.add(RepeatVector(tar_timesteps))
        model.add(LSTM(n_units, return_sequences=True))
        model.add(TimeDistributed(Dense(tar_vocab, activation='softmax')))
        return model

    def prepare_data(self):
        eng_tokenizer = self.create_tokenizer(self.dataset[:, 0])
        self.eng_vocab_size = len(eng_tokenizer.word_index) + 1
        self.eng_length = self.max_length(self.dataset[:, 0])
        print('English Vocabulary Size: %d' % self.eng_vocab_size)
        print('English Max Length: %d' % (self.eng_length))
        spain_tokenizer = self.create_tokenizer(self.dataset[:, 1])
        self.spain_vocab_size = len(spain_tokenizer.word_index) + 1
        self.spain_length = self.max_length(self.dataset[:, 1])
        print('Spain Vocabulary Size: %d' % self.spain_vocab_size)
        print('Spain Max Length: %d' % (self.spain_length))
        self.trainX = self.encode_sequences(spain_tokenizer, self.spain_length, self.train[:, 1])
        trainY = self.encode_sequences(eng_tokenizer, self.eng_length, self.train[:, 0])
        self.trainY = self.encode_output(trainY, self.eng_vocab_size)
        self.testX = self.encode_sequences(spain_tokenizer, self.spain_length, self.test[:, 1])
        testY = self.encode_sequences(eng_tokenizer, self.eng_length, self.test[:, 0])
        self.testY = self.encode_output(testY, self.eng_vocab_size)

    def train_model(self, filename_model='model_translation.h5'):
        model = self.define_model(self.spain_vocab_size, self.eng_vocab_size, self.spain_length, self.eng_length,128)
        model.compile(optimizer='adam', loss='categorical_crossentropy')
        print(model.summary())
        checkpoint = ModelCheckpoint(filename_model, monitor='val_loss', verbose=1, save_best_only=True, mode='min')
        monitor = EarlyStopping(monitor='val_loss', min_delta=1e-3, patience=5, verbose=1, mode='auto')
        model.fit(self.trainX, self.trainY, epochs=50, batch_size=64, validation_data=(self.testX, self.testY), callbacks=[checkpoint,monitor], verbose=2)

    def main_train(self):
        self.split_dataset()
        self.save_clean_data()
        self.prepare_data()
        self.train_model()

train = TrainMachineTranslation()

Saved:  english-spanish-both.pkl english-spanish-train.pkl english-spanish-test.pkl
English Vocabulary Size: 5240
English Max Length: 4
Spain Vocabulary Size: 9116
Spain Max Length: 4
Model: "sequential_2"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
embedding_2 (Embedding)      (None, 4, 128)            1166848   
_________________________________________________________________
lstm_3 (LSTM)                (None, 128)               131584    
_________________________________________________________________
repeat_vector_2 (RepeatVecto (None, 4, 128)            0         
_________________________________________________________________
lstm_4 (LSTM)                (None, 4, 128)            131584    
_________________________________________________________________
time_distributed_2 (TimeDist (None, 4, 5240)           675960    
Total params: 2,105,976
Trainable params: 2,105,976
Non-trainable pa

In [18]:
class EvaluateMachineTranslation:
    def __init__(self, file_dataset='english-spanish-both.pkl', file_train='english-spanish-train.pkl', file_test='english-spanish-test.pkl'):
        self.dataset = self.load_data(file_dataset)
        self.train = self.load_data(file_train)
        self.test = self.load_data(file_test)
        self.eng_tokenizer = ''
        self.trainX = ''
        self.testX = ''
        self.main_evaluate()

    def load_data(self, filename):
        return load(open(filename, 'rb'))

    def create_tokenizer(self, lines):
        tokenizer = Tokenizer()
        tokenizer.fit_on_texts(lines)
        return tokenizer

    def max_length(self, lines):
        return max(len(line.split()) for line in lines)

    def encode_sequences(self, tokenizer, length, lines):
        X = tokenizer.texts_to_sequences(lines)
        X = pad_sequences(X, maxlen=length, padding='post')
        return X

    def word_for_id(self, integer, tokenizer):
        for word, index in tokenizer.word_index.items():
            if index == integer:
                return word
        return None

    def predict_sequence(self, model, tokenizer, source):
        prediction = model.predict(source, verbose=0)[0]
        integers = [argmax(vector) for vector in prediction]
        target = list()
        for i in integers:
            word = self.word_for_id(i, tokenizer)
            if word is None:
                break
            target.append(word)
        return ' '.join(target)

    def evaluate_model(self, model, tokenizer, sources, raw_dataset):
        actual, predicted = list(), list()
        for i, source in enumerate(sources):
            source = source.reshape((1, source.shape[0]))
            translation = self.predict_sequence(model, tokenizer, source)
            raw_target, raw_src = raw_dataset[i]
            if i in range(30,50):
                print('src=[%s], target=[%s], predicted=[%s]' % (raw_src, raw_target, translation))
            actual.append(raw_target.split())
            predicted.append(translation.split())
        print('BLEU-1: %f' % corpus_bleu(actual, predicted, weights=(1.0, 0, 0, 0), smoothing_function=SmoothingFunction().method7))
        print('BLEU-2: %f' % corpus_bleu(actual, predicted, weights=(0.5, 0.5, 0, 0), smoothing_function=SmoothingFunction().method7))
        print('BLEU-3: %f' % corpus_bleu(actual, predicted, weights=(0.3, 0.3, 0.3, 0), smoothing_function=SmoothingFunction().method7))
        print('BLEU-4: %f' % corpus_bleu(actual, predicted, weights=(0.25, 0.25, 0.25, 0.25), smoothing_function=SmoothingFunction().method7))

    def prepare_data(self):
        self.eng_tokenizer = self.create_tokenizer(self.dataset[:, 0])
        eng_vocab_size = len(self.eng_tokenizer.word_index) + 1
        eng_length = self.max_length(self.dataset[:, 0])
        spain_tokenizer = self.create_tokenizer(self.dataset[:, 1])
        spain_vocab_size = len(spain_tokenizer.word_index) + 1
        spain_length = self.max_length(self.dataset[:, 1])
        self.trainX = self.encode_sequences(spain_tokenizer, spain_length, self.train[:, 1])
        self.testX = self.encode_sequences(spain_tokenizer, spain_length, self.test[:, 1])

    def main_evaluate(self):
        self.prepare_data()
        model = load_model('model_translation.h5')
        print('train')
        self.evaluate_model(model, self.eng_tokenizer, self.trainX, self.train)
        print('test')
        self.evaluate_model(model, self.eng_tokenizer, self.testX, self.test)

evaluate = EvaluateMachineTranslation()

train
src=[comeis carne], target=[do you eat meat], predicted=[do you eat meat]
src=[esquiar es mi pasion], target=[skiing is my passion], predicted=[here is my passion]
src=[ya termino], target=[its finished now], predicted=[is already already]
src=[ella dio dinero], target=[she gave money], predicted=[she gave money money]
src=[grite], target=[i screamed], predicted=[i screamed]
src=[es demasiado temprano], target=[its too early], predicted=[its too early]
src=[son lindos], target=[they are pretty], predicted=[they are pretty]
src=[no me hagas quedar], target=[dont make me stay], predicted=[dont make me]
src=[tom es un democrata], target=[tom is a democrat], predicted=[tom is a democrat]
src=[ellos invadieron el pais], target=[they invaded the country], predicted=[they invaded the country]
src=[estoy exhausto], target=[i am exhausted], predicted=[im exhausted]
src=[tom conoce a mary], target=[tom has met mary], predicted=[tom wants mary mary]
src=[banderas japonesas ondeaban], target