<a href="https://colab.research.google.com/github/dustejuned/nlp/blob/master/seq_2seq_lstm.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [14]:
from random import randint
import numpy as np
from keras.models import Sequential
from keras.layers import LSTM
from keras.layers import Dense
from keras.layers import Input
from keras.models import Model
from keras.utils import to_categorical

#Get data sets
def get_datasets(input_length, output_length, n_features, n_samples):
    InputSeq, TargetSeq, TargetInput = list(), list(), list()
    for _ in range(n_samples):
        X = [randint(1, n_features-1) for _ in range(input_length)]
        Y = X[:output_length]
        Y.reverse()
        #Create padded target input, one timestamp in future
        Y_in = [0] + Y[:-1]
        encoded_x = to_categorical(X, num_classes=n_features)
        InputSeq.append(encoded_x)
        endcoded_y = to_categorical(Y, num_classes=n_features)
        TargetSeq.append(endcoded_y)
        encoded_y_in = to_categorical(Y_in, num_classes=n_features)
        TargetInput.append(encoded_y_in)
    return np.array(InputSeq), np.array(TargetSeq), np.array(TargetInput)

def define_model(input_length, output_length, n_units):
    #define encoder
    encoder_inputs = Input(shape=(None, input_length))
    encoder = LSTM(n_units, return_state=True)
    encoder_outputs, e_state_h, e_state_c = encoder(encoder_inputs)
    #just need encoder states
    e_states = [e_state_h, e_state_c]
    #define decoder
    decoder_inputs = Input(shape=(None, output_length))
    decoder = LSTM(n_units, return_sequences=True, return_state=True)
    #we just need decoder output
    decoded_outputs, _, _ = decoder(decoder_inputs, initial_state=e_states)
    decoder_dense = Dense(output_length, activation='softmax')
    decoded_outputs = decoder_dense(decoded_outputs)
    model = Model([encoder_inputs, decoder_inputs], decoded_outputs)
    #define inference encoder
    inf_encoder = Model(encoder_inputs, e_states)

    #defince inference decoder
    inf_decoder_state_h = Input(shape=(n_units,))
    inf_decoder_state_c = Input(shape=(n_units,))
    inf_decoder_state_inputs = [inf_decoder_state_h, inf_decoder_state_c]
    decoded_outputs, decoder_state_h, decoder_state_c = decoder(decoder_inputs, initial_state=inf_decoder_state_inputs)
    decoder_states = [decoder_state_h, decoder_state_c]
    decoded_outputs = decoder_dense(decoded_outputs)
    inf_decoder = Model([decoder_inputs] + inf_decoder_state_inputs, [decoded_outputs] + decoder_states)
    #return all models
    return model, inf_encoder, inf_decoder

def one_hot_decode(encoded_seq):
    return [np.argmax(vector) for vector in encoded_seq]

def predict_seq(inf_encoder, inf_decoder, source, n_steps, n_features):
    states = inf_encoder.predict(source)
    target = np.array([0 for _ in range(n_features)]).reshape(1, 1, n_features)
    output_seq = []

    for _ in range(n_steps):
        output, h, c = inf_decoder.predict([target] + states)
        output_seq.append(output[0, 0, :])
        states = [h, c]
        target = output

    return np.array(output_seq)



#Train model and predict sequences
input_lenght = 6
output_length = 4
n_features = 60
train_input, train_target, train_target_input = get_datasets(input_lenght, output_length, n_features, 10000)

model, inf_encoder, inf_decoder = define_model(n_features, n_features, 128)

print(model.summary())

model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['acc'])

model.fit([train_input, train_target_input], train_target, epochs=10)

total = 100
correct = 0
#calculate accuracy of model
for _ in range(total):
    test_input, test_target, test_target_input = get_datasets(input_lenght, output_length, n_features, 1)
    target = predict_seq(inf_encoder, inf_decoder, test_input, output_length, n_features)
    print('Input Sequence: %s, Predicted: %s, Actual: %s' %(one_hot_decode(test_input[0]), one_hot_decode(target), one_hot_decode(test_target[0])))
    if np.array_equal(one_hot_decode(target), one_hot_decode(test_target[0])):
        correct += 1
print('accuracy is %s' % (float(correct)/float(total)*100))







__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_21 (InputLayer)           (None, None, 60)     0                                            
__________________________________________________________________________________________________
input_22 (InputLayer)           (None, None, 60)     0                                            
__________________________________________________________________________________________________
lstm_11 (LSTM)                  [(None, 128), (None, 96768       input_21[0][0]                   
__________________________________________________________________________________________________
lstm_12 (LSTM)                  [(None, None, 128),  96768       input_22[0][0]                   
                                                                 lstm_11[0][1]                    
          