In [1]:
import numpy as np
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import TimeDistributed, Dense, Dropout, SimpleRNN, RepeatVector
from tensorflow.keras.callbacks import EarlyStopping, LambdaCallback
import tensorflow as tf
from termcolor import colored

In [2]:
# Checking if GPU available
physical_devices = tf.config.experimental.list_physical_devices("GPU")
tf.config.experimental.set_memory_growth(physical_devices[0], True)

In [3]:
chars_for_add = '0123456789+'
num_features = len(chars_for_add)
char_to_index = dict((c, i) for i, c in enumerate(chars_for_add))
index_to_char = dict((i, c) for i, c in enumerate(chars_for_add))

In [4]:
def generate_samples_for_add():
    first = np.random.randint(0,10000)
    second = np.random.randint(0,10000)
    sample = str(first) + '+' + str(second)
    label = str(first+second)
    return sample, label
generate_samples_for_add()

('5899+239', '6138')

In [5]:
hidden_units = 128
max_time_steps = 11

model = Sequential([
    # Simple RNN
    """Fully-connected RNN where output is to be fed back to input.
    units: Positive integer, dimensionality of the output space.
    activation: Activation function to use. Default: hyperbolic 
    tangent (tanh). If you pass None, no activation is applied (ie.
    "linear" activation: a(x) = x)."""
    SimpleRNN(hidden_units, input_shape=(None, num_features)),
    
    # Repeat Vector
    """Repeats the input max_time_steps times"""
    RepeatVector(max_time_steps),
    
    """return_sequences	Boolean. Whether to return the last output in
    the output sequence, or the full sequence. Default: False."""
    SimpleRNN(hidden_units, return_sequences=True),
    
    # Time distributed layer
    """This wrapper allows to apply a layer to every temporal slice of
    an input.
    The input should be at least 3D, and the dimension of index one 
    will be considered to be the temporal dimension."""
    TimeDistributed(Dense(num_features, activation='softmax'))
])

model.compile(
    loss='categorical_crossentropy',
    optimizer='adam',
    metrics=['accuracy']
)

model.summary()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
simple_rnn (SimpleRNN)       (None, 128)               17920     
_________________________________________________________________
repeat_vector (RepeatVector) (None, 11, 128)           0         
_________________________________________________________________
simple_rnn_1 (SimpleRNN)     (None, 11, 128)           32896     
_________________________________________________________________
time_distributed (TimeDistri (None, 11, 11)            1419      
Total params: 52,235
Trainable params: 52,235
Non-trainable params: 0
_________________________________________________________________


In [6]:
def vectorize_sample(sample, label):
    x = np.zeros((max_time_steps, num_features))
    y = np.zeros((max_time_steps, num_features))
    diff_x = max_time_steps - len(sample)
    diff_y = max_time_steps - len(label)
    for i, c in enumerate(sample):
        x[i+diff_x, char_to_index[c]] = 1
    for i in range(diff_x):
        x[i, char_to_index['0']] = 1
    for i, c in enumerate(label):
        y[i+diff_y, char_to_index[c]] = 1
    for i in range(diff_x):
        y[i, char_to_index['0']] = 1
    return x,y    
        
e, l = generate_samples_for_add()
print(e, l)
x, y = vectorize_sample(e, l)
print(x.shape, y.shape)

8812+4109 12921
(11, 11) (11, 11)


In [7]:
def devectorize_sample(sample):
    res = [index_to_char[np.argmax(vec)] for i, vec in enumerate(sample)]
    return ''.join(res)

devectorize_sample(x)

'008812+4109'

In [8]:
devectorize_sample(y)

'00000012921'

In [9]:
# Dataset
def create_dataset(num_samples=5000):
    x = np.zeros((num_samples, max_time_steps, num_features))
    y = np.zeros((num_samples, max_time_steps, num_features))
    for i in range(num_samples):
        e, l = generate_samples_for_add()
        e, l = vectorize_sample(e, l)
        x[i] = e
        y[i] = l
    return x, y    

x, y = create_dataset()
print(x.shape, y.shape)
devectorize_sample(x[78])


(5000, 11, 11) (5000, 11, 11)


'001298+1595'

In [11]:
devectorize_sample(y[78])


'00000002893'

In [None]:
# Model Training
l_cb = LambdaCallback(
    on_epoch_end = lambda e, l:print('{:.2f}'.format(l['val_accuracy']), end=' _ ')
)
es_cb = EarlyStopping(monitor='val_loss', patience=10)
model.fit(x, y, epochs=500, batch_size=256, validation_split=0.2,
         verbose=False, callbacks=[es_cb, l_cb])



In [None]:
x_test, y_test = create_dataset(10)
preds = model.predict(x_test)

for i, pred in enumerate(preds):
    y = devectorize_sample(y_test[i])
    y_pred = devectorize_sample(pred)
    col = 'green'
    if y!=y_pred:
        col='red'
    out = 'Input: '+devectorize_sample(x_test[i])+ ' Out:'+y+' Pred: '+y_pred
    print(colored(out, col))