In [1]:
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input
from tensorflow.keras.layers import LSTM
from tensorflow.keras.layers import Dense
from numpy import array

## Check return sequences output in LSTM

In [2]:
#define the model
inputs1 = Input(shape=(3,1))
lstm1   = LSTM(1)(inputs1)
model   = Model(inputs = inputs1, outputs = lstm1)
#define input data
data = array([0.1, 0.2, 0.3]).reshape((1,3,1))
#make and show prediction
print(model.predict(data))

[[-0.11731906]]


The above model gives single hidden state for three time steps. We can obtain the hidden state for each time step.

In [3]:
#define the model
inputs1 = Input(shape=(3,1))
lstm1   = LSTM(1, return_sequences=True)(inputs1)
model   = Model(inputs = inputs1, outputs = lstm1)
#define input data
data = array([0.1, 0.2, 0.3]).reshape((1,3,1))
#make and show prediction
print(model.predict(data))

[[[-0.00474961]
  [-0.01266974]
  [-0.02270861]]]


## Return States

In [4]:
#define the model
inputs1 = Input(shape=(3,1))
lstm1, state_h, state_c   = LSTM(1, return_state=True)(inputs1)
model   = Model(inputs = inputs1, outputs = [lstm1, state_h, state_c])
#define input data
data = array([0.1, 0.2, 0.3]).reshape((1,3,1))
#make and show prediction
print(model.predict(data))

[array([[-0.01870343]], dtype=float32), array([[-0.01870343]], dtype=float32), array([[-0.03878213]], dtype=float32)]


## Return States and sequences

In [5]:
#define the model
inputs1 = Input(shape=(3,1))
lstm1, state_h, state_c   = LSTM(1, return_state=True, return_sequences=True)(inputs1)
model   = Model(inputs = inputs1, outputs = [lstm1, state_h, state_c])
#define input data
data = array([0.1, 0.2, 0.3]).reshape((1,3,1))
#make and show prediction
print(model.predict(data))

[array([[[-0.00544518],
        [-0.01447116],
        [-0.0257654 ]]], dtype=float32), array([[-0.0257654]], dtype=float32), array([[-0.04588732]], dtype=float32)]


# Encoder Decoder Example

In [6]:
# returns train, inference_encoder and inference_decoder models
def define_models(n_input, n_output, n_units):
	# define training encoder
	encoder_inputs = Input(shape=(None, n_input))
	encoder = LSTM(n_units, return_state=True)
	encoder_outputs, state_h, state_c = encoder(encoder_inputs)
	encoder_states = [state_h, state_c]
	# define training decoder
	decoder_inputs = Input(shape=(None, n_output))
	decoder_lstm = LSTM(n_units, return_sequences=True, return_state=True)
	decoder_outputs, _, _ = decoder_lstm(decoder_inputs, initial_state=encoder_states)
	decoder_dense = Dense(n_output, activation='softmax')
	decoder_outputs = decoder_dense(decoder_outputs)
	model = Model([encoder_inputs, decoder_inputs], decoder_outputs)
	# define inference encoder
	encoder_model = Model(encoder_inputs, encoder_states)
	# define inference decoder
	decoder_state_input_h = Input(shape=(n_units,))
	decoder_state_input_c = Input(shape=(n_units,))
	decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]
	decoder_outputs, state_h, state_c = decoder_lstm(decoder_inputs, initial_state=decoder_states_inputs)
	decoder_states = [state_h, state_c]
	decoder_outputs = decoder_dense(decoder_outputs)
	decoder_model = Model([decoder_inputs] + decoder_states_inputs, [decoder_outputs] + decoder_states)
	# return all models
	return model, encoder_model, decoder_model

In [7]:
def predict_sequence(infenc, infdec, source, n_steps, cardinality):
	# encode
	state = infenc.predict(source)
	# start of sequence input
	target_seq = array([0.0 for _ in range(cardinality)]).reshape(1, 1, cardinality)
	# collect predictions
	output = list()
	for t in range(n_steps):
		# predict next char
		yhat, h, c = infdec.predict([target_seq] + state)
		# store prediction
		output.append(yhat[0,0,:])
		# update state
		state = [h, c]
		# update target sequence
		target_seq = yhat
	return array(output)

# Scalable Sequence to Sequence problem

In [8]:
from random import randint
from numpy import array
import numpy as np
from numpy import argmax
from tensorflow.keras.utils import to_categorical

#generate a sequence of random integers
def generate_sequence(length, n_unique):
    return [randint(1, n_unique -1 ) for _ in range(length)]

#prepare the data for lstm
def get_dataset(n_in, n_out, cardinality, n_samples):
    X1, X2, y = list(), list(), list()

    for _ in range(n_samples):
        #generate source sequence
        source = generate_sequence(n_in, cardinality)
        #define target sequence
        target = source[:n_out]
        target.reverse()
        #create padded target input
        target_in = [0] + target[:-1]

        #encode
        src_encoded = to_categorical([source], num_classes = cardinality)
        tar_encoded = to_categorical([target], num_classes = cardinality)
        tar2_encoded = to_categorical([target_in], num_classes = cardinality)

        #store
        X1.append(src_encoded)
        X2.append(tar2_encoded)
        y.append(tar_encoded)

    return array(X1), array(X2), array(y)

#decode a one-hot-encoded string
def one_hot_decode(encoded_seq):
    return [argmax(vector) for vector in encoded_seq]



# X1, X2, y = get_dataset(n_steps_in, n_steps_out, n_features, n_samples)
# X1 = X1.reshape((n_samples,n_steps_in,n_features))
# X2 = X2.reshape((n_samples,n_steps_out,n_features))
# y = y.reshape((n_samples,n_steps_out,n_features))
# print(X1.shape, X2.shape, y.shape)
# print('X1=%s, X2=%s, y=%s' % (one_hot_decode(X1[0]), one_hot_decode(X2[0]), one_hot_decode(y[0])))


In [11]:
#configure problem
import sys

n_features = 50 + 1
n_steps_in = 6
n_steps_out = 3
samples = 100000

#define model
train, infenc, infdec = define_models(n_features, n_features, 128)
train.compile(optimizer = 'adam', loss = 'categorical_crossentropy', metrics=['accuracy'])

#generate training dataset
X1, X2, y = get_dataset(n_steps_in, n_steps_out, n_features, samples)
X1 = X1.reshape((samples,n_steps_in,n_features))
X2 = X2.reshape((samples,n_steps_out,n_features))
y = y.reshape((samples,n_steps_out,n_features))
print("Shape of data")
print(X1.shape,X2.shape,y.shape)
print(X1[0])
sys.exit()
#train model
train.fit([X1, X2], y, epochs = 1)

#evaluate LSTM
total, correct = 100, 0
for _ in range(total):
    X1, X2, y = get_dataset(n_steps_in, n_steps_out, n_features, 1)
    X1 = X1.reshape((1,n_steps_in,n_features))
    X2 = X2.reshape((1,n_steps_out,n_features))
    y  = y.reshape((1,n_steps_out,n_features))
    target = predict_sequence(infenc, infdec, X1, n_steps_out, n_features)
    if np.array_equal(one_hot_decode(y[0]), one_hot_decode(target)):
        correct += 1
print('Accuracy: %.2f%%' % (float(correct)/float(total)*100.0))
# spot check some examples
for _ in range(10):
    X1, X2, y = get_dataset(n_steps_in, n_steps_out, n_features, 1)
    X1 = X1.reshape((1,n_steps_in,n_features))
    X2 = X2.reshape((1,n_steps_out,n_features))
    y  = y.reshape((1,n_steps_out,n_features))
    target = predict_sequence(infenc, infdec, X1, n_steps_out, n_features)
    print('X=%s y=%s, yhat=%s' % (one_hot_decode(X1[0]), one_hot_decode(y[0]), one_hot_decode(target)))

Shape of data
(100000, 6, 51) (100000, 3, 51) (100000, 3, 51)
[[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0.
  0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
  0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
  0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
  1. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
  0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0.
  0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0.
  0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
  0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0.
  0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
  0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
  0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.

SystemExit: 