In [1]:
import os
import pandas as pd
import numpy as np
from PIL import Image
import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.python.client import device_lib
#check if gpu available
def get_available_devices():
    local_device_protos = device_lib.list_local_devices()
    return [x.name for x in local_device_protos]
print(get_available_devices()) 
# gpu memory
from tensorflow.compat.v1 import ConfigProto
from tensorflow.compat.v1 import InteractiveSession
config = ConfigProto()
config.gpu_options.allow_growth = True
session = InteractiveSession(config=config)

['/device:CPU:0', '/device:GPU:0']


# Load dataset

In [2]:
from tensorflow.keras.utils import to_categorical  

#load cifar-10 dataset
(x_train, y_train), (x_test, y_test) = keras.datasets.cifar10.load_data()
#one hot encoding
y_test=to_categorical(y_test, num_classes=10)
y_train=to_categorical(y_train, num_classes=10)

print(x_train.shape)
print(y_train.shape)
print(x_test.shape)
print(y_test.shape)

(50000, 32, 32, 3)
(50000, 10)
(10000, 32, 32, 3)
(10000, 10)


### Validation data

In [3]:
#shuffle data and split data to generate validation data
shuffler = np.random.permutation(len(x_train))
x_train = x_train[shuffler]
y_train = y_train[shuffler]
 

#validation data
x_val=x_train[0:2000]
y_val=y_train[0:2000]

x_train=x_train[2000:]
y_train=y_train[2000:]


In [4]:
batch_size=64
epochs=3
base_learning_rate=0.0001

### Load ResNet50 without pretrained weights

In [5]:
from tensorflow.keras.applications.resnet50 import ResNet50
resnet50 = ResNet50( include_top=False, weights=None)

#handles the different input siz
preprocess_input =tf.keras.applications.resnet_v2.preprocess_input

In [6]:
from tensorflow.keras.models import Model
from tensorflow.python.keras.layers import Dense, GlobalAveragePooling2D

#input
inputs = tf.keras.Input(shape=(32, 32, 3))
# Preprocess for ResNet50 input size
x = preprocess_input(inputs)
# ResNet50
x = resnet50(x)
# Add top layer again
x=GlobalAveragePooling2D()(x)
#10 classes for cifar 10
preds=Dense(10,activation='softmax')(x)
resnet50_cifar10=Model(inputs=inputs,outputs=preds)
resnet50_cifar10.summary()


Model: "model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_2 (InputLayer)         [(None, 32, 32, 3)]       0         
_________________________________________________________________
tf.math.truediv (TFOpLambda) (None, 32, 32, 3)         0         
_________________________________________________________________
tf.math.subtract (TFOpLambda (None, 32, 32, 3)         0         
_________________________________________________________________
resnet50 (Functional)        (None, None, None, 2048)  23587712  
_________________________________________________________________
global_average_pooling2d (Gl (None, 2048)              0         
_________________________________________________________________
dense (Dense)                (None, 10)                20490     
Total params: 23,608,202
Trainable params: 23,555,082
Non-trainable params: 53,120
____________________________________________

In [7]:
#same with data augmentation
from tensorflow.keras.preprocessing.image import ImageDataGenerator
resnet50_cifar10.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=base_learning_rate),loss='categorical_crossentropy', metrics = ['acc'])
train_datagen = ImageDataGenerator(
        horizontal_flip=True,
        width_shift_range=4, #+-4 pixel shift max
        height_shift_range=4,#+-4 pixel shift max
        cval=0, #zero padding
        fill_mode='constant' #zero padding
        )

train_generator = train_datagen.flow(x_train, y_train, batch_size=batch_size)
step_size_train=train_generator.n//train_generator.batch_size

history=resnet50_cifar10.fit(train_generator,
                   steps_per_epoch = step_size_train,
                   epochs = epochs,
                   validation_data=(x_val, y_val))

Epoch 1/3


UnknownError:  Failed to get convolution algorithm. This is probably because cuDNN failed to initialize, so try looking to see if a warning log message was printed above.
	 [[node model/resnet50/conv1_conv/Conv2D (defined at \AppData\Local\Temp/ipykernel_9148/59398938.py:18) ]] [Op:__inference_train_function_13039]

Function call stack:
train_function


# Adding SE layers

### This is what we had before: normal resnet50

In [None]:
resnet50.summary()
import pydot
keras.utils.plot_model(resnet50, to_file='resnet50.png',show_shapes=True)


### List of layers where SE is going to be added after the output

(Check resnet50.png for more information)

In [None]:
l=[l.name for l in resnet50.layers]
list_se=[]
for i,name in enumerate(l):
    if(i+1<len(l) and 'add' in l[i+1]):
        print(name)
        list_se.append(name)


In [None]:
###### Not used #######
#custom SE Layer (not working)
from tensorflow.python.keras.layers import GlobalAveragePooling2D
class SE_layer(tf.keras.layers.Layer):
    def __init__(self, ratio=16,initializer="he_normal", **kwargs):
        super(SE_layer, self).__init__(**kwargs)
        self.initializer = keras.initializers.get(initializer)
        self.ratio=ratio

    def build(self, input_shape):
        out_dim = input_shape[-1]
    def call(self, inputs): 
        print(inputs.shape[-1])
        out_dim=inputs.shape[-1]
        F_sq = GlobalAveragePooling2D()(inputs) #squeeze
        F_ex = Dense(out_dim / self.ratio,activation='relu')(F_sq)
        F_ex = Dense(out_dim,activation='sigmoid')(F_ex)
        F_ex = tf.reshape(F_ex, [-1,1,1,out_dim])
        F_scale = inputs * F_ex
        return F_scale

    def get_config(self):
        # Implement get_config to enable serialization.
        base_config = super(SE_layer, self).get_config()
        config = {"initializer": keras.initializers.serialize(self.initializer)}
        return dict(list(base_config.items()) + list(config.items()))

### Adding SE layers before 'add' layers

In [None]:
####### add SE layers to model ##########
# input: model
#        l: list l of layer names where SE layers are going to go after
#        ratio : SE ratio
#        modelname: name of new model
#
# Output: modified model with SE layers at specified locations
# See https://stackoverflow.com/questions/49492255/how-to-replace-or-insert-intermediate-layer-in-keras-model to understand the function
##########################################

def add_SE(model, l,ratio,modelname=None):
    se_num =1
    # Auxiliary dictionary to describe the network graph
    network_dict = {'input_layers_of': {}, 'new_output_tensor_of': {}}

    # Set the input layers of each layer
    for layer in model.layers:
        for node in layer._outbound_nodes:
            layer_name = node.outbound_layer.name
            if layer_name not in network_dict['input_layers_of']:
                network_dict['input_layers_of'].update(
                        {layer_name: [layer.name]})
            else:
                network_dict['input_layers_of'][layer_name].append(layer.name)

    # Set the output tensor of the input layer
    network_dict['new_output_tensor_of'].update(
            {model.layers[0].name: model.input})

    # Iterate over all layers after the input
    model_outputs = []
    for layer in model.layers[1:]:

        # Determine input tensors
        layer_input = [network_dict['new_output_tensor_of'][layer_aux] 
                for layer_aux in network_dict['input_layers_of'][layer.name]]
        if len(layer_input) == 1:
            layer_input = layer_input[0]

        # Insert SE layers if name matches the argument list
        if layer.name in l:
            #input of SE
            x = layer(layer_input)
            
            ### adding SE layers
            out_dim=layer.output_shape[-1]
            #squeeze layer
            F_sq = GlobalAveragePooling2D(name='SE'+str(se_num)+'_global_avg')(x) #squeeze
            #Excitation with 2 fully connected layers
            F_ex = Dense(out_dim / ratio,activation='relu', name='SE'+str(se_num)+'_dense_relu')(F_sq)
            F_ex = Dense(out_dim,activation='sigmoid', name='SE'+str(se_num)+'_dense_sig')(F_ex)
            #Output : rescaling
            F_ex = tf.reshape(F_ex, [-1,1,1,out_dim])
            x=keras.layers.multiply([x,F_ex], name='SE'+str(se_num)+'_scaling')
            
            se_num+=1 # for naming
            print('added SE after ',layer.name)
        else:
            x = layer(layer_input)

        # Set new output tensor (the original one, or the one of the inserted
        # layer)
        network_dict['new_output_tensor_of'].update({layer.name: x})

        # Renaming model
        if layer.name in model.output_names:
            model_outputs.append(x)
        my_final_model= Model(inputs=model.inputs, outputs=model_outputs)
        if (modelname!=None):
            my_final_model._name = modelname
        else:
            my_final_model._name='SE_'+model._name
    return my_final_model

SE_ResNet50 = ResNet50(include_top=False, weights=None)
def SE_factory(ratio):
    return SE_layer(ratio=ratio)

SE_ResNet50 = add_SE(SE_ResNet50, list_se,ratio=16)
#SE_ResNet50.save('temp.h5')
#SE_ResNet50 = keras.models.load_model('temp.h5',custom_objects={'SE_layer': SE_layer})
SE_ResNet50.summary()
keras.utils.plot_model(SE_ResNet50, to_file='test.png',show_shapes=True)

### Adding topo layer, and input preprocessing (different resolution)

In [None]:
from tensorflow.keras.models import Model
from tensorflow.python.keras.layers import Dense, GlobalAveragePooling2D

inputs = tf.keras.Input(shape=(32, 32, 3))
# Preprocess for ResNet50 input size
x = preprocess_input(inputs)
# ResNet50
x = SE_ResNet50(x)
# Add top layer again
x=GlobalAveragePooling2D()(x)
#10 classes for cifar 10
preds=Dense(10,activation='softmax')(x)
SE_resnet50_cifar10=Model(inputs=inputs,outputs=preds)
SE_resnet50_cifar10.summary()


# Todo : train model with custom loop, adaptive learning rate, saving model every k epochs etc

# Top n accuracy

In [None]:
def top_n_acc(pred, targets,n=1):
    acc=np.mean(tf.keras.metrics.top_k_categorical_accuracy(targets, pred, k=n).numpy())
    print("top",n,'accuracy:',acc,sep=' ')
    return acc

In [None]:
preds=resnet50_cifar10.predict(x_test)
top_n_acc(preds,y_test,1)