In [None]:
import matplotlib.pyplot as plt
import numpy as np
import os
import tensorflow as tf
import pandas as pd
import sklearn as sk
import cv2
from PIL import Image, ImageEnhance
from sklearn.model_selection import train_test_split
from google.colab.patches import cv2_imshow

## LOADING DATASET

In [None]:
# MOUNTING THE DRIVE
from google.colab import drive

drive.mount('/content/drive')

os.chdir('/content/drive/My Drive/Data/public/')

# LOADING DATASET FROM CSV
dataset = pd.read_csv('public.csv')

# REPLACING IMAGE FILENAME WITH PATH
for i in range(0, 5758):
  if dataset['ground truth'][i] == 1:
    dataset['name'][i] = 'globally_sclerotic_glomeruli/' + dataset['name'][i]
  else:
    dataset['name'][i] = 'non_globally_sclerotic_glomeruli/' + dataset['name'][i]

## PREPROCESSING DATASET

In [None]:
# CREATE INDEPENDENT VARIABLE SET (IMAGE FILE PATH) AND DEPENDENT VARIABLE SET (LABEL)
X = dataset.drop('ground truth', axis = 1)
y = dataset['ground truth']

X_train_val, X_test, y_train_val, y_test = train_test_split(X, y, test_size = 0.15, random_state = 42)
X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size = 0.15, random_state = 42)

y_test = pd.DataFrame(y_test)
y_val = pd.DataFrame(y_val)
y_train = pd.DataFrame(y_train)

## PREPROCESSING THE IMAGES BY ENHACING THE COLORS

In [None]:
def preprocess_images_by_color(X, y, path):
  for i in X.index: # len(X.index) GIVES NO.OF ROWS IN DATAFRAME X
    img = cv2.imread(X['name'][i])

    img_processed = cv2.resize(img, (224, 224)) # FOR ResNet50

    # APPLYING COLOR ENHANCEMENTS
    # EQUALIZED HISTOGRAM
    img_yuv = cv2.cvtColor((img_processed * 255).astype(np.uint8), cv2.COLOR_RGB2YUV)
    img_yuv[:,:,0] = cv2.equalizeHist(img_yuv[:,:,0])
    img_processed = cv2.cvtColor(img_yuv, cv2.COLOR_YUV2RGB)

    # CONVERT TO FLOAT32 FOR FURTHER PROCESSING
    img_processed = img_processed.astype(np.float32)

    # SATURATION AND HUE ENHANCEMENTS
    pil_img = Image.fromarray((img_processed * 255).astype(np.uint8))
    enhancer = ImageEnhance.Color(pil_img)
    pil_img = enhancer.enhance(1.5)  # INCREASED SATURATION BY 1.5 TIMES
    img_processed = np.array(pil_img)

    if y['ground truth'][i] == 1:
      dest_path = os.path.join(path,'sclerotic')
    else:
      dest_path = os.path.join(path, 'non_sclerotic')

    os.makedirs(dest_path, exist_ok=True)


    file_name = os.path.basename(X['name'][i])

    pil_img.save(os.path.join(dest_path, file_name))

# DEFINING BASE DIRECTORY TO STORE FIRST PREPROCESSED DATASET
base_dir = '/content/drive/MyDrive/processed_dataset'
os.makedirs(base_dir, exist_ok = True)

# CREATE TRAIN, VALIDATION, TEST DIRECTORIES
train_dir = os.path.join(base_dir, 'train')
validation_dir = os.path.join(base_dir, 'validation')
test_dir = os.path.join(base_dir, 'test')

os.makedirs(train_dir, exist_ok=True)
os.makedirs(validation_dir, exist_ok=True)
os.makedirs(test_dir, exist_ok=True)

# CREATE SUBDIRECTOREIS FOR GLOBALLY SCLEROTIC AND NON-GLOBALLY SCLEROTIC
subdirs = ['sclerotic', 'non_sclerotic']
for subdir in subdirs:
    os.makedirs(os.path.join(train_dir, subdir), exist_ok=True)
    os.makedirs(os.path.join(validation_dir, subdir), exist_ok=True)
    os.makedirs(os.path.join(test_dir, subdir), exist_ok=True)

# CALL THE PREPROCESSING FUNCTION
preprocess_images_by_color(X_train, y_train, train_dir)
preprocess_images_by_color(X_val, y_val, validation_dir)
preprocess_images_by_color(X_test, y_test, test_dir)

In [None]:
# CREATE THE TENSOR DATASETS WITH BATCH SIZE SET TO 32 FOR FIRST PREPROCESSED DATASET
batch_size = 32
img_size = (224, 224)
train_dataset_1 = tf.keras.utils.image_dataset_from_directory(train_dir,
                                                            shuffle=True,
                                                            color_mode='rgb',
                                                            batch_size=batch_size,
                                                            image_size=img_size)

validation_dataset_1 = tf.keras.utils.image_dataset_from_directory(validation_dir,
                                                                 shuffle=True,
                                                                 color_mode='rgb',
                                                                 batch_size=batch_size,
                                                                 image_size=img_size)

test_dataset_1 = tf.keras.utils.image_dataset_from_directory(test_dir,
                                                           shuffle=True,
                                                           color_mode='rgb',
                                                           batch_size=batch_size,
                                                           image_size=img_size)

# PREFETCHING FOR THE DATASET_1
train_dataset_1 = train_dataset_1.prefetch(buffer_size=tf.data.AUTOTUNE)
validation_dataset_1 = validation_dataset_1.prefetch(buffer_size=tf.data.AUTOTUNE)
test_dataset_1 = test_dataset_1.prefetch(buffer_size=tf.data.AUTOTUNE)

In [None]:
# DEFINE DATA AUGUMENTATION PARAMETERS
data_augmentation = tf.keras.Sequential([
  tf.keras.layers.RandomFlip('horizontal_and_vertical'),
  tf.keras.layers.RandomRotation(0.15),
])

## BUILDING THE MODEL

In [None]:
# CREATE THE BASE LEARNER 1 MODEL FROM PRETRAINED RESNET50V2
IMG_SHAPE = img_size + (3,)
base_model_1 = tf.keras.applications.ResNet50V2(input_shape=IMG_SHAPE,
                                               include_top=False,
                                               weights='imagenet')

In [None]:
# CHECK NO.OF LAYERS IN RESNET50V2
print("Number of layers in the ResNet50V2: ", len(base_model_1.layers))

In [None]:
# UNFREEZING TOP 50 LAYERS FOR TRAINING (THE MODEL GETS FITTED TO THE DATASET TO TOP 50 LAYERS ONLY)
base_model_1.trainable = True

train_from_layers = 140

for layer in base_model_1.layers[:train_from_layers]:
  layer.trainable = False

base_model_1.summary()

In [None]:
# DEFINING TOP TWO LAYERS FOR SETTING OUR OUTPUT REQUIREMENTS
global_average_layer = tf.keras.layers.GlobalAveragePooling2D()
prediction_layer = tf.keras.layers.Dense(1, activation = 'sigmoid')

# DEFINE ARCHITECTURE OF THE COMPLETE MODEL AFTER AUGUMENTATION AND PREPROCESSING WITH RESPECT TO RESNET50V2
inputs = tf.keras.Input(shape=(224, 224, 3))
x = data_augmentation(inputs)
x = tf.keras.applications.resnet_v2.preprocess_input(x)
x = base_model_1(x, training=False)
x = global_average_layer(x)
x = tf.keras.layers.Dropout(0.3)(x)
outputs = prediction_layer(x)
model_1 = tf.keras.Model(inputs, outputs)

# SUMMARY OF THE UPDATED MODEL
model_1.summary()

In [None]:
# COMPILING THE MODEL
base_learning_rate = 0.001
model_1.compile(loss=tf.keras.losses.BinaryCrossentropy(from_logits=False),
              optimizer = tf.keras.optimizers.SGD(learning_rate=base_learning_rate),
              metrics=[tf.keras.metrics.BinaryAccuracy(threshold=0.5, name='accuracy')]
              )

# SETTING UP CHECKPOINT CALLBACK TO STORE THE BEST PERFORMED MODEL IN EPOCHS
checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath='run2_best_resnet50V2_model.keras',      # Path where the model will be saved
    monitor='val_loss',            # Metric to monitor
    save_best_only=True,           # Save only the best model
    save_weights_only=False,       # Save the entire model
    mode='min',                    # Mode to determine whether the metric should be minimized or maximized
    verbose=1                      # Verbosity mode, 0 or 1
)

# TRAINING THE MODEL
history = model_1.fit(train_dataset_1,
                    epochs=7,
                    validation_data=validation_dataset_1,
                    callbacks=[checkpoint_callback]
                    )

## ASSESSING THE RESULTS

In [None]:
# PLOTTING THE RESULTS
acc = history.history['accuracy']
val_acc = history.history['val_accuracy']

loss = history.history['loss']
val_loss = history.history['val_loss']

plt.figure(figsize=(8, 8))
plt.subplot(2, 1, 1)
plt.plot(acc, label='Training Accuracy')
plt.plot(val_acc, label='Validation Accuracy')
plt.legend(loc='upper right')
plt.ylabel('Accuracy')
plt.ylim([min(plt.ylim()),1])
plt.title('Training and Validation Accuracy')

plt.subplot(2, 1, 2)
plt.plot(loss, label='Training Loss')
plt.plot(val_loss, label='Validation Loss')
plt.legend(loc='upper right')
plt.ylabel('Cross Entropy')
plt.ylim([0,max(plt.ylim())])
plt.title('Training and Validation Loss')
plt.xlabel('epoch')
plt.show()

In [None]:
# SAVING THE FINAL MODEL (BUT WE'LL USE THE BEST MODEL IDENTIFIED BY CHECKPOINT_CALLBACK)
model_1.save('run2_last_resnet50V2_model.keras')