In [None]:
try:
    %tensorflow_version 2.x 
except Exception: 
    pass

In [None]:
# Orthographies to be tested
LANGUAGES = ['eno', 'ent',
             'ar', 'br', 'de', 'en', 'es',
             'fi', 'fr', 'fro', 'it', 'ko',
             'nl', 'pt', 'ru', 'sh', 'tr']
TASKS = ['write', 'read']

# Artificial Neural Network (ANN) hyperparameters
EARLY_STOPPING_PATIENCE = 15
BATCH_SIZE = 32  
NB_EPOCHS = 100  
NB_LSTM_UNITS = 512  
NB_MAX_SYMBOLS = 70

NB_TRAINING_SAMPLES = 12500
NB_TEST_SAMPLES = 1000

In [None]:
# check if GPU support
from tensorflow.python.client import device_lib
device_lib.list_local_devices()

In [None]:
import datetime
import pprint
import difflib
from pathlib import Path
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
from numpy import argmax
import pandas as pd
import tensorflow as tf
from tensorflow.keras.models import Model, load_model, Sequential
from tensorflow.keras.layers import Input, LSTM, GRU, Dense, Embedding
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.callbacks import ReduceLROnPlateau
from tensorflow.keras.utils import to_categorical

In [None]:
!pip -q install watermark
import watermark
%load_ext watermark
%watermark --iversions

In [None]:
def display_distribution(error_positions, lang):
  nb_positions = error_positions.shape[0]
  print('nb_positions:', nb_positions)
  labels = [x for x in range(0, nb_positions)]
  print(labels)
  df_error_distribitution = pd.DataFrame({'error_position': labels, 'nb_errors': error_positions})

  ax = df_error_distribitution.plot.bar(x='error_position', y='nb_errors', rot=0, color='#607c8e')
  plt.title('Location of the error')
  plt.xlabel('Position')
  plt.ylabel('Number of errors')
  plt.grid(axis='y', alpha=0.75)
  plt.show()
  plt.savefig(lang+'_errors_position.png')

def test_distribution():
  nb=10
  error_positions = np.zeros((nb,), dtype=int)
  nb_errors=0
  for i in range(nb):
    error_positions[i]=i
    nb_errors+=i
  error_positions = error_positions/nb_errors
  display_distribution(error_positions)
#test_distribution()

In [None]:
def get_diff_symbols(wanted, predicted, task, debug=False):
    
    a = wanted
    b = predicted
    
    error = False
    error_position = 0

    if debug:
        print('{} => {}'.format(a,b))  
    more = '+.'
    less = '-.'
    for i,s in enumerate(difflib.ndiff(a, b)):
        if s[0]==' ': 
            continue
        if s[0]=='-':
          if debug:
            print(u'- \'{}\' @ {}, '.format(s[-1],i))
          if error == False:
            error_position=i+1
            error = True
          less += s[-1] +'.'
        elif s[0]=='+':
          if debug:
            print(u'+ \'{}\' @ {}, '.format(s[-1],i))
          if error == False:
            error_position=i+1
            error = True  
          more += s[-1]+'.'  
    return task[0]+':'+less+more, error_position

#get_diff_symbols('blincáremos', 'blincaremos', 'writing', debug=True)

In [None]:
''' One-hot encode a list of samples (e.g. sounds or letters) coded as list
    of integer tokens into a one-hot encoded list of elements.
    num_classes must not include value for 0. (eg. if 3 integers token 1, 2, 3)
    one column will be automatically added by one_hot_encode for token value 0
    This additional '0' class is needed to cope with keras Tokenizer library
'''


def one_hot_encode(integer_list_list, num_classes, verbose=False):
    nb_i = len(integer_list_list)
    nb_j = len(integer_list_list[0])
    if verbose:
        print('nb of samples:', nb_i)
        print('sequence length:', nb_j)
        print('number of tokens:', num_classes)
    results = np.zeros((nb_i, nb_j, num_classes+1))  # +1 is for 0 (no-value)

    for i, integer_list in enumerate(integer_list_list):
        for j, integer in enumerate(integer_list):
            for k in range(num_classes+1):
                if k == integer:
                    results[i, j, k] = 1
                    break

    return(results)

#one_hot_encode([[1, 2, 3], [2, 2, 2]], 3, True)

In [None]:
''' Retrieve the dataset for a given language.
'''


def get_data(lang):

    filename = lang + '_wikt_samples.csv'
    df = pd.read_csv(filename, keep_default_na=False)

    return df

In [None]:
''' Given a DataFrame shape, check if there are enough
    samples to perform a training of nb_training_samples
    and a test of nb_test_samples.
    If not, decrease nb_training_samples accordingly.
'''


def get_nb_samples(df, nb_training_samples, nb_test_samples):
    nb_samples = df.shape[0]
    print('nb_samples:', nb_samples)
    if nb_samples < (nb_training_samples + nb_test_samples):
        nb_training_samples = df.shape[0] - nb_test_samples
        print('check_dataset_size(): wanted nb_samples=%d : \
        not enough samples => nb_training_samples:%d !!!'
              % (nb_samples, nb_training_samples))

    return nb_training_samples, nb_test_samples

In [None]:
''' Remove an index from a dictionary.
'''


def copy_and_remove_symbols(symbol_dict, symbols):
    symbol_dict2 = symbol_dict.copy()
    for key in symbols:
        if key in symbol_dict2:
            del symbol_dict2[key]
    return symbol_dict2

#copy_and_remove_symbols({ 'a':2, 'b':3, 'c':4, 'd':5}, ['b'])

In [None]:
def get_model_parameters(hist_params, df, input_name, target_name):

    df['Input'] = df[input_name]
    df['Target_0'] = df[target_name]
    df['Target'] = df[target_name] + '>'
    df['Target_input'] = '<' + df[target_name]

    nb_samples = df.shape[0]

    max_len_input = len(max(df['Input'].values, key=len))
    max_len_target = len(max(df['Target'].values, key=len))
    if max_len_target >= max_len_input:
        print('max_len_target >= max_len_input ... but do nothing')

    # tokenize the inputs
    tokenizer_inputs = Tokenizer(num_words=NB_MAX_SYMBOLS, filters='', char_level=True)
    tokenizer_inputs.fit_on_texts(df['Input'].values)
    input_symbol2idx = tokenizer_inputs.word_index
    nb_input_symbols = len(input_symbol2idx.keys())

    # tokenize the outputs
    tokenizer_outputs = Tokenizer(num_words=NB_MAX_SYMBOLS, filters='', char_level=True)
    tokenizer_outputs.fit_on_texts(df['Target'].values + ['<'])
    output_symbol2idx = tokenizer_outputs.word_index
    print('output_symbol2idx:', output_symbol2idx.items())
    nb_output_symbols = len(output_symbol2idx.keys())

    # save main variables in the history to consult them after the test
    hist_params['nb_samples'] = nb_samples
    hist_params['max_len_input'] = max_len_input
    hist_params['max_len_target'] = max_len_target

    hist_params['tokenizer_inputs'] = tokenizer_inputs
    hist_params['input_symbol2idx'] = input_symbol2idx
    hist_params['nb_input_symbols'] = nb_input_symbols

    hist_params['tokenizer_outputs'] = tokenizer_outputs
    hist_params['output_symbol2idx'] = output_symbol2idx
    hist_params['nb_output_symbols'] = nb_output_symbols

    hist_params['x_symbols'] = ', '.join(sorted(input_symbol2idx.keys()))
    output_symbols = copy_and_remove_symbols(output_symbol2idx, ["<", ">", "["])
    hist_params['y_symbols'] = ', '.join(sorted(output_symbols.keys()))
    hist_params['x_nb_symbols'] = len(input_symbol2idx)
    hist_params['y_nb_symbols'] = len(output_symbols)

    print('input symbols:', hist_params['x_symbols'])
    print('input symbols nb:', hist_params['x_nb_symbols'])
    print('output symbols:', hist_params['y_symbols'])
    print('output symbols nb:', hist_params['y_nb_symbols'])

    return df, hist_params

In [None]:
''' Split a dataframe into a training and a testing dataframes.
'''


def get_train_and_test_data(df, nb_training_samples, nb_test_samples):

    df_train_test = df.sample(nb_training_samples + nb_test_samples)
    df_train = df_train_test.head(nb_training_samples)
    df_test = df_train_test.tail(nb_test_samples)

    return df_train, df_test

In [None]:
''' Prepare the ANN's input and output data by tokenizing, 
    padding and one-hot encoding them
'''


def prepare_input_and_target_format(hist_params, df_train):

    inputs = df_train['Input'].values
    targets = df_train['Target'].values
    targets_inputs = df_train['Target_input'].values

    tokenizer_inputs = hist_params['tokenizer_inputs']
    tokenizer_outputs = hist_params['tokenizer_outputs']
    max_len_input = hist_params['max_len_input']
    max_len_target = hist_params['max_len_target']
    nb_input_symbols = hist_params['nb_input_symbols']
    nb_output_symbols = hist_params['nb_output_symbols']

    # tokenize the input and output words (i.e. transform them into list of characters)
    inputs_tk = tokenizer_inputs.texts_to_sequences(inputs)
    targets_tk = tokenizer_outputs.texts_to_sequences(targets)
    targets_inputs_tk = tokenizer_outputs.texts_to_sequences(targets_inputs)

    # pad the input and output words (i.e. give each entry the same maxlen)
    inputs_tk_pd = pad_sequences(inputs_tk, maxlen=max_len_input)
    targets_inputs_tk_pd = pad_sequences(targets_inputs_tk, maxlen=max_len_target,)
    targets_tk_pd = pad_sequences(targets_tk, maxlen=max_len_target, padding='post')

    # one-hot encode the input and output words
    inputs_tk_pd_hot = one_hot_encode(inputs_tk_pd, num_classes=nb_input_symbols)
    targets_inputs_tk_pd_hot = one_hot_encode(targets_inputs_tk_pd, num_classes=nb_output_symbols)
    targets_tk_pd_hot = one_hot_encode(targets_tk_pd, num_classes=nb_output_symbols)

    return inputs_tk_pd_hot, targets_inputs_tk_pd_hot, targets_tk_pd_hot

## Build model

In [None]:
def create_model(hist_params, verbose=True):

    max_len_input = hist_params['max_len_input']
    nb_input_symbols = hist_params['nb_input_symbols']
    max_len_target = hist_params['max_len_target']
    nb_output_symbols = hist_params['nb_output_symbols']

    encoder_inputs_sounds = Input(shape=(max_len_input, nb_input_symbols+1,),
                                  name="encoder_inputs_sounds")

    encoder = LSTM(
        NB_LSTM_UNITS,
        return_state=True,
        name='encoder_lstm',
    )

    encoder_outputs, h, c = encoder(encoder_inputs_sounds)

    # keep the states to pass them later into the decoder
    encoder_states_1_2 = [h, c]

    decoder_inputs_placeholder = Input(shape=(max_len_target, nb_output_symbols+1),
                                       name="output_letters_help")

    decoder_inputs_x = decoder_inputs_placeholder

    decoder_lstm = LSTM(
        NB_LSTM_UNITS,
        return_sequences=True,
        return_state=True,
        name='decoder_lstm',
    )
    decoder_outputs, _, _ = decoder_lstm(
        decoder_inputs_x,
        initial_state=encoder_states_1_2,
    )

    decoder_dense = Dense(nb_output_symbols+1, activation='softmax',
                          name='decoder_sf_output_letters')

    decoder_outputs = decoder_dense(decoder_outputs)

    model = Model([encoder_inputs_sounds, decoder_inputs_placeholder], decoder_outputs)

    model.compile(
        optimizer='rmsprop',
        loss='categorical_crossentropy',
        metrics=['accuracy'],
    )

    hist_params['trainable_params'] = model.count_params()
    print('trainable_params:', model.count_params())

    print(model.summary())

    model_modules = {}
    model_modules['encoder_inputs_sounds'] = encoder_inputs_sounds
    model_modules['encoder_states_1_2'] = encoder_states_1_2
    model_modules['decoder_lstm'] = decoder_lstm
    model_modules['decoder_dense'] = decoder_dense

    return model, model_modules

In [None]:
def train_model(hist_params, model, encoder_inputs, decoder_inputs, decoder_targets, verbose=True):

    callbacks_list = [
        EarlyStopping(
            monitor='val_accuracy',
            mode='max',
            patience=EARLY_STOPPING_PATIENCE,
            restore_best_weights=True,
        ),
        ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.2,
            patience=5,
            min_lr=0.001,
        ),
    ]

    training_t0 = datetime.datetime.now()

    r = model.fit(
        [encoder_inputs, decoder_inputs], decoder_targets,
        batch_size=BATCH_SIZE,
        epochs=NB_EPOCHS,
        callbacks=callbacks_list,
        validation_split=0.2,
        verbose=verbose
    )

    training_t1 = datetime.datetime.now()
    training_duration = training_t1 - training_t0
    print('training_duration:', training_duration)
    hist_params['training_duration'] = str(training_duration)

    val_accuracy = r.history.get('val_accuracy')[-1]
    val_accuracy = float(int(val_accuracy * 1000)/1000)
    print('val_accuracy:%.2f' % val_accuracy)

    hist_params['val_accuracy'] = val_accuracy

    return r

## Build the test model

In [None]:
def create_test_model(hist_params, model_modules, verbose=False):

    encoder_inputs_sounds = model_modules['encoder_inputs_sounds']
    encoder_states_1_2 = model_modules['encoder_states_1_2']
    decoder_lstm = model_modules['decoder_lstm']
    decoder_dense = model_modules['decoder_dense']

    nb_output_symbols = hist_params['nb_output_symbols']

    encoder_model = Model(encoder_inputs_sounds, encoder_states_1_2)

    decoder_state_input_h = Input(shape=(NB_LSTM_UNITS,), name="lstm_cell_h")
    decoder_state_input_c = Input(shape=(NB_LSTM_UNITS,), name="lstm_cell_c")
    decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]

    decoder_inputs_single = Input(shape=(1, nb_output_symbols+1,), name="reinput_letter")

    decoder_outputs, h, c = decoder_lstm(
        decoder_inputs_single,
        initial_state=decoder_states_inputs
    )
    decoder_states = [h, c]
    decoder_outputs = decoder_dense(decoder_outputs)

    decoder_model = Model(
      [decoder_inputs_single] + decoder_states_inputs,
      [decoder_outputs] + decoder_states
    )

    if verbose:
        print(encoder_model.summary())
        print(decoder_model.summary())

    return encoder_model, decoder_model

In [None]:
def decode_sequence(hist_params, encoder_model, decoder_model, input_seq, verbose=False):

    nb_output_symbols = hist_params['nb_output_symbols']
    max_len_target = hist_params['max_len_target']
    tokenizer_outputs = hist_params['tokenizer_outputs']
    output_symbol2idx = hist_params['output_symbol2idx']

    if verbose:
        print('input_seq.shape :', input_seq.shape)

    states_value = encoder_model.predict(input_seq)

    idx_start_tk = tokenizer_outputs.word_index['<']
    target_seq = np.zeros((1, 1, nb_output_symbols+1))
    target_seq[0, 0, idx_start_tk] = np.int64(1)

    idx_end_tk = tokenizer_outputs.word_index['>']

    # generate the step-by-step output prediction
    output_sentence = []
    for i in range(max_len_target):
        output_tokens, h, c = decoder_model.predict(
            [target_seq] + states_value
        )

        # Get next char
        idx = np.argmax(output_tokens[0, 0, :])

        if idx == idx_end_tk:
            break

        # reset the target_set (in particular, its symbols' row)
        target_seq = np.zeros((1, 1, nb_output_symbols+1))

        if idx in tokenizer_outputs.index_word.keys():
            letter = tokenizer_outputs.index_word[idx]
            output_sentence.append(letter)
            target_seq[0, 0, idx] = 1
        elif idx == 0:
            target_seq[0, 0, idx] = 1

        states_value = [h, c]

    return ''.join(output_sentence)

In [None]:
def decode_sequence_old(hist_params, encoder_model, decoder_model, input_seq, verbose=False):

    nb_output_symbols = hist_params['nb_output_symbols']
    max_len_target = hist_params['max_len_target']
    tokenizer_outputs = hist_params['tokenizer_outputs']
    output_symbol2idx = hist_params['output_symbol2idx']

    if verbose:
        print('input_seq.shape :', input_seq.shape)

    states_value = encoder_model.predict(input_seq)

    # Generate empty target sequence of length 1.
    target_seq = np.zeros((1, 1, nb_output_symbols+1))

    # Populate the first character of target sequence with the start character.
    # NOTE: tokenizer lower-cases all words
    test_letter_tk = tokenizer_outputs.texts_to_sequences('<')
    test_letter_tk_pd = pad_sequences(test_letter_tk, maxlen=max_len_target)
    test_letter_tk_pd_ht = one_hot_encode(test_letter_tk_pd, num_classes=nb_output_symbols)
    target_seq[0, 0] = test_letter_tk_pd_ht[0][-1]

    # be careful: eos type is np.int64
    eos = tokenizer_outputs.texts_to_sequences('>')

    # Create the translation
    output_sentence = []
    for i in range(max_len_target):
        output_tokens, h, c = decoder_model.predict(
            [target_seq] + states_value
        )

        # Get next word
        idx = np.argmax(output_tokens[0, 0, :])

        # End sentence of EOS
        if eos == idx:
            break

        word = ''
        if idx > 0:
            letter = tokenizer_outputs.index_word[idx]
            output_sentence.append(letter)

        try:
            test_letter_tk = tokenizer_outputs.texts_to_sequences(letter)
            test_letter_tk_pd = pad_sequences(test_letter_tk, maxlen=max_len_target)
            test_letter_tk_pd_ht = one_hot_encode(test_letter_tk_pd, num_classes=nb_output_symbols)
            target_seq[0, 0] = test_letter_tk_pd_ht[0][-1]
            if verbose:
                print('target_seq[0, 0]:', target_seq[0, 0])
        except:
            # should not happen
            print('!?')

        # Update states
        states_value = [h, c]
        # states_value = [h] # gru

    return ''.join(output_sentence)

In [None]:
'''tokenize, pad, and one_hot_encode a word so that it can be input to ANN encoder'''


def tpo(hist_params, sounds):
    max_len_input = hist_params['max_len_input']
    nb_input_symbols = hist_params['nb_input_symbols']
    tokenizer_inputs = hist_params['tokenizer_inputs']
    sounds_tk = tokenizer_inputs.texts_to_sequences(sounds)
    sounds_tk_pd = pad_sequences(sounds_tk, maxlen=max_len_input)
    sounds_tk_pd_ht = one_hot_encode(sounds_tk_pd, num_classes=nb_input_symbols)
    return sounds_tk_pd_ht

In [None]:
def test(hist_params, encoder_model, decoder_model, df_test, verbose=False):

    lang = hist_params['lang']
    task = hist_params['task']

    df_test['Prediction'] = ''
    df_test['Prediction_diff'] = '='
    max_len_target = hist_params['max_len_target']

    # array indicating at which ouput position an error happened
    test_err_index = np.zeros((max_len_target+1,), dtype=int)

    test_t1 = datetime.datetime.now()

    nb_predictions = 0
    nb_good_predictions = 0
    nb_bad_predictions = 0

    for i, sample in df_test.iterrows():
        # Do some test translations
        x = sample['Input']

        wanted_y = sample['Target_0']
        if verbose:
            print('x:', x, end='')
            print(' wanted_y:', wanted_y, end='')
        wiki_page = sample['Wiki_page']
        predicted_y = decode_sequence(hist_params, encoder_model, decoder_model,
                                      tpo(hist_params, [x]), verbose)
        df_test.at[i, 'Prediction'] = predicted_y
        if verbose:
            print(' predicted_y:', predicted_y)

        nb_predictions += 1
        if predicted_y == wanted_y:
            nb_good_predictions += 1
        else:
            nb_bad_predictions += 1
            df_test.at[i, 'Prediction_diff'], error_position = get_diff_symbols(wanted_y, predicted_y, task)
            test_err_index[error_position] += 1

        if nb_predictions == 1:
            print("[[%s]]: input:%s -> predicted:%s, wanted:%s, is_ok=%s" %
                  (wiki_page, x, predicted_y, wanted_y, predicted_y == wanted_y))

    test_accuracy = round(float(nb_good_predictions*100/nb_predictions))/100

    print('lang:%s, task:%s, nb_good_predictions=%d out of %d trials, test_accuracy=%.2f' %
          (lang, task, nb_good_predictions, nb_predictions, test_accuracy))

    test_t2 = datetime.datetime.now()
    test_duration = test_t2 - test_t1
    print("test_duration:", test_duration)

    # keep trace of the results
    hist_params['nb_predictions'] = nb_predictions
    hist_params['nb_good_predictions'] = nb_good_predictions
    hist_params['test_err_index'] = test_err_index/nb_bad_predictions
    hist_params['test_accuracy'] = test_accuracy
    hist_params['test_duration'] = str(test_duration)

    # keep trace of the top-three most common mistakes
    # as well as one example of word for which the mistake has happened
    rank = 1
    for key, val in df_test.Prediction_diff[df_test.Prediction_diff != '='].value_counts().head(3).items():
        print(rank, key, val)
        hist_params['test_err_'+str(rank)+'_label'] = str(key)
        hist_params['test_err_'+str(rank)+'_nb'] = str(val)
        for key2, val2 in df_test[df_test.Prediction_diff == key].sample(1).iterrows():
            hist_params['test_err_'+str(rank)+'_word'] = val2['Word']
            hist_params['test_err_'+str(rank)+'_pronunciation'] = val2['Pronunciation']
            hist_params['test_err_'+str(rank)+'_prediction'] = val2['Prediction']
        rank += 1

    print('test_err_index:', hist_params['test_err_index'])
    display_distribution(hist_params['test_err_index'], hist_params['lang'])

    return df_test

In [None]:
UNIT_TEST = False
if UNIT_TEST:

    hist_params = {
        'lang': 'de'
    }
    input_name = 'Pronunciation'
    target_name = 'Word'
    task = 'write'
    verbose = True

    lang = hist_params['lang']
    hist_params['task'] = task
    hist_params['lstm'] = NB_LSTM_UNITS

    print("1/ GET DATA (%s, %s, %d)" % (lang, task, NB_LSTM_UNITS))
    df = get_data(lang)
    nb_test_samples = NB_TEST_SAMPLES    
    nb_training_samples, nb_test_samples = get_nb_samples(df, NB_TRAINING_SAMPLES, nb_test_samples)

    hist_params['word_len_mean'] = int(df['Word'].str.len().mean()*10)/10
    hist_params['pronunciation_len_mean'] = int(df['Pronunciation'].str.len().mean()*10)/10

    print("2/ PREPARE DATA (%s, %s, %d)" % (lang, task, NB_LSTM_UNITS))
    df, hist_params = get_model_parameters(hist_params, df, input_name=input_name, target_name=target_name)
    df_train, df_test = get_train_and_test_data(df, nb_training_samples, nb_test_samples)
    df_train.head(5)
    encoder_inputs, decoder_inputs, decoder_targets = prepare_input_and_target_format(hist_params, df_train)
    print('(nb_encoder_input.shape (nb_inputs, nb_letters, nb_symbols) :', encoder_inputs.shape)
    print('(nb_decoder_input.shape (nb_outputs, nb_letters, nb_symbols) :', decoder_inputs.shape)
    print('(nb_decoder_target.shape (nb_outputs, nb_letters, nb_symbols) :', decoder_targets.shape)

    print("3/ BUILD MODEL (%s, %s, %d)" % (lang, task, NB_LSTM_UNITS))
    model, model_modules = create_model(hist_params, verbose)

    print("4/ TRAIN MODEL (%s, %s, %d)" % (lang, task, NB_LSTM_UNITS))
    r = train_model(hist_params, model, encoder_inputs, decoder_inputs, decoder_targets, verbose=True)

    # plot some data
    plt.plot(r.history['loss'], label='loss')
    plt.plot(r.history['val_loss'], label='val_loss')
    plt.legend()
    plt.show()

    # accuracies
    plt.plot(r.history['accuracy'], label='accuracy')
    plt.plot(r.history['val_accuracy'], label='val_accuracy')
    plt.legend()
    plt.show()

    encoder_model, decoder_model = create_test_model(hist_params, model_modules, verbose=False)

    print("5/ TEST MODEL (%s, %s, %d)" % (lang, task, NB_LSTM_UNITS))
    # test NB_TEST_SAMPLES samples
    df_test_results = test(hist_params, encoder_model, decoder_model, df_test)

In [None]:
def make_and_test_model(hist_params, input_name, target_name, task, verbose):
    lang = hist_params['lang']
    # save parameters for history
    hist_params['task'] = task
    hist_params['lstm'] = NB_LSTM_UNITS

    print("1/ GET DATA (%s, %s, %d)" % (lang, task, NB_LSTM_UNITS))
    df = get_data(lang)  
    nb_training_samples, nb_test_samples = get_nb_samples(df, NB_TRAINING_SAMPLES, NB_TEST_SAMPLES)

    hist_params['word_len_mean'] = int(df['Word'].str.len().mean()*10)/10
    hist_params['pronunciation_len_mean'] = int(df['Pronunciation'].str.len().mean()*10)/10

    print("2/ PREPARE DATA (%s, %s, %d)" % (lang, task, NB_LSTM_UNITS))
    df, hist_params = get_model_parameters(hist_params, df, input_name=input_name, target_name=target_name)
    df_train, df_test = get_train_and_test_data(df, nb_training_samples, nb_test_samples)
    df_train.head(5)
    encoder_inputs, decoder_inputs, decoder_targets = prepare_input_and_target_format(hist_params, df_train)
    print('(nb_encoder_input.shape (nb_inputs, nb_letters, nb_symbols) :', encoder_inputs.shape)
    print('(nb_decoder_input.shape (nb_outputs, nb_letters, nb_symbols) :', decoder_inputs.shape)
    print('(nb_decoder_target.shape (nb_outputs, nb_letters, nb_symbols) :', decoder_targets.shape)

    print("3/ BUILD MODEL (%s, %s, %d)" % (lang, task, NB_LSTM_UNITS))
    model, model_modules = create_model(hist_params, verbose)

    print("4/ TRAIN MODEL (%s, %s, %d)" % (lang, task, NB_LSTM_UNITS))
    r = train_model(hist_params, model, encoder_inputs, decoder_inputs, decoder_targets, verbose)
    # Save the model
    model.save(task + '_model_' + hist_params['lang'] + '.h5')

    # plot some data
    plt.plot(r.history['loss'], label='loss')
    plt.plot(r.history['val_loss'], label='val_loss')
    plt.legend()
    plt.show()

    # accuracies
    plt.plot(r.history['accuracy'], label='accuracy')
    plt.plot(r.history['val_accuracy'], label='val_accuracy')
    plt.legend()
    plt.show()

    encoder_model, decoder_model = create_test_model(hist_params, model_modules, verbose)

    print("5/ TEST MODEL (%s, %s, %d)" % (lang, task, NB_LSTM_UNITS))
    # test NB_TEST_SAMPLES samples
    df_test = test(hist_params, encoder_model, decoder_model, df_test)
    df_test = df_test.drop(columns=['Input', 'Target_0', 'Target', 'Target_input'])
    return df_test

    print('####################################################################')

In [None]:
def test_orthography(lang, task, verbose=False):
    date = datetime.datetime.today().strftime('%Y-%m-%d')
    time = datetime.datetime.today().strftime('%H:%M:%S')

    hist_params = {'date': date, 'time': time, 'lang': lang}

    if task == 'write':
        df_test = make_and_test_model(hist_params, input_name='Pronunciation',
                                      target_name='Word', task=task,
                                      verbose=False)
    elif task == 'read':
        df_test = make_and_test_model(hist_params, input_name='Word',
                                      target_name='Pronunciation', task=task,
                                      verbose=False)
    else:
        print('test_orthography: task=%s does not exist' % task)

    dict_res = copy_and_remove_symbols(hist_params, ['input_symbol2idx',
                                                     'output_symbol2idx',
                                                     'tokenizer_inputs',
                                                     'tokenizer_outputs'])
    if verbose:
        pprint.pprint(dict_res)
    return dict_res, df_test

In [None]:
date = datetime.datetime.today().strftime('%Y-%m-%d')
output_prefix = 'oteann_results_' + date
results_file = output_prefix + '.csv'

# open the file for being able to append the results of this test
# otherwise create a new one
try:
    df_results = pd.read_csv(results_file)
except:
    df_results = pd.DataFrame()
df_last_test = pd.DataFrame()

nb_episodes = 1
for episode in range(nb_episodes):
    print('episode:', episode)

    for lang in LANGUAGES:
        for task in TASKS:
            dict_res, df_last_test = test_orthography(lang, task, verbose=False)
            # put the results as a new line in the CSV history file
            df_res = pd.DataFrame(data = [dict_res.values()], columns = dict_res.keys())
            df_results = pd.concat([df_results, df_res], axis=0, ignore_index=True, sort=False)
            df_results.to_csv(results_file, index = None, header=True)


In [None]:
def display_results(df):
    sns.set(style="whitegrid")
    current_palette = sns.color_palette()
    for task in TASKS:
        df_o = df[df.task==task].sort_values(by='lang')
        df_o = df_o[['task', 'lang', 'test_accuracy']]
        df_o['test_accuracy']=df_o['test_accuracy']*100
        df_o = df_o.groupby('lang', as_index=False).mean()
        sns.palplot(current_palette)
        ax = sns.barplot(x="lang", y="test_accuracy",
                         data=df_o, palette=current_palette)
        # add the accuracy number on the top of each bar
        i=0
        for index, row in df_o.iterrows():
            ax.text(i, row.test_accuracy+1, str(round(row.test_accuracy)), color='black', ha="center")
            i+=1
        plt.title(task.capitalize())
        plt.ylim(0, 100)
        plt.show()

display_results(df_results)

In [None]:
def display_distribution(test_err_index):
    nb_indexes = test_err_index.shape[0]
    labels = [x for x in range(0, nb_indexes)]
    df_error_distribitution = pd.DataFrame({'error_position': labels,
                                            'nb_errors': test_err_index})

    ax = df_error_distribitution.plot.bar(x='error_position', y='nb_errors',
                                          rot=0, color='#607c8e')
    plt.title('Index of the badly predicted output character')
    plt.xlabel('Position')
    plt.ylabel('Number of errors')
    plt.grid(axis='y', alpha=0.75)
    plt.show()
    plt.savefig("figx")

In [None]:
def display_history(df):
    for i, row in df.iterrows():
        print('test:%d, LANG=%s' % (i, row.lang))
        for column in df.columns:
            print('%s:%s' % (column, str(row[column])))
            if column == 'test_err_index':
                display_distribution(row[column])
        print('=================================================')
display_history(df_results)