In [289]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
import pandas as pd

In [290]:
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
sns.set()

# CUSTOM LAYERS

In [291]:
class InputLayer(tf.keras.layers.Layer):
  def __init__(self, name):
    super(InputLayer, self).__init__(name=name)
    
  def call(self, inputs):
    a1a1 = tf.reshape(inputs[0],shape=(1,1)) #1
    a1a2 = tf.reshape(inputs[1],shape=(1,1)) #2 
    a1a2a2 = tf.reshape(inputs[2],shape=(1,1)) #3
    a1a2a1 = tf.reshape(inputs[3],shape=(1,1)) #4
    a2a2 = tf.reshape(inputs[4],shape=(1,1)) #5
    a2a1 = tf.reshape(inputs[5],shape=(1,1)) #6
    a2a1a1 = tf.reshape(inputs[6],shape=(1,1)) #7
    a2a1a2 = tf.reshape(inputs[7],shape=(1,1)) #8

    to_hidden = []
    to_hidden.append((tf.concat([a1a2a2, a2a2], axis = 1))) #["TrRi"] [3,5]
    to_hidden.append((tf.concat([a1a2a1, a2a1], axis = 1))) #["RiTr"] [4,6] 
    to_hidden.append((tf.concat([a1a1, a2a1], axis = 1))) #["Plu"] [1,6] 
    to_hidden.append((tf.concat([a1a2, a1a2a2], axis = 1))) #["TTra"] [2,3] 
    to_hidden.append((tf.concat([a2a1, a2a1a1], axis = 1))) #["TTrb"] [6,7] 
    to_hidden.append((tf.concat([a1a2, a2a1], axis = 1))) #["Tr"] [2,6]
    to_hidden.append((tf.concat([a1a2a1, a2a1a2], axis = 1))) #["MeTr"] [4,8] 
    to_hidden.append((tf.concat([a1a2a1, a1a2], axis = 1))) #["MoTr"] [4,2] 
    to_hidden.append((tf.concat([a1a1, a1a2a1], axis = 1))) #["RiRi"] [1,4] 
    to_hidden.append((tf.concat([a1a1, a1a2], axis = 1))) #["MoFo"] [1,2] 
    to_hidden.append((tf.concat([a1a2a2, a1a2a1], axis = 1))) #["MMof"] [3,4] 
    to_hidden.append((tf.concat([a1a1, a1a2a2], axis = 1))) #["MoRi"] [1,3] 
    to_hidden.append((tf.concat([a1a2a2, a2a1a1], axis = 1))) #["MeRi"] [3,7] 
    to_hidden.append((tf.concat([a1a1, a2a2], axis = 1))) #["Ri"] [1,5] 

    output = tf.concat(to_hidden,axis = 0)
    return output

In [292]:
class Layer(tf.keras.layers.Layer):
  def __init__(self, name, weights_init_val, bias_init_val):
    super(Layer, self).__init__(name = name)
    self.units = 14
    self.weights_init_val = weights_init_val
    self.bias_init_val = bias_init_val
    if self.name in ["HiddenLayer"]:
      self.type = "Hidden"
    elif self.name in ["OutputLayer"]:
      self.type = "Output"

  def build(self, input_shape):
    shape = [int(input_shape[-1]),self.units]
    self.w = self.add_weight(initializer = tf.initializers.Constant(self.weights_init_val) ,
                              shape= shape,
                              name = self.name + "_weights")
    self.b = self.add_weight(initializer = tf.initializers.Constant(self.bias_init_val) ,
                              shape= [1, self.units],
                              name = self.name + "_biases")

    self.deltaA = tf.Variable(initial_value=tf.zeros(shape = [1, self.units]),trainable=False, name= self.name + "_adjustements")
    self.a = tf.Variable(initial_value=tf.zeros(shape = [1, self.units]),trainable=False, name= self.name + "_activation")

  def call(self, inputs, temperature):
    #print("mat mult")
    #print(tf.matmul(inputs, self.w))
    #print("weights")
    #print(self.w)
    #print("bias vector")
    #print(self.b)
    
    if self.type == "Hidden":
      net = tf.matmul(inputs, self.w)[:,0]
    elif self.type == "Output":
      net = tf.matmul(inputs, self.w)
    #print("NET")
    #print(tf.matmul(inputs, self.w)[:,0])
    self.a.assign(1/(1 + tf.exp((net + self.b)/temperature)))
    return self.a.value()


### Testing foward pass

In [293]:
x = tf.constant([1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0])
x

<tf.Tensor: shape=(8,), dtype=float32, numpy=array([1., 2., 3., 4., 5., 6., 7., 8.], dtype=float32)>

In [294]:
input_layer = InputLayer(name = "InputLayer")
input_layer

<__main__.InputLayer at 0x7f5c00636c10>

In [295]:
res_input = input_layer(x)
res_input

<tf.Tensor: shape=(14, 2), dtype=float32, numpy=
array([[3., 5.],
       [4., 6.],
       [1., 6.],
       [2., 3.],
       [6., 7.],
       [2., 6.],
       [4., 8.],
       [4., 2.],
       [1., 4.],
       [1., 2.],
       [3., 4.],
       [1., 3.],
       [3., 7.],
       [1., 5.]], dtype=float32)>

In [296]:
res_input[0]

<tf.Tensor: shape=(2,), dtype=float32, numpy=array([3., 5.], dtype=float32)>

In [297]:
tf.reshape(res_input[0], shape = (1,2))

<tf.Tensor: shape=(1, 2), dtype=float32, numpy=array([[3., 5.]], dtype=float32)>

In [298]:
hidden_layer = Layer(name = "HiddenLayer",weights_init_val=0.5, bias_init_val=0.0)
hidden_layer

<__main__.Layer at 0x7f5c0053a6a0>

In [299]:
res_hidden = hidden_layer(inputs = res_input, temperature = 1)
res_hidden

<tf.Tensor: shape=(1, 14), dtype=float32, numpy=
array([[0.01798621, 0.00669285, 0.02931223, 0.07585818, 0.00150118,
        0.01798621, 0.00247262, 0.04742587, 0.07585818, 0.18242553,
        0.02931223, 0.11920292, 0.00669285, 0.04742587]], dtype=float32)>

In [300]:
res_hidden

<tf.Tensor: shape=(1, 14), dtype=float32, numpy=
array([[0.01798621, 0.00669285, 0.02931223, 0.07585818, 0.00150118,
        0.01798621, 0.00247262, 0.04742587, 0.07585818, 0.18242553,
        0.02931223, 0.11920292, 0.00669285, 0.04742587]], dtype=float32)>

In [301]:
output_layer = Layer(name = "OutputLayer",weights_init_val=0.59, bias_init_val=0.0)
output_layer

<__main__.Layer at 0x7f5c0053a9d0>

In [302]:
res_output = output_layer(inputs = res_hidden, temperature = 1)
res_output

<tf.Tensor: shape=(1, 14), dtype=float32, numpy=
array([[0.40384   , 0.40384   , 0.40384   , 0.40384   , 0.40384   ,
        0.40384   , 0.40384   , 0.40384   , 0.40384004, 0.40384004,
        0.40384004, 0.40384004, 0.40384004, 0.40384004]], dtype=float32)>

In [303]:
tf.math.square(res_hidden - res_output)

<tf.Tensor: shape=(1, 14), dtype=float32, numpy=
array([[0.14888315, 0.15772586, 0.14027105, 0.10757208, 0.16187653,
        0.14888315, 0.16109578, 0.12703104, 0.1075721 , 0.04902438,
        0.14027108, 0.08101829, 0.15772589, 0.12703106]], dtype=float32)>

In [304]:
tf.math.reduce_sum(tf.math.square(res_hidden - res_output))

<tf.Tensor: shape=(), dtype=float32, numpy=1.8159815>

In [305]:
test = tf.Variable(initial_value=0,trainable=False)
test

<tf.Variable 'Variable:0' shape=() dtype=int32, numpy=0>

In [312]:
x = tf.constant([[1.0, 2.0, 3.0],
                [4.0, 5.0, 6.0]])
x

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)>

In [314]:
x @ tf.transpose(x)

<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[14., 32.],
       [32., 77.]], dtype=float32)>

In [315]:
tf.transpose(x) @ x 

<tf.Tensor: shape=(3, 3), dtype=float32, numpy=
array([[17., 22., 27.],
       [22., 29., 36.],
       [27., 36., 45.]], dtype=float32)>

---

# CUSTOM MODEL

In [306]:
class MQSelfReflexiveNetwork(tf.keras.Model):
  def __init__(self, learning_rate):
    super(MQSelfReflexiveNetwork, self).__init__(name='MQSelfReflexiveNetwork')

    self.input_layer = InputLayer(name = "InputLayer")
    self.h = Layer(name = "HiddenLayer",weights_init_val=0.5, bias_init_val=0.0)
    self.o = Layer(name = "OutputLayer",weights_init_val=0.59, bias_init_val=0.0)
    self.temperature = tf.Variable(initial_value=1.0,trainable=False, name = "Temperature")
    self.error = tf.Variable(initial_value=0.0,trainable=False, name = "Error")
    self.alpha = learning_rate

  def call(self, input):
    ### FOWARD PASS
    res_input = self.input_layer(input)
    res_hidden = self.h(res_input, self.temperature)
    res_output = self.o(res_hidden, self.temperature)
    return tf.math.reduce_sum(tf.math.square(res_hidden - res_output))
  
  def train_step(self, input):
    ### CALL FOWARD PASS
    print("Step-1")
    self.error.assign(self(input, training = True))
    
    ### BACK PROPAGATION
    # Compute temperature
    print("Step0_A")
    self.temperature.assign(1 - (1 / (1 + self.error) ))
    print("Step0")
    # Update outputs weights
    delta_Ao = self.o.a * (self.h.a - self.o.a) * (1 - self.o.a) + (self.o.deltaA * ((self.h.a - self.o.a)) )
    print("Step1")
    delta_Wo = self.alpha * tf.matmul(tf.transpose(self.h.a),delta_Ao)
    print("Step2")
    self.o.w.assign_add(delta_Wo)
    print("Step3")
    ## Update hidden weights
    delta_Ah = self.h.a * (1 - self.h.a) * (delta_Ah * self.o.a)
    print("Step4")
    delta_Wh = self.alpha * delta_Ah * input
    print("Step5")
    self.h.w.assign_add(delta_Wh)
    print("Step6")

    return {
        "Error": tf.squeeze(self.error.value()),
        "Temperature": tf.squeeze(self.temperature.value()),
        "Hidden weights": tf.squeeze(self.h.w.value()),
        "Output weights": tf.squeeze(self.o.w.value())
        }

In [307]:
# Construct an instance of CustomModel
input = tf.constant([1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0])
model = MQSelfReflexiveNetwork(learning_rate=1)

In [308]:
model.call(input)

<tf.Tensor: shape=(), dtype=float32, numpy=1.8159815>

In [311]:
#model.get_weights()

In [310]:
model.compile()
model_history = model.fit(x = input, epochs=200)

Epoch 1/200
Step-1
Step0_A
Step0
Step1
Step2


ValueError: in user code:

    File "/home/davide/.local/lib/python3.8/site-packages/keras/engine/training.py", line 878, in train_function  *
        return step_function(self, iterator)
    File "/home/davide/.local/lib/python3.8/site-packages/keras/engine/training.py", line 867, in step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "/home/davide/.local/lib/python3.8/site-packages/keras/engine/training.py", line 860, in run_step  **
        outputs = model.train_step(data)
    File "/tmp/ipykernel_9230/3332207265.py", line 34, in train_step
        self.o.w.assign_add(delta_Wo)

    ValueError: Dimension 0 in both shapes must be equal, but are 14 and 1. Shapes are [14,14] and [1,14]. for '{{node AssignAddVariableOp}} = AssignAddVariableOp[dtype=DT_FLOAT](MQSelfReflexiveNetwork/OutputLayer/MatMul/ReadVariableOp/resource, mul_4)' with input shapes: [], [1,14].


In [None]:
df_model_history = pd.DataFrame(model_history.history)
df_model_history

Unnamed: 0,Error,Temperature,Hidden weight,Output weight
0,0.004489,0.004469,0.498272,0.583754
1,0.250000,0.200000,0.498272,0.583754
2,0.135400,0.119253,0.495420,0.576806
3,0.217034,0.178330,0.494568,0.575009
4,0.155215,0.134361,0.492123,0.569271
...,...,...,...,...
195,0.000000,0.000000,-0.071070,-0.707804
196,0.000000,0.000000,-0.071070,-0.707804
197,0.000000,0.000000,-0.071070,-0.707804
198,0.000000,0.000000,-0.071070,-0.707804


In [None]:
fig = px.line(pd.DataFrame(model_history.history))
fig.update_layout(title="Monitored variables",
                   xaxis_title='Epoch',
                   yaxis_title='',
                   hovermode='x unified')
fig.show()

---