In [0]:
import tensorflow as tf
import numpy as np

tf.enable_eager_execution()

In [0]:
class Constants(object):
    # RNN cells and layer types.
    LSTM = 'lstm'
    GRU = 'gru'
    DENSE = "dense"  # Fully connected layer.
    TCN = "tcn"  # Temporal convolutional layer, i.e., causal 1D convolution.
    
    # Activation functions.
    RELU = 'relu'
    ELU = 'elu'
    SIGMOID = 'sigmoid'
    SOFTPLUS = 'softplus'
    TANH = 'tanh'
    SOFTMAX = 'softmax'
    LRELU = 'lrelu'
    CLRELU = 'clrelu'  # Clamped leaky relu.

In [0]:
C = Constants()
tf_input_step = tf.Variable(np.random.normal(0,1, (10, 2)), dtype=tf.float32)
tf_input_seq = tf.Variable(np.random.normal(0,1, (10, 20, 2)), dtype=tf.float32)

In [0]:
class Activations(object):
    @classmethod
    def get(cls, type_str):
        # Check if the activation is already callable.
        if callable(type_str):
            return type_str

        # Check if the activation is a built-in or custom function.
        if type_str == C.RELU:
            return tf.nn.relu
        elif type_str == C.ELU:
            return tf.nn.elu
        elif type_str == C.TANH:
            return tf.nn.tanh
        elif type_str == C.SIGMOID:
            return tf.nn.sigmoid
        elif type_str == C.SOFTPLUS:
            return tf.nn.softplus
        elif type_str == C.SOFTMAX:
            return tf.nn.softmax
        elif type_str == C.LRELU:
            return lambda x: tf.nn.leaky_relu(x, alpha=1. / 3.)
        elif type_str == C.CLRELU:
            with tf.name_scope('ClampedLeakyRelu'):
                return lambda x: tf.clip_by_value(tf.nn.leaky_relu(x, alpha=1. / 3.), -3.0, 3.0)
        elif type_str is None:
            return None
        else:
            err_unknown_type(type_str)
            

class RNNCells(object):
    @classmethod
    def get(cls, type_str, units, layers=1, **kwargs):
        cells = []

        for i in range(layers):
            if type_str == C.LSTM:
                cells.append(tf.keras.layers.LSTMCell(units))
            elif type_str == C.GRU:
                cells.append(tf.keras.layers.GRUCell(units))
            else:
                err_unknown_type(type_str)

        if layers > 1:
            return tf.keras.layers.StackedRNNCells(cells)
        else:
            return cells[0]
          
          
class DenseLayer(tf.keras.models.Sequential):
    """
    Stacks a number of dense layers by allowing applying dropout on the inputs and activation function on the outputs
    of every dense layer.
    """
    def __init__(self, units, layers, activation, dropout_rate=0, **kwargs):
        super(DenseLayer, self).__init__(**kwargs)

        self.num_units = units if isinstance(units, list) else [units] * layers
        self.num_layers = layers
        self.dropout_rate = dropout_rate if isinstance(dropout_rate, list) else [dropout_rate] * self.num_layers
        self.activation_fn = Activations.get(activation)

        for idx in range(self.num_layers):
            if self.dropout_rate[idx] > 0:
                self.add(tf.keras.layers.Dropout(self.dropout_rate[idx], name=self.name + "_dropout" + str(idx)))
            self.add(tf.keras.layers.Dense(self.num_units[idx], self.activation_fn, name=self.name + "_" + str(idx)))

    def call(self, inputs, training=None, mask=None):
        out = super(DenseLayer, self).call(inputs, training=training, mask=mask)
        return out

    def get_config(self):
        base_config = super(DenseLayer, self).get_config()
        return base_config
      
      
class RNNSeq2Seq(tf.keras.models.Sequential):
    """
    A sequence to sequence model for temporal data. The encoder and decoder networks are RNN instances.
    """
    def __init__(self, latent_units, cell_units, cell_layers, cell_type, activation, bidirectional_encoder=False,
                 return_logits=True, **kwargs):
        """
        Args:
            latent_units: latent representation dimensionality.
            cell_units: encoder/decoder rnn cell/output size.
            cell_layers: number of encoder/decoder rnn cells.
            cell_type (str): 'lstm' or 'gru'.
            activation: activation function to be applied after dense layers.
            bidirectional_encoder (bool):
            return_logits (bool): if True, the output is the same size with encoder inputs. Otherwise, it is decoder
                cell's output. It is useful for stacking layers or making arbitrary predictions.
            **kwargs:
        """
        super(RNNSeq2Seq, self).__init__(**kwargs)

        self.num_latent_units = latent_units
        self.num_cell_units = cell_units
        self.num_cell_layers = cell_layers
        self.cell_type = cell_type
        self.activation_fn = Activations.get(activation)
        self.is_bidirectional_encoder = bidirectional_encoder
        self.return_logits = return_logits

        # Encoder and decoder cells.
        self.encoder_cell = RNNCells.get(self.cell_type, self.num_cell_units, self.num_cell_layers)
        self.decoder_cell = RNNCells.get(self.cell_type, self.num_cell_units, self.num_cell_layers)

        # Encoder network.
        self.encoder = tf.keras.Sequential()
        self.encoder.add(tf.keras.layers.Dense(self.num_cell_units, activation=self.activation_fn))
        if self.is_bidirectional_encoder:
            self.encoder.add(tf.keras.layers.Bidirectional(tf.keras.layers.RNN(self.encoder_cell)))
        else:
            self.encoder.add(tf.keras.layers.RNN(self.encoder_cell))
        self.encoder.add(tf.keras.layers.Dense(latent_units, activation=None))

        # Decoder network.
        self.decoder = tf.keras.Sequential()
        self.decoder.add(tf.keras.layers.Dense(self.num_cell_units, activation=self.activation_fn))
        self.decoder.add(tf.keras.layers.RNN(self.decoder_cell, return_sequences=True))

    def build(self, input_shape=None):
        # Decoder output is in the same size with the encoder inputs.
        if not self.return_logits:
            output_units = input_shape[-1]
            self.decoder.add(tf.keras.layers.Dense(output_units, activation=None))

    def call(self, inputs, decoder_inputs=None, output_len=None, training=None, mask=None, **kwargs):
        """
        Given an input sequence, calculates the embedding and predicts a sequence. If decoder_inputs is passed, then
        the decoder is fed with the same input embedding and corresponding decoder_inputs step. If decoder_inputs is
        None and output_len passed, then the decoder is fed with its own predictions at the next step.
        The length of the output sequence is determined by either the length of the decoder_inputs or or output_len.

        Args:
            inputs: [batch_size, seq_len, feature_size]
            decoder_inputs: [batch_size, seq_len, feature_size]
            output_len (int): length of output sequence.
            **kwargs:
        Returns:
            [batch_size, seq_len, num_cell_units] if return_logits is True else [batch_size, seq_len, feature_size]
        """
        assert decoder_inputs is not None or output_len is not None, "One of the decoder_inputs or output_len must be provided."

        embedding = self.encoder(inputs)  # [batch_size, latent_size]
        if decoder_inputs is not None:
            output_len = decoder_inputs.shape[1]
            embedding_seq = tf.tile(tf.expand_dims(embedding, axis=1), (1, output_len, 1))
            emb_dec_input = tf.concat([embedding_seq, decoder_inputs], axis=-1)
            return self.decoder(emb_dec_input)
        else:
            err_not_implemented("autoregressive decoder.")

    def autoregressive_call(self, embedding, num_steps, initial_step=None):
        pass

    def get_config(self):
        base_config = super(RNNSeq2Seq, self).get_config()
        return base_config

In [0]:
cell = RNNCells.get("lstm", 64, 1)
rnn_model = tf.keras.layers.RNN(cell)

dense_model = DenseLayer(128, 2, None, name=None)

seq2seq_model = RNNSeq2Seq(13, 64, 1, C.LSTM, C.RELU)

In [0]:
seq_out = rnn_model(tf_input_seq)
dense_out = dense_model(tf_input_step)
seq2seq_out = seq2seq_model(tf_input_seq, decoder_inputs=tf_input_seq)

In [18]:
print(seq2seq_out.shape)
seq2seq_model.summary()

(10, 20, 64)
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
lstm_cell_10 (LSTMCell)      multiple                  33024     
_________________________________________________________________
lstm_cell_11 (LSTMCell)      multiple                  33024     
_________________________________________________________________
sequential_6 (Sequential)    multiple                  34061     
_________________________________________________________________
sequential_7 (Sequential)    multiple                  34048     
Total params: 68,109
Trainable params: 68,109
Non-trainable params: 0
_________________________________________________________________


In [8]:
dense_model.summary()

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense_layer_0 (Dense)        multiple                  384       
_________________________________________________________________
dense_layer_1 (Dense)        multiple                  16512     
Total params: 16,896
Trainable params: 16,896
Non-trainable params: 0
_________________________________________________________________


In [20]:
dense_model.outputs

[]

In [15]:
tf.shape(dense_out)[1].numpy()

128