# Redes de Neuronas con conexiones residuales y entrenados según _transfer learning_
## Práctica 3

#### Hugo Fole Avellás y José Romero Conde

 ### $\huge\text{Ejercicio } 1$  
 ### (_2 puntos_) Define la capa ResidualBlock (ver Figura 2), usando como base la plantilla proporcionada en la Figura 3.
 - Ten en cuenta que el número de convoluciones depende de los valores de input_channels y out-put_channels.
 - Esta red no tiene capas de Pooling. La reducción del tamaño se realiza con el parámetro strides
de las convoluciones, pero se modifica únicamente en 1 (o 2) de las convoluciones del modelo.

!['arquitectura'](ResidualBlock.png)

In [1]:
from tensorflow.keras import Model
from tensorflow.keras import layers
from tensorflow.keras import activations

class ResidualBlock(Model):
    def __init__(self, input_channels, output_channels, strides=(1, 1)):
        
        super().__init__()
        
        self.BN1 = layers.BatchNormalization()
        self.Conv1 = layers.Conv2D(filters = output_channels, 
                                   kernel_size = (3, 3),
                                   strides = strides,
                                   padding="same",
                                   use_bias=False)
                                   
        self.BN2 = layers.BatchNormalization()
        self.Conv2 = layers.Conv2D(filters = output_channels, 
                                   kernel_size = (3, 3),
                                   strides = (1, 1),
                                   padding="same",
                                   use_bias=False)
        
        if input_channels != output_channels:
            self.salidaDistinta = True
            self.ConvFuera = layers.Conv2D(filters = output_channels, 
                                   kernel_size = (1, 1),
                                   strides = strides,
                                   use_bias=False)
        else: self.salidaDistinta = False
            
    def call(self, x):
        x = self.BN1(x)
        y = activations.silu(x)
        x = self.Conv1(y)
        x = self.BN2(x)
        x = activations.silu(x)
        x = self.Conv2(x)
        if self.salidaDistinta:
            y = self.ConvFuera(y)
        x = x + y
        return x

 ### $\huge\text{Ejercicio } 2$  
### (_2 puntos_) Define la red ResidualNetwork (ver Figura 1). Para comprobar su correcto funcionamiento haz lo siguiente:
 - Descarga los pesos del modelo preentrenado (los podrás encontrar en el canal de Teams de la asignatura).
 - Carga los pesos en tu modelo, haciendo uso de la función proporcionada en la Figura 4.
 - Comprueba que la precisión del modelo en CIFAR-100 es superior al 69 %.

!['arquitectura'](ResidualNetwork.png)

In [2]:
from tensorflow.keras import Sequential
from tensorflow.keras import Input

ResidualNetwork = Sequential([
    Input(shape=(32,32,3)),
    layers.Conv2D(filters=16, kernel_size=(3,3),strides=(1,1),padding="same", use_bias=False), 
    # la configuración de la capa convolucional es para asegurarse que no se reduce tamaño
    ResidualBlock(16,64),
    ResidualBlock(64,64),
    ResidualBlock(64,64),
    ResidualBlock(64,128,strides=(2,2)),
    ResidualBlock(128,128),
    ResidualBlock(128,128),
    ResidualBlock(128,256,strides=(2,2)),
    ResidualBlock(256,256),
    ResidualBlock(256,256),
    layers.BatchNormalization(),
    layers.Activation(activations.silu),
    layers.GlobalAveragePooling2D(),
    layers.Dense(100,activation='softmax')
    
])

In [3]:
import pickle

def load_weights(model, weight_file):
    with open(weight_file, 'rb') as f:
        weights = pickle.load(f)

    all_vars = model.trainable_weights + model.non_trainable_weights
    weight_list = [(x, weights[x]) for x in sorted(weights.keys())]
    weights = {}
    for i, var in enumerate(all_vars):
        aux = var.path.split('/')[-2:]
        classname = '_'.join(aux[0].split('_')[:-1])
        name = aux[1]
        assigned = False
        for j, (key, value) in enumerate(weight_list):
            if classname in key and name in key:
                try:
                    all_vars[i].assign(value)
                    print(':) ',end='')
                except:
                    continue
                print('assinging', key, 'to', var.path)
                del weight_list[j]
                assigned = True
                break
        if not assigned:
            raise Exception(var.path + ' cannot be loaded')

In [4]:
load_weights(ResidualNetwork, "p2_model_weights.pkl")

:) assinging conv2d_52/kernel to sequential/conv2d/kernel
:) assinging batch_normalization_38/gamma to sequential/residual_block/batch_normalization/gamma
:) assinging batch_normalization_38/beta to sequential/residual_block/batch_normalization/beta
:) assinging conv2d_54/kernel to sequential/residual_block/conv2d_1/kernel
:) assinging batch_normalization_39/gamma to sequential/residual_block/batch_normalization_1/gamma
:) assinging batch_normalization_39/beta to sequential/residual_block/batch_normalization_1/beta
:) assinging conv2d_55/kernel to sequential/residual_block/conv2d_2/kernel
:) assinging conv2d_53/kernel to sequential/residual_block/conv2d_3/kernel
:) assinging batch_normalization_40/gamma to sequential/residual_block_1/batch_normalization_2/gamma
:) assinging batch_normalization_40/beta to sequential/residual_block_1/batch_normalization_2/beta
:) assinging conv2d_56/kernel to sequential/residual_block_1/conv2d_4/kernel
:) assinging batch_normalization_41/gamma to sequent

In [5]:
## carga y procesado de datos

from tensorflow.keras.datasets import cifar100
import numpy as np

(x_train, Y_train), (x_test, Y_test) = cifar100.load_data()

x_train = (2*x_train.astype(float)-255)/(255)
x_test = (2*x_test.astype(float)-255)/(255)

# como la salida de la red son 100 nodos se sobreentiende 
# que tengo que one-hot-ear Y

y_train = np.zeros(shape=(Y_train.shape[0],max(Y_train)[0]+1))
y_train[np.arange(Y_train.size),Y_train.T] = 1

y_test = np.zeros(shape=(Y_test.shape[0],max(Y_test)[0]+1))
y_test[np.arange(Y_test.size),Y_test.T] = 1

(np.max(x_train),np.min(x_train),np.max(x_test),np.min(x_test))

(1.0, -1.0, 1.0, -1.0)

In [None]:
'''from tensorflow.keras.losses import CategoricalCrossentropy as CCE

ResidualNetwork.compile(optimizer='adam',
                        loss=CCE(),
                        metrics=['accuracy'])

ResidualNetwork.fit(x_train, y_train, 
                    epochs=5, 
                    validation_data=(x_test, y_test), 
                    batch_size=8)

ResidualNetwork.evaluate(x_test)''';

In [7]:
y_pred = ResidualNetwork(x_test)

In [11]:
n = 10_000
tasaAcierto = sum([np.argmax(y_pred[i]) == np.argmax(y_test[i]) for i in range(n)])/n
print(f'La tasa de acierto es {tasaAcierto}')

La tasa de acierto es 0.7029


 ### $\huge\text{Ejercicio } 3$ 
### (_4 puntos_) Entrena, mediante la técnica de fine-tuning, sobre el dataset CIFAR-10, manteniendo fijos los pesos de la red preentrenada proporcionada en el canal de Teams de la asignatura. Analiza el resultado en el conjunto de test. Se penalizarán los siguientes puntos:
 - (_-4 puntos_) No se ha entrenado la red correctamente, y no se proporciona el resultado obtenido en
el conjunto de test.
 - (_-1 punto_) No se ha utilizado la red original completa, a excepción de la última capa.
 - (_-1 punto_) No se optimizado el entrenamiento, reduciendo al máximo el consumo de memoria.
 - (_-1 punto_) No se ha hecho uso de técnicas de DataAugmentation sobre las imágenes de entrenamiento.
 - (_-2 puntos_) No se ha entrenado la red sin hacer uso de la función fit, entrenando el modelo con un
bucle de entrenamiento desde cero.

In [34]:
# definimos las funciones de entrenamiento
# inspirándonos en el libro de François Chollet
from math import ceil as techo
from tensorflow import GradientTape 
from tensorflow.keras import losses
from tensorflow.keras import optimizers
from tensorflow import reduce_mean


class GeneradorBatches:
    
    def __init__(self, imagenes, etiquetas, tamanoBatch = 128):
        assert len(imagenes) == len(etiquetas)
        self.indice = 0
        self.imagenes = imagenes
        self.etiquetas = etiquetas
        self.tamanoBatch = tamanoBatch
        self.numBatches = techo(len(imagenes)/tamanoBatch)

    def siguiente(self):
        imagenes = self.imagenes[self.indice : self.indice + self.tamanoBatch]
        etiquetas = self.etiquetas[self.indice : self.indice + self.tamanoBatch]
        self.indice += self.tamanoBatch
        return imagenes, etiquetas

def pasada(modelo, batchImagenes, batchEtiquetas):
    
    with GradientTape() as memoria:
        predicciones = modelo(batchImagenes)
        perdidasInstancia = losses.categorical_crossentropy(batchEtiquetas, predicciones)
        perdidaMedia = reduce_mean(perdidasInstancia)
    gradientes = memoria.gradient(perdidaMedia, modelo.trainable_weights)
    optimizador = optimizers.Adam(learning_rate = 1e-3)
    optimizador.apply_gradients(zip(gradientes, modelo.trainable_weights))
    
    return perdidaMedia
        
def entrena(modelo, imagenes, etiquetas, epochs, tamanoBatch):
    for epoch in range(epochs):
        print(f'Epoch: {epoch}')
        generadorBatches = GeneradorBatches(imagenes, etiquetas, tamanoBatch=tamanoBatch)
        for batch in range(generadorBatches.numBatches):
            batchImagenes, batchEtiquetas = generadorBatches.siguiente()
            perdida = pasada(modelo, batchImagenes, batchEtiquetas)
            if batch % 50 == 0:
                n = 1000
                predicciones = modelo(x_test[:n])
                tasaAcierto = sum([np.argmax(predicciones[i]) == np.argmax(y_test[i]) for i in range(n)])/n
                print(f'La tasa de acierto es {tasaAcierto}')
        
                print(f'    Perdida en el batch {batch} = {perdida}')

In [35]:
entrena(ResidualNetwork, x_train, y_train, epochs=5, tamanoBatch=128)

Epoch: 0
La tasa de acierto es 0.031
    Perdida en el batch 0 = 4.066461086273193


KeyboardInterrupt: 