## Imports

In [None]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt

In [None]:
import sys
print(sys.executable)
import tensorflow as tf
print("Built with CUDA:", tf.test.is_built_with_cuda())
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))


In [None]:
#check gpu availability
print("GPU Available: ", tf.config.list_physical_devices('GPU'))


## Prepare the Dataset

You will prepare the train and test sets a little differently this time. Instead of just normalizing the images, you will also introduce random noise and the generated images will be used as input to your model. The target or label will still be the clean images.

In [None]:
BATCH_SIZE = 32

train_dataset = "E:\\OIA-ODIR\\Training Set\\Preprocessed_Train_Images"


test_dataset = "E:\\OIA-ODIR\\Off-site Test Set\\Images"

In [None]:
extended.to_excel("E:\\OIA-ODIR\\Training Set\\Annotation\\extended.xlsx")

In [None]:
extended = extended.sample(frac=1).reset_index(drop=True)

In [None]:
import pandas as pd
annotations = pd.read_excel("E:\\OIA-ODIR\\Training Set\\Annotation\\training annotation (English).xlsx")
annotations_off_test = pd.read_excel("E:\\OIA-ODIR\\Off-site Test Set\\Annotation\\off-site test annotation (English).xlsx")
annotations_on_test = pd.read_excel("E:\\OIA-ODIR\\On-site Test Set\\Annotation\\on-site test annotation (English).xlsx")

In [None]:
annotations

In [None]:
Classes = ["Normal", "Diabetic Retinopathy", "Glaucoma", "Cataract", "AMD", "Hypertension", "Myopia", "Other"]
y = extended[["N", "D", "G", "C", "A", "H", "M", "O"]]
y_off_test = annotations_off_test[["N", "D", "G", "C", "A", "H", "M", "O"]]
y_on_test = annotations_on_test[["N", "D", "G", "C", "A", "H", "M", "O"]]
# y = old_anno[["N", "D", "G", "C", "A", "H", "M", "O"]]
y

In [None]:
X_train = extended[['Left-Fundus', "Right-Fundus", "Patient Age", "Patient Sex"]]
X_off_test = annotations_off_test[['Left-Fundus', "Right-Fundus", "Patient Age", "Patient Sex"]]
X_on_test = annotations_on_test[['Left-Fundus', "Right-Fundus", "Patient Age", "Patient Sex"]]
# X_train = old_anno[['Left-Fundus', "Right-Fundus"]]
X_train

In [None]:
import os
import cv2
import numpy as np
import matplotlib.pyplot as plt

def generate_batches(dataset_path, batch_size, X_train, y, data_size):
    while True:
        indices = np.arange(data_size)
        np.random.shuffle(indices)
        for start in range(0, data_size, batch_size):
            end = min(start + batch_size, data_size)
            batch_indices = indices[start:end]

            batch_x = X_train.iloc[batch_indices]
            batch_y = y.iloc[batch_indices]
         
            left_images = [cv2.cvtColor(cv2.imread(os.path.join(dataset_path, key)), cv2.COLOR_BGR2RGB) for key in batch_x['Left-Fundus']]
            right_images = [cv2.cvtColor(cv2.imread(os.path.join(dataset_path, key)), cv2.COLOR_BGR2RGB) for key in batch_x['Right-Fundus']]

            ages = batch_x['Patient Age'].values
            sexes = (batch_x["Patient Sex"] == 'Male').astype(int).values
            
            yield (np.array(left_images), np.array(right_images), ages, sexes), np.array(batch_y)

In [None]:
preprocessed_images = "E:\\OIA-ODIR\\Augmented\\Training Set\\Images"
off_test_path="E:\\OIA-ODIR\\Off-site Test Set\\off_site_preprocessed_images"
off_test_generator = generate_batches(off_test_path, 52, X_off_test, y_off_test, 500)
# on_test_generator = generate_batches(on_test_path , 52, X_on_test, y_on_test, 1000)
train_generator = generate_batches(preprocessed_images, 52, X_train, y, 18013)

## New Model with HR Net Included

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Layer, Input, Conv2D, BatchNormalization, ReLU, Add, GlobalAveragePooling2D, Dense , Dropout , Concatenate ,Multiply , Flatten
from tensorflow.keras.models import Model
def conv3x3(out_planes, stride=1):
    """3x3 convolution with padding"""
    return Conv2D(out_planes, kernel_size=3, strides=stride, padding='same', use_bias=False)

def bn_relu(x):
    x = BatchNormalization()(x)
    x = ReLU()(x)
    return x

class BasicBlock(Layer):
    expansion = 1

    def __init__(self, planes, stride=1, downsample=None):
        super(BasicBlock, self).__init__()
        self.conv1 = conv3x3(planes, stride)
        self.bn1 = BatchNormalization()
        self.relu = ReLU()
        self.conv2 = conv3x3(planes)
        self.bn2 = BatchNormalization()
        self.downsample = downsample
        self.stride = stride

    def call(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out
def make_layer(block, blocks, planes, stride=1):
    downsample = None
    if stride != 1:
        downsample = tf.keras.Sequential([
            Conv2D(planes * block.expansion, kernel_size=1, strides=stride, use_bias=False),
            BatchNormalization(),
        ])

    layers = []
    layers.append(block(planes, stride, downsample))
    for _ in range(1, blocks):
        layers.append(block(planes))

    return tf.keras.Sequential(layers)

class HighResolutionModule(Layer):
    def __init__(self, num_blocks, planes):
        super(HighResolutionModule, self).__init__()
        self.layer1 = make_layer(BasicBlock, num_blocks, planes, stride=1)
        self.layer2 = make_layer(BasicBlock, num_blocks, planes, stride=2)

    def call(self, x):
        high_res_input = x
        low_res_input = tf.keras.layers.AveragePooling2D(pool_size=(2, 2), strides=2)(x)

        high_res_output = self.layer1(high_res_input)
        low_res_output = self.layer2(low_res_input)

        # Upsampling to match dimensions
        low_res_output = tf.image.resize(low_res_output, (high_res_output.shape[1], high_res_output.shape[2]))

        return tf.concat([high_res_output, low_res_output], axis=-1)

class HRNet(Model):
    def __init__(self):
        super(HRNet, self).__init__()
        self.initial_layer = Conv2D(64, kernel_size=3, strides=2, padding='same', activation='relu')
        self.stage1 = HighResolutionModule(num_blocks=4, planes=64)
        self.transition = Conv2D(128, kernel_size=3, strides=2, padding='same', activation='relu')
        self.stage2 = HighResolutionModule(num_blocks=3, planes=128)

    def call(self, inputs):
        x = self.initial_layer(inputs)
        x = self.stage1(x)
        x = self.transition(x)
        x = self.stage2(x)
        return x

# make the attention block similar to below cell 
class CustomAttentionBlock(Layer):
    def __init__(self, filters):
        super(CustomAttentionBlock, self).__init__()
        self.ds_conv = Conv2D(filters, kernel_size=(3, 3), padding='same', groups=filters)
        self.p_conv = Conv2D(filters=filters, kernel_size=(1, 1), padding='same')
        self.bn = BatchNormalization()
        self.relu = ReLU()
        self.concat = Concatenate(axis=-1)

    def call(self, inputs):
        ds_features = self.ds_conv(inputs)
        p_features = self.p_conv(ds_features)
        p_features = self.bn(p_features)
        p_features = self.relu(p_features)
        concatenated_features = self.concat([inputs, p_features])
        return concatenated_features
    
#define the sener block
class SEBlock(Layer):
    def __init__(self, reduction=16):
        super(SEBlock, self).__init__()
        self.reduction = reduction

    def build(self, input_shape):
        self.avg_pool = GlobalAveragePooling2D()
        self.fc1 = Dense(input_shape[-1] // self.reduction, activation='relu')
        self.fc2 = Dense(input_shape[-1], activation='sigmoid')

    def call(self, inputs):
        x = self.avg_pool(inputs)
        x = self.fc1(x)
        x = self.fc2(x)
        x = tf.reshape(x, [-1, 1, 1, inputs.shape[-1]])
        return inputs * x

class DRBM(Layer):
    def __init__(self, num_visible, num_hidden, num_classes):
        super(DRBM, self).__init__()
        self.num_visible = num_visible
        self.num_hidden = num_hidden
        self.num_classes = num_classes
        # Rename 'weights' to avoid conflicts with internal Layer properties
        self.weights_h = self.add_weight(shape=(num_visible, num_hidden), initializer="random_normal")
        self.class_weights_h = self.add_weight(shape=(num_hidden, num_classes), initializer="random_normal")
        self.hidden_bias = self.add_weight(shape=(num_hidden,), initializer="zeros")
        self.class_bias = self.add_weight(shape=(num_classes,), initializer="zeros")

    def call(self, inputs):
        # Adjust the matrix multiplication to use the new attribute names
        hidden = tf.nn.relu(tf.matmul(inputs, self.weights_h) + self.hidden_bias)
        class_logits = tf.matmul(hidden, self.class_weights_h) + self.class_bias
        return tf.nn.sigmoid(class_logits)

    
def create_model(input_shape=(224, 224, 3), num_classes=8):
    # Input layers for left and right images
    input_left = Input(shape=input_shape, name='input_left')
    input_right = Input(shape=input_shape, name='input_right')

    # Backbone HRNet for feature extraction
    hrnet = HRNet()

    # Feature extraction for left and right images
    features_left = hrnet(input_left)
    features_right = hrnet(input_right)

    # Attention blocks for left and right images
    attention_block = CustomAttentionBlock(filters=256)  
    attention_features_left = attention_block(features_left)
    attention_features_right = attention_block(features_right)

    # SENet blocks for left and right images
    se_block = SEBlock()
    se_features_left = se_block(attention_features_left)
    se_features_right = se_block(attention_features_right)


    multiplied_features = Multiply()([se_features_left, se_features_right]) 


    flattened_features = GlobalAveragePooling2D()(multiplied_features)
    flattened_features = Flatten()(flattened_features)

    drbm_output = DRBM(num_visible=flattened_features.shape[-1], num_hidden=1024, num_classes=num_classes)(flattened_features)

    model = Model(inputs=[input_left, input_right], outputs=drbm_output)
    return model

model = create_model()
model.summary()


In [None]:
# Compile your model
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0006), loss='binary_crossentropy', metrics=['AUC', 'Accuracy'])

# Determine the number of steps per epoch (number of batches)
num_samples = 18013  # Total number of samples in your dataset
batch_size = 25  # Batch size used by your generator
steps_per_epoch = num_samples // batch_size
val_steps = 500 // batch_size
# Train your model using the generator
hist = model.fit(train_generator, steps_per_epoch=steps_per_epoch, epochs=10, 
         validation_data=off_test_generator, validation_steps=val_steps)


## Data Augmentations

In [None]:
images_labels_dict = {}
for i in range(len(annotations)):
    image_name = annotations.iloc[i, 3]
    labels = annotations.iloc[i, 7:].to_numpy()
    images_labels_dict[image_name] = labels
    
images_labels_dict

In [None]:
import albumentations as A
import cv2
import os
import numpy as np
from collections import Counter

# Define augmentations
augmentations = A.Compose([
    A.Rotate(limit=40),
    A.RandomBrightnessContrast(brightness_limit=0.1, contrast_limit=0.1, p=0.5),
    A.HueSaturationValue(hue_shift_limit=20, sat_shift_limit=30, val_shift_limit=20, p=0.5),
    A.HorizontalFlip(),
])

# Directories
images_dir = "E:\\OIA-ODIR\\Training Set\\train_preprocessed_images"
augmented_images_dir = "E:\\OIA-ODIR\\Augmented\\Training Set\\Images"

# Ensure the augmented images directory exists
if not os.path.exists(augmented_images_dir):
    os.makedirs(augmented_images_dir)

# Initialize a dictionary to keep track of augmented labels
augmented_labels_dict = {}

# Initialize the class counts
initial_class_counts = Counter({i: 0 for i in range(8)})

# Calculate initial class representation
for labels in images_labels_dict.values():
    for i, label in enumerate(labels):
        if label == 1:
            initial_class_counts[i] += 1

# Define target counts for each class to balance the dataset
target_counts = np.array([3000] * 8)  # Let's say we want 3000 images per class

# Determine how many images are needed to reach the target for each class
needed_counts = target_counts - np.array(list(initial_class_counts.values()))

# Main augmentation loop
new_rows = []
for row in annotations.iterrows():
    image_file = row[1]['Left-Fundus']
    image = cv2.imread(os.path.join(images_dir, image_file))
    image_right = cv2.imread(os.path.join(images_dir, image_file.replace("left", "right")))
    labels = row[1].iloc[7:]
    # Determine how many times this image should be augmented for each class
    augmentation_counts = [needed_counts[i] if label == 1 else 0 for i, label in enumerate(labels)]
    max_augmentations = max(augmentation_counts)

    # Perform the necessary augmentations
    for i in range(max_augmentations):
        # Check if we still need more images for this class
        if any(needed_counts[class_idx] > 0 for class_idx, label in enumerate(labels) if label == 1):
            
            augmented_image = augmentations(image=image)['image']
            augmented_image_right = augmentations(image=image_right)['image']
            
            augmented_filename = f"{image_file}_{i}_augmented.jpg"
            augmented_filename_right = f"{image_file.replace('left', 'right')}_{i}_augmented.jpg"
            
            cv2.imwrite(os.path.join(augmented_images_dir, augmented_filename), augmented_image)
            cv2.imwrite(os.path.join(augmented_images_dir, augmented_filename_right), augmented_image_right)
#             augmented_labels_dict[augmented_filename] = labels
            row_ = {'Left-Fundus':augmented_filename, "Right-Fundus":augmented_filename_right, 
                   'Patient Age':row[1]['Patient Age'], "Patient Sex":row[1]["Patient Sex"],
                       "N":row[1]["N"], "D":row[1]["D"], "G":row[1]["G"], "C":row[1]["C"], "A":row[1]["A"],
                        "H":row[1]["H"], "M":row[1]["M"], "O":row[1]["O"]}
            new_rows.append(row_)
            # Update the counts
            for class_idx, count in enumerate(augmentation_counts):
                if count > 0:
                    needed_counts[class_idx] -= 1
                    augmentation_counts[class_idx] -= 1

extended = pd.DataFrame(new_rows)

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Count the number of images for each class in the augmented dataset
class_counts = np.zeros(8)
for labels in extended.iterrows():
    # Convert labels to a numpy array if they aren't already one
    if not isinstance(labels, np.ndarray):
        labels = np.array(labels[1].iloc[2:])
    # Make sure the labels are of a numeric type (e.g., integers)
    labels = labels.astype(int)
    class_counts += labels
    
# Plot the class distribution
Classes = ["Normal", "Diabetic Retinopathy", "Glaucoma", "Cataract", "AMD", "Hypertension", "Myopia", "Other"]  # Assuming you have defined 'Classes' somewhere

plt.figure(figsize=(12, 6))
plt.bar(Classes, class_counts)
plt.xlabel("Classes")
plt.ylabel("Number of Images")
plt.title("Class Distribution in Augmented Dataset")

plt.show()

## Model with ResNet feature extraction

In [None]:
import tensorflow as tf
from tensorflow.keras.applications.resnet import ResNet101, preprocess_input
from tensorflow.keras.models import Model
# Load ResNet-101 model pre-trained on ImageNet, without the top classification layer
#with a shape `(None, None, None, 1024)
base_model = ResNet101(weights='imagenet', include_top=False)

model_custom = Model(inputs=base_model.input, outputs=base_model.get_layer('conv4_block23_out').output)

In [None]:
for layers in model_custom.layers:
    layers.trainable = False

In [None]:
import tensorflow as tf
from tensorflow.keras import Model, Input
from tensorflow.keras.layers import Layer, DepthwiseConv2D, Reshape, Conv2D, BatchNormalization, ReLU, Concatenate, GlobalAveragePooling2D, Dense, Multiply
from tensorflow.keras.regularizers import l2
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping

class CustomAttentionBlock(Layer):
    def __init__(self, filters, **kwargs):
        super(CustomAttentionBlock, self).__init__(**kwargs)
        self.filters = filters
        self.ds_conv = DepthwiseConv2D(kernel_size=(3, 3), padding='same')
        self.p_conv = Conv2D(filters=filters, kernel_size=(1, 1), padding='same', kernel_regularizer=l2(0.01))
        self.bn = BatchNormalization()
        self.relu = ReLU()
        self.concat = Concatenate(axis=-1)

    def get_config(self):
        config = super(CustomAttentionBlock, self).get_config()
        config.update({'filters': self.filters})
        return config

class SEBlock(Layer):
    def __init__(self, reduction_ratio=16, **kwargs):
        super(SEBlock, self).__init__(**kwargs)
        self.reduction_ratio = reduction_ratio
        self.global_avg_pool = GlobalAveragePooling2D()
        self.fc1 = None
        self.fc2 = None

    def build(self, input_shape):
        channels = input_shape[-1]
        self.fc1 = Dense(channels // self.reduction_ratio, activation='relu')
        self.fc2 = Dense(channels, activation='relu')

    def call(self, inputs):
        squeeze = self.global_avg_pool(inputs)
        excitation = self.fc1(squeeze)
        excitation = self.fc2(excitation)
        excitation = tf.reshape(excitation, (-1, 1, 1, inputs.shape[-1]))
        return inputs * excitation
    
    def get_config(self):
        config = super(SEBlock, self).get_config()
        config.update({'reduction_ratio': self.reduction_ratio})
        return config

class DRBM(Layer):
    def __init__(self, visible_units, hidden_units, num_classes, **kwargs):
        super(DRBM, self).__init__(**kwargs)
        self.visible_units = visible_units
        self.hidden_units = hidden_units
        self.num_classes = num_classes
        self.W = tf.Variable(tf.random.normal(shape=(visible_units, hidden_units), stddev=0.02))
        self.v_bias = tf.Variable(tf.random.normal(shape=(visible_units,), stddev=0.02))
        self.h_bias = tf.Variable(tf.zeros(shape=(hidden_units,)))
        self.class_weights = tf.Variable(tf.random.normal(shape=(hidden_units, num_classes), stddev=0.02))
        self.class_bias = tf.Variable(tf.zeros(shape=(num_classes,)))

    def call(self, v):
        hidden_probs = tf.sigmoid(tf.matmul(v, self.W) + self.h_bias)
        class_probs = tf.nn.softmax(tf.matmul(hidden_probs, self.class_weights) + self.class_bias)
        return class_probs

    def get_config(self):
        config = super(DRBM, self).get_config()
        config.update({
            'visible_units': self.visible_units,
            'hidden_units': self.hidden_units,
            'num_classes': self.num_classes
        })
        return config

def create_model(visible_units, hidden_units, num_classes):
    
    # Read Inputs
    left_input = Input(shape=(224, 224, 3))
    right_input = Input(shape=(224, 224, 3))
    input_age = Input(shape=(1,), name='input_age')  # Single value for age
    input_sex = Input(shape=(1,), name='input_sex')  # Single value for sex
    
    # Feature Extraction
    left_features = model_custom(left_input)
    right_features = model_custom(right_input)
    
    # Apply Attention Blocks
    attention_out_left = CustomAttentionBlock(filters=1024)(left_features)
    attention_out_right = CustomAttentionBlock(filters=1024)(right_features)
    
    # Apply SE Blocks to the outputs of the Attention Blocks
    se_output_left = SEBlock()(attention_out_left)
    se_output_right = SEBlock()(attention_out_right)
    
    # Concatenate the outputs of the SE Blocks
    concatenated_features = Concatenate(axis=-1)([se_output_left, se_output_right])
    
    # Processing Demoghraphics data
    concatenated_demographics = tf.keras.layers.Concatenate()([input_age, input_sex])

    # Feedforward network to process concatenated features
    x = Dense(2048, activation='relu')(concatenated_demographics)

    # Reshape to prepare for depthwise convolution
    x = Reshape((1, 1, 2048))(x)  # Adjusted reshape size

    # Combine image features with patient data
    combined_features = tf.multiply(concatenated_features, x)
    
    # Process the concatenated features
    processed_features = GlobalAveragePooling2D()(combined_features)
    
    # Apply the DRBM
    output_probs = DRBM(visible_units, hidden_units, num_classes)(processed_features)
    
    # Define the model
    
    model = Model(inputs=[left_input, right_input, input_age, input_sex], outputs=output_probs)
    
    return model

visible_units=2048
hidden_units=128
num_classes=8

# Define the model
model = create_model(visible_units, hidden_units, num_classes=8)
model.summary()

In [None]:
# Compile your model
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0006),
              loss='binary_crossentropy',
              metrics=[tf.keras.metrics.AUC(name='AUC', multi_label=True), 'Accuracy'])

# Determine the number of steps per epoch (number of batches)
num_samples = 18013  # Total number of samples in your dataset
batch_size = 52  # Batch size used by your generator
steps_per_epoch = num_samples // (batch_size)
val_steps = 500 // (batch_size)
# Train your model using the generator
hist = model.fit(train_generator, steps_per_epoch=steps_per_epoch, epochs=30, 
         validation_data=off_test_generator, validation_steps=val_steps)

In [None]:
model.evaluate(off_test_generator, steps = 500 // 32)

In [None]:
model.save("E://OIA-ODIR//Model_multimodal_aug")

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
from tqdm import tqdm

# Initialize an empty list to store true and predicted labels
true_labels = []
predicted_labels = []

# extended = annotations.sample(frac=1).reset_index(drop=True)
# Iterate over each row in the extended DataFrame
i = 0
for _, row in tqdm(extended.iterrows(), desc='Processing', unit='Images'):
    if(i==1000):
        break
    left = cv2.cvtColor(cv2.imread(os.path.join(preprocessed_images, row['Left-Fundus'])), cv2.COLOR_BGR2RGB)
    right = cv2.cvtColor(cv2.imread(os.path.join(preprocessed_images, row['Right-Fundus'])), cv2.COLOR_BGR2RGB)
    age = row["Patient Age"]
    sex = 1 if row["Patient Sex"] == "Male" else 0
    
    # Make predictions for a single sample
    preds = model.predict([np.array([left]), np.array([right]), np.array([age]), np.array([sex])], verbose=0)
    
    # Choose the predicted class based on the threshold
    threshold = 0.05  # Adjust this threshold as needed
    predicted_class = np.argmax(preds) if np.max(preds) >= threshold else -1  # Assign -1 for uncertain predictions
    
    # Append true and predicted labels
    true_labels.append(np.argmax(row.iloc[7:]))  # Assuming the class labels start from column index 4
    predicted_labels.append(predicted_class)
    i+=1

In [None]:
# Compute the confusion matrix
cm = confusion_matrix(true_labels, predicted_labels)

# Plot the confusion matrix
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=Classes, yticklabels=Classes)
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.title('Confusion Matrix')
plt.show()