In [1]:
import tensorflow as tf
from tensorflow.keras.layers import Layer
import tensorflow.keras as K
import tensorflow.keras.backend as Kback


DATA LOADING AND PREPROCESSING

In [5]:
import os
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns
import random

from tensorflow.keras import layers, models, optimizers, Input
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.model_selection import train_test_split

# Set dataset paths
desktop_path = os.path.join(os.path.expanduser("~"), "Desktop")
labelled_dataset_path = os.path.join(desktop_path, "Dataset xray", "Dataset labelled")

IMG_SIZE = 256
BATCH_SIZE = 16
EPOCHS = 10

# Load image paths and labels
def load_dataset(base_path):
    categories = ['Normal', 'Pneumonia']
    data = []
    for label, category in enumerate(categories):
        category_path = os.path.join(base_path, category)
        for fname in os.listdir(category_path):
            if fname.lower().endswith(('.png', '.jpg', '.jpeg')):
                data.append((os.path.join(category_path, fname), label))
    random.shuffle(data)
    return data

dataset = load_dataset(labelled_dataset_path)
X_paths, y_labels = zip(*dataset)
X_train, X_test, y_train, y_test = train_test_split(X_paths, y_labels, test_size=0.2, stratify=y_labels, random_state=42)

# TF Data pipeline
def preprocess_image(filename, label):
    image = tf.io.read_file(filename)
    image = tf.image.decode_png(image, channels=3) 
    image = tf.image.resize(image, [IMG_SIZE, IMG_SIZE])
    image = tf.cast(image, tf.float32) / 255.0
    return image, tf.one_hot(label, 2)

train_ds = tf.data.Dataset.from_tensor_slices((list(X_train), list(y_train)))
train_ds = train_ds.map(preprocess_image).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

test_ds = tf.data.Dataset.from_tensor_slices((list(X_test), list(y_test)))
test_ds = test_ds.map(preprocess_image).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

FCSSAM MODEL DEFINITION

In [7]:
from tensorflow.keras import backend as Kback

def SAM_avg(x, cam):
    channel = x.shape[-1]
    x = layers.SeparableConv2D(channel, kernel_size=1, padding="same",
                               depthwise_initializer='he_normal',
                               pointwise_initializer='he_normal')(x)
    x = layers.SeparableConv2D(channel, kernel_size=3, padding="same",
                               depthwise_initializer='he_normal',
                               pointwise_initializer='he_normal')(x)
    x = layers.BatchNormalization()(x)
    x = x * cam
    x1 = tf.reduce_mean(x, axis=-1, keepdims=True)
    feats = layers.Conv2D(1, kernel_size=7, padding="same", activation="sigmoid",
                          kernel_initializer='he_normal')(x1)
    feats = layers.Multiply()([x, feats])
    return feats

def SAM_max(x, cam):
    channel = x.shape[-1]
    x = layers.SeparableConv2D(channel, kernel_size=1, padding="same",
                               depthwise_initializer='he_normal',
                               pointwise_initializer='he_normal')(x)
    x = layers.SeparableConv2D(channel, kernel_size=3, padding="same",
                               depthwise_initializer='he_normal',
                               pointwise_initializer='he_normal')(x)
    x = layers.BatchNormalization()(x)
    x = x * cam
    x2 = tf.reduce_max(x, axis=-1, keepdims=True)
    feats = layers.Conv2D(1, kernel_size=7, padding="same", activation="sigmoid")(x2)
    feats = layers.Multiply()([x, feats])
    return feats


def CSSAM(x, cam):
    x_avg = SAM_avg(x, cam)
    x_max = SAM_max(x, cam)
    x = layers.Concatenate()([x_avg, x_max, cam])
    x = ChannelDropout(drop_ratio=0.5)(x)
    return x

def CAM(x, ratio=8):
    channel = x.shape[-1]
    l1 = layers.Dense(channel // ratio, activation="relu", use_bias=False)
    l2 = layers.Dense(channel, use_bias=False)
    
    x1 = layers.GlobalAveragePooling2D()(x)
    x1 = l1(x1)
    x1 = l2(x1)

    x2 = layers.GlobalMaxPooling2D()(x)
    x2 = l1(x2)
    x2 = l2(x2)

    feats = layers.Add()([x1, x2])
    feats = layers.Activation("sigmoid")(feats)
    feats = layers.Multiply()([x, feats])
    return feats

class ChannelDropout(tf.keras.layers.Layer):
    def __init__(self, drop_ratio=0.2):
        super(ChannelDropout, self).__init__()
        self.drop_ratio = drop_ratio

    def build(self, input_shape):
        self.channels = input_shape[-1]
        self.mask = RichardsSigmoid(units=1)(self.add_weight("mask", shape=(1, 1, 1, self.channels), initializer="ones", trainable=True))

    def call(self, x):
        mask = tf.tile(self.mask, [tf.shape(x)[0], 1, 1, 1])
        x = x * mask
        num_channels_to_keep = int(self.channels // 1.25)
        sorted_x, _ = tf.nn.top_k(x, k=num_channels_to_keep, sorted=True)
        return sorted_x

class RichardsSigmoid(tf.keras.layers.Layer):
    def __init__(self, units=1, **kwargs):
        super(RichardsSigmoid, self).__init__(**kwargs)
        self.units = units

    def build(self, input_shape):
        self.A = self.add_weight(name='A', shape=(self.units,), initializer='uniform', trainable=True)
        self.Q = self.add_weight(name='Q', shape=(self.units,), initializer='uniform', trainable=True)
        self.mu = self.add_weight(name='mu', shape=(self.units,), initializer='uniform', trainable=True)
        super(RichardsSigmoid, self).build(input_shape)

    def call(self, x):
        return 1 / (1 + tf.exp(-self.A * tf.exp(-self.Q * (x - self.mu))))

    def compute_output_shape(self, input_shape):
        return input_shape[:-1] + (self.units,)

# Build the model
def build_model():
    input_layer = Input(shape=(IMG_SIZE, IMG_SIZE, 3))
    base_model = tf.keras.applications.DenseNet169(include_top=False, weights="imagenet", input_tensor=input_layer)
    
    for layer in base_model.layers:
        layer.trainable = True

    feat_img = base_model.output
    cam = CAM(feat_img)
    feat_img = CSSAM(feat_img, cam)
    flat = layers.GlobalAveragePooling2D()(feat_img)
    flat = layers.Dropout(0.35)(flat)
    output = layers.Dense(2, activation='softmax')(flat)

    model = models.Model(inputs=input_layer, outputs=output)
    return model

# Instantiate and compile model
model = build_model()
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001),
              loss='categorical_crossentropy',
              metrics=[tf.keras.metrics.CategoricalAccuracy()])
model.summary()


ValueError: A KerasTensor cannot be used as input to a TensorFlow function. A KerasTensor is a symbolic placeholder for a shape and dtype, used when constructing Keras Functional models or Keras Functions. You can only use it as input to a Keras layer or a Keras operation (from the namespaces `keras.layers` and `keras.ops`). You are likely doing something like:

```
x = Input(...)
...
tf_fn(x)  # Invalid.
```

What you should do instead is wrap `tf_fn` in a layer:

```
class MyLayer(Layer):
    def call(self, x):
        return tf_fn(x)

x = MyLayer()(x)
```


In [10]:
import tensorflow as tf
from tensorflow.keras import layers, models, Input

# -- Custom helper layers for reduce_mean and reduce_max --
class ReduceMeanLayer(tf.keras.layers.Layer):
    def __init__(self, axis, keepdims):
        super().__init__()
        self.axis = axis
        self.keepdims = keepdims

    def call(self, inputs):
        return tf.reduce_mean(inputs, axis=self.axis, keepdims=self.keepdims)

class ReduceMaxLayer(tf.keras.layers.Layer):
    def __init__(self, axis, keepdims):
        super().__init__()
        self.axis = axis
        self.keepdims = keepdims

    def call(self, inputs):
        return tf.reduce_max(inputs, axis=self.axis, keepdims=self.keepdims)

# -- Attention modules --
def SAM_avg(x, cam):
    channel = x.shape[-1]
    x = layers.SeparableConv2D(channel, kernel_size=1, padding="same",
                               depthwise_initializer='he_normal',
                               pointwise_initializer='he_normal')(x)
    x = layers.SeparableConv2D(channel, kernel_size=3, padding="same",
                               depthwise_initializer='he_normal',
                               pointwise_initializer='he_normal')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Multiply()([x, cam])
    x1 = ReduceMeanLayer(axis=-1, keepdims=True)(x)
    feats = layers.Conv2D(1, kernel_size=7, padding="same", activation="sigmoid",
                          kernel_initializer='he_normal')(x1)
    feats = layers.Multiply()([x, feats])
    return feats

def SAM_max(x, cam):
    channel = x.shape[-1]
    x = layers.SeparableConv2D(channel, kernel_size=1, padding="same",
                               depthwise_initializer='he_normal',
                               pointwise_initializer='he_normal')(x)
    x = layers.SeparableConv2D(channel, kernel_size=3, padding="same",
                               depthwise_initializer='he_normal',
                               pointwise_initializer='he_normal')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Multiply()([x, cam])
    x2 = ReduceMaxLayer(axis=-1, keepdims=True)(x)
    feats = layers.Conv2D(1, kernel_size=7, padding="same", activation="sigmoid")(x2)
    feats = layers.Multiply()([x, feats])
    return feats

def CSSAM(x, cam):
    x_avg = SAM_avg(x, cam)
    x_max = SAM_max(x, cam)
    x = layers.Concatenate()([x_avg, x_max, cam])
    x = ChannelDropout(drop_ratio=0.5)(x)
    return x

def CAM(x, ratio=8):
    channel = x.shape[-1]
    l1 = layers.Dense(channel // ratio, activation="relu", use_bias=False)
    l2 = layers.Dense(channel, use_bias=False)
    
    x1 = layers.GlobalAveragePooling2D()(x)
    x1 = l1(x1)
    x1 = l2(x1)

    x2 = layers.GlobalMaxPooling2D()(x)
    x2 = l1(x2)
    x2 = l2(x2)

    feats = layers.Add()([x1, x2])
    feats = layers.Activation("sigmoid")(feats)
    feats = layers.Multiply()([x, feats])
    return feats

# -- Custom ChannelDropout and RichardsSigmoid layers --
class ChannelDropout(tf.keras.layers.Layer):
    def __init__(self, drop_ratio=0.2):
        super(ChannelDropout, self).__init__()
        self.drop_ratio = drop_ratio
        self.richards_sigmoid = RichardsSigmoid(units=1)

    def build(self, input_shape):
        self.channels = input_shape[-1]
        self.mask_weight = self.add_weight(
            name="mask_weight",
            shape=(1, 1, 1, self.channels),
            initializer="ones",
            trainable=True
        )

    def call(self, x):
        mask = self.richards_sigmoid(self.mask_weight)
        mask = tf.tile(mask, [tf.shape(x)[0], 1, 1, 1])
        x = x * mask
        num_channels_to_keep = int(self.channels // 1.25)
        sorted_x, _ = tf.nn.top_k(x, k=num_channels_to_keep, sorted=True)
        return sorted_x

class RichardsSigmoid(tf.keras.layers.Layer):
    def __init__(self, units=1, **kwargs):
        super(RichardsSigmoid, self).__init__(**kwargs)
        self.units = units

    def build(self, input_shape):
        self.A = self.add_weight(name='A', shape=(self.units,), initializer='uniform', trainable=True)
        self.Q = self.add_weight(name='Q', shape=(self.units,), initializer='uniform', trainable=True)
        self.mu = self.add_weight(name='mu', shape=(self.units,), initializer='uniform', trainable=True)
        super(RichardsSigmoid, self).build(input_shape)

    def call(self, x):
        return 1 / (1 + tf.exp(-self.A * tf.exp(-self.Q * (x - self.mu))))

    def compute_output_shape(self, input_shape):
        return input_shape[:-1] + (self.units,)

# -- Build the final model --
def build_model(input_shape=(256, 256, 3), num_classes=2):
    input_layer = Input(shape=input_shape)
    base_model = tf.keras.applications.DenseNet169(include_top=False, weights="imagenet", input_tensor=input_layer)
    
    for layer in base_model.layers:
        layer.trainable = True

    feat_img = base_model.output
    cam = CAM(feat_img)
    feat_img = CSSAM(feat_img, cam)
    flat = layers.GlobalAveragePooling2D()(feat_img)
    flat = layers.Dropout(0.35)(flat)
    output = layers.Dense(num_classes, activation='softmax')(flat)

    model = models.Model(inputs=input_layer, outputs=output)
    return model


In [11]:
model = build_model()
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001),
              loss='categorical_crossentropy',
              metrics=[tf.keras.metrics.CategoricalAccuracy()])
model.summary()


In [12]:
history = model.fit(train_ds, epochs=EPOCHS, validation_data=test_ds)


Epoch 1/10
[1m463/463[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1584s[0m 3s/step - categorical_accuracy: 0.6043 - loss: 0.5435 - val_categorical_accuracy: 0.8486 - val_loss: 0.4023
Epoch 2/10
[1m463/463[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3359s[0m 7s/step - categorical_accuracy: 0.8618 - loss: 0.3930 - val_categorical_accuracy: 0.8897 - val_loss: 0.2606
Epoch 3/10
[1m463/463[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2047s[0m 4s/step - categorical_accuracy: 0.8787 - loss: 0.2599 - val_categorical_accuracy: 0.8962 - val_loss: 0.2288
Epoch 4/10
[1m463/463[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1418s[0m 3s/step - categorical_accuracy: 0.9045 - loss: 0.1957 - val_categorical_accuracy: 0.8518 - val_loss: 0.4083
Epoch 5/10
[1m463/463[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1428s[0m 3s/step - categorical_accuracy: 0.9262 - loss: 0.1557 - val_categorical_accuracy: 0.8816 - val_loss: 0.2477
Epoch 6/10
[1m463/463[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37