 <!-- SVM, Small VGG Network, ArcFace, FCapNetwork -->

In [15]:
import os
import numpy as np
import cv2
import random
from matplotlib import pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, accuracy_score
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.layers import Dense, Flatten, Dropout
from tensorflow.keras.models import Sequential
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.utils import to_categorical
from tensorflow.keras import initializers


import tensorflow as tf
from tensorflow.keras import layers, models, optimizers, losses
import numpy as np

import tensorflow as tf
from tensorflow.keras.layers import Layer, Lambda



image_size = (160, 120, 3)
input_shape = (160, 120, 3)
num_classes = 2
num_capsules = 32  
dim_capsules = 16  
routing_iterations = 3  
kernel_size = 5  
learning_rate = 1e-5  
dataset_path = "data"


print('Our dataset = ')
print(os.listdir(dataset_path))

Our dataset = 
['Grupo_0', 'Grupo_1', '.DS_Store', 'Grupo_4', 'Grupo_3', 'Grupo_2', 'Grupo_5']


Step 1: Loading DataSet, LG Folder - 29 Subjects, with one corrupt sample

In [None]:
def load_images_from_folder(folder_path, max_depth=5):
    images = []
    labels = []
    names = []
    corruptedFiles = []
    image_extensions = ["bmp"]

    for root, dirs, files in os.walk(folder_path):
        current_depth = root[len(folder_path):].count(os.sep)

        if ('IriTech' in root) or ('Iritech' in root) or ('iriTech' in root) or ('iritech' in root):
            continue

        if current_depth <= max_depth:
            for file in files:
                if any(file.lower().endswith(ext) for ext in image_extensions):
                    img_path = os.path.join(root, file)
                    try:
                        img = cv2.imread(img_path)
                        if img is not None:
                            img = cv2.resize(img, (160, 120))
                            images.append(img)
                            # Binary label, '0' means Fit for Duty, others alcoholic
                            labels.append(file[6] != '0')
                            names.append(file)
                    except:
                        corruptedFiles.append(img_path)
        else:
            del dirs[:]

    print(f"Following {len(corruptedFiles)} files are corrupt or encountered error: \n {corruptedFiles}")
    return np.array(images), np.array(labels), np.array(names)

images, labels, names = load_images_from_folder(dataset_path)
print(f"Read {len(images)} images from the data folder with shape {images.shape}")


In [None]:
ri = random.randint(0, len(images) - 1)
print("Randome index: " + str(ri))
plt.imshow(images[ri])

Step 2: Split images and labels into 80-20 random train test split and train cnn

In [None]:
# Split the dataset into 70% train and 30% test
images_train, images_test, labels_train, labels_test = train_test_split(images, labels, test_size=0.3, random_state=42)

# Normalize the image data
images_train = images_train / 255.0
images_test = images_test / 255.0


labels_train_cnn = to_categorical(labels_train, num_classes=2)
labels_test_cnn = to_categorical(labels_test, num_classes=2)

In [54]:
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers, losses
import numpy as np

class CapsuleLayer(layers.Layer):
    def __init__(self, num_capsules, dim_capsules, routing_iterations=3, **kwargs):
        super(CapsuleLayer, self).__init__(**kwargs)
        self.num_capsules = num_capsules
        self.dim_capsules = dim_capsules
        self.routing_iterations = routing_iterations

    def build(self, input_shape):
        self.W = self.add_weight(shape=[self.num_capsules, input_shape[1], self.dim_capsules, input_shape[2]],
                                 initializer='glorot_uniform',
                                 trainable=True)

    def call(self, inputs):
        inputs = tf.expand_dims(inputs, 1)
        inputs_hat = tf.scan(lambda ac, x: tf.linalg.matvec(self.W, x), elems=inputs)
        logits = tf.zeros(shape=[tf.shape(inputs_hat)[0], self.num_capsules, inputs.shape[1]])
        for i in range(self.routing_iterations):
            routing_weights = tf.nn.softmax(logits, axis=1)
            weighted_prediction = tf.reduce_sum(routing_weights * inputs_hat, axis=2)
            outputs = self.squash(weighted_prediction)
            if i < self.routing_iterations - 1:
                logits += tf.reduce_sum(inputs_hat * outputs[:, :, None, :], axis=-1)
        return outputs

    def squash(self, s):
        s_squared_norm = tf.reduce_sum(tf.square(s), axis=-1, keepdims=True)
        scale = s_squared_norm / (1 + s_squared_norm) / tf.sqrt(s_squared_norm + 1e-9)
        return scale * s

# Reconstruction network
def build_decoder(caps_output, image_size=(120, 160)):
    decoded = layers.Dense(512, activation='relu')(caps_output)
    decoded = layers.Dense(1024, activation='relu')(decoded)
    decoded = layers.Dense(np.prod(image_size), activation='sigmoid')(decoded)
    decoded = layers.Reshape(target_shape=image_size)(decoded)
    return decoded

# Capsule Network model
def build_capsule_network(input_shape, num_classes, num_capsules, dim_capsules, routing_iterations, kernel_size, learning_rate):
    inputs = layers.Input(shape=input_shape)
    
    conv1 = layers.Conv2D(64, kernel_size, activation='relu')(inputs)
    conv2 = layers.Conv2D(128, kernel_size, activation='relu')(conv1)

    primary_caps = layers.Conv2D(256, kernel_size, strides=2, padding='valid')(conv2)
    primary_caps = layers.Reshape((-1, dim_capsules))(primary_caps)
    
    capsules = CapsuleLayer(num_capsules=num_capsules, dim_capsules=dim_capsules, routing_iterations=routing_iterations)(primary_caps)
    
    capsules_output = layers.Lambda(lambda z: tf.norm(z, axis=-1))(capsules)
    output = layers.Dense(num_classes, activation='sigmoid')(capsules_output)
    
    decoder_output = build_decoder(capsules_output)
    
    model = models.Model(inputs=inputs, outputs=[output, decoder_output])
    
    loss = [losses.BinaryCrossentropy(), 'mse']
    loss_weights = [1.0, 0.0005]
    
    model.compile(optimizer=optimizers.Adam(learning_rate=learning_rate),
                  loss=loss, loss_weights=loss_weights,
                  metrics=['accuracy'])
    
    model.summary()
    
    return model



In [12]:
class SquashLayer(layers.Layer):
    def __init__(self, **kwargs):
        super(SquashLayer, self).__init__(**kwargs)

    def call(self, inputs):
        s = inputs
        squared_norm = tf.reduce_sum(tf.square(s), axis=-1, keepdims=True)
        safe_norm = tf.sqrt(squared_norm + 1e-7)
        squash_factor = squared_norm / (1. + squared_norm)
        return squash_factor * s / safe_norm


In [83]:
import tensorflow as tf
from tensorflow.keras.layers import Layer

class WTilingLayer(Layer):
    def __init__(self, caps1_n_caps, caps2_n_caps, caps2_n_dims, caps1_n_dims, init_sigma=0.1, **kwargs):
        super(WTilingLayer, self).__init__(**kwargs)
        self.caps1_n_caps = caps1_n_caps
        self.caps2_n_caps = caps2_n_caps
        self.caps2_n_dims = caps2_n_dims
        self.caps1_n_dims = caps1_n_dims
        self.init_sigma = init_sigma

    def build(self, input_shape):
        W_init = tf.random.normal(
            shape=(1, self.caps1_n_caps, self.caps2_n_caps, self.caps2_n_dims, self.caps1_n_dims),
            stddev=self.init_sigma, dtype=tf.float32, name="W_init"
        )
        self.W = tf.Variable(W_init, name="W", trainable=True)
    
    def call(self, inputs):
        caps1_output = inputs[0]
        X = inputs
        batch_size = tf.shape(X)[0]
        W_tiled = tf.tile(self.W, [batch_size, 1, 1, 1, 1], name="W_tiled")
        return W_tiled

# Example usage:
# layer = CustomCapsuleLayer(caps1_n_caps=32, caps2_n_caps=10, caps2_n_dims=16, caps1_n_dims=8)
# W_tiled, caps1_output_tiled = layer([caps1_output, X])


In [101]:
class TilingLayer(Layer):
    def __init__(self, caps2_n_caps, **kwargs):
        super(TilingLayer, self).__init__(**kwargs)
        self.caps2_n_caps = caps2_n_caps

    def call(self, inputs):
        inputs = tf.reshape(inputs, [-1, 119808, 8])
        caps1_output_expanded = tf.expand_dims(inputs, -1, name="caps1_output_expanded")
        caps1_output_tile = tf.expand_dims(caps1_output_expanded, 2, name="caps1_output_tile")
        caps1_output_tiled = tf.tile(caps1_output_tile, [1, 1, self.caps2_n_caps, 1, 1], name="caps1_output_tiled")
        return caps1_output_tiled

In [134]:
class MatMulitplier(Layer):
    def __init__(self, w_tiled, **kwargs):
        super(MatMulitplier, self).__init__(**kwargs)
        self.w_tiled = w_tiled
    
    def call(self, inputs):
        print(inputs[0].shape)
        print(inputs[1].shape)
        caps2_predicted = tf.matmul(inputs[0], inputs[1])
        return caps2_predicted


In [13]:
input_shape = (160, 120, 1)
inputs = layers.Input(shape=input_shape)  

conv1 = layers.Conv2D(256, 9, strides=1, activation='relu', name="conv1")(inputs)
conv2 = layers.Conv2D(256, 9, strides=2, activation='relu', name="conv2")(conv1)

caps1_raw = layers.Reshape((-1, 32, 8), name="caps1_raw")(conv2)
squashed_output = SquashLayer(name="caps1_output")(caps1_raw)



In [16]:
class DigitCapsuleLayer(layers.Layer):
    # creating a layer class in keras
    def __init__(self, **kwargs):
        super(DigitCapsuleLayer, self).__init__(**kwargs)
        self.kernel_initializer = initializers.get('glorot_uniform')
    
    def build(self, input_shape): 
        # initialize weight matrix for each capsule in lower layer
        self.W = self.add_weight(shape = [2, 32*72*52, 16, 8], initializer = self.kernel_initializer, name = 'weights')
        self.built = True
    
    def call(self, inputs):
        inputs = K.expand_dims(inputs, 1)
        inputs = K.tile(inputs, [1, 2, 1, 1])
        # matrix multiplication b/w previous layer output and weight matrix
        inputs = K.map_fn(lambda x: K.batch_dot(x, self.W, [2, 3]), elems=inputs)
        b = tf.zeros(shape = [K.shape(inputs)[0], 10, 32*72*52])
        
# routing algorithm with updating coupling coefficient c, using scalar product b/w input capsule and output capsule
        for i in range(3-1):
            c = tf.nn.softmax(b, dim=1)
            s = K.batch_dot(c, inputs, [2, 2])
            v = squash(s)
            b = b + K.batch_dot(v, inputs, [2,3])
            
        return v 
    def compute_output_shape(self, input_shape):
        return tuple([None, 10, 16])
    
    
    
def output_layer(inputs):
    return K.sqrt(K.sum(K.square(inputs), -1) + K.epsilon())
 
digit_caps = DigitCapsuleLayer()(squashed_output)
outputs = Lambda(output_layer)(digit_caps)


NotImplementedError: Exception encountered when calling Lambda.call().

[1mWe could not automatically infer the shape of the Lambda's output. Please specify the `output_shape` argument for this Lambda layer.[0m

Arguments received by Lambda.call():
  • args=('<KerasTensor shape=(None, 10, 16), dtype=float32, sparse=False, name=keras_tensor_10>',)
  • kwargs={'mask': 'None'}

In [None]:
caps2_n_caps = 2
caps2_n_dims = 16

init_sigma = 0.1

W_tiled = WTilingLayer(32*72*52, caps2_n_caps, caps2_n_dims, 8, init_sigma, name="w_tiled")(caps1_output)
caps1_output_tiled = TilingLayer(caps2_n_caps)(caps1_output)
inputs = [W_tiled, caps1_output_tiled]
caps2_predicted = MatMulitplier(w_tiled=W_tiled, name="caps2_predicted")(inputs)
# output = layers.Dense(num_classes, activation='sigmoid')(caps2_predicted)

model = models.Model(inputs=inputs, outputs=[caps2_predicted])

model.summary()

In [None]:
model = build_capsule_network(input_shape, num_classes, num_capsules, dim_capsules, routing_iterations, kernel_size, learning_rate)

In [None]:
print(f"Train images count: {len(images_train)}")
print(f"Image Shape {images_train[0].shape}")
print(f"Test images count: {len(images_test)}")

In [None]:
cnn_model.fit(
    images_train,
    labels_train_cnn,
    epochs=10,
    validation_data=(images_test, labels_test_cnn)
)

In [None]:
#Testing model
cnn_loss, cnn_accuracy = cnn_model.evaluate(images_test, labels_test_cnn)
print(f"CNN Test Accuracy: {cnn_accuracy * 100:.2f}%")

In [None]:
predictions = cnn_model.predict(images_test)
predicted_classes = np.argmax(predictions, axis=1)
true_classes = np.argmax(labels_test_cnn, axis=1)

report = classification_report(true_classes, predicted_classes, target_names=['Fit for Duty', 'Alcoholic'])
print(report)

accuracy = accuracy_score(true_classes, predicted_classes)
print(f'Overall Accuracy: {accuracy * 100:.2f}%')

Demonstration

In [None]:
totalTest = 0
correctTest = 0

In [None]:
# testImagePath = 'images/test2.png'
# testImg = cv2.imread(testImagePath, cv2.IMREAD_GRAYSCALE)

r_in = random.randint(0, len(images) - 1)

testImg = images[r_in]
testLabel = labels[r_in]
# testImg = cv2.resize(testImg, image_size)

plt.imshow(testImg, cmap='gray')

testImg = np.expand_dims(testImg, axis=0)

print(testImg.shape)

prediction = cnn_model.predict(testImg)
prediction = np.argmax(prediction[0])

totalTest +=1
correctTest += prediction==testLabel

prediction = 'Alcoholic' if prediction else 'Fit for duty'
testLabel = 'Alcoholic' if testLabel else 'Fit for duty'
plt.title(f'Name: {names[r_in]}\n\nPrediction: {prediction}\nActual: {testLabel}\n\nAccuracy: {correctTest/totalTest}')

In [None]:
cnn_model.save("./Models/ResNet50Classification/NonAugmented.keras")