In [429]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
import pandas as pd

In [430]:
import matplotlib.pyplot as plt
import seaborn as sns
import plotly
import plotly.express as px
import plotly.graph_objects as go
sns.set()

# CUSTOM LAYERS

In [431]:
class InputLayer(tf.keras.layers.Layer):
  def __init__(self, name):
    super(InputLayer, self).__init__(name=name)
    
  def build(self, input_shape):
    self.a = tf.Variable(initial_value=tf.zeros(shape = [1, int(input_shape[1])]),trainable=False, name= self.name + "_activation")
    
  def call(self, input):
    output = tf.reshape(input, [1, 16])
    self.a.assign(output)
    return self.a.value()

In [432]:
class Classic_layer_Semeion(tf.keras.layers.Layer):
  def __init__(self, name, weights_init_val, bias_init_val):
    super(Classic_layer_Semeion, self).__init__(name = name)
    self.units = 15
    self.weights_init_val = weights_init_val
    self.bias_init_val = bias_init_val

  def build(self, input_shape):
    shape = [int(input_shape[1]), self.units]
    self.w = self.add_weight(initializer = tf.keras.initializers.RandomUniform(minval=0.0, maxval=0.51),#tf.initializers.Constant(self.weights_init_val) ,
                              shape= shape,
                              name = self.name + "_weights")
    self.b = self.add_weight(initializer = tf.initializers.Constant(self.bias_init_val) ,
                              shape= [1, self.units],
                              name = self.name + "_biases")

    self.deltaA = tf.Variable(initial_value=tf.zeros(shape = [1, self.units]),trainable=False, name= self.name + "_adjustements")
    self.a = tf.Variable(initial_value=tf.zeros(shape = [1, self.units]),trainable=False, name= self.name + "_activation")

  def call(self, inputs, temperature):
    # i pesi dei neuroni sono considerati in colonna
    net = tf.matmul(inputs,self.w)
    # if one element of the vector is < -99 the the same element of the output vector is set to -99
    net = tf.where(net < -99, -99*tf.ones_like(net), net)
    net = tf.where(net > 99, 99*tf.ones_like(net), net)
    self.a.assign( 1/( 1 + tf.exp( -( (net + self.b)/temperature)) ) )
    return self.a.value()


---

# CUSTOM MODEL

In [433]:
class MQ_Fuzzy_inverter(tf.keras.Model):
  def __init__(self, learning_rate, bias_value, initial_temperature, initial_weights):
    super(MQ_Fuzzy_inverter, self).__init__(name='MQ_Fuzzy_inverter')

    self.input_layer = InputLayer(name = "InputLayer")
    self.h = Classic_layer_Semeion(name = "HiddenLayer",weights_init_val=initial_weights, bias_init_val=bias_value)
    self.o = Classic_layer_Semeion(name = "OutputLayer",weights_init_val=initial_weights, bias_init_val=bias_value)
    self.error = tf.Variable(initial_value=0.0,trainable=False, name = "Error")
    self.temperature = tf.Variable(initial_value = initial_temperature,trainable=False, name = "Temperature")
    self.alpha = learning_rate

  @tf.function
  def call(self, data):
    input, target = data
    ### FOWARD PASS
    res_input = self.input_layer(input)
    res_hidden = self.h(res_input, self.temperature)
    res_output = self.o(res_hidden, self.temperature)
    error = tf.math.reduce_sum(tf.math.square(target - res_output))
    return error
  
  @tf.function
  def train_step(self, data):
    input, target = data
    sum_error = 0.0
    for i in tf.range(0, tf.shape(data[0])[0]):
      single_input = tf.reshape(input[i], [1, 16])
      single_target = tf.reshape(target[i], [1, 15])

      ### CALL FOWARD PASS
      error = self((single_input, single_target), training = True)
      sum_error = sum_error + error
      #self.temperature.assign(1 - (1 / (1 + error) ))

      ### BACK PROPAGATION
        # Update outputs weights
      delta_Ao = self.o.a * (single_target - self.o.a) * (1 - self.o.a) + self.o.deltaA *(single_target - self.o.a)# self momentum: self.o.deltaA *(single_target - self.o.a)
      delta_Wo = self.alpha * tf.matmul(tf.transpose(self.h.a),delta_Ao)
      # update bias of output layer
      self.o.b.assign_add(self.alpha * delta_Ao)
      # delta output layer
      self.o.deltaA.assign(delta_Ao)

        ## Update hidden weights
      delta_Ah = self.h.a * (1 - self.h.a) * tf.matmul(self.o.deltaA, self.o.w)  #tf.matmul(self.o.deltaA, self.o.w) #
      delta_Wh = self.alpha * tf.matmul(tf.transpose(self.input_layer.a),delta_Ah)
      # update bias of hidden layer
      self.h.b.assign_add(self.alpha * delta_Ah)
      # delta hidden layer
      self.h.deltaA.assign(delta_Ah)

        # TO BE UPDATED AFTER TOGETHER
      self.o.w.assign_add(delta_Wo)
      self.h.w.assign_add(delta_Wh)

    #n_data_points = float(len(input))
    self.error.assign(sum_error/10)#/n_data_points)
    
    # Compute temperature
    #self.temperature.assign(1 - (1 / (1 + self.error) ))
    
    return {
        "Error": tf.squeeze(self.error.value()),
        #"Temperature": tf.squeeze(self.temperature.value()),
        #"Hidden weights": tf.squeeze(self.h.w.value()),
        #"Output weights": tf.squeeze(self.o.w.value()),
        #"Hidden activations": tf.squeeze(self.h.a.value()),
        #"Output activations": tf.squeeze(self.o.a.value())
        }

# DEFINE PARAMETERS AND INPUT

In [434]:
soggetto = "soggetto_8"

In [435]:
# Model parameters
learning_rate = 1.0
global_bias_value = 0.0 # this value is used to initialize the bias of all the layers
epochs = 10000
initial_temperature = 1.0
initial_weights = 0.0000001

In [436]:
# Model input
df_input = pd.read_csv(soggetto + "_input.csv")
df_input = df_input.drop(columns=['SIGMA','bit_type'])
# Model target
df_target = pd.read_csv(soggetto + "_targets.csv")

input = df_input.to_numpy()
targets = df_target.to_numpy()

In [437]:
# Construct an instance of CustomModel
model = MQ_Fuzzy_inverter(learning_rate = learning_rate, bias_value = global_bias_value, initial_temperature = initial_temperature, initial_weights=initial_weights)

In [438]:
model.compile(run_eagerly = False)
model_history = model.fit(x = input, y = targets, epochs=epochs, shuffle=False, verbose = 0, batch_size = None)

# Analysis of the results

In [439]:
df_model_history = pd.DataFrame(model_history.history)
df_model_history

Unnamed: 0,Error
0,7.536801
1,5.635702
2,5.591081
3,5.424076
4,5.227405
...,...
9995,3.521588
9996,3.521668
9997,3.521751
9998,3.521838


In [440]:
fig = px.line(df_model_history[["Error"]])
fig.update_layout(title="Error_"+soggetto,
                   xaxis_title='Epoch',
                   yaxis_title='',
                   hovermode='x unified')
#fig.write_html("Error and temperature.html")
#fig.write_image("error_temperature.png")
fig.show()

In [441]:
# # create a dictionary with the weights, each key is w in the form w_h_i_j
# weights_hidden = {}
# for i in range(0, len(df_model_history["Hidden weights"][0])):
#   for j in range(0, len(df_model_history["Hidden weights"][0][i])):
#     weights_hidden["w_h_" + str(i+1) + "_" + str(j+1)] = df_model_history["Hidden weights"].apply(lambda x: x[i][j])
# 
# # create a dictionary with the weights, each key is w in the form w_o_i_j
# weights_output = {}
# for i in range(0, len(df_model_history["Output weights"][0])):
#     for j in range(0, len(df_model_history["Output weights"][0][i])):
#         weights_output["w_o_" + str(i+1) + "_" + str(j+1)] = df_model_history["Output weights"].apply(lambda x: x[i][j])

In [442]:
# # create a dataframe with the weights_hidden and weights_output
# df_weights = pd.DataFrame(weights_hidden)
# df_weights = df_weights.join(pd.DataFrame(weights_output))
# df_weights

In [443]:
# df_weights.to_csv("weights.csv")

In [444]:
# # create a dictionary with the activations, each key is a in the form a_h_i
# activations_hidden = {}
# for i in range(0, len(df_model_history["Hidden activations"][0])):
#     activations_hidden["a_h_" + str(i+1)] = df_model_history["Hidden activations"].apply(lambda x: x[i])
# 
# # create a dictionary with the activations, each key is a in the form a_o_i
# activations_output = {}
# for i in range(0, len(df_model_history["Output activations"][0])):
#     activations_output["a_o_" + str(i+1)] = df_model_history["Output activations"].apply(lambda x: x[i])

In [445]:
# # create a dataframe with the activations_hidden and activations_output
# df_activations = pd.DataFrame(activations_hidden)
# df_activations = df_activations.join(pd.DataFrame(activations_output))
# df_activations

In [446]:
# df_activations.to_csv("activations.csv")

In [447]:
# # create a plot displaying the neural network hidden weights over time (epochs) using plotly
# fig1 = go.Figure()
# for i in range(0, len(df_model_history["Hidden weights"][0])):
#     for j in range(0, len(df_model_history["Hidden weights"][0][i])):
#         fig1.add_trace(go.Scatter(x=df_model_history.index, y=df_model_history["Hidden weights"].apply(lambda x: x[i][j]), name="w_h_" + str(i+1) + "_" + str(j+1)))
# fig1.update_layout(height=500, width=800, title_text="Hidden Weights over time")
# fig1.update_yaxes(range=[-1, 1])
# fig1.update_layout(legend_title_text="Click to hide/show", legend_title_font=dict(size=16, family="sans-serif", color="black"))
# fig1.update_traces(line_color='blue')
# fig1.update_traces(hovertemplate=None)
# fig1.update_traces(hoverlabel=dict(font_size=18, font_family="Rockwell"))
# fig1.update_traces(hovertemplate="<b>Epoch</b>: %{x}<br><b>Weight value</b>: %{y}")
# fig1.show()
# # create a plot displaying the neural network output weights over time (epochs) using plotly
# fig2 = go.Figure()
# for i in range(0, len(df_model_history["Output weights"][0])):
#     for j in range(0, len(df_model_history["Output weights"][0][i])):
#         fig2.add_trace(go.Scatter (x=df_model_history.index, y=df_model_history["Output weights"].apply(lambda x: x[i][j]), name="w_o_" + str(i+1) + "_" + str(j+1)))
# fig2.update_layout(height=500, width=800, title_text="Output Weights over time")
# fig2.update_yaxes(range=[-1, 1])
# fig2.update_layout(legend_title_text="Click to hide/show", legend_title_font=dict(size=16, family="sans-serif", color="black"))
# fig2.update_traces(line_color='lightblue')
# fig2.update_traces(hovertemplate=None)
# fig2.update_traces(hoverlabel=dict(font_size=18, font_family="Rockwell"))
# fig2.update_traces(hovertemplate="<b>Epoch</b>: %{x}<br><b>Weight value</b>: %{y}")
# fig2.show()
# # create a plot displaying the neural network hidden activations over time (epochs) using plotly
# fig3 = go.Figure()
# for i in range(0, len(df_model_history["Hidden activations"][0])):
#     fig3.add_trace(go.Scatter(x=df_model_history.index, y=df_model_history["Hidden activations"].apply(lambda x: x[i]), name="a_h_" + str(i+1)))
# fig3.update_layout(height=500, width=800, title_text="Hidden Activations over time")
# fig3.update_yaxes(range=[0, 1])
# fig3.update_layout(legend_title_text="Click to hide/show", legend_title_font=dict(size=16, family="sans-serif", color="black"))
# fig3.update_traces(line_color='red')
# fig3.update_traces(hovertemplate=None)
# fig3.update_traces(hoverlabel=dict(font_size=18, font_family="Rockwell"))
# fig3.update_traces(hovertemplate="<b>Epoch</b>: %{x}<br><b>Weight value</b>: %{y}")
# fig3.show()
# # create a plot displaying the neural network output activations over time (epochs) using plotly
# fig4 = go.Figure()
# for i in range(0, len(df_model_history["Output activations"][0])):
#     fig4.add_trace(go.Scatter(x=df_model_history.index, y=df_model_history["Output activations"].apply(lambda x: x[i]), name="a_o_" + str(i+1)))
# fig4.update_layout(height=500, width=800, title_text="Output Activations over time")
# fig4.update_yaxes(range=[0, 1])
# fig4.update_layout(legend_title_text="Click to hide/show", legend_title_font=dict(size=16, family="sans-serif", color="black"))
# fig4.update_traces(line_color='orange')
# fig4.update_traces(hovertemplate=None)
# fig4.update_traces(hoverlabel=dict(font_size=18, font_family="Rockwell"))
# fig4.update_traces(hovertemplate="<b>Epoch</b>: %{x}<br><b>Weight value</b>: %{y}")
# fig4.show()


In [448]:
# df_model_history.to_csv("model_history.csv")

---