#Imports

In [None]:
import os
import pandas as pd
import shutil
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from pathlib import Path
from keras.models import load_model

from sklearn.model_selection import train_test_split

from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, precision_score, recall_score, f1_score, mean_absolute_error, mean_squared_error, roc_curve, auc
from sklearn.utils.class_weight import compute_class_weight
from tensorflow import keras


In [None]:
from google.colab import drive
drive.mount('/content/drive')

MessageError: Error: credential propagation was unsuccessful

In [None]:
anaemic_dir = '/content/drive/MyDrive/Hb_Nail/Filtered Data/new_split/Anaemic'
non_dir = '/content/drive/MyDrive/Hb_Nail/Filtered Data/new_split/Non-Anaemic'
labelfile = '/content/drive/MyDrive/Hb_Nail/Filtered Data/Labels.xlsx'

#Preparing Data


##Training Split Function

In [None]:
def split_images(input_folder, labels_file):
    # Read the labels file
    labels_df = pd.read_excel(labels_file)

    # Create output folders if they don't exist
    #os.makedirs(output_train_folder, exist_ok=True)
    #os.makedirs(output_test_folder, exist_ok=True)

    # Get unique patient IDs
    unique_patient_ids = labels_df['PatientID']

    # Create lists to store data
    data = []
    haemoglobin_levels = []
    anemia_labels= []

    for image_file in os.listdir(input_folder):

      #print(image_path)
      #Get Patient ID
      patient_id = int(image_file.split('_')[0])

      #Get corresponding row
      label_row = labels_df.loc[labels_df['PatientID'] == patient_id]
      haemoglobin_level = label_row['Hb'].values[0]
      anemia_label = 1 if label_row['Anemia'].values[0] == 'Yes' else 0

      #Get Image path
      image_path = os.path.join(input_folder,image_file)

      #Get image data
      img = tf.keras.preprocessing.image.load_img(image_path, target_size=(224, 224))
      img_array = tf.keras.preprocessing.image.img_to_array(img)
      img_array = np.expand_dims(img_array, axis=0) / 255.0


      # Append data and labels
      data.append(img_array)
      haemoglobin_levels.append(haemoglobin_level)
      anemia_labels.append(anemia_label)
      # Convert lists to numpy arrays
    #if data:

    data = np.vstack(data)
    haemoglobin_levels = np.array(haemoglobin_levels)
    a_labels = np.array(anemia_labels)
 #   Split data into training and testing sets

    train_data, test_data, train_haemoglobin_levels, test_haemoglobin_levels, train_anemia, test_anemia = train_test_split(data, haemoglobin_levels, anemia_labels, test_size=0.1, random_state=42, shuffle = True)

    return train_data, test_data, train_haemoglobin_levels, test_haemoglobin_levels, train_anemia, test_anemia

    #else:
     # print("No data found in the specified directory.")
    #exit()

##Splitting Anaemic and Non-Anaemic separately to decrease class imbalance in training

In [None]:
x_train_non, x_test_non, hb_train_non, hb_test_non, y_train_non, y_test_non = split_images(non_dir, labelfile)
x_train_an, x_test_an, hb_train_an, hb_test_an, y_train_an, y_test_an = split_images(anaemic_dir, labelfile)

In [None]:
x_train = np.concatenate((x_train_non, x_train_an), axis = 0)
y_train = np.concatenate((y_train_non, y_train_an), axis = 0)
x_test = np.concatenate((x_test_non, x_test_an), axis = 0)
y_test = np.concatenate((y_test_non, y_test_an), axis = 0)
hb_train = np.concatenate((hb_train_non, hb_train_an), axis = 0)
hb_test = np.concatenate((hb_test_non, hb_test_an), axis = 0)

In [None]:
print(x_train.shape)
print(x_test.shape)
print(y_train.shape)
print(y_test.shape)
print(hb_train.shape)
print(y_train[:5])
print(hb_train[:5])

(728, 224, 224, 3)
(81, 224, 224, 3)
(728,)
(81,)
(728,)
[0 0 0 0 0]
[15.8 16.4 13.9 12.2 13.2]


##Validation set

In [None]:
#Train and validation set for regression
train_data, val_data, train_haemoglobin_levels, val_haemoglobin_levels, train_y, val_y = train_test_split(x_train, hb_train, y_train, test_size=0.1, random_state=69, shuffle=True)

#Train and validation set for classification

In [None]:
val_data.shape

(73, 224, 224, 3)

##Smote for classification

In [None]:
train_data.shape

(655, 224, 224, 3)

In [None]:
x_reshaped = train_data.reshape(655, 224*224*3)
x_reshaped.shape

(655, 150528)

In [None]:
from imblearn.over_sampling import SMOTE
sm = SMOTE(random_state= 2)
x_smotere, y_smote = sm.fit_resample(x_reshaped, train_y)

In [None]:
x_smotere.shape

(1114, 150528)

In [None]:
x_smote = x_smotere.reshape(1114, 224, 224, 3)

#Training

##Imports & Data generator

In [None]:
from tensorflow.keras.applications import ResNet50, EfficientNetV2B3, InceptionResNetV2
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Flatten, GlobalAveragePooling2D
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.models import load_model
from tensorflow.keras.preprocessing.image import load_img, img_to_array
#from tensorflow.keras.optimizers import Adam
from tensorflow.keras.optimizers.legacy import Adam

In [None]:
models_subfolder = '/content/drive/MyDrive/Hb_Nail/Filtered Data/Models'
batch_size = 32
Optimizer = Adam(learning_rate = 0.001)
# Define an ImageDataGenerator for data augmentation for training
train_datagen = ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest'
)

##Data Generator

In [None]:
# Fit training data to generator
regression_train_data_generator = train_datagen.flow(train_data, train_haemoglobin_levels, batch_size=batch_size)
classification_train_data_generator = train_datagen.flow(x_smote, y_smote, batch_size=batch_size)

##ResNet50

###Model Specifications

In [None]:
# Create ResNet50 base model
base_model1 = ResNet50(include_top=False, weights='imagenet', input_shape=(224, 224, 3))

# Freeze the layers of the pre-trained model
for layer in base_model1.layers:
    layer.trainable = False



Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5


###Regression

####Regression custom layers

In [None]:
# Add custom top layers for regression
regression_output = Dense(128, activation='relu')(base_model1.output)
regression_output = Dense(1, activation='linear', name='regression_output')(regression_output)

# Create the regression model
regression_model = Model(inputs=base_model1.input, outputs=regression_output)
regression_model.compile(optimizer='adam', loss='mean_squared_error', metrics=['mae'])

####Epochs

In [None]:
# Train the model
regression_model.fit(
    regression_train_data_generator,
    epochs=10,
    validation_data=(val_data, val_haemoglobin_levels),
    callbacks=[EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)]
)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10

KeyboardInterrupt: 

####Saving

In [None]:
savepathresreg = os.path.join(models_subfolder, "resnet_regression.keras")
errorpathresreg = os.path.join(models_subfolder, "resnetregression_error.xlsx")
regression_model.save(savepathresreg)


###Classification

####Classification custom top layers

In [None]:
# Add custom top layers for classification
classification_output = GlobalAveragePooling2D()(base_model1.output)
classification_output = Dense(128, activation='relu')(classification_output)
classification_output = Dense(1, activation='sigmoid', name='classification_output')(classification_output)

# Create the classification model
classification_model = Model(inputs=base_model1.input, outputs=classification_output)
classification_model.compile(optimizer=Optimizer, loss='binary_crossentropy', metrics=['accuracy'])

#Add Class Weights
class_weights = compute_class_weight('balanced', classes=np.unique(y_smote), y=y_smote)
class_weight_dict = {0: class_weights[0], 1: class_weights[1]}

In [None]:
print(class_weight_dict)

{0: 1.0, 1: 1.0}


####Epochs

In [None]:
classification_model.fit(
        classification_train_data_generator,
        epochs=10,
        validation_data=(val_data, val_y),
        callbacks=[EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)]
        #,class_weight=class_weight_dict
    )

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10


<keras.src.callbacks.History at 0x7fdad2303970>

In [None]:
savepathresc = os.path.join(models_subfolder, "resnet_classification.keras")
errorpathresc = os.path.join(models_subfolder, "resnetclassification_error.xlsx")
regression_model.save(savepathresc)

##EfficientNet

###Model Specifications

In [None]:
# Create EfficientNetV2 base model
base_model2 = EfficientNetV2B3(include_top=False, weights='imagenet', input_shape=(224, 224, 3))

# Freeze the layers of the pre-trained model
for layer in base_model2.layers:
    layer.trainable = False



Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/efficientnet_v2/efficientnetv2-b3_notop.h5


###Regression

####Regression custom top layers

In [None]:
# Add custom top layers for regression
regression_output = Dense(128, activation='relu')(base_model2.output)
regression_output = Dense(1, activation='linear', name='regression_output')(regression_output)

# Create the regression model
regression_model = Model(inputs=base_model2.input, outputs=regression_output)
regression_model.compile(optimizer='adam', loss='mean_squared_error', metrics=['mae'])

####Epochs

In [None]:
# Train the model
regression_model.fit(
    regression_train_data_generator,
    epochs=10,
    validation_data=(val_data, val_haemoglobin_levels),
    callbacks=[EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)]
)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.src.callbacks.History at 0x7b31deb9bfa0>

####Saving

In [None]:
savepatheff = os.path.join(models_subfolder, "efficientnetb3_regression.keras")
regression_model.save(savepatheff)
errorpatheff = os.path.join(models_subfolder, "efficientnetb3regression_error.xlsx")

###Classification

####Classification custom top layers

In [None]:
# Add custom top layers for classification
classification_output = GlobalAveragePooling2D()(base_model2.output)
classification_output = Dense(128, activation='relu')(classification_output)
classification_output = Dense(1, activation='sigmoid', name='classification_output')(classification_output)

# Create the classification model
classification_model = Model(inputs=base_model2.input, outputs=classification_output)
classification_model.compile(optimizer=Optimizer, loss='binary_crossentropy', metrics=['accuracy'])

#Add Class Weights
class_weights = compute_class_weight('balanced', classes=np.unique(y_smote), y=y_smote)
class_weight_dict = {0: class_weights[0], 1: class_weights[1]}

####Epochs

In [None]:
classification_model.fit(
        classification_train_data_generator,
        epochs=10,
        validation_data=(val_data, val_y),
        #callbacks=[EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)]
        #,class_weight=class_weight_dict
    )

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.src.callbacks.History at 0x7fe7ea4d5990>

####Saving

In [None]:
savepatheffc = os.path.join(models_subfolder, "efficientnetb3_classification.keras")
classification_model.save(savepatheffc)
errorpatheffc = os.path.join(models_subfolder, "efficientnetb3classification_error.xlsx")

#Testing

##Regression

###Core Functions

In [None]:
def calculate_absoluteerror_perimage(actual, predicted):

    actual = actual.flatten()
    predicted = predicted[:len(actual)]  # Ensure both arrays have the same length

    ae_per_image = np.abs(actual - predicted)

    return ae_per_image

def regtester(path, test_data, test_hb, errorfile):

    #Load model and predictions
    model = tf.keras.models.load_model(path)
    predictions = model.predict(test_data)

    #Check shapes
    print("Shape of test_haemoglobin_levels:", test_hb.shape)
    print("Shape of regression_predictions:", predictions.flatten().shape)

    #Calculate absolute error & MAE
    imagewise_ae = calculate_absoluteerror_perimage(test_hb, predictions.flatten())
    print(np.mean(imagewise_ae))

    #Store imagewise error in errorfile
    errordata = pd.DataFrame()
    errordata['Actual_Hb'] = test_hb.flatten()
    errordata['Predicted_Hb'] = predictions.flatten()[:81]
    errordata['Absolute Error'] = imagewise_ae
    errordata.to_excel(errorfile, index=False)


###Testing Models

In [None]:
regtester(savepathres, x_test, hb_test, errorpathres)

In [None]:
regtester(savepatheff, x_test, hb_test, errorpatheff)

Shape of test_haemoglobin_levels: (81,)
Shape of regression_predictions: (3969,)
1.745387143264582


##Classification

###Core Functions

In [None]:
def clatester(path, test_data, test_y, errorfile):
    #Load model and predictions
    model = tf.keras.models.load_model(path)
    predictions = model.predict(test_data)

   #Evaluate model on test data
    classification_predictions = np.round(model.predict(test_data)).flatten()
    accuracy = accuracy_score(test_y, classification_predictions)
    print("Accuracy for classification:", accuracy)

    # Get the probabilities for each class (assuming binary classification)
    resnet_classification_probabilities = classification_model.predict(test_data).flatten()

    # Calculate the ROC curve
    fpr, tpr, thresholds = roc_curve(test_y, resnet_classification_probabilities)

    # Calculate Youden's Index for each threshold
    youden_index = tpr - fpr
    optimal_threshold_index = np.argmax(youden_index)
    optimal_threshold = thresholds[optimal_threshold_index]

    # Apply the optimal threshold to get binary predictions
    optimal_classification_predictions = (resnet_classification_probabilities > optimal_threshold).astype(int)

    # Define class names
    class_names = ['No Anemia', 'Anemia']

    # Accuracy with optimal threshold
    optimal_accuracy = accuracy_score(test_y, optimal_classification_predictions)
    print("Accuracy with optimal threshold:", optimal_accuracy)

    # Classification Report with optimal threshold
    print("Classification Report with optimal threshold:")
    optimal_classification_report = classification_report(test_y, optimal_classification_predictions, target_names=class_names, zero_division=0)
    print(optimal_classification_report)

    # Confusion Matrix with optimal threshold
    print("Confusion Matrix with optimal threshold:")
    optimal_conf_matrix = confusion_matrix(test_y, optimal_classification_predictions, labels=[0, 1])
    print(pd.DataFrame(optimal_conf_matrix, index=class_names, columns=class_names))




###Testing Models

In [None]:
#regtester(savepathresc, x_test, y_test, errorpathresc)
clatester(savepatheffc, x_test, y_test, errorpatheffc)
clatester(savepathresc, x_test, y_test, errorpathresc)

Accuracy for ResNet classification: 0.8518518518518519
Accuracy with optimal threshold: 0.3950617283950617
Classification Report with optimal threshold:
              precision    recall  f1-score   support

   No Anemia       0.92      0.32      0.47        69
      Anemia       0.18      0.83      0.29        12

    accuracy                           0.40        81
   macro avg       0.55      0.58      0.38        81
weighted avg       0.81      0.40      0.45        81

Confusion Matrix with optimal threshold:
           No Anemia  Anemia
No Anemia         22      47
Anemia             2      10
