In [None]:
from keras.models import Sequential
from keras import layers
import numpy as np

# Simple Math

## Overview

This is an simple extension to the addition RNN example provided in Keras documentation. Instead of just supporting addition, this will support subtraction, multiplication, and division.

## The Data

To train the RNN, we need a training/validation data set. This needs to be encoded in a vector to be understood by the network.

Here are the constants for our experiments. Maximum length is determined by the number digits for each operand plus one for carry and plus one for possible negative sign on result (negative input not allowed at the moment).

In [None]:
TRAINING_SIZE = 50000
DIGITS = 3
REVERSE = True
MAXLEN = 2 * DIGITS + 2

This will output a random number with the specified number of maximum digits.

In [None]:
operand = lambda: np.random.randint(0, 10 ** DIGITS)

We also need to randomly select an operation.

In [None]:
operator = lambda: np.random.choice([ '+', '-', '*', '/' ])

Lastly, we need to evaluate an randomly generated expression.

In [None]:
def evaluate(a, operator, b):
    
    if operator == '+':
        return a + b
    elif operator == '-':
        return a - b
    elif operator == '*':
        return a * b
    elif operator == '/' and b != 0:
        return a / b
    
    return float('nan')

Now we can put this into a loop to generate a data set.

In [None]:
expressions = []
results = []
seen = set()

while len(expressions) < TRAINING_SIZE:
    
    a, b = operand(), operand()
    op = operator()
    
    if op == '/' and b == 0:
        continue
    
    key = tuple(sorted([a, b])) 
    if key in seen:
        continue
    seen.add(key)
    
    expression = ('{}' + op + '{}').format(a, b)
    expression += ' ' * (MAXLEN - len(expression))
    
    result = str(evaluate(a, op, b))
    if len(result) > MAXLEN:
        result = result[:MAXLEN]
    else:
        result += ' ' * (MAXLEN - len(result))
    
    if REVERSE:
        expression = expression[::-1]
    
    expressions.append(expression)
    results.append(result)

Now that we have the data set, we need to engineer it to work with the network by converting the data points into vectors.

In [None]:
characters = list(' .0123456789+-*/')

X = np.zeros((len(expressions), MAXLEN, len(characters)), dtype=np.bool)
Y = np.zeros((len(results), MAXLEN, len(characters)), dtype=np.bool)

for i in range(len(expressions)):
    for j in range(len(expressions[i])):
        X[i][j][characters.index(expressions[i][j])] = True

for i in range(len(results)):
    for j in range(len(results[i])):
        Y[i][j][characters.index(results[i][j])] = True
        
indices = np.arange(len(X))
np.random.shuffle(indices)
X = X[indices]
Y = Y[indices]

We set aside 10% of the data for validation.

In [None]:
split = len(X) - len(X) // 10

X_train, X_val = X[:split], X[split:]
Y_train, Y_val = Y[:split], Y[split:]

## The Model

Here are the constants for our experiment model.

In [None]:
RNN = layers.LSTM
HIDDEN_SIZE = 128
BATCH_SIZE = 128
LAYERS = 1

The model specification in Keras.

In [None]:
model = Sequential()
model.add(RNN(HIDDEN_SIZE, input_shape=(MAXLEN, len(characters))))
model.add(layers.RepeatVector(MAXLEN))
for _ in range(LAYERS):
    model.add(RNN(HIDDEN_SIZE, return_sequences=True))
model.add(layers.TimeDistributed(layers.Dense(len(characters), activation='softmax')))
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
model.summary()

## Training

In [None]:
for iteration in range(1, 200):
    
    print()
    print('-' * 50)
    print('iteration', iteration)
    
    model.fit(X_train, Y_train, batch_size=BATCH_SIZE, epochs=1, validation_data=(X_val, Y_val))
    
    for i in range(10):
        
        index = np.random.randint(0, len(X_val))
        x, y = X_val[[index]], Y_val[[index]]
        
        prediction = model.predict_classes(x, verbose=0)
        
        decoded_expression = ''        
        for i in range(len(x[0])):
            decoded_expression += characters[np.asscalar(np.argwhere(x[0][i]))]
        decoded_expression = decoded_expression[::-1]
        
        decoded_actual = ''
        for i in range(len(y[0])):
            decoded_actual += characters[np.asscalar(np.argwhere(y[0][i]))]
                
        decoded_prediction = ''
        for i in range(len(prediction[0])):
            decoded_prediction += characters[prediction[0][i]]
        
        print(f'Expression: {decoded_expression} Prediction: {decoded_prediction} Actual: {decoded_actual}')