Download DataSet: Ensure Completion of Uploading kaggle.json in colab before execution to get dataset from Kaggle.

In [1]:
! mkdir -p ~/.kaggle
! cp kaggle.json ~/.kaggle/
! kaggle datasets download -d kmader/lungnodemalignancy

Downloading lungnodemalignancy.zip to /content
100% 94.9M/94.9M [00:06<00:00, 19.4MB/s]
100% 94.9M/94.9M [00:06<00:00, 15.6MB/s]


Unzip Dataset:

In [2]:
import zipfile
zip_ref = zipfile.ZipFile('/content/lungnodemalignancy.zip')
zip_ref.extractall('/content')
zip_ref.close()

Delete Dataset Zip and Unwanted Files: It is not mandatory, Not good in case You Not Sure that the Dataset is fully Extracted

In [3]:
!rm -r '/content/sample_data' '/content/lungnodemalignancy.zip' '/content/kaggle.json'

### **Capsule Network (CapsNet) for image classification i.e, Detection of Lung Nodule Malignancy**

1. Data Augmentation Module (data_augmentation.py):

In [4]:
from keras.preprocessing.image import ImageDataGenerator

def create_data_generator(width_shift_range=0., height_shift_range=0.):
  """
  Creates an ImageDataGenerator for data augmentation.

  Args:
    width_shift_range: Fraction of total width for random horizontal shifts.
    height_shift_range: Fraction of total height for random vertical shifts.

  Returns:
    An ImageDataGenerator object.
  """
  train_datagen = ImageDataGenerator(width_shift_range=width_shift_range,
                                    height_shift_range=height_shift_range)
  return train_datagen

def generate_data(x, y, batch_size):
  """
  Generates batches of data with optional augmentation.

  Args:
    x: Input data.
    y: Target labels.
    batch_size: Batch size.

  Yields:
    A tuple of (x_batch, y_batch) for training.
  """
  generator = train_datagen.flow(x, y, batch_size=batch_size)
  while 1:
    x_batch, y_batch = generator.next()
    yield ([x_batch, y_batch], [y_batch, x_batch])


2. Data Loading Module (data_loading.py):

In [5]:
import h5py
from sklearn.model_selection import train_test_split
import numpy as np
from keras.utils import to_categorical

def load_data(data_path):
  """
  Loads data from HDF5 files.

  Args:
    data_path: Path to the HDF5 file containing data.

  Returns:
    A tuple of (X_full, y_full):
      - X_full: Input data (numpy array).
      - y_full: Target labels (numpy array).
  """
  with h5py.File(data_path, 'r') as f:
    X_full = f['ct_slices'][:]
    y_full = f['slice_class'][:]

  X_full = np.expand_dims(X_full[:, ::2, ::2], -1)  # downsample and add depth
  X_full = np.clip((X_full + 600)/900, 0, 1).astype(np.float32)  # normalize
  y_full = to_categorical(y_full.astype(np.float32))

  return X_full, y_full

def split_data(X_full, y_full, test_size=0.3):
  """
  Splits data into training and testing sets.

  Args:
    X_full: Full input data.
    y_full: Full target labels.
    test_size: Fraction of data for the testing set.

  Returns:
    A tuple of ((x_train, y_train), (x_test, y_test)):
      - x_train: Training input data.
      - y_train: Training target labels.
      - x_test: Testing input data.
      - y_test: Testing target labels.
  """
  return train_test_split(X_full, y_full, test_size=test_size)


3. Model Creation Module (model_creation.py):

In [20]:
from keras.layers import Input, Conv2D, Reshape, Dense
#from .custom_layers import Length, Mask, CapsuleLayer, PrimaryCap
from keras import models
from keras import backend as K
from keras.utils import to_categorical
from keras.optimizers import Adam

def create_capsnet(input_shape, n_class, num_routing):
  """
  Creates a CapsNet model.

  Args:
    input_shape: Input data shape (tuple).
    n_class: Number of classes.
    num_routing: Number of routing iterations.

  Returns:
    A compiled Keras model.
  """
  x = Input(shape=input_shape)

  # Layer 1: Just a conventional Conv2D layer
  conv1 = Conv2D(filters=256, kernel_size=9, strides=1, padding='valid', activation='relu', name='conv1')(x)

  # Layer 2: Conv2D layer with `squash` activation, then reshape to [None, num_capsule, dim_vector]
  primarycaps = PrimaryCap(conv1, dim_vector=8, n_channels=32, kernel_size=9, strides=2, padding='valid')

  # Layer 3: Capsule layer. Routing algorithm works here.
  digitcaps = CapsuleLayer(num_capsule=n_class, dim_vector=16, num_routing=num_routing, name='digitcaps')(primarycaps)

  # Layer 4: This is an auxiliary layer to replace each capsule with its length. Just to match the true label's shape.
  # If using tensorflow, this will not be necessary. :)
  out_caps = Length(name='out_caps')(digitcaps)

  # Decoder network.
  y = Input(shape=(n_class,))
  masked = Mask()([digitcaps, y])  # The true label is used to mask the output of capsule layer.
  x_recon = Dense(512, activation='relu')(masked)
  x_recon = Dense(1024, activation='relu')(x_recon)
  x_recon = Dense(np.prod(input_shape).astype(int), activation='sigmoid')(x_recon)
  x_recon = Reshape(target_shape=input_shape, name='out_recon')(x_recon)

  # Two-input-two-output Keras Model
  model = models.Model([x, y], [out_caps, x_recon])

  return model

  # ... rest of the function with the loss function (margin_loss) and compilation
def margin_loss(y_true, y_pred):
    """
    Margin loss for Eq.(4). When y_true[i, :] contains not just one `1`, this loss should work too. Not test it.
    :param y_true: [None, n_classes]
    :param y_pred: [None, num_capsule]
    :return: a scalar loss value.
    """
    L = y_true * K.square(K.maximum(0., 0.9 - y_pred)) + \
        0.5 * (1 - y_true) * K.square(K.maximum(0., y_pred - 0.1))

    return K.mean(K.sum(L, 1))


4. Model Training Module (model_training.py):

In [17]:
from keras.callbacks import CSVLogger, ModelCheckpoint, LearningRateScheduler

def train_model(model, data, epoch_size_frac, epochs):
  """
  Trains a CapsNet model.

  Args:
    model: The CapsNet model to train.
    data: A tuple of ((x_train, y_train), (x_test, y_test)).
    epoch_size_frac: Fraction of the training data to use per epoch.
    epochs: Number of training epochs.

  Returns:
    The trained model.
  """
  (x_train, y_train), (x_test, y_test) = data

  # Define callbacks
  log = CSVLogger('log.csv')
  checkpoint = ModelCheckpoint('weights-{epoch:02d}.h5',
                               save_best_only=True, save_weights_only=True, verbose=1)
  lr_decay = LearningRateScheduler(schedule=lambda epoch: 0.001 * np.exp(-epoch / 10.))

  # Train the model
  model.fit_generator(generator=generate_data(x_train, y_train, 64, 0.1),
                      steps_per_epoch=int(epoch_size_frac*y_train.shape[0] / 64),
                      epochs=epochs,
                      validation_data=[[x_test, y_test], [y_test, x_test]],
                      callbacks=[log, checkpoint, lr_decay])

  model.save_weights('trained_model.h5')
  print('Trained model saved to \'trained_model.h5\'')

  return model


5. Testing Module (testing.py):

In [8]:
#from .data_augmentation import generate_data
import matplotlib.pyplot as plt
from PIL import Image

def test_model(model, data):
  """
  Evaluates a CapsNet model on the testing data.

  Args:
    model: The trained CapsNet model.
    data: A tuple of (x_test, y_test).

  Prints the test accuracy and saves reconstructed images.
  """
  x_test, y_test = data
  y_pred, x_recon = model.predict([x_test, y_test], batch_size=100)

  print('-'*50)
  print('Test acc:', np.sum(np.argmax(y_pred, 1) == np.argmax(y_test, 1))/y_test.shape[0])

  img = combine_images(np.concatenate([x_test[:50],x_recon[:50]]))
  image = img * 255
  Image.fromarray(image.astype(np.uint8)).save("real_and_recon.png")
  print()
  print('Reconstructed images are saved to ./real_and_recon.png')
  print('-'*50)

  plt.imshow(plt.imread("real_and_recon.png", ))
  plt.show()


6. Custom Layers Module (custom_layers.py):

In [9]:
import keras.backend as K
import tensorflow as tf
from keras import initializers, layers

class Length(layers.Layer):
    """
    Compute the length of vectors. This is used to compute a Tensor that has the same shape with y_true in margin_loss
    inputs: shape=[dim_1, ..., dim_{n-1}, dim_n]
    output: shape=[dim_1, ..., dim_{n-1}]
    """
    def call(self, inputs, **kwargs):
        return K.sqrt(K.sum(K.square(inputs), -1))

    def compute_output_shape(self, input_shape):
        return input_shape[:-1]

class Mask(layers.Layer):
    """
    Mask a Tensor with shape=[None, d1, d2] by the max value in axis=1.
    Output shape: [None, d2]
    """
    def call(self, inputs, **kwargs):
        # use true label to select target capsule, shape=[batch_size, num_capsule]
        if type(inputs) is list:  # true label is provided with shape = [batch_size, n_classes], i.e. one-hot code.
            assert len(inputs) == 2
            inputs, mask = inputs
        else:  # if no true label, mask by the max length of vectors of capsules
            x = inputs
            # Enlarge the range of values in x to make max(new_x)=1 and others < 0
            x = (x - K.max(x, 1, True)) / K.epsilon() + 1
            mask = K.clip(x, 0, 1)  # the max value in x clipped to 1 and other to 0

        # masked inputs, shape = [batch_size, dim_vector]
        inputs_masked = K.batch_dot(inputs, mask, [1, 1])
        return inputs_masked

    def compute_output_shape(self, input_shape):
        if type(input_shape[0]) is tuple:  # true label provided
            return tuple([None, input_shape[0][-1]])
        else:
            return tuple([None, input_shape[-1]])


def squash(vectors, axis=-1):
    """
    The non-linear activation used in Capsule. It drives the length of a large vector to near 1 and small vector to 0
    :param vectors: some vectors to be squashed, N-dim tensor
    :param axis: the axis to squash
    :return: a Tensor with same shape as input vectors
    """
    s_squared_norm = K.sum(K.square(vectors), axis, keepdims=True)
    scale = s_squared_norm / (1 + s_squared_norm) / K.sqrt(s_squared_norm)
    return scale * vectors


class CapsuleLayer(layers.Layer):
    """
    The capsule layer. It is similar to Dense layer. Dense layer has `in_num` inputs, each is a scalar, the output of the
    neuron from the former layer, and it has `out_num` output neurons. CapsuleLayer just expand the output of the neuron
    from scalar to vector. So its input shape = [None, input_num_capsule, input_dim_vector] and output shape = \
    [None, num_capsule, dim_vector]. For Dense Layer, input_dim_vector = dim_vector = 1.

    :param num_capsule: number of capsules in this layer
    :param dim_vector: dimension of the output vectors of the capsules in this layer
    :param num_routings: number of iterations for the routing algorithm
    """
    def __init__(self, num_capsule, dim_vector, num_routing=3,
                 kernel_initializer='glorot_uniform',
                 bias_initializer='zeros',
                 **kwargs):
        super(CapsuleLayer, self).__init__(**kwargs)
        self.num_capsule = num_capsule
        self.dim_vector = dim_vector
        self.num_routing = num_routing
        self.kernel_initializer = initializers.get(kernel_initializer)
        self.bias_initializer = initializers.get(bias_initializer)

    def build(self, input_shape):
        assert len(input_shape) >= 3, "The input Tensor should have shape=[None, input_num_capsule, input_dim_vector]"
        self.input_num_capsule = input_shape[1]
        self.input_dim_vector = input_shape[2]

        # Transform matrix
        self.W = self.add_weight(shape=[self.input_num_capsule, self.num_capsule, self.input_dim_vector, self.dim_vector],
                                 initializer=self.kernel_initializer,
                                 name='W')

        # Coupling coefficient. The redundant dimensions are just to facilitate subsequent matrix calculation.
        self.bias = self.add_weight(shape=[1, self.input_num_capsule, self.num_capsule, 1, 1],
                                    initializer=self.bias_initializer,
                                    name='bias',
                                    trainable=False)
        self.built = True

    def call(self, inputs, training=None):
        # inputs.shape=[None, input_num_capsule, input_dim_vector]
        # Expand dims to [None, input_num_capsule, 1, 1, input_dim_vector]
        inputs_expand = K.expand_dims(K.expand_dims(inputs, 2), 2)

        # Replicate num_capsule dimension to prepare being multiplied by W
        # Now it has shape = [None, input_num_capsule, num_capsule, 1, input_dim_vector]
        inputs_tiled = K.tile(inputs_expand, [1, 1, self.num_capsule, 1, 1])

        """
        # Compute `inputs * W` by expanding the first dim of W. More time-consuming and need batch_size.
        # Now W has shape  = [batch_size, input_num_capsule, num_capsule, input_dim_vector, dim_vector]
        w_tiled = K.tile(K.expand_dims(self.W, 0), [self.batch_size, 1, 1, 1, 1])

        # Transformed vectors, inputs_hat.shape = [None, input_num_capsule, num_capsule, 1, dim_vector]
        inputs_hat = K.batch_dot(inputs_tiled, w_tiled, [4, 3])
        """
        # Compute `inputs * W` by scanning inputs_tiled on dimension 0. This is faster but requires Tensorflow.
        # inputs_hat.shape = [None, input_num_capsule, num_capsule, 1, dim_vector]
        inputs_hat = tf.scan(lambda ac, x: K.batch_dot(x, self.W, [3, 2]),
                             elems=inputs_tiled,
                             initializer=K.zeros([self.input_num_capsule, self.num_capsule, 1, self.dim_vector]))
        """
        # Routing algorithm V1. Use tf.while_loop in a dynamic way.
        def body(i, b, outputs):
            c = tf.nn.softmax(self.bias, dim=2)  # dim=2 is the num_capsule dimension
            outputs = squash(K.sum(c * inputs_hat, 1, keepdims=True))
            b = b + K.sum(inputs_hat * outputs, -1, keepdims=True)
            return [i-1, b, outputs]

        cond = lambda i, b, inputs_hat: i > 0
        loop_vars = [K.constant(self.num_routing), self.bias, K.sum(inputs_hat, 1, keepdims=True)]
        _, _, outputs = tf.while_loop(cond, body, loop_vars)
        """
        # Routing algorithm V2. Use iteration. V2 and V1 both work without much difference on performance
        assert self.num_routing > 0, 'The num_routing should be > 0.'
        for i in range(self.num_routing):
            c = tf.nn.softmax(self.bias, dim=2)  # dim=2 is the num_capsule dimension
            # outputs.shape=[None, 1, num_capsule, 1, dim_vector]
            outputs = squash(K.sum(c * inputs_hat, 1, keepdims=True))

            # last iteration needs not compute bias which will not be passed to the graph any more anyway.
            if i != self.num_routing - 1:
                # self.bias = K.update_add(self.bias, K.sum(inputs_hat * outputs, [0, -1], keepdims=True))
                self.bias += K.sum(inputs_hat * outputs, -1, keepdims=True)
            # tf.summary.histogram('BigBee', self.bias)  # for debugging
        return K.reshape(outputs, [-1, self.num_capsule, self.dim_vector])

    def compute_output_shape(self, input_shape):
        return tuple([None, self.num_capsule, self.dim_vector])


def PrimaryCap(inputs, dim_vector, n_channels, kernel_size, strides, padding):
    """
    Apply Conv2D `n_channels` times and concatenate all capsules
    :param inputs: 4D tensor, shape=[None, width, height, channels]
    :param dim_vector: the dim of the output vector of capsule
    :param n_channels: the number of types of capsules
    :return: output tensor, shape=[None, num_capsule, dim_vector]
    """
    output = layers.Conv2D(filters=dim_vector*n_channels, kernel_size=kernel_size, strides=strides, padding=padding)(inputs)
    outputs = layers.Reshape(target_shape=[-1, dim_vector])(output)
    return layers.Lambda(squash)(outputs)


Main Script (main.py):

In [19]:
# Load data
data_path = '/content/all_patches.hdf5'
X_full, y_full = load_data(data_path)

# Split data
x_train, x_test, y_train, y_test = split_data(X_full, y_full)

# Define model parameters
input_shape = [32, 32, 1]
n_class = 2
num_routing = 3

# Create CapsNet model #create_capsnet
model = create_capsnet(input_shape, n_class, num_routing)

# Compile the model
model.compile(loss='categorical_crossentropy', optimizer=Adam(lr=0.001), metrics=['accuracy'])

# Train the model
train_model(model, (x_train, y_train), epoch_size_frac=1.0, epochs=10)

# Evaluate the model on test data
loss, accuracy = model.evaluate(x_test, y_test, verbose=0)
print("Test accuracy:", accuracy)

ValueError: Exception encountered when calling layer "digitcaps" (type CapsuleLayer).

in user code:

    File "<ipython-input-9-8b4cde22a264>", line 114, in call  *
        inputs_hat = tf.scan(lambda ac, x: K.batch_dot(x, self.W, [3, 2]),

    ValueError: Inconsistent shapes: saw (2048, 2, 1, 2, 16) but expected (2048, 2, 1, 16) 


Call arguments received by layer "digitcaps" (type CapsuleLayer):
  • inputs=tf.Tensor(shape=(None, 2048, 8), dtype=float32)
  • training=None