In [60]:
import numpy as np
from keras.models import Sequential
from keras.layers import Dense, Embedding
from keras.layers import LSTM
from keras.layers import RepeatVector, TimeDistributed
from keras.callbacks import ModelCheckpoint
import tensorflow as tf
import tensorflow.keras.backend as K
from tensorflow import keras
from math import sqrt
from sklearn.metrics import mean_squared_error
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
import operator
import matplotlib.pyplot as plt
import json
from pathlib import Path

MAX_Q_LEN = 20
MAX_A_LEN = 5

In [52]:
# Helper functions to read data

from pathlib import Path
import re
import numpy as np

def ps(s):
    """Process String: convert a string into a list of lowercased words."""
    return [word.strip() for word in re.split(r'([+-/*()?]|\d|\w)', s) if word.strip()]

def read_data(filepath,perc_data):
    '''
    returns character lists of questions and answers.
    '''
    # q,a lists
    X = []
    y = []
    file_len = 0
    
    with open(filepath) as f:
        # Grabbing a subset of the entire file
        lines = f.readlines()
        file_len = len(lines)
        
    with open(filepath) as dataset_file:
        for i in range(0,int(file_len*perc_data)):
            line_q = dataset_file.readline().strip()
            line_a = dataset_file.readline().strip()
            if(len(line_q) < MAX_Q_LEN and len(line_a) < MAX_A_LEN):
                X.append(ps(line_q))
                y.append(ps(line_a))   
    return X,y

def pad_data(X,y, max_question_len, max_answer_len):
    # dataset is of form [(q,a)]
    X_padded = list()
    for q in X:
        qpad =  ['BOE'] + q + ['EOE'] + ['#' for _ in range(max_question_len-len(q))] 
        X_padded.append(qpad)
    y_padded = list()
    for a in y:
        apad =  ['BOE']  + a + ['EOE'] + ['#' for _ in range(max_answer_len-len(a))]
        y_padded.append(apad)
    return X_padded,y_padded

In [53]:
def create_alphabet_index(X):

    char_to_int = {}
    char_to_int['#'] = 0
    for q in X:
        for word in q:
            if word not in char_to_int:
                char_to_int[word] = len(char_to_int)
    
    int_to_char = dict([(char_to_int[char],char) for char in char_to_int])

    return (char_to_int,int_to_char)

def encode_data(X,y,char_to_int):
    Xenc = list()
    
    for pattern in X:
        integer_encoded = [char_to_int[char] for char in pattern]
        Xenc.append(integer_encoded)
    yenc = list()
    for pattern in y:
        integer_encoded = [char_to_int[char] for char in pattern]
        yenc.append(integer_encoded)
    
    return Xenc, yenc

# one hot encode
def one_hot_encode(X, y, vocab_size):
	Xenc = list()
	for seq in X:
		pattern = list()
		for index in seq:
			vector = [0 for _ in range(vocab_size)]
			vector[index] = 1
			pattern.append(vector)
		Xenc.append(pattern)
	yenc = list()
	for seq in y:
		pattern = list()
		for index in seq:
			vector = [0 for _ in range(vocab_size)]
			vector[index] = 1
			pattern.append(vector)
		yenc.append(pattern)
	return Xenc, yenc

def one_hot_decode(seq, int_to_char):
	strings = list()
	for pattern in seq:
		string = int_to_char[np.argmax(pattern)]
		strings.append(string)
	return ''.join(strings)

def process_data(dataset_filename, perc_data):
    X,y = read_data(dataset_filename,perc_data)
    X,y = pad_data(X,y,MAX_Q_LEN,MAX_A_LEN)
    X_train,X_test,y_train, y_test = train_test_split(X,y, test_size = 0.2)
    char_to_int,int_to_char = create_alphabet_index(X_train)
    X_train,y_train = encode_data(X_train,y_train,char_to_int)
    X_test,y_test = encode_data(X_test,y_test,char_to_int)
    # X,y = one_hot_encode(X,y,len(char_to_int))
    return (X_train, y_train, X_test,y_test,char_to_int,int_to_char)

In [86]:
def try_experiment(trial,i):
    
    n_batch =  trial["n_batch"]
    n_epoch =  trial["n_epoch"]
    data_perc = trial["data_perc"]
    embed_dim = trial["embed_dim"]
    encoder_hid = trial["encoder_hid"]
    decoder_hid = trial["decoder_hid"]
    max_q_len = trial["max_q_len"]
    max_a_len = trial["max_a_len"]
    
    X_train,y_train,X_test,y_test, char_to_int,int_to_char = process_data(dataset_filename, data_perc)

    model = Sequential()
    model.add(Embedding(len(char_to_int), embed_dim, input_length=max_q_len+2, mask_zero = True))
    model.add(LSTM(encoder_hid, input_shape=(max_q_len+2, len(char_to_int))))
    model.add(RepeatVector(max_a_len+2))
    model.add(LSTM(decoder_hid, return_sequences=True))
    model.add(TimeDistributed(Dense(len(char_to_int), activation='softmax'))),
    checkpoint_filepath = Path("../tmp/checkpoint/{:d}".format(i))
    
    early_stop_callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=7)
    model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
        filepath=checkpoint_filepath,
        save_weights_only=True,
        monitor='val_accuracy',
        mode='max',
        save_best_only=True)

    model.compile(loss='categorical_crossentropy', optimizer=keras.optimizers.Adam(), metrics=['accuracy'])
    print(model.summary())

    _, y_train = one_hot_encode([],y_train, len(char_to_int))
    _, y_test = one_hot_encode([],y_test,len(char_to_int))
    # train LSTM
    history = model.fit(np.array(X_train), np.array(y_train), validation_data=(X_test, y_test),
                        epochs=n_epoch, batch_size=n_batch, callbacks=[early_stop_callback,model_checkpoint_callback])
    
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])
    plt.title('model accuracy')
    plt.ylabel('accuracy')
    plt.xlabel('epoch')
    plt.legend(['train', 'test'], loc='upper left')
    plt.savefig("figs/acc/trial_{:d}_acc.png".format(i))
    plt.show()
    # summarize history for loss
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('model loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.legend(['train', 'test'], loc='upper left')
    plt.savefig("figs/loss/trial_{:d}_loss.png".format(i))
    plt.show()
    js = {
        "parameters": trial,
         "history": history.history
    }

    with open("json/trial_{:d}.json".format(i), "w") as outfile:
        json.dump(js, outfile)


In [84]:
def create_trials():
    trials = []
    data_percs = [0.1,0.5]
    n_batchs = [128]
    n_epochs =  [100]
    embed_dims = [30]
    encoder_hids = [128,256]
    decoder_hids = [128,256]
    max_lens = [(20,5),(30,10)]
    for d_h in decoder_hids:
        for n_b in n_batchs:   
            for e_h in encoder_hids: 
                for e_d in embed_dims: 
                    for n_e in n_epochs:
                        for m_l in max_lens:
                            for p in data_percs:
                                trials.append(
                                    {
                                        "n_batch" :  n_b,
                                        "n_epoch" :  n_e,
                                        "data_perc" : p,
                                        "embed_dim" : e_d,
                                        "encoder_hid" : e_h,
                                        "decoder_hid" : d_h,
                                        "max_q_len" : m_l[0],
                                        "max_a_len" : m_l[1]   
                                    })
    return trials

In [87]:
trials = create_trials()
# or alternatively, create this manually: 
# trials.append({"n_batch" :  n_b, "n_epoch" :  n_e, "data_perc" : p,"embed_dim" : e_d, 
#               "encoder_hid" : e_h,"decoder_hid" : d_h, "max_q_len" : m_q_len, "max_a_len" : m_q_len})
# be careful not to overwrite existing files. Change i to i + 10 maybe if you already did 10 trials
for i,trial in enumerate(trials):
    try_experiment(trial,i)

KeyboardInterrupt: 

In [None]:
# evaluate on some new patterns
result = model.predict(X_test[:100], verbose=0)
# calculate error
expected = [one_hot_decode(x, int_to_char) for x in y_test[:100]]
predicted = [one_hot_decode(x, int_to_char) for x in result]
# show some examples
for i in range(100):
	print('Expected=%s, Predicted=%s' % (expected[i], predicted[i]))

In [133]:
# Alternative method to generate dataset, in case our original dataset doesn't work.

valid_characters = '0123456789.+*-/ '
char_to_int = dict((character, index) for index, character in  enumerate(valid_characters))
int_to_char = dict((index, character) for index, character in  enumerate(valid_characters))

number_max = 100 #Up to this number
MAX_Q_LEN = len(str(number_max-1)) * 2 + 1
MAX_A_LEN = MAX_Q_LEN
operators = ['+', '*', '-', '/']
operators_dict = { "+":operator.add, 
                  '*':operator.mul, 
                  "-":operator.sub,
                  '/':operator.truediv}

def oper_generator():
    number_1 = np.random.randint(1,number_max)
    operator_index = np.random.randint(0,len(operators))
    operator = operators[operator_index]
    number_2 = np.random.randint(1,number_max)
    number_1= max(number_1,number_2)
    number_2= min(number_1,number_2)
    operation = str(number_1) + operator + str(number_2)
    result = str(operators_dict[operator](number_1,number_2))[:MAX_A_LEN]
    return ps(operation), ps(result)

def data_generator(training_size,test_size):
    x_train = []
    x_test = []
    y_train = []
    y_test = []
    for i in (range(0, training_size)):
        x, y = oper_generator()
        x_train.append(x)
        y_train.append(y)
    for i in (range(0, test_size)):
        x, y = oper_generator()
        x_test.append(x)
        y_test.append(y)
    X,y = pad_data(x_train,y_train,MAX_Q_LEN,MAX_A_LEN)
    X,y = encode_data(X,y,char_to_int)
    X,y = one_hot_encode(X,y,len(char_to_int))
    X_test,y_test = pad_data(x_test,y_test,MAX_Q_LEN,MAX_A_LEN)
    X_test,y_test = encode_data(X_test,y_test,char_to_int)
    X_test,y_test = one_hot_encode(X_test,y_test,len(char_to_int))
    return X,y,X_test,y_test

data_points = 10000
test_size = 0.2
training_size = int(round(data_points * (1-test_size),0))
test_size = data_points - training_size