In [1]:
#import libraries
import os
import cv2
import numpy as np
import keras
import tensorflow as tf
from sklearn.utils import shuffle
import matplotlib.pyplot as plt
from datetime import datetime
from sklearn.metrics import classification_report, accuracy_score
from keras.applications import MobileNet
from keras.layers import Dense, GlobalAveragePooling2D, Dropout, GlobalMaxPooling2D, Activation, Multiply, Conv2D, Concatenate, Flatten, ELU
from keras.models import Model
from keras.optimizers import Adam 
from keras.callbacks import EarlyStopping
from numpy.random import seed
import re
from keras import backend as K

seed(25)
tf.random.set_seed(25)
tf.keras.utils.set_random_seed(25)
tf.config.experimental.enable_op_determinism()

2023-04-15 13:07:18.090428: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F AVX512_VNNI FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-04-15 13:07:18.248747: I tensorflow/core/util/port.cc:104] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2023-04-15 13:07:18.253077: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /home/cserv1_a/soc_ug/fy19iars/project/lib/python3.9/site-packages/cv2/../../lib64:

Helper functions and variables

In [2]:
def list_labels(file):
    labels_file = open(file, "r")
    labels = []
    
    for line in labels_file:
        label = line.strip()
        labels.append(label)
    
    labels_file.close()
    
    return labels


#list of all labels
class_names = list_labels("./CamSDD/Labels.txt")
class_name_labels = {class_name:i for i, class_name in enumerate(class_names)}



def load_data(folder):
    Category = ["training", "test", "validation"]
    output = []
    
    for category in Category:
        print("Loading {}".format(category))
        path = os.path.join(folder, category)
        print(path)
        images = []
        labels = []
        
        for sub_folder in os.listdir(path):
            label = class_name_labels[sub_folder]
            
            #Iterating through all images
            for file in os.listdir(os.path.join(path, sub_folder)):
                
                #getting the image path
                img_path = os.path.join(os.path.join(path, sub_folder), file)
                
                #appending image and corresponding label
                images.append(cv2.resize(cv2.imread(img_path), (224, 224)))
                # images.append(cv2.imread(img_path))
                labels.append(label)
            
        #check that data type doesn't affect accuracy
        images = (np.array(images, dtype='float32')/127.5)-1
        labels = np.array(labels, dtype='int8')
        
        output.append((images, labels))
        
    return output



#displays 25 images with labels
#cite
def display_examples(class_names, images, labels):
    figsize = (20, 20)
    fig = plt.figure(figsize=figsize)
    fig.suptitle("Example of images", fontsize=16)
    for i in range(25):
        plt.subplot(5,5,i+1)
        plt.xticks([])
        plt.yticks([])
        plt.grid(False)
        plt.imshow(images[i].astype(np.uint8))
        plt.xlabel(class_names[labels[i]])
    plt.show()

#https://stackoverflow.com/questions/49492255/how-to-replace-or-insert-intermediate-layer-in-keras-model
def insert_layer_nonseq(model, layer_regex, insert_layer_factory,
                        insert_layer_name=None, position='after'):

    # Auxiliary dictionary to describe the network graph
    network_dict = {'input_layers_of': {}, 'new_output_tensor_of': {}}

    # Set the input layers of each layer
    for layer in model.layers:
        for node in layer._outbound_nodes:
            layer_name = node.outbound_layer.name
            if layer_name not in network_dict['input_layers_of']:
                network_dict['input_layers_of'].update(
                        {layer_name: [layer.name]})
            else:
                network_dict['input_layers_of'][layer_name].append(layer.name)

    # Set the output tensor of the input layer
    network_dict['new_output_tensor_of'].update(
            {model.layers[0].name: model.input})

    # Iterate over all layers after the input
    model_outputs = []
    for layer in model.layers[1:]:

        # Determine input tensors
        layer_input = [network_dict['new_output_tensor_of'][layer_aux] 
                for layer_aux in network_dict['input_layers_of'][layer.name]]
        if len(layer_input) == 1:
            layer_input = layer_input[0]

        # Insert layer if name matches the regular expression
        if re.match(layer_regex, layer.name):
            if position == 'replace':
                x = layer_input
            elif position == 'after':
                x = layer(layer_input)
            elif position == 'before':
                pass
            else:
                raise ValueError('position must be: before, after or replace')

            new_layer = insert_layer_factory()
            if insert_layer_name:
                new_layer._name = insert_layer_name
            else:
                new_layer._name = '{}_{}'.format(layer.name, 
                                                new_layer.name)
            x = new_layer(x)
            print('New layer: {} Old layer: {} Type: {}'.format(new_layer.name,
                                                            layer.name, position))
            if position == 'before':
                x = layer(x)
        else:
            x = layer(layer_input)

        # Set new output tensor (the original one, or the one of the inserted
        # layer)
        network_dict['new_output_tensor_of'].update({layer.name: x})

        # Save tensor in output list if it is output in initial model
        if layer_name in model.output_names:
            model_outputs.append(x)

    return Model(inputs=model.inputs, outputs=model_outputs)

Loading data

In [3]:
(train_images, train_labels), (test_images, test_labels), (validation_images, validation_labels)= load_data("./CamSDD")

Loading training
./CamSDD/training
Loading test
./CamSDD/test
Loading validation
./CamSDD/validation


In [4]:
#shuffling train data
train_images, train_labels = shuffle(train_images, train_labels, random_state=25)
validation_images, validation_labels = shuffle(validation_images, validation_labels, random_state=25)

In [9]:
model = keras.models.load_model('spatial_configB5')

start = datetime.now()
score = model.evaluate(test_images, test_labels)
end = datetime.now()

print('Test loss:', score[0])
print('Test accuracy:', score[1])
print('time: ', end-start)

Test loss: 0.16657833755016327
Test accuracy: 0.9566666483879089
time:  0:00:05.115132


In [7]:
model.predict_classes(test_images[0])

AttributeError: 'Functional' object has no attribute 'predict_classes'

Visualize data

In [None]:
display_examples(class_names, train_images, train_labels)

Creating model

In [5]:
#channel module
#channel module
def channel_attention_module(x, ratio, bias):
    b, _, _, channel = x.shape
    # MLP shared layer
    l1 = Dense(channel//ratio, activation="relu", use_bias=bias)
    l2 = Dense(channel, use_bias=bias)
    
    # Global Average pooling
    x1 = GlobalAveragePooling2D()(x)
    x1 = l1(x1)
    x1 = l2(x1)
    
    # Global Max pooling
    x2 = GlobalMaxPooling2D()(x)
    x2 = l1(x2)
    x2 = l2(x2)
    
    # Adding both and applying sigmoid
    features = x1 + x2
    features = Activation("sigmoid")(features)
    features = Multiply()([x, features])
    
    return features

# spatial attention module
def spatial_attention_module(x, kernel_size=7, bias=True):
    # Average pooling
    x1 = tf.reduce_mean(x, axis=-1)
    x1 = tf.expand_dims(x1, axis=-1)
    
    # Max pooling
    x2 = tf.reduce_max(x, axis=-1)
    x2 = tf.expand_dims(x2, axis=-1)
    
    # Concatenate
    features = Concatenate()([x1, x2])
    
    # Conv layer
    features = Conv2D(1, kernel_size=kernel_size, use_bias=bias, padding="same", activation="sigmoid")(features)
    features = Multiply()([x, features])
    
    return features
    
# CBAM
def CBAM(x, ratio, bias):
    x = channel_attention_module(x, ratio=ratio, bias=bias)
    x = spatial_attention_module(x)
    return x

def elu2(x):
    return K.switch(K.less_equal(x, 1), K.exp(x) - 1, x)

In [6]:
base_model=MobileNet(weights='imagenet', include_top=False) 

# def elu_layer():
#     return ELU(name='elu')
# base_model = insert_layer_nonseq(base_model, '.*relu.*', elu_layer, position="replace")

x=base_model.output
x=spatial_attention_module(x, 9, True)
x=GlobalAveragePooling2D()(x)
x=Dense(1024, activation='sigmoid')(x)
x=Dropout(0.7)(x)
output = Dense(30, activation="softmax")(x)

model=Model(inputs=base_model.input,outputs=output)



2023-02-25 16:43:42.207006: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /home/cserv1_a/soc_ug/fy19iars/project/lib/python3.9/site-packages/cv2/../../lib64:/uollinapps/AppsData/src/vscode/1.71.2-1663191299.el7/lib64
2023-02-25 16:43:42.207042: W tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:265] failed call to cuInit: UNKNOWN ERROR (303)
2023-02-25 16:43:42.207064: I tensorflow/compiler/xla/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (feng-linux-08.leeds.ac.uk): /proc/driver/nvidia/version does not exist
2023-02-25 16:43:42.207480: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:

In [None]:
#checking model architecture
print(model.summary())
# for i,layer in enumerate(model.layers):
#   print(i,layer.name)
# model.layers[3].get_config()

In [7]:
for layer in base_model.layers:
    layer.trainable = False

In [8]:
monitor = EarlyStopping(monitor='val_loss', min_delta=1e-3, patience=10, restore_best_weights=True)
model.compile(optimizer=Adam(learning_rate=0.0001), loss='sparse_categorical_crossentropy', metrics=["accuracy"])
history = model.fit(train_images, train_labels, batch_size=20, epochs=100, validation_data=(validation_images,validation_labels), callbacks=[monitor], shuffle=False)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100


Evaluating trained model

In [9]:
# # # model.save("spatial_k9_T_2")
# model1 = keras.models.load_model('spatial_k7_T')

score = model.evaluate(test_images, test_labels)
print('Test loss:', score[0])
print('Test accuracy:', score[1])

Test loss: 0.22056683897972107
Test accuracy: 0.9233333468437195


In [None]:
def plot_accuracy_loss(history):
    # #plot accuracy
    # plt.subplot(221)
    # plt.plot(history.history["accuracy"], 'bo--', label="acc")
    # plt.plot(history.history["val_accuracy"], "ro--", label = "val_acc")
    # plt.title("train_acc vs val_acc")
    # plt.ylabel("accuracy")
    # plt.xlabel("epochs")
    # plt.legend()
    
    #plot loss function
    plt.subplot(221)
    plt.plot(history.history["loss"], 'bo--', label="loss")
    plt.plot(history.history["val_loss"], "ro--", label = "val_loss")
    plt.title("train_loss vs val_loss")
    plt.ylabel("loss")
    plt.xlabel("epochs")
    plt.legend()
    plt.show()

plot_accuracy_loss(history)

In [None]:
predictions = model.predict(test_images)
pred_labels = np.argmax(predictions, axis=1)
print(classification_report(test_labels, pred_labels))