In [1]:
import tensorflow as tf
import numpy as np

In [21]:
class StateTime(tf.keras.layers.Layer):
    def __init__(self, opts, t0=5, tf=15):
        super().__init__()
        self.opts = opts # numero de casos de semaforos considerados
        self.t0 = t0 # rango inferior de la ventana de tiempo
        self.tf = tf # rango superior de la ventana de tiempo
    
    def get_config(self):
        return {'opts': self.opts}

    def build(self, input_shape): # (batch, units)
        self.Wopt = self.add_weight(shape=(input_shape[-1], self.opts)) # matriz de pesos para opciones de control
        self.bopt = self.add_weight(shape=(self.opts,)) # bias para opciones de control

        self.Wt = self.add_weight(shape=(input_shape[-1],1)) # matriz de pesos para opciones de control
        self.bt = self.add_weight(shape=(1,)) # bias para opciones de control

    
    def call(self, Xs):
        yopt = tf.keras.activations.softmax(tf.matmul(Xs, self.Wopt) + self.bopt)
        yt = (self.tf-self.t0)*tf.keras.activations.sigmoid(tf.matmul(Xs, self.Wt) + self.bt) + self.t0

        ys = tf.concat([yopt, yt], axis=-1)
        
        return ys


class ControlStateTime(tf.keras.Model):
    def __init__(self, cs, u1=16, u2=32, opts=4):
        super().__init__()

        self.cs = cs
        self.u1 = u1
        self.u2 = u2
        self.opts = opts

        self.d1 = tf.keras.layers.Dense(units=self.u1, activation='relu', input_dim=self.cs)
        self.d2 = tf.keras.layers.Dense(units=self.u2, activation='relu')
        self.trafic = StateTime(opts=self.opts)

    def call(self, Xs):
        ys = self.d1(Xs)
        ys = self.d2(ys)
        ys = self.trafic(ys)

        return ys
    
    def get_gen(self):
        gen = []
        for layer in [self.d1, self.d2, self.trafic]:
            Ws = layer.get_weights()
            for wi in Ws:
                wi = np.array(wi)
                wi = wi.flatten(order='C')
                gen += list(wi)
        return np.array(gen)
    
    def set_phen(self, gen):
        aux = 0 # limite inferior para saber que variables tomar
        for layer in [self.d1, self.d2, self.trafic]:
            Ws = layer.get_weights()
            new_ws = []
            for wi in Ws:
                inc = np.prod(wi.shape)
                auxW = gen[aux:aux+inc]
                auxW = np.reshape(auxW, newshape=wi.shape)
                new_ws.append(auxW)
                aux += inc
            layer.set_weights(new_ws)


class ControlState(tf.keras.Model):
    def __init__(self, cs=4, u1=16, u2=32, opts=4):
        super().__init__()

        self.cs = cs # tamano del vector de entrada de la red
        self.u1 = u1
        self.u2 = u2
        self.opts = opts
        
        self.d1 = tf.keras.layers.Dense(units=u1, activation='relu')
        self.d2 = tf.keras.layers.Dense(units=u2, activation='relu')
        self.trafic = tf.keras.layers.Dense(units=opts, activation='softmax')
    
    def call(self, Xs):
        ys = self.d1(Xs)
        ys = self.d2(ys)
        ys = self.trafic(ys)

        return ys
    
    def get_gen(self):
        gen = []
        for layer in [self.d1, self.d2, self.trafic]:
            Ws = layer.get_weights()
            for wi in Ws:
                wi = np.array(wi)
                wi = wi.flatten(order='C')
                gen += list(wi)
        return np.array(gen)
    
    def set_phen(self, gen):
        aux = 0 # limite inferior para saber que variables tomar
        for layer in [self.d1, self.d2, self.trafic]:
            Ws = layer.get_weights()
            new_ws = []
            for wi in Ws:
                inc = np.prod(wi.shape)
                auxW = gen[aux:aux+inc]
                auxW = np.reshape(auxW, newshape=wi.shape)
                new_ws.append(auxW)
                aux += inc
            layer.set_weights(new_ws)

# Prueba ControlState

In [22]:
cs = 4 # tamano del vector de entrada evaluado por la red
es = 4 # numero de configuraciones del semaforo

NN = ControlState(cs=4, opts=es)
NN.predict([[0. for i in range(cs)]]) ## llamada auxiliar para construir el nodo de la red
NN.summary()

Model: "control_state"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_10 (Dense)            multiple                  80        
                                                                 
 dense_11 (Dense)            multiple                  544       
                                                                 
 dense_12 (Dense)            multiple                  132       
                                                                 
Total params: 756
Trainable params: 756
Non-trainable params: 0
_________________________________________________________________


# Prueba ControlStateTime

In [20]:
cs = 4 # tamano del vector de entrada evaluado por la red
es = 4 # numero de configuraciones del semaforo

NN = ControlStateTime(cs=4, opts=es)
NN.predict([[0. for i in range(cs)]]) ## llamada auxiliar para construir el nodo de la red
NN.summary()

Model: "control_state_time_9"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_8 (Dense)             multiple                  80        
                                                                 
 dense_9 (Dense)             multiple                  544       
                                                                 
 state_time_4 (StateTime)    multiple                  165       
                                                                 
Total params: 789
Trainable params: 789
Non-trainable params: 0
_________________________________________________________________
