In [None]:
# Connecting to google drive to access image dataset
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Import Necessary Libraries
import cv2
import random
import matplotlib.pyplot as plt
import numpy as np
import os
import statistics
import h5py
import zipfile
import gc
from tensorflow.keras.applications import EfficientNetB3
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Input
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_score, recall_score, confusion_matrix

In [None]:
# Enter the google drive path, where the images dataset(zip file) is present.
# We will extract the data from the zip file in google colab locally, to process the data fastly.
zip_ref = zipfile.ZipFile("/content/drive/MyDrive/Colab-Notebooks/CMIL-Assessment.zip", 'r')
zip_ref.extractall("/content/dataset")
zip_ref.close()

In [None]:
# Function to convert every image present in a folder (with path "files_path") to a vector
def image_vector(files_path):
    # Storing all the names of image files (present at "files_path" location) in "files"
    files = os.listdir(files_path)
    # Appending the "files_path" name along with the names of image files present in "files"
    files_complete_path = [files_path + i for i in files]
    image_vector_list = []
    for i in files_complete_path:
        image = cv2.imread(i)
        # Convert BGR (default in OpenCV) to RGB
        image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image_vector_list.append(image_rgb)
        # Appending Shape of the Vector of all the Images
    image_shapes = [i.shape for i in image_vector_list]
    return image_vector_list, image_shapes

In [None]:
# Function for Resizing and Padding images while retaining maximum information
def reshape_image_size(vector_list, target_height, target_width):
    # target_size is a list of 2 numbers
    list_copy = []
    for vector in vector_list:
        height, width = vector.shape[0], vector.shape[1]
        # Resize the image based on target_size:
        if height>=target_height and width>=target_width:
            resized_vector = cv2.resize(vector, (target_width, target_height), interpolation=cv2.INTER_AREA)
        elif height>=target_height and width<target_width:
            resized_vector = cv2.resize(vector, (width, target_height), interpolation=cv2.INTER_AREA)
        elif height<target_height and width>=target_width:
            resized_vector = cv2.resize(vector, (target_width, height), interpolation=cv2.INTER_AREA)
        else:
            resized_vector = cv2.resize(vector, (width, height), interpolation=cv2.INTER_AREA)
        # Calculate padding to reach target size
        pad_width = target_width - resized_vector.shape[1]
        pad_height = target_height - resized_vector.shape[0]
        top, bottom = pad_height // 2, pad_height - (pad_height // 2)
        left, right = pad_width // 2, pad_width - (pad_width // 2)
        # Setting padding, i.e., adding zeros along height or width, which are less target_size
        padded_vector = cv2.copyMakeBorder(resized_vector, top, bottom, left, right, cv2.BORDER_CONSTANT)
        list_copy.append(padded_vector)
    list_copy = np.array(list_copy)
    return list_copy

In [None]:
# Applies transformation on a given array of image vectors for data augmentation to avoid overfitting
def apply_transformation(image_vectors, additional_records_size):
    random_indices = np.random.choice(image_vectors.shape[0], size=additional_records_size, replace=False)
    translated_random_vectors = image_vectors[random_indices]
    for vector in translated_random_vectors:
        # Apply random scaling
        scaled_vector = vector * np.random.uniform(0.8, 1.3)
        # Apply random translation
        translated_vector = scaled_vector + np.random.normal(loc=0, scale=0.1, size=vector.shape)
        # Apply random noise
        noisy_vector = translated_vector + np.random.normal(loc=0, scale=0.05, size=vector.shape)
        vector = noisy_vector
    return translated_random_vectors

In [None]:
# Function for individual analysis of Widths and Heights of all the image vectors
def image_vector_shape_analysis(image_shapes_list):
    # Converting the given list into numpy array
    im_shapes_array = np.array(image_shapes_list)
    # Separating Widths and Heights of Images for individual analysis
    im_shapes_height = [im_shapes_array[i][0] for i in range(len(im_shapes_array))]
    im_shapes_width = [im_shapes_array[i][1] for i in range(len(im_shapes_array))]
    sorted_shapes_height = im_shapes_height
    sorted_shapes_width = im_shapes_width
    sorted_shapes_height.sort()
    sorted_shapes_width.sort()
    # Individual analysis of Widths and Heights of all the image vectors
    print("maximum height:", max(im_shapes_height), "maximum width:", max(im_shapes_width))
    print("minimum height:", min(im_shapes_height), "minimum width:", min(im_shapes_width))
    print("mean height:", sum(im_shapes_height)/len(im_shapes_height), "mean width:", sum(im_shapes_width)/len(im_shapes_width))
    print("median height:", statistics.median(sorted_shapes_height), "median width:", statistics.median(sorted_shapes_width))
    print("standard deviation of height:", (statistics.variance(im_shapes_height))**0.5,"standard deviation of width:", (statistics.variance(im_shapes_width))**0.5)

In [None]:
# Function for comparing original image and resized image
def compare_image_vectors(initial_vector_list, final_vector_list):
    fig, axes = plt.subplots(1, 2, figsize=(10, 5))  # Create a figure with two subplots
    random_sample = random.randint(0, len(initial_vector_list) - 1)
    # Get dimensions of the two images (in pixels)
    vector1 = initial_vector_list[random_sample]
    vector2 = final_vector_list[random_sample]
    # Display the first image on the left subplot
    axes[0].imshow(vector1)
    axes[0].set_title(f'Original - {vector1.shape[0]}x{vector1.shape[1]}')  # Optionally set a title for the first image
    # Display the second image on the right subplot
    axes[1].imshow(vector2)
    axes[1].set_title(f'Resized - {vector2.shape[0]}x{vector2.shape[1]}')  # Optionally set a title for the second image
    plt.show()
    return random_sample

In [None]:
# Locating Datasets
# Present in your local google colab repository
gsg_src = "/content/dataset/CMIL-Assessment/globally_sclerotic_glomeruli/"
ngsg_src = "/content/dataset/CMIL-Assessment/globally_sclerotic_glomeruli/"

In [None]:
# For globally sclerotic glomeruli
# Converting gsg images into Vectors
gsg_im_vector, gsg_im_vector_shapes = image_vector(gsg_src)

In [None]:
# Deleting variables for RAM Optimization
del gsg_im_vector_shapes
gc.collect()

In [None]:
# Resizing the gsg image Vectors
resized_gsg_im_vector = reshape_image_size(gsg_im_vector, 300, 300)

In [None]:
# Data augmentation using Transformation on existing image vectors
resized_gsg_im_vector_with_data_aug = apply_transformation(resized_gsg_im_vector, len(resized_gsg_im_vector))

In [None]:
# Final gsg vector along with data augmentation
final_gsg_im_vector = np.concatenate((resized_gsg_im_vector, resized_gsg_im_vector_with_data_aug), axis=0)

In [None]:
# Comparing Original Image Vectors (Globally Sclerotic images) with Resized Image Vectors
sample = compare_image_vectors(gsg_im_vector, resized_gsg_im_vector)

In [None]:
# Deleting variables for RAM Optimization
del gsg_im_vector
del resized_gsg_im_vector
del resized_gsg_im_vector_with_data_aug
gc.collect()

In [None]:
# Deleting variables for RAM Optimization
del sample
gc.collect()

In [None]:
# Creating a list consisting the glomeruli type (1) and converting it into array.
# Glomeruli of type = 1 means that it is Globally Sclerotic.
gsg_glomeruli = [1 for i in range(len(final_gsg_im_vector))]
gsg_glomeruli = np.array(gsg_glomeruli)

In [None]:
# For non globally sclerotic glomeruli
# Converting Images into Vectors
ngsg_im_vector, ngsg_im_vector_shapes = image_vector(ngsg_src)

In [None]:
# Deleting variables for RAM Optimization
del ngsg_im_vector_shapes
gc.collect()

In [None]:
# Resizing the Image Vectors
resized_ngsg_im_vector = reshape_image_size(ngsg_im_vector, 300, 300)

In [None]:
# Data augmentation using Transformation on existing image vectors
resized_ngsg_im_vector_with_data_aug = apply_transformation(resized_ngsg_im_vector, len(resized_ngsg_im_vector)//4)

In [None]:
# Final ngsg vector along with data augmentation
final_ngsg_im_vector = np.concatenate((resized_ngsg_im_vector, resized_ngsg_im_vector_with_data_aug), axis=0)

In [None]:
# Comparing Original Image Vectors (Non Globally Sclerotic images) with Resized Image Vectors
sample1 = compare_image_vectors(ngsg_im_vector, resized_ngsg_im_vector)

In [None]:
# Deleting variables for RAM Optimization
del ngsg_im_vector
del resized_ngsg_im_vector
del resized_ngsg_im_vector_with_data_aug
del sample1
gc.collect()

In [None]:
# Creating a list consisting the glomeruli type (0) and converting the list into array.
# Glomeruli of type = 1 means that it is Non Globally Sclerotic.
ngsg_glomeruli = [0 for i in range(len(final_ngsg_im_vector))]
ngsg_glomeruli = np.array(ngsg_glomeruli)

In [None]:
# Concatenating the resized gsg image vector "resized_gsg_im_vector" and resized ngsg image vector "resized_ngsg_im_vector"
combined_final_im_vector = np.concatenate((final_gsg_im_vector, final_ngsg_im_vector), axis=0)
combined_final_glomeruli = np.concatenate((gsg_glomeruli, ngsg_glomeruli), axis=0)

In [None]:
# Deleting variables for RAM Optimization
del final_gsg_im_vector
del final_ngsg_im_vector
del gsg_glomeruli
del ngsg_glomeruli
gc.collect()

In [None]:
# Rearranging the order of the the image vectors and glomeruli type
shuffled_indices = np.arange(combined_final_im_vector.shape[0])
np.random.shuffle(shuffled_indices)
combined_final_im_vector = combined_final_im_vector[shuffled_indices]
combined_final_glomeruli = combined_final_glomeruli[shuffled_indices]

In [None]:
# Deleting variables for RAM Optimization
del shuffled_indices
gc.collect()

In [None]:
# Save the combined resized image vector along with combined glomeruli into a HDF5 file
# If you want to save the data for future use
# with h5py.File("Dataset.h5", 'w') as hf:
#             # Append data to the new .h5 dataset
#             hf.create_dataset('image_vector', data=combined_final_im_vector_random)
#             hf.create_dataset('glomeruli_type', data=combined_final_glomeruli_random)

In [None]:
# Accessing Data to train our deep learning Model, if the dataset was saved earlier
# If have saved the data earlier, you have to implement this step
# with h5py.File("Dataset.h5", 'r') as hf:
#     imv_feature = hf['image_vector'][:]
#     glm_target = hf['glomeruli_type'][:]

In [None]:
# Normalizing the data
combined_final_im_vector = combined_final_im_vector/255.0

In [None]:
# Stratified split to ensure balanced classes in both sets
x_train, x_test, y_train, y_test = train_test_split(combined_final_im_vector, combined_final_glomeruli, test_size=0.3, stratify=combined_final_glomeruli, random_state=35)

In [None]:
# Defining the input layer to process the image vectors
input_layer = Input(shape=(300, 300, 3))

In [None]:
# Loading the EfficientNetB3 model with no pre-trained weights.
base_model = EfficientNetB3(include_top=False, input_tensor=input_layer, weights=None)

In [None]:
# Adding a custome last layer, which is the output layer.
x = base_model.output
x = GlobalAveragePooling2D()(x)
x = Dense(512, activation='relu')(x)
x = BatchNormalization()(x)
x = Dropout(0.3)(x)
output_layer = Dense(1, activation='sigmoid', kernel_regularizer=l2(0.01))(x)

In [None]:
# Creating the Initial Model.
model = Model(inputs=base_model.input, outputs=output_layer)
# Freezing the base model layers Initially, to avoid any learning
for layer in base_model.layers:
    layer.trainable = False
# Compiling the Model.
model.compile(optimizer=Adam(learning_rate=0.001), loss='binary_crossentropy', metrics=['accuracy'])

In [None]:
# Setting main Hyperparameters for initial training
batches = 16
total_epochs = 5

In [None]:
# Training the model
b3ic = model.fit(
    x_train, y_train,
    validation_data=(x_test, y_test),
    epochs=total_epochs,
    batch_size = batches,
    )

In [None]:
# Analyzing the performance (accuracy) of the trained model, over number of epochs.
plt.plot(b3ic.history['loss'])
plt.plot(b3ic.history['val_loss'])
plt.title('b3ic Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend(['Train', 'Validation'], loc='upper right')
plt.show()

In [None]:
# Unfreeze some layers and fine-tune the model
for layer in model.layers[:-10]:
    layer.trainable = True
# Compiling the model
model.compile(optimizer=Adam(learning_rate=0.001), loss='binary_crossentropy', metrics=['accuracy'])

In [None]:
# Setting main Hyperparameters for Fine tuning
fine_tune_batches = 16
fine_tune_epochs = 7

In [None]:
# Re-training the Model
fine_tuned_b3ic = model.fit(
    x_train, y_train,
    validation_data=(x_test, y_test),
    epochs=fine_tune_epochs,
    batch_size = fine_tune_batches
)

In [None]:
# Analyzing the performance (accuracy) of the fine tuned model, over epochs.
plt.plot(fine_tuned_b3ic.history['loss'])
plt.plot(fine_tuned_b3ic.history['val_loss'])
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend(['Train', 'Validation'], loc='upper right')
plt.show()

In [None]:
# Predicting the output on test dataset, to evaluate the performance(using metrics such as Accuracy, Precision, Recall, etc).
y_pred = model.predict(x_test)

In [None]:
# Converting every predicted output into 0s if the output value is less than 0.5, else 1 otherwise.
y_pred = (y_pred > 0.5).astype(int)
print(y_pred)

In [None]:
# Calculating precision and recall.
precision = precision_score(y_test, y_pred, average='weighted')
recall = recall_score(y_test, y_pred, average='weighted')
# Computing confusion matrix
conf_matrix = confusion_matrix(y_test, y_pred)

In [None]:
print(f'Precision: {precision}\nRecall: {recall}\nConfusion Matrix:\n {conf_matrix}')

In [None]:
# Plotting Heatmap, to show Confusion Matrix.
plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=['Predicted Negative', 'Predicted Positive'], yticklabels=['Actual Negative', 'Actual Positive'])
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.title('Confusion Matrix Heatmap')
plt.show()

In [None]:
# Deleting variables for RAM Optimization
del x_train
del x_test
del y_train
del y_test
gc.collect()