In [1]:
import tensorflow as tf
tf. __version__

'2.3.4'

In [2]:
from tensorflow.keras import backend as K
import tensorflow as tf
from tensorflow.keras import initializers, layers

In [3]:
class Length(layers.Layer):
    """
    Compute the length of vectors. This is used to compute a Tensor that has the same shape with y_true in margin_loss.
    Using this layer as model's output can directly predict labels by using `y_pred = np.argmax(model.predict(x), 1)`
    inputs: shape=[None, num_vectors, dim_vector]
    output: shape=[None, num_vectors]
    """
    def call(self, inputs, **kwargs):
        return K.sqrt(K.sum(K.square(inputs), -1) + K.epsilon())

    def compute_output_shape(self, input_shape):
        return input_shape[:-1]

    def get_config(self):
        config = super(Length, self).get_config()
        return config


class Mask(layers.Layer):
    """
    Mask a Tensor with shape=[None, num_capsule, dim_vector] either by the capsule with max length or by an additional 
    input mask. Except the max-length capsule (or specified capsule), all vectors are masked to zeros. Then flatten the
    masked Tensor.
    For example:
        ```
        x = keras.layers.Input(shape=[8, 3, 2])  # batch_size=8, each sample contains 3 capsules with dim_vector=2
        y = keras.layers.Input(shape=[8, 3])  # True labels. 8 samples, 3 classes, one-hot coding.
        out = Mask()(x)  # out.shape=[8, 6]
        # or
        out2 = Mask()([x, y])  # out2.shape=[8,6]. Masked with true labels y. Of course y can also be manipulated.
        ```
    """
    def call(self, inputs, **kwargs):
        if type(inputs) is list:  # true label is provided with shape = [None, n_classes], i.e. one-hot code.
            assert len(inputs) == 2
            inputs, mask = inputs
        else:  # if no true label, mask by the max length of capsules. Mainly used for prediction
            # compute lengths of capsules
            x = K.sqrt(K.sum(K.square(inputs), -1))
            # generate the mask which is a one-hot code.
            # mask.shape=[None, n_classes]=[None, num_capsule]
            mask = K.one_hot(indices=K.argmax(x, 1), num_classes=x.get_shape().as_list()[1])

        # inputs.shape=[None, num_capsule, dim_capsule]
        # mask.shape=[None, num_capsule]
        # masked.shape=[None, num_capsule * dim_capsule]
        masked = K.batch_flatten(inputs * K.expand_dims(mask, -1))
        return masked

    def compute_output_shape(self, input_shape):
        if type(input_shape[0]) is tuple:  # true label provided
            return tuple([None, input_shape[0][1] * input_shape[0][2]])
        else:  # no true label provided
            return tuple([None, input_shape[1] * input_shape[2]])

    def get_config(self):
        config = super(Mask, self).get_config()
        return config


def squash(vectors, axis=-1):
    """
    The non-linear activation used in Capsule. It drives the length of a large vector to near 1 and small vector to 0
    :param vectors: some vectors to be squashed, N-dim tensor
    :param axis: the axis to squash
    :return: a Tensor with same shape as input vectors
    """
    s_squared_norm = K.sum(K.square(vectors), axis, keepdims=True)
    scale = s_squared_norm / (1 + s_squared_norm) / K.sqrt(s_squared_norm + K.epsilon())
    return scale * vectors


class CapsuleLayer(layers.Layer):
    """
    The capsule layer. It is similar to Dense layer. Dense layer has `in_num` inputs, each is a scalar, the output of the 
    neuron from the former layer, and it has `out_num` output neurons. CapsuleLayer just expand the output of the neuron
    from scalar to vector. So its input shape = [None, input_num_capsule, input_dim_capsule] and output shape = \
    [None, num_capsule, dim_capsule]. For Dense Layer, input_dim_capsule = dim_capsule = 1.
    
    :param num_capsule: number of capsules in this layer
    :param dim_capsule: dimension of the output vectors of the capsules in this layer
    :param routings: number of iterations for the routing algorithm
    """
    def __init__(self, num_capsule, dim_capsule, routings=3,
                 kernel_initializer='glorot_uniform',
                 **kwargs):
        super(CapsuleLayer, self).__init__(**kwargs)
        self.num_capsule = num_capsule
        self.dim_capsule = dim_capsule
        self.routings = routings
        self.kernel_initializer = initializers.get(kernel_initializer)

    def build(self, input_shape):
        assert len(input_shape) >= 3, "The input Tensor should have shape=[None, input_num_capsule, input_dim_capsule]"
        self.input_num_capsule = input_shape[1]
        self.input_dim_capsule = input_shape[2]

        # Transform matrix
#         self.W = self.add_weight(shape=[self.num_capsule, self.input_num_capsule,
#                                         self.dim_capsule, self.input_dim_capsule],
#                                  initializer=self.kernel_initializer,
#                                  name='W')
        self.W = self.add_weight(
            shape=[1, self.input_num_capsule, self.num_capsule, 
                   self.dim_capsule, self.input_dim_capsule],
            initializer=self.kernel_initializer,
            name='W')
        self.built = True

    def call(self, inputs, training=None):
        # inputs.shape=[None, input_num_capsule, input_dim_capsule]
        # inputs_expand.shape=[None, 1, input_num_capsule, input_dim_capsule]
        
        #inputs_expand = K.expand_dims(inputs, 1)
        #print("inputs_expand.shape:", inputs_expand.shape)
        
        inputs_expand = K.expand_dims(K.expand_dims(inputs, 2), 2)
        W_tiled = K.tile(self.W, [K.shape(inputs)[0], 1, 1, 1, 1])
        # Replicate num_capsule dimension to prepare being multiplied by W
        #inputs_tiled.shape=[None, num_capsule, input_num_capsule, input_dim_capsule]
        
        #inputs_tiled = K.tile(inputs_expand, [1, self.num_capsule, 1, 1])
        #print("inputs_tiled.shape:", inputs_tiled.shape)

        # Compute `inputs * W` by scanning inputs_tiled on dimension 0.
        # x.shape=[num_capsule, input_num_capsule, input_dim_capsule]
        # W.shape=[num_capsule, input_num_capsule, dim_capsule, input_dim_capsule]
        # Regard the first two dimensions as `batch` dimension,
        # then matmul: [input_dim_capsule] x [dim_capsule, input_dim_capsule]^T -> [dim_capsule].
        # inputs_hat.shape = [None, num_capsule, input_num_capsule, dim_capsule]
        #inputs_hat = K.map_fn(lambda x: K.batch_dot(x, self.W, [2, 3]), elems=inputs_tiled)
        inputs_hat = tf.squeeze(
            tf.matmul(W_tiled, inputs_expand, transpose_b=True), axis=-1)
        
        print("Self.w:",self.W)
        print("inputs_hat.shape:", inputs_hat.shape)
        
        # Begin: Routing algorithm ---------------------------------------------------------------------#
        # The prior for coupling coefficient, initialized as zeros.
        # b.shape = [None, self.num_capsule, self.input_num_capsule].
        #b = tf.zeros(shape=[K.shape(inputs_hat)[0], self.num_capsule, self.input_num_capsule])
        b = tf.zeros(shape=[K.shape(inputs)[0], self.input_num_capsule, self.num_capsule])
        print("b.shape:", b.shape)
        
        assert self.routings > 0, 'The routings should be > 0.'
        for i in range(self.routings):
            # c.shape=[batch_size, num_capsule, input_num_capsule]
            c = tf.nn.softmax(b, axis=2)
            print(f"Routing {i}, c.shape:", c.shape)
            c_expand = K.expand_dims(c, -1)
            
            # c.shape =  [batch_size, num_capsule, input_num_capsule]
            # inputs_hat.shape=[None, num_capsule, input_num_capsule, dim_capsule]
            # The first two dimensions as `batch` dimension,
            # then matmal: [input_num_capsule] x [input_num_capsule, dim_capsule] -> [dim_capsule].
            # outputs.shape=[None, num_capsule, dim_capsule]
            #outputs = squash(K.batch_dot(c, inputs_hat, [2, 2]))  # [None, 10, 16]
            outputs = squash(tf.reduce_sum(inputs_hat * c_expand, axis=1))
            print(f"Routing {i}, outputs.shape:", outputs.shape)
            
            if i < self.routings - 1:
                # outputs.shape =  [None, num_capsule, dim_capsule]
                # inputs_hat.shape=[None, num_capsule, input_num_capsule, dim_capsule]
                # The first two dimensions as `batch` dimension,
                # then matmal: [dim_capsule] x [input_num_capsule, dim_capsule]^T -> [input_num_capsule].
                # b.shape=[batch_size, num_capsule, input_num_capsule]
                #b += K.batch_dot(outputs, inputs_hat, [2, 3])
                outputs_expand = K.expand_dims(outputs, 1)
                b += tf.reduce_sum(inputs_hat * c_expand, axis=-1)
        # End: Routing algorithm -----------------------------------------------------------------------#

        return outputs

    def compute_output_shape(self, input_shape):
        return tuple([None, self.num_capsule, self.dim_capsule])

    def get_config(self):
        config = {
            'num_capsule': self.num_capsule,
            'dim_capsule': self.dim_capsule,
            'routings': self.routings
        }
        base_config = super(CapsuleLayer, self).get_config()
        return dict(list(base_config.items()) + list(config.items()))


def PrimaryCap(inputs, dim_capsule, n_channels, kernel_size, strides, padding):
    """
    Apply Conv2D `n_channels` times and concatenate all capsules
    :param inputs: 4D tensor, shape=[None, width, height, channels]
    :param dim_capsule: the dim of the output vector of capsule
    :param n_channels: the number of types of capsules
    :return: output tensor, shape=[None, num_capsule, dim_capsule]
    """
    output = layers.Conv2D(filters=dim_capsule*n_channels, kernel_size=kernel_size, strides=strides, padding=padding,
                           name='primarycap_conv2d')(inputs)
    outputs = layers.Reshape(target_shape=[-1, dim_capsule], name='primarycap_reshape')(output)
    return layers.Lambda(squash, name='primarycap_squash')(outputs)

In [4]:
from tensorflow.keras.applications import MobileNet
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, Reshape, Dense, Flatten, BatchNormalization


def MobileNet_CapsuleNet(input_shape, n_classes, routings):
    """
    Combines MobileNet for lightweight feature extraction with CapsuleNet for relational modeling.

    Parameters:
    - input_shape: Tuple, shape of the input images (width, height, channels).
    - n_classes: Integer, number of output classes.
    - routings: Integer, number of routing iterations in CapsuleNet.

    Returns:
    - model: Keras Model, combined MobileNet + CapsuleNet.
    """
    # Define input layer
    input_layer = Input(shape=input_shape)

    # Load pre-trained MobileNet model without the top classification layer
    mobilenet_base = MobileNet(weights='imagenet', include_top=False, input_tensor=input_layer, input_shape=input_shape)

    # Extract feature maps from MobileNet
    mobilenet_output = mobilenet_base.output  # Feature maps

    # Add a Conv2D layer to adapt MobileNet output for CapsuleNet
    conv_caps = Conv2D(filters=128, kernel_size=3, strides=1, padding='same', activation='relu')(mobilenet_output)

    # Verify the output shape of conv_caps
    print(f"Shape after conv_caps: {conv_caps.shape}")

    # Adjust kernel size and padding for the Primary Capsule Layer
    primary_caps = PrimaryCap(conv_caps, dim_capsule=8, n_channels=32, kernel_size=3, strides=2, padding='valid')
    print(primary_caps.shape)

    # Capsule Layer: DigitCaps for classification
    digit_caps = CapsuleLayer(num_capsule=n_classes, dim_capsule=16, routings=routings)(primary_caps)

    # Length layer: Outputs the length of the capsule vectors for classification
    output_caps = Length(name='capsnet')(digit_caps)

    # Build the model
    model = Model(inputs=input_layer, outputs=output_caps)

    return model


In [5]:
# Example usage
if __name__ == "__main__":
    input_shape = (128, 128, 3)  # Example input shape
    n_classes = 10  # Example number of classes
    routings = 3  # Number of routing iterations

    model = MobileNet_CapsuleNet(input_shape=input_shape, n_classes=n_classes, routings=routings)
    model.summary()

Shape after conv_caps: (None, 4, 4, 128)
(None, 32, 8)
Self.w: <tf.Variable 'capsule_layer/W:0' shape=(1, 32, 10, 16, 8) dtype=float32>
inputs_hat.shape: (None, 32, 10, 16)
b.shape: (None, 32, 10)
Routing 0, c.shape: (None, 32, 10)
Routing 0, outputs.shape: (None, 10, 16)
Routing 1, c.shape: (None, 32, 10)
Routing 1, outputs.shape: (None, 10, 16)
Routing 2, c.shape: (None, 32, 10)
Routing 2, outputs.shape: (None, 10, 16)
Model: "functional_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 128, 128, 3)]     0         
_________________________________________________________________
conv1_pad (ZeroPadding2D)    (None, 129, 129, 3)       0         
_________________________________________________________________
conv1 (Conv2D)               (None, 64, 64, 32)        864       
_________________________________________________________________
conv1_bn (BatchNormalizat

In [6]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, LearningRateScheduler
from tensorflow.keras import layers
from tensorflow.keras.models import Model
from tensorflow.keras.applications import MobileNet
import tensorflow as tf
from tensorflow.keras.regularizers import l2
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import classification_report, confusion_matrix


In [7]:
'''we Try to split the Entire dataset into Trian and Test so after we split them there wont be any data present in the 
initial dataset directory ,so when we run for the second time it show error!! '''

'we Try to split the Entire dataset into Trian and Test so after we split them there wont be any data present in the \ninitial dataset directory ,so when we run for the second time it show error!! '

In [8]:
import os
import shutil
from sklearn.model_selection import train_test_split

initial_dataset_dir = 'D:/Major Project/Data'
train_dir = 'D:/Major Project/Train'
test_dir = 'D:/Major Project/Test'

# # Create the new directories for train and test sets
# if not os.path.exists(train_dir):
#     os.makedirs(train_dir)

# if not os.path.exists(test_dir):
#     os.makedirs(test_dir)

# # Iterate through each class directory in the initial dataset
# for class_name in os.listdir(initial_dataset_dir):
#     class_dir = os.path.join(initial_dataset_dir, class_name)
#     if os.path.isdir(class_dir):
#         # Create the directories for train and test data for each class
#         train_class_dir = os.path.join(train_dir, class_name)
#         test_class_dir = os.path.join(test_dir, class_name)

#         # Create class directories if they do not exist
#         os.makedirs(train_class_dir, exist_ok=True)
#         os.makedirs(test_class_dir, exist_ok=True)

#         # Get all the images for the current class
#         image_files = [f for f in os.listdir(class_dir) if os.path.isfile(os.path.join(class_dir, f))]
        
#         # Split the images into train and test sets (80% train, 20% test)
#         train_files, test_files = train_test_split(image_files, test_size=0.2, random_state=42)
        
#         # Move the images into the corresponding directories
#         for train_file in train_files:
#             shutil.move(os.path.join(class_dir, train_file), os.path.join(train_class_dir, train_file))

#         for test_file in test_files:
#             shutil.move(os.path.join(class_dir, test_file), os.path.join(test_class_dir, test_file))

# print("Dataset split into train and test directories successfully!")


In [9]:
# Data Augmentation for training and validation
train_datagen = ImageDataGenerator(
    rescale=1./255,
    rotation_range=30,  
    width_shift_range=0.2,  
    height_shift_range=0.2,
    shear_range=0.2,  
    zoom_range=0.2,
    horizontal_flip=True,
    brightness_range=[0.8, 1.2],  
    channel_shift_range=60.0,
    validation_split=0.3
)

valid_datagen = ImageDataGenerator(rescale=1./255, validation_split=0.3)

train_generator = train_datagen.flow_from_directory(
    train_dir,
    target_size=(128, 128),
    batch_size=16,  
    class_mode='categorical',
    subset='training'
)

valid_generator = valid_datagen.flow_from_directory(
    train_dir,
    target_size=(128, 128),
    batch_size=16,  
    class_mode='categorical',
    subset='validation'
)

test_datagen = ImageDataGenerator(rescale=1./255)
test_generator = test_datagen.flow_from_directory(
    test_dir,
    target_size=(128, 128),
    batch_size=16,
    class_mode='categorical',
    shuffle=False
)

input_shape = (128, 128, 3)
num_classes = train_generator.num_classes
routings = 3  # Number of routing iterations in CapsuleNet

# Build the MobileNet-CapsuleNet model
model = MobileNet_CapsuleNet(input_shape, num_classes, routings)

# Learning Rate Warm-Up with extended warm-up period and smoother decay
def lr_warmup(epoch):
    if epoch < 10:  
        return 1e-4
    elif epoch < 20:
        return 5e-5  
    else:
        return 1e-5

lr_scheduler = LearningRateScheduler(lr_warmup)

# Compile the model with Adam optimizer and categorical crossentropy loss
optimizer = Adam()
model.compile(optimizer=optimizer,
              loss='categorical_crossentropy',
              metrics=['accuracy'])

# Callbacks
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-7)
checkpoint = ModelCheckpoint('best_model.keras', monitor='val_loss', save_best_only=True, mode='min')

epochs = 40

# Training the model for 40 epochs
history = model.fit(
    train_generator,
    steps_per_epoch=train_generator.samples // train_generator.batch_size,
    validation_data=valid_generator,
    validation_steps=valid_generator.samples // valid_generator.batch_size,
    epochs=epochs,
    callbacks=[reduce_lr, lr_scheduler, checkpoint]
)

# Save the final model
model.save('mobilenet_capsulenet_final_model.keras')

Found 2809 images belonging to 7 classes.
Found 1199 images belonging to 7 classes.
Found 1006 images belonging to 7 classes.
Shape after conv_caps: (None, 4, 4, 128)
(None, 32, 8)
Self.w: <tf.Variable 'capsule_layer_1/W:0' shape=(1, 32, 7, 16, 8) dtype=float32>
inputs_hat.shape: (None, 32, 7, 16)
b.shape: (None, 32, 7)
Routing 0, c.shape: (None, 32, 7)
Routing 0, outputs.shape: (None, 7, 16)
Routing 1, c.shape: (None, 32, 7)
Routing 1, outputs.shape: (None, 7, 16)
Routing 2, c.shape: (None, 32, 7)
Routing 2, outputs.shape: (None, 7, 16)
Epoch 1/40
Self.w: <tf.Variable 'capsule_layer_1/W:0' shape=(1, 32, 7, 16, 8) dtype=float32>
inputs_hat.shape: (None, 32, 7, 16)
b.shape: (None, 32, 7)
Routing 0, c.shape: (None, 32, 7)
Routing 0, outputs.shape: (None, 7, 16)
Routing 1, c.shape: (None, 32, 7)
Routing 1, outputs.shape: (None, 7, 16)
Routing 2, c.shape: (None, 32, 7)
Routing 2, outputs.shape: (None, 7, 16)
Self.w: <tf.Variable 'capsule_layer_1/W:0' shape=(1, 32, 7, 16, 8) dtype=float32>


KeyboardInterrupt: 

In [None]:
test_loss, test_acc = model.evaluate(test_generator, steps=test_generator.samples // test_generator.batch_size)
print(f'Test Accuracy: {test_acc * 100:.2f}%')

# Evaluate the model on test data
test_loss, test_acc = model.evaluate(test_generator)
print(f"Test Accuracy: {test_acc:.4f}")

# Confusion Matrix and Classification Report
Y_pred = model.predict(test_generator)
y_pred = np.argmax(Y_pred, axis=1)
print('Confusion Matrix')
cm = confusion_matrix(test_generator.classes, y_pred)
print(cm)

# Classification Report
target_names = list(test_generator.class_indices.keys())
print('Classification Report')
print(classification_report(test_generator.classes, y_pred, target_names=target_names))

In [None]:
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=target_names, yticklabels=target_names)

In [None]:
import matplotlib.pyplot as plt

# Assuming 'history' is the History object returned from model.fit()
# Example: history = model.fit(...)

# Plot training & validation accuracy values
plt.figure(figsize=(8, 6))
plt.plot(history.history['accuracy'], label='Train Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(loc='upper left')
plt.grid(True)
plt.show()