In [None]:
import numpy as np

from tensorflow.keras.models import Sequential

from tensorflow.keras.layers import TimeDistributed, Dense, Dropout, SimpleRNN, RepeatVector
from tensorflow.keras.callbacks import EarlyStopping, LambdaCallback

from termcolor import colored

In [None]:
#Specify all the elements to be used
all_chars  = '0123456789+'

In [None]:
#enumerate(all_chars) will append an index to the left of each element in all_chars
dict(enumerate(all_chars))

{0: '0',
 1: '1',
 2: '2',
 3: '3',
 4: '4',
 5: '5',
 6: '6',
 7: '7',
 8: '8',
 9: '9',
 10: '+'}

In [None]:
#Create a dictionary mapping the character to its index, and vice versa
char_to_idx = dict((c,i) for i,c in enumerate(all_chars))
idx_to_char = dict((i,c) for i,c in enumerate(all_chars))

In [None]:
#Define a function that returns two random numbers and create an example and label from these 2 numbers

def generate_data():
  first = np.random.randint(0,100)
  second = np.random.randint(0,100)
  example = str(first) + '+' + str(second)
  label = str(first+second)

  return example, label

generate_data()

('68+44', '112')

In [None]:
#There are a maximum of 5 elements in the training example (two 2 digit numbers and 1 + sign)
max_timesteps=5
#Total number of unique characters
num_features = len(all_chars)


model = Sequential([
                    #Encoder part of the network
                    #Since return_sequence = False, it will return a single vector (of the last value)
                    SimpleRNN(128,input_shape=(None,num_features)),
                    #Repeat the SimpleRNN for 'max_timesteps'. Because your next RNN layer requires 3D input
                    RepeatVector(max_timesteps),

                    #Define the decoder part of your model
                    SimpleRNN(128,return_sequences=True),
                    #TimeDistributed will apply the dense layer to all timesteps. Will return the probability of a character in the sequence
                    TimeDistributed(Dense(num_features,activation='softmax'))
])

model.compile(
    loss = 'categorical_crossentropy',
    optimizer = 'adam',
    metrics=['accuracy']
)

model.summary()

Model: "sequential_2"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
simple_rnn_5 (SimpleRNN)     (None, 128)               17920     
_________________________________________________________________
repeat_vector_3 (RepeatVecto (None, 5, 128)            0         
_________________________________________________________________
simple_rnn_6 (SimpleRNN)     (None, 5, 128)            32896     
_________________________________________________________________
time_distributed_2 (TimeDist (None, 5, 11)             1419      
Total params: 52,235
Trainable params: 52,235
Non-trainable params: 0
_________________________________________________________________


In [None]:
#OHE the example and labels
def vectorize(example,label):
  #Set x and y to be a matrix of 5 by 11 (there are 5 elements and each element is a value out of 11 different items)
  x = np.zeros((max_timesteps,num_features))
  y = np.zeros((max_timesteps,num_features))

  #Get the difference between the length of examples (May not be 5, as the random numbers may not be both 2 digits long) and 5
  diff_x = max_timesteps - len(example)
  diff_y = max_timesteps - len(label)

  for i , c in enumerate(example):
    #Set the value of the index whose element is present to 1
    #e.g: 7+5
    #Will set the (3,7) element of the x matrix to be 1. (Because diff_x is 2, so the element '7' is the 3rd row and since it has value=7, it will refer to the 7th column)
    x[i+diff_x,char_to_idx[c]] = 1
  
  #Pad any extra values (If the eg has length 4, will give the value of the first row of the x matrix to be 0 (give the value 1 for the index where value=0) )
  for i in range(diff_x):
    x[i,char_to_idx['0']] = 1

  for i, c in enumerate(label):
    y[i+diff_y,char_to_idx[c]] = 1
  for i in range(diff_y):
    y[i,char_to_idx['0']] = 1

  return x,y

In [None]:
e,l = generate_data()
print(e)
print(l)

x , y = vectorize(e,l)

print(x)
print(y)

95+4
99
[[1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0.]
 [0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1.]
 [0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0.]]
[[1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0.]]


In [None]:
def devectorize(example):
  result = [idx_to_char[np.argmax(i)] for i in example]
  return ''.join(result)

In [None]:
devectorize(x)

'095+4'

In [None]:
#Create a dataset of x and y. This will be a 3D matrix of the One Hot Encodings of the xs and ys
def create_dataset(num_examples=2000):
  x = np.zeros([num_examples,max_timesteps,num_features])
  y = np.zeros([num_examples,max_timesteps,num_features])
  for i in range(num_examples):
    e,l = generate_data()
    e_v , l_v = vectorize(e,l)
    #This refers to the ith example in the matrix. This will have shape (max_timesteps, num_features)
    x[i] = e_v
    y[i] = l_v
  return x,y

In [None]:
x, y = create_dataset()

print(devectorize(x[0]))
print(devectorize(y[0]))

29+36
00065


In [None]:
#Define callback functions

#Print the val_acc after every epoch end
l_cb = LambdaCallback(on_epoch_end=lambda e,l:print(l['val_accuracy']))

#Wait for 10 more epochs when val_loss no longer changes, before stopping the training
es_cb = EarlyStopping(monitor='val_loss',patience=10)

model.fit(x,y,epochs=500,batch_size=256,validation_split=0.2,verbose=0,callbacks=[es_cb,l_cb])

0.9524999856948853
0.953499972820282
0.949999988079071
0.9505000114440918
0.9505000114440918
0.9480000138282776
0.9490000009536743
0.9470000267028809
0.9505000114440918
0.9505000114440918
0.9509999752044678
0.9484999775886536
0.9514999985694885
0.9505000114440918


<tensorflow.python.keras.callbacks.History at 0x7f1aaa7740b8>

In [None]:
#Create test dataset and predict on it

x_test , y_test = create_dataset(10)
#Predict based on the x values. This will return a OH encoded vector of y
preds = model.predict(x_test)

for i, pred in enumerate(preds):
  #Get the true value of y
  y = devectorize(y_test[i])
  #Get the predicted value of y
  y_pred = devectorize(pred)
  col = 'green'
  if y != y_pred:
    col = 'red'
  out = f'Input:{devectorize(x_test[i])} Output:{y}  Pred: {y_pred}'
  print(colored(out,col))

[32mInput:89+24 Output:00113  Pred: 00113[0m
[32mInput:14+73 Output:00087  Pred: 00087[0m
[32mInput:064+0 Output:00064  Pred: 00064[0m
[31mInput:05+12 Output:00017  Pred: 00015[0m
[31mInput:49+99 Output:00148  Pred: 00147[0m
[32mInput:84+82 Output:00166  Pred: 00166[0m
[32mInput:80+92 Output:00172  Pred: 00172[0m
[32mInput:65+43 Output:00108  Pred: 00108[0m
[31mInput:047+0 Output:00047  Pred: 00055[0m
[32mInput:41+42 Output:00083  Pred: 00083[0m
