# Esquema de Aprendizaje Basado en la Ecuación HJB para redes Neuronales Artificiales

In [27]:
# Librerías
import tensorflow as tf
from tensorflow.keras.datasets import mnist
from tensorflow.keras.models import Sequential
from tensorflow.keras import layers
import time as time
import numpy as np

In [22]:
# Parámetros
batch_size          = 64
shuffle_buffer_size = 100
training_epochs     = 10

In [74]:
num_classes                   = 10
img_rows, img_cols, channels  = 28, 28, 1

# Cargo Datos

In [10]:
# Utilizo datos MNIST para prueba 
(x_train, y_train), (x_test, y_test) = mnist.load_data()
# Expando dimensiones
x_train = np.expand_dims(x_train, 3)
x_test  = np.expand_dims(x_test, 3)

# Normalizo
x_train = x_train.astype(np.float32) /255.
x_test  = x_test.astype(np.float32) / 255.

print('x_train shape: ', x_train.shape)
print('x_test shape: ', x_train.shape)

x_train shape:  (60000, 28, 28, 1)
x_test shape:  (60000, 28, 28, 1)


In [16]:
# Vectorizo resultados
y_train = tf.keras.utils.to_categorical(y_train, num_classes)
y_test = tf.keras.utils.to_categorical(y_test, num_classes)
# Transformo a tensores
train_ds = tf.data.Dataset.from_tensor_slices((x_train, y_train))
# Elijo de manera aleatoria
train_ds = train_ds.shuffle(shuffle_buffer_size).batch(batch_size)
test_ds = tf.data.Dataset.from_tensor_slices((x_test, y_test))
test_ds = test_ds.batch(batch_size)

# Defino modelo

In [17]:
def get_model():
  model = Sequential()
  model.add(layers.Conv2D(32, kernel_size=(3, 3),
                     activation='relu',
                     input_shape = input_shape))
  model.add(layers.Conv2D(64, (3, 3), activation = 'relu'))
  model.add(layers.MaxPooling2D(pool_size = (2, 2)))
  model.add(layers.Flatten())
  model.add(layers.Dense(128, activation = 'relu'))
  model.add(layers.Dense(num_classes, activation = 'softmax'))
  
  return model

In [68]:
def hjb_optimize(model, train_ds, test_ds, r=100., epochs=6, metric=tf.keras.metrics.Accuracy):
    train_loss_results = []
    train_accuracy_results = []

    star = time.time() 

    for epoch in range(epochs):
        epoch_loss_avg = tf.keras.metrics.Mean()
        epoch_accuracy = metric()
        # Obtengo gradientes
        for x, y in train_ds:
            grads  = mse_grad(model, x, y)
            grad_t = None
            for g in grads:
                if grad_t is None:
                    grad_t = tf.reshape(g, [-1])
                else:
                    grad_t = tf.concat([grad_t, tf.reshape(g, [-1])], 0)
            # Obtengo actualización
            grad_norm_value = tf.norm(grad_t)
            loss_value = model.loss(y_true = y, y_pred = model(x, training=True))
            grad_t *= tf.sqrt(2*loss_value)
            grad_t /= grad_norm_value
            grad_t /= tf.sqrt(r)
            # Actualizo gradientes
            it = 0
            for g in grads:
                len_g = np.prod(g.shape)
                g = tf.reshape(grad_t[it:it+len_g], g.shape)
                it += len_g
            (model.optimizer).apply_gradients(zip(grads, model.trainable_variables))

            # Guardo Resultados
            epoch_loss_avg.update_state(loss_value)
            epoch_accuracy.update_state(y, model(x, training=True))

        # End epoch
        train_loss_results.append(epoch_loss_avg.result())
        train_accuracy_results.append(epoch_accuracy.result())

        print("Epoch {:01d}/{:02d}: Loss: {:.3f}, Accuracy: {:.3%}".format(epoch+1, epochs,
                                                                    epoch_loss_avg.result(),
                                                                    epoch_accuracy.result()))
    



    print('Training time: ', time.time()- star, 'seconds.')
    
    test_accuracy = metric()   
    star = time.time()
    for x, y in test_ds:
        test_accuracy.update_state(y, model(x, training=False))
    print("Test set accuracy: {:.3%}".format(test_accuracy.result()))

In [69]:
# Defino función de error
def mse_grad(model, inputs, targets):
  with tf.GradientTape() as tape:
    loss_value = tf.keras.losses.mean_squared_error(y_true=targets, y_pred=model(inputs, training=True))
  return tape.gradient(loss_value, model.trainable_variables)

# Entrenamiento HJB

In [70]:
model = get_model()
model.compile(loss = tf.keras.losses.CategoricalCrossentropy(), optimizer = tf.keras.optimizers.Adagrad())
hjb_optimize(model, train_ds, test_ds, r = 100., epochs = training_epochs, metric = tf.keras.metrics.CategoricalAccuracy)

Epoch 01/10: Loss: 0.709, Accuracy: 83.040%
Epoch 02/10: Loss: 0.294, Accuracy: 93.765%
Epoch 03/10: Loss: 0.251, Accuracy: 94.835%
Epoch 04/10: Loss: 0.222, Accuracy: 95.425%
Epoch 05/10: Loss: 0.201, Accuracy: 95.892%
Epoch 06/10: Loss: 0.184, Accuracy: 96.267%
Epoch 07/10: Loss: 0.171, Accuracy: 96.572%
Epoch 08/10: Loss: 0.159, Accuracy: 96.847%
Epoch 09/10: Loss: 0.149, Accuracy: 97.068%
Epoch 10/10: Loss: 0.140, Accuracy: 97.325%
Training time:  165.99798226356506 seconds.
Test set accuracy: 96.580%


# Adagrad Estándar

In [71]:
model = get_model()
model.compile(loss      = tf.keras.losses.CategoricalCrossentropy(),
              optimizer = tf.keras.optimizers.Adagrad(),
              metrics   = ['accuracy'])

star = time.time()
model.fit(train_ds, 
          epochs = training_epochs)
print ('Training time :', time.time() - star, 'seconds')

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Training time : 46.8939471244812 seconds


In [72]:
# Validación
star  = time.time()
score = model.evaluate(test_ds)
print('Test accuracy:', score[1])
print ('Testing time :', time.time() - star, 'seconds.')

Test accuracy: 0.9542999863624573
Testing time : 0.6389431953430176 seconds.


# SGD

In [73]:
model = get_model()
model.compile(loss      = tf.keras.losses.CategoricalCrossentropy(),
              optimizer = tf.keras.optimizers.SGD(lr=0.01, decay=1e-6, momentum = 0.9, nesterov = True),
              metrics   = ['accuracy'])
star = time.time()
model.fit(train_ds, 
          epochs = training_epochs)
print ('Training time :', time.time() - star, 'seconds.')

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Training time : 46.94995069503784 seconds.


In [49]:
# Validación
star  = time.time()
score = model.evaluate(test_ds)
print('Test accuracy:', score[1])
print ('Testing time :', time.time() - star, 'seconds.')

Test accuracy: 0.9886999726295471
Testing time : 0.6290290355682373 seconds.


Revisar
1. Usar el test set en cada iteración para no sobrentrenar.
2. Graficar pérdida y accuracy.
3. Revisar eficiencia del HJB comparado con los otros métodos.
