<a href="https://colab.research.google.com/github/HNXJ/PredictiveCodingNetwork/blob/main/PredictiveLSTM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from tensorflow.keras import backend as K
from matplotlib import pyplot as plt
import tensorflow as tf
import numpy as np


# GPU config if needed
device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
  raise SystemError('GPU device not found')
print('Found GPU at: {}'.format(device_name))

Found GPU at: /device:GPU:0


# Recurrent neural network based on predictive coding properties

In this simulation, we will use task of [Abdullahi et. al 2020] by increasing temporal resolution from one image per step to 10 iterations for each input.

In [2]:
# TB
# dir(tf.keras.layers.LSTM)

In [3]:
(x_train,y_train),(x_test,y_test) = tf.keras.datasets.mnist.load_data()

def create_serial_dataset(x=None, y=None, n=100, length=10, frames=10):

    X = np.zeros([n, length*frames, x.shape[1], x.shape[2]])
    Y = np.zeros([n, length*frames, 10])

    for i in range(n):

        k = np.random.randint(0, 1000, size=(length))
        for j in range(length):
          
            X[i, j*frames:j*frames+frames, :, :] = x[k[j], :, :]
            Y[i, j*frames:j*frames+frames, y[k[j]]] = 1
          
    return X.reshape(n, length*frames, x.shape[1] * x.shape[2]), Y

Xn, Yn = create_serial_dataset(x_train, y_train, n=100, length=10, frames=20)
X = Xn[:10, :, :]
Y = Yn[:10, :, :]
X.shape, Y.shape

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz


((10, 200, 784), (10, 200, 10))

In [7]:
class RNNModel1(tf.keras.Model):
    def __init__(self):
        super(RNNModel1, self).__init__()
        self.Input = (tf.keras.layers.InputLayer(input_shape=(None, 784)))
        self.LSTM = tf.keras.layers.LSTM(input_shape=(None, 784),
          units=512,
          recurrent_dropout=0.2,
          return_sequences=True,
          # return_state=True
        )
        self.FCN = tf.keras.layers.Dense(units=10)
        return 
        
    def call(self, x):
        out = self.Input(x)
        out = self.LSTM(x)
        out = self.FCN(out)
        return out

    def get_state(self):
        return

In [88]:
class PredictiveNet():
    def __init__(self):
        self.model = tf.keras.Sequential()
        self.model.add(tf.keras.layers.InputLayer(input_shape=(None, 784)))
        self.model.add(tf.keras.layers.LSTM(
          units=512,
          recurrent_dropout=0.2,
          return_sequences=True,
          # return_state=True
        ))
        self.model.add(tf.keras.layers.Dense(units=10))
        return

    def printw(self): # Debug log
        print(K.mean(self.model.layers[1].weights[0]))
        return

    def EnergyCostLoss(self, y_true, y_pred):
        error = y_pred - y_true
        lambda1 = 1
        lambda2 = 1
        return K.mean(K.square(error) + lambda1*K.mean(K.abs(y_pred))) + lambda2*K.mean(K.abs(self.model.layers[1].weights[0]))

In [89]:
Net1 = PredictiveNet()
# a = model.predict(Xn[1:2, :, :])
Net1.model.compile(
  loss=Net1.EnergyCostLoss,
  optimizer=tf.keras.optimizers.Adam(learning_rate=0.003)
)
Net1.model.summary()

Net1.printw()

L1 = K.function([Net1.model.layers[0].input],
                                  [Net1.model.layers[1].output])

Model: "sequential_32"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
lstm_38 (LSTM)               (None, None, 512)         2656256   
_________________________________________________________________
dense_37 (Dense)             (None, None, 10)          5130      
Total params: 2,661,386
Trainable params: 2,661,386
Non-trainable params: 0
_________________________________________________________________
tf.Tensor(-0.0023751305, shape=(), dtype=float32)


In [90]:
history = Net1.model.fit(
    x=X, y=Y,
    epochs=50,
    batch_size=5,
    validation_split=0.0,
    verbose=1,
    shuffle=True
)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


In [92]:
# Median layer model
history = Net1.model.fit(
    x=X, y=Y,
    epochs=1,
    batch_size=5,
    validation_split=0.0,
    verbose=1,
    shuffle=True
)
# history2 = L1.fit(x=X, y=Y, epochs=1, batch_size=5, validation_split=0.0, verbose=1, shuffle=True)



In [98]:
means1 = []

for k in range(10):
    l, r = k, k+1
    l_pred = L1(Xn[l:r, :, :])[0]
    y_pred = Net1.model.predict(Xn[l:r, :, :])

    # for i in range(7):
        # plt.plot(l_pred[0][0, :, i])
    # for i in range(10):
    # plt.plot(y_pred[0, :, 6])
    # for i in range(0, 100, 10):
    #     print(Yn[l:r, i, :])
    means1.append(np.mean(np.mean(np.abs(l_pred))))

In [99]:
means2 = []

for k in range(20, 40):
    l, r = k, k+1
    l_pred = L1(Xn[l:r, :, :])[0]
    y_pred = Net1.model.predict(Xn[l:r, :, :])

    # for i in range(7):
    #     plt.plot(l_pred[0, i, :])
    # for i in range(10):
    # plt.plot(y_pred[0, :, 6])
    # for i in range(0, 100, 10):
    #     print(Yn[l:r, i, :])
    means2.append(np.mean(np.mean(np.abs(l_pred))))

In [100]:
print(np.mean(means1), np.mean(means2))

0.036559604 0.08765531
