In [1]:
import tensorflow as tf
import numpy as np
import keras
from tensorflow.keras import layers
from CFN_impl import CFNCell

In [3]:
class CFNCell(tf.keras.layers.AbstractRNNCell):
    def __init__(self, num_units, **kwargs) -> None:
        super(CFNCell,self).__init__(**kwargs)
        self._num_units = num_units
        # self.state_size = tf.TensorShape([num_units])
        # self.output_size = tf.TensorShape([num_units])
        self._activation = tf.keras.activations.tanh

    @property
    def state_size(self):
      return self._num_units

    def build(self, inputs_shape):
        #super().build(inputs_shape)
        self._weights_U_theta = self.add_weight(
            "gate/U_theta",
            shape=(self._num_units, self._num_units),
            initializer=tf.keras.initializers.RandomUniform(minval=-0.07,maxval=0.07),
            trainable=True   #
            )
    
        self._weights_V_theta = self.add_weight(
            "gate/V_theta",
            shape=(inputs_shape[-1], self._num_units),
            initializer=tf.keras.initializers.RandomUniform(minval=-0.07,maxval=0.07),   ##tf.random_uniform_initializer(dtype = self.dtype, minval=-0.07,maxval=0.07)
            trainable=True
            )
        
        self._bias_theta = self.add_weight(
            "gate/b_theta",
            shape=(self._num_units),
            initializer= tf.keras.initializers.Constant(value=1), ##tf.ones_initializer(dtype=self.dtype))
            trainable=True
            )
        
        self._weights_U_n = self.add_weight(
            "gate/U_n",
            shape=(self._num_units, self._num_units),
            initializer=tf.keras.initializers.RandomUniform(minval=-0.07,maxval=0.07),
            trainable=True
            )
        
        self._weights_V_n = self.add_weight(
            "gate/V_n",
            shape=(inputs_shape[-1], self._num_units),
            initializer=tf.keras.initializers.RandomUniform(minval=-0.07,maxval=0.07),
            trainable=True
            )
        
        self._bias_n = self.add_weight(
            "gate/b_n",
            shape=(self._num_units),
            initializer= tf.keras.initializers.Constant(value=-1), ##MinusOnes(dtype=self.dtype)
            trainable=True
            )
        self._W =  self.add_weight(
            "kernel",
            shape=(inputs_shape[-1], self._num_units),
            initializer=tf.keras.initializers.RandomUniform(minval=-0.07,maxval=0.07),
            trainable=True
            )

        self.built = True

    def call(self, inputs, state):
        states=state[0]
        theta = tf.keras.activations.sigmoid(
            tf.add(tf.add(tf.matmul(states, self._weights_U_theta),
                    tf.matmul(inputs, self._weights_V_theta)),
                    self._bias_theta)
        )
        eta = tf.keras.activations.sigmoid(
            tf.add(tf.add(tf.matmul(states, self._weights_U_n),
                    tf.matmul(inputs, self._weights_V_n)),
                    self._bias_n)
        )
        output = tf.add(
                    tf.math.multiply(theta, self._activation(states)),
                    tf.math.multiply(eta, self._activation(tf.matmul(inputs, self._W)))
        )
        return output ,output

In [2]:
cfncell = CFNCell(64)
lstmcell = tf.keras.layers.LSTMCell
lstmlayer = tf.keras.layers.RNN(lstmcell(32), return_state=True)
cfnlayer = tf.keras.layers.RNN(cfncell)

In [3]:
encoder_vocab = 1000
decoder_vocab = 2000

encoder_input = layers.Input(shape=(None,))
encoder_embedded = layers.Embedding(input_dim=encoder_vocab, output_dim=64)(
    encoder_input
)

# Return states in addition to output
encoderlayer = tf.keras.layers.RNN(CFNCell(64),return_state=True, name="encoder")
output, encoder_state = encoderlayer(
    encoder_embedded
)
#encoder_state = [state_h, state_c]

decoder_input = layers.Input(shape=(None,))
decoder_embedded = layers.Embedding(input_dim=decoder_vocab, output_dim=64)(
    decoder_input
)
decoderlayer = tf.keras.layers.RNN(CFNCell(64), name="decoder")
# Pass the 2 states to a new LSTM layer, as initial state
decoder_output = decoderlayer(
    decoder_embedded, initial_state=encoder_state
)
output = layers.Dense(10)(decoder_output)

model = keras.Model([encoder_input, decoder_input], output)
model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, None)]       0           []                               
                                                                                                  
 input_2 (InputLayer)           [(None, None)]       0           []                               
                                                                                                  
 embedding (Embedding)          (None, None, 64)     64000       ['input_1[0][0]']                
                                                                                                  
 embedding_1 (Embedding)        (None, None, 64)     128000      ['input_2[0][0]']                
                                                                                              

2023-04-12 10:03:00.991001: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:936] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2023-04-12 10:03:01.026629: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:936] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2023-04-12 10:03:01.026709: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:936] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2023-04-12 10:03:01.027227: I tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags

In [32]:
class NestedCell(keras.layers.Layer):
    def __init__(self, unit_1, unit_2, unit_3, **kwargs):
        self.unit_1 = unit_1
        self.unit_2 = unit_2
        self.unit_3 = unit_3
        self.state_size = [tf.TensorShape([unit_1]), tf.TensorShape([unit_2, unit_3])]
        self.output_size = [tf.TensorShape([unit_1]), tf.TensorShape([unit_2, unit_3])]
        super(NestedCell, self).__init__(**kwargs)

    def build(self, input_shapes):
        # expect input_shape to contain 2 items, [(batch, i1), (batch, i2, i3)]
        i1 = input_shapes[0][1]
        i2 = input_shapes[1][1]
        i3 = input_shapes[1][2]

        self.kernel_1 = self.add_weight(
            shape=(i1, self.unit_1), initializer="uniform", name="kernel_1"
        )
        self.kernel_2_3 = self.add_weight(
            shape=(i2, i3, self.unit_2, self.unit_3),
            initializer="uniform",
            name="kernel_2_3",
        )

    def call(self, inputs, states):
        # inputs should be in [(batch, input_1), (batch, input_2, input_3)]
        # state should be in shape [(batch, unit_1), (batch, unit_2, unit_3)]
        input_1, input_2 = tf.nest.flatten(inputs)
        s1, s2 = states

        output_1 = tf.matmul(input_1, self.kernel_1)
        output_2_3 = tf.einsum("bij,ijkl->bkl", input_2, self.kernel_2_3)
        state_1 = s1 + output_1
        state_2_3 = s2 + output_2_3

        output = (output_1, output_2_3)
        new_states = (state_1, state_2_3)

        return output, new_states

    def get_config(self):
        return {"unit_1": self.unit_1, "unit_2": unit_2, "unit_3": self.unit_3}

In [33]:
unit_1 = 10
unit_2 = 20
unit_3 = 30

i1 = 32
i2 = 64
i3 = 32
batch_size = 64
num_batches = 10
timestep = 50

cell = NestedCell(unit_1, unit_2, unit_3)
rnn = keras.layers.RNN(cell)

input_1 = keras.Input((None, i1))
input_2 = keras.Input((None, i2, i3))

outputs = rnn((input_1, input_2))

model = keras.models.Model([input_1, input_2], outputs)

model.compile(optimizer="adam", loss="mse", metrics=["accuracy"])

In [35]:
model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_8 (InputLayer)           [(None, None, 32)]   0           []                               
                                                                                                  
 input_9 (InputLayer)           [(None, None, 64, 3  0           []                               
                                2)]                                                               
                                                                                                  
 rnn_4 (RNN)                    ((None, 10),         1229120     ['input_8[0][0]',                
                                 (None, 20, 30))                  'input_9[0][0]']                
                                                                                              

In [34]:
input_1_data = np.random.random((batch_size * num_batches, timestep, i1))
input_2_data = np.random.random((batch_size * num_batches, timestep, i2, i3))
target_1_data = np.random.random((batch_size * num_batches, unit_1))
target_2_data = np.random.random((batch_size * num_batches, unit_2, unit_3))
input_data = [input_1_data, input_2_data]
target_data = [target_1_data, target_2_data]

model.fit(input_data, target_data, batch_size=batch_size)



2023-04-09 15:02:30.917232: I tensorflow/stream_executor/cuda/cuda_blas.cc:1786] TensorFloat-32 will be used for the matrix multiplication. This will only be logged once.


<keras.callbacks.History at 0x7fd53afb4ee0>