##Purpose of this notebook is to verify the seq2seq model coding (for both LSTM and bi-direction LSTM) via a dummy dataset with fixed sequence length

- where both input and output are one-hot encoded

In [None]:
import numpy as np
from numpy import array
from numpy import argmax
from random import randint
from numpy import array_equal
import tensorflow as tf
from tensorflow.keras.utils import to_categorical
from keras.models import Model
from keras.layers import Input, LSTM, Dense

#Dummy dataset creation 
- with fixed sequence length of 10

In [None]:
#Function to generate a sequence of integers (e.g. from 0 to 499) 
def generate_sequence(length, n_unique):
	return [randint(0, n_unique-1) for _ in range(length)]

In [None]:
#n_unique -> no. of unique digits (0,1,2,3,4,,...,499) to be chosen for use in each sample of input sequence
#n_samples -> total no. of samples in dataset

#Function to return arrays of dataset for encoder input, decoder input and decoder output
def get_dataset(n_unique, n_samples):

    encoder_input = list()
    decoder_input = list()
    decoder_output = list()

    for _ in range(n_samples):

        length = 10
        #length = randint(20, 267)
        
        encoder_ip = list()
        encoder_ip_x = generate_sequence(length, n_unique) #list of x-coordinates of encoder input
        encoder_ip_y = generate_sequence(length, n_unique) #list y-coordinates of encoder input

        for i in range(length):
            coordinate_pair = (encoder_ip_x[i], encoder_ip_y[i])
            encoder_ip.append(coordinate_pair)
        while len(encoder_ip) < 10: 
          encoder_ip.append([500,500]) 

        x_sum = 0
        x_coord_running_sum = list()  #list of x_coord running sum
        for i in range(length):
            x_sum = x_sum + encoder_ip[i][0]
            x_coord_running_sum.append(x_sum)

        y_sum = 0
        y_coord_running_sum = list()  #list of y_coord running sum
        for i in range(length):
            y_sum = y_sum + encoder_ip[i][1]
            y_coord_running_sum.append(y_sum)

        decoder_op = list()
        for i in range(length): 
            if (x_coord_running_sum[i] + y_coord_running_sum[i])%2 == 0: #if sum of x and y coord is even, output is even; else output is odd
                op = 0
            else:
                op = 1
            decoder_op.append(op)
        while len(decoder_op) < 10:
          decoder_op.append(0)

        decoder_ip =  [2] + decoder_op[:-1] #decoder_ip is one time-step ahead of decoder_op

        encoder_input.append(encoder_ip)
        decoder_input.append(decoder_ip)
        decoder_output.append(decoder_op)

    X1=np.array(encoder_input)
    X2=np.array(decoder_input).reshape(n_samples,10,1)
    Y=np.array(decoder_output).reshape(n_samples,10,1)

    #One-hot-encode
    encoder_input_onehot=list()
    for i in range(X2.shape[0]):
        row=list()
        for j in range(10):
            row.append(X1[i][j][0])
            row.append(X1[i][j][1])
        encoder_input_onehot.append(row)
    encoder_input_onehot = np.array(encoder_input_onehot)
    encoder_input_onehot = to_categorical([encoder_input_onehot], num_classes=10)

    decoder_input_onehot = to_categorical([X2], num_classes=3)
    decoder_output_onehot = to_categorical([Y], num_classes=3)

    encoder_input_onehot = encoder_input_onehot.reshape(n_samples,20,10)
    decoder_input_onehot = decoder_input_onehot.reshape(n_samples,10,3)
    decoder_output_onehot = decoder_output_onehot.reshape(n_samples,10,3)

    return encoder_input_onehot, decoder_input_onehot, decoder_output_onehot
   

In [None]:
n_unique = 10
n_samples = 100000
X1, X2, Y = get_dataset(n_unique, n_samples)

In [None]:
print(X1.shape,X2.shape,Y.shape)

(100000, 20, 10) (100000, 10, 3) (100000, 10, 3)


In [None]:
n_unique = 10
n_samples = 10000
X1_valid, X2_valid, Y_valid = get_dataset(n_unique, n_samples)

In [None]:
print(X1_valid.shape,X2_valid.shape,Y_valid.shape)

(10000, 20, 10) (10000, 10, 3) (10000, 10, 3)


#Model creation and training

In [None]:
#Unidirectional

# returns train, inference_encoder and inference_decoder models
def define_models(n_features_input, n_features_output, n_units):

	#define training encoder
	encoder_inputs = Input(shape=(None, n_features_input))
	encoder = LSTM(n_units, return_state=True)
	encoder_outputs, state_h, state_c = encoder(encoder_inputs)
	encoder_states = [state_h, state_c]

	#define training decoder
	decoder_inputs = Input(shape=(None, n_features_output))
	decoder_lstm = LSTM(n_units, return_sequences=True, return_state=True)
	decoder_outputs, _, _ = decoder_lstm(decoder_inputs, initial_state=encoder_states)
	decoder_dense = Dense(n_features_output, activation='softmax')
	decoder_outputs = decoder_dense(decoder_outputs)
	model = Model([encoder_inputs, decoder_inputs], decoder_outputs)
 
	#define inference encoder
	encoder_model = Model(encoder_inputs, encoder_states)
 
	#define inference decoder
	decoder_state_input_h = Input(shape=(n_units,))
	decoder_state_input_c = Input(shape=(n_units,))
	decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]
	decoder_outputs, state_h, state_c = decoder_lstm(decoder_inputs, initial_state=decoder_states_inputs)
	decoder_states = [state_h, state_c]
	decoder_outputs = decoder_dense(decoder_outputs)
	decoder_model = Model([decoder_inputs] + decoder_states_inputs, [decoder_outputs] + decoder_states)
 
	#return all models
	return model, encoder_model, decoder_model

In [None]:
# #Bidirectional

# # returns train, inference_encoder and inference_decoder models
# def define_models(n_features_input, n_features_output, n_units):

# 	# define training encoder
# 	encoder_inputs = Input(shape=(None, n_features_input))
# 	encoder = Bidirectional(LSTM(n_units, return_state=True))                     
# 	encoder_outputs, forward_h, forward_c, backward_h, backward_c = encoder(encoder_inputs)
# 	state_h = Concatenate()([forward_h, backward_h])
# 	state_c = Concatenate()([forward_c, backward_c])
# 	encoder_states = [state_h, state_c]

# 	# define training decoder
# 	decoder_inputs = Input(shape=(None, n_features_output))
# 	decoder_lstm = LSTM(n_units*2, return_sequences=True, return_state=True)      #Multiple by 2 cuz of bidirectional
# 	decoder_outputs, _, _ = decoder_lstm(decoder_inputs, initial_state=encoder_states)
# 	decoder_dense = Dense(n_features_output, activation='softmax')
# 	decoder_outputs = decoder_dense(decoder_outputs)
# 	model = Model([encoder_inputs, decoder_inputs], decoder_outputs)
 
# 	# define inference encoder
# 	encoder_model = Model(encoder_inputs, encoder_states)
 
# 	# define inference decoder
# 	decoder_state_input_h = Input(shape=(n_units*2,))                             #Multiple by 2 cuz of bidirectional
# 	decoder_state_input_c = Input(shape=(n_units*2,))                             #Multiple by 2 cuz of bidirectional
# 	decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]
# 	decoder_outputs, state_h, state_c = decoder_lstm(decoder_inputs, initial_state=decoder_states_inputs)
# 	decoder_states = [state_h, state_c]
# 	decoder_outputs = decoder_dense(decoder_outputs)
# 	decoder_model = Model([decoder_inputs] + decoder_states_inputs, [decoder_outputs] + decoder_states)
 
# 	# return all models
# 	return model, encoder_model, decoder_model

In [None]:
# configure problem
n_features_input = 10
n_features_output = 3
n_units = 128
# define model
train, infenc, infdec = define_models(n_features_input, n_features_output, n_units)
train.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# generate training dataset
print(X1.shape,X2.shape,Y.shape)
# train model
history = train.fit([X1, X2], Y, batch_size=32, epochs=50)


(100000, 20, 10) (100000, 10, 3) (100000, 10, 3)
Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


In [None]:
# generate target given source sequence
def predict_sequence(infenc, infdec, input, n_steps_out, n_features): 
  #n_steps_out->no. of time-step in output
  #n_features->no. of features in output
	# encode
	state = infenc.predict(input)
	# start of sequence input
	target_seq = array([0.0 for _ in range(n_features)]).reshape(1, 1, n_features)
	# collect predictions
	output = list()
	for t in range(n_steps_out):
		# predict next char
		yhat, h, c = infdec.predict([target_seq] + state)
		# store prediction
		output.append(yhat[0,0,:])
		# update state
		state = [h, c]
		# update target sequence
		target_seq = yhat
	return array(output)

# decode a one hot encoded string
def one_hot_decode(encoded_seq):
	return [argmax(vector) for vector in encoded_seq]

#Evaluate with test data

In [None]:
n_unique = 10
n_samples = 1
n_steps_out = 10
n_features = 3

total, correct = 100, 0
for _ in range(total):
	X1_test, X2_test, y_test = get_dataset(n_unique, n_samples)
	target = predict_sequence(infenc, infdec, X1_test, n_steps_out, n_features)
	if array_equal(one_hot_decode(y_test[0]), one_hot_decode(target)):
		correct += 1
print('Accuracy: %.2f%%' % (float(correct)/float(total)*100.0))

Accuracy: 100.00%


###Evaluate with one test sample

In [None]:
n_unique = 10
n_samples = 1
n_steps_out = 10
n_features = 3
X1_test, X2_test, y_test = get_dataset(n_unique, n_samples)

In [None]:
prediction = predict_sequence(infenc, infdec, X1_test, n_steps_out, n_features)

In [None]:
prediction = one_hot_decode(prediction)

In [None]:
prediction

[0, 1, 0, 0, 0, 0, 1, 0, 0, 1]

In [None]:
y_test

array([[[1., 0., 0.],
        [0., 1., 0.],
        [1., 0., 0.],
        [1., 0., 0.],
        [1., 0., 0.],
        [1., 0., 0.],
        [0., 1., 0.],
        [1., 0., 0.],
        [1., 0., 0.],
        [0., 1., 0.]]], dtype=float32)

###Evaluate with train data (optional)

In [None]:
X1[0].shape

(20, 10)

In [None]:
X1_train = X1[0].reshape(1,20,10)

In [None]:
prediction = predict_sequence(infenc, infdec, X1_train, n_steps_out, n_features)


In [None]:
prediction = one_hot_decode(prediction)

In [None]:
prediction

[1, 0, 0, 0, 1, 1, 1, 1, 1, 1]

In [None]:
Y[0]

array([[0., 1., 0.],
       [1., 0., 0.],
       [1., 0., 0.],
       [1., 0., 0.],
       [0., 1., 0.],
       [0., 1., 0.],
       [0., 1., 0.],
       [0., 1., 0.],
       [0., 1., 0.],
       [0., 1., 0.]], dtype=float32)