In [2]:
try:
    from tensorflow import keras
    print("tf.keras is available.")
except ImportError:
    print("tf.keras is not available.")


tf.keras is available.


In [3]:
import os
import cv2
import numpy as np
from sklearn.model_selection import train_test_split
from tensorflow.python import keras 
import tensorflow as tf
from keras.utils import to_categorical
from keras import layers
from keras.layers import Layer
from keras.models import Model


In [10]:
image_dir = '/home/sujan/Conv-Caps/Conv-Capsnet/Dataset/'
image_size = (150, 150)
num_classes = 3

# Function to load and preprocess images
def load_and_preprocess_data(image_dir, image_size):
    data = []
    labels = []
    classes = ['covid', 'pneumonia', 'normal']
    
    for class_label, class_name in enumerate(classes):
        class_dir = os.path.join(image_dir, class_name)
        for img_filename in os.listdir(class_dir):
            img_path = os.path.join(class_dir, img_filename)
            try:
                img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)  # Read image as grayscale
                
                if img is None:
                    print(f"Error loading image: {img_path}")
                    continue

                img = cv2.resize(img, image_size)  # Resize image to 150x150
                img = img.astype('float32') / 255.0  # Normalize pixel values to [0, 1]
                
                # Expand dimensions for grayscale images (add channel dimension)
                img_expanded = np.expand_dims(img, axis=-1)
                
                data.append(img_expanded)
                labels.append(class_label)
            except Exception as e:
                print(f"Error processing image: {img_path} - {str(e)}")

    # Convert lists to numpy arrays
    data = np.array(data)
    labels = np.array(labels)
    
    # Convert labels to one-hot encoded vectors
    labels = to_categorical(labels, num_classes=len(classes))
    
    return data, labels

    
# Load and preprocess the data
data, labels = load_and_preprocess_data(image_dir, image_size)

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(data, labels, test_size=0.25, random_state=42)

# Print the shape of the training and testing data
print("X_train shape:", X_train.shape)
print("y_train shape:", y_train.shape)
print("X_test shape:", X_test.shape)
print("y_test shape:", y_test.shape)

X_train shape: (11364, 150, 150, 1)
y_train shape: (11364, 3)
X_test shape: (3789, 150, 150, 1)
y_test shape: (3789, 3)


In [4]:
def safe_norm(s, ax=-1, epsilon=1e-7, keepdims_v=False, name=None):
    with tf.name_scope(name):
        squared_norm = tf.reduce_sum(tf.square(s), axis=ax, keepdims=keepdims_v)
        return tf.sqrt(squared_norm + epsilon)

In [12]:
def squash(self, s, axis=-1, epsilon=1e-7, name=None):
        with tf.name_scope(name):
            squared_norm = tf.reduce_sum(tf.square(s), axis=axis, keepdims=True)
            safe_norm = tf.sqrt(squared_norm + epsilon)
            squash_factor = squared_norm / (1. + squared_norm)
            unit_vector = s / safe_norm
            return squash_factor * unit_vector
        
class PrimaryCapsuleLayer(layers.Layer):
    def __init__(self, **kwargs):
        super(PrimaryCapsuleLayer, self).__init__(**kwargs)
        self.conv1 = layers.Conv2D(16, kernel_size=(5, 5), strides=(1, 1), padding='same', activation=tf.nn.relu)
        self.conv2 = layers.Conv2D(32, kernel_size=(5, 5), strides=(2, 2), padding='same', activation=tf.nn.relu)
        self.conv3 = layers.Conv2D(64, kernel_size=(5, 5), strides=(1, 1), padding='same', activation=tf.nn.relu)
        self.conv4 = layers.Conv2D(128, kernel_size=(9, 9), strides=(1, 1), padding='same', activation=tf.nn.relu)
                
    def call(self, inputs):
        conv1_output = self.conv1(inputs)
        conv2_output = self.conv2(conv1_output)
        conv3_output = self.conv3(conv2_output)
        conv4_output = self.conv4(conv3_output)
        primary_capsule_input = tf.reshape(conv4_output, [-1, 1152, 8])
        primary_capsule_output = self.squash(primary_capsule_input)
        return primary_capsule_output

In [16]:
class SecondaryCapsuleLayer(layers.Layer):
    def __init__(self, num_capsules, capsule_dimension, num_routing=3, **kwargs):
        super(SecondaryCapsuleLayer, self).__init__(**kwargs)
        self.num_capsules = num_capsules
        self.capsule_dimension = capsule_dimension
        self.num_routing = num_routing        
        
    def build(self, input_shape):
        self.W = self.add_weight(shape=[1, 1152, self.num_capsules, self.capsule_dimension, 8], initializer='random_normal', trainable=True)

    def call(self, inputs):
        primary_capsule_output_expanded = tf.expand_dims(inputs, -1)
        primary_capsule_output_tile = tf.expand_dims(primary_capsule_output_expanded, 2)
        primary_capsule_output_tiled = tf.tile(primary_capsule_output_tile, [1, 1, self.num_capsules, 1, 1])
        secondary_capsule_predicted = tf.matmul(self.W, primary_capsule_output_tiled)

        # Dynamic Routing 
        
        # Initialize bias bij to zero 
        
        raw_weights_round1 = tf.zeros([tf.shape(X_train)[0], 1152, 3, 1, 1], dtype=tf.float32, name="raw_weights_round1")
        
        # Round 1 
        
        # Coupling Coeffecient Cij = softmax(bij) 
        
        routing_weights_round1 = tf.nn.softmax(raw_weights_round1, axis=2, name="routing_weights_round1")
        
        # Weighted Sum Sij = Summation(Cij * uj|i)
        
        weighted_predictions_round1 = tf.multiply(routing_weights_round1, secondary_capsule_predicted, name="weighted_predictions_round1")
        weighted_sum_round1 = tf.reduce_sum(weighted_predictions_round1, axis=1, keepdims=True, name="weighted_sum_round1")
        
        # vj = SQUASH(Sj) 
        
        secondary_capsule_output_round1 = squash(weighted_sum_round1, axis=-2, name="secondary_capsule_output_round1")
        
        secondary_capsule_output_round1 
        
        # Update bias bij = bij + uj|i . vj
        
        secondary_capsule_output_round1_tiled = tf.tile(secondary_capsule_output_round1, [1, 1152, 1, 1, 1], name="secondary_capsule_output_round1_tiled")
        
        agreement_after_round1 = tf.matmul(secondary_capsule_predicted, secondary_capsule_output_round1_tiled, transpose_a=True, name="agreement_after_round1")
        
        raw_weights_round2 = tf.add(raw_weights_round1, agreement_after_round1, name="raw_weights_round2")
        
        # Round 2 
        
        # Coupling Coeffecient Cij = softmax(bij) 
        
        routing_weights_round2 = tf.nn.softmax(raw_weights_round2, axis=2, name="routing_weights_round2")
        
        # Weighted Sum Sij = Summation(Cij * uj|i)
        
        weighted_predictions_round2 = tf.multiply(routing_weights_round2, secondary_capsule_predicted, name="weighted_predictions_round2")
        weighted_sum_round2 = tf.reduce_sum(weighted_predictions_round2, axis=1, keepdims=True, name="weighted_sum_round2")
        
        # vj = SQUASH(Sj) 
        
        secondary_capsule_output_round2 = squash(weighted_sum_round2, axis=-2, name="secondary_capsule_output_round2")
        
        # Update bias bij = bij + uj|i . vj
        
        secondary_capsule_output_round2_tiled = tf.tile(secondary_capsule_output_round2, [1, 1152, 1, 1, 1], name="secondary_capsule_output_round2_tiled")
        
        agreement_after_round2 = tf.matmul(secondary_capsule_predicted, secondary_capsule_output_round2_tiled, transpose_a=True, name="agreement_after_round2")
        raw_weights_round3 = tf.add(raw_weights_round2, agreement_after_round2, name="raw_weights_round3")
        
        # Round 3 
        
        # Coupling Coeffecient Cij = softmax(bij) 
        
        routing_weights_round3 = tf.nn.softmax(raw_weights_round3, axis=2, name="routing_weights_round3")
        
        # Weighted Sum Sij = Summation(Cij * uj|i)
        
        weighted_predictions_round3 = tf.multiply(routing_weights_round3, secondary_capsule_predicted, name="weighted_predictions_round3")
        weighted_sum_round3 = tf.reduce_sum(weighted_predictions_round3, axis=1, keepdims=True, name="weighted_sum_round3")
        
        # vj = SQAUSH(sj)
        
        secondary_capsule_output_round3 = squash(weighted_sum_round3, axis=-2, name="secondary_capsule_output_round3")
        
        # Update bias bij = bij + uj|i . vj (OPTIONAL SINCE THIS IS THE LAST ROUND)
        
        seecondary_capsule_output_round3_tiled = tf.tile(secondary_capsule_output_round2, [1, 1152, 1, 1, 1], name="secondary_capsule_output_round2_tiled")
        
        agreement_after_round3 = tf.matmul(secondary_capsule_predicted, seecondary_capsule_output_round3_tiled, transpose_a=True, name="agreement_after_round3")
        
        raw_weights_round4 = tf.add(raw_weights_round3, agreement_after_round3, name="raw_weights_round4")
        
        # End of Dynamic Routing 
        
        secondary_capsule_output = secondary_capsule_output_round3
        return secondary_capsule_output  

In [17]:
class MarginLossLayer(Layer):
    def __init__(self, num_classes, m_plus=0.9, m_minus=0.1, lambda_val=0.5, **kwargs):
        super(MarginLossLayer, self).__init__(**kwargs)
        self.num_classes = num_classes
        self.m_plus = m_plus
        self.m_minus = m_minus
        self.lambda_val = lambda_val

    def call(self, inputs):
        T = labels
        caps2_output_norm = safe_norm(caps2_output, axis=-2, keep_dims=True,
                              name="caps2_output_norm")
        present_error_raw = tf.square(tf.maximum(0., m_plus - caps2_output_norm),
                              name="present_error_raw")
        present_error = tf.reshape(present_error_raw, shape=(-1, 10),
                                   name="present_error")
                
        absent_error_raw = tf.square(tf.maximum(0., caps2_output_norm - m_minus),
                                     name="absent_error_raw")
        absent_error = tf.reshape(absent_error_raw, shape=(-1, 10),
                                  name="absent_error")
        L = tf.add(T * present_error, lambda_ * (1.0 - T) * absent_error,
           name="L")
        margin_loss = tf.reduce_mean(tf.reduce_sum(L, axis=1), name="margin_loss")
        
        return margin_loss


In [18]:
class CapsuleNetwork(Model):
    def __init__(self, num_classes, **kwargs):
        super(CapsuleNetwork, self).__init__(**kwargs)
        # Primary Capsule Layer
        self.primary_capsule_layer = PrimaryCapsuleLayer(input_shape=(150, 150, 1))
        # Secondary Capsule Layer
        self.secondary_capsule_layer = SecondaryCapsuleLayer(num_capsules=3, capsule_dimension=16)
        # Margin Loss Layer
        self.margin_loss_layer = MarginLossLayer(num_classes=num_classes)
        self.num_classes = num_classes

    def call(self, inputs):
        # Primary Capsule Layer forward pass
        primary_capsule_output = self.primary_capsule_layer(inputs)
        # Secondary Capsule Layer forward pass
        secondary_capsule_output= self.secondary_capsule_layer(primary_capsule_output)
        # Estimated Class Probabilities
        y_prob = safe_norm(secondary_capsule_output, ax=-2, name="y_prob")
        y_prob_argmax = tf.argmax(y_prob, axis=2, name="y_prob_argmax")
        y_predicted = tf.squeeze(y_prob_argmax, axis=[1, 2], name="y_predicted")
        return y_predicted

    def compute_loss(self, inputs, true_labels, y, caps2_n_caps, caps2_output, lambda_):
        # Compute margin loss using MarginLossLayer
        predicted_labels = self(inputs)
        margin_loss = self.margin_loss_layer(predicted_labels, true_labels, y, caps2_n_caps, caps2_output, lambda_)
        return margin_loss

In [21]:
def main(): 
    # Define the model and training parameters
    num_classes = 3
    epochs = 10
    batch_size = len(X_train)
    print("Im bsdk")
    # Create an instance of the CapsuleNetwork model
    capsule_net = CapsuleNetwork(num_classes=num_classes)
    
    # Compile the model
    capsule_net.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    
    # Train the model (don't use np.argmax(y_train, axis=1))
    capsule_net.fit(X_train, y_train, epochs=epochs, batch_size=batch_size, validation_split=0.1)
    
    # Evaluate the model on test data
    test_loss, test_accuracy = capsule_net.evaluate(X_test, np.argmax(y_test, axis=1), verbose=1)
    print(f'Test Accuracy: {test_accuracy * 100:.2f}%')