### Libraries

In [None]:
!pip uninstall pydicom -y
!pip install pydicom

!pip install pillow>=10.0
!pip install pylibjpeg>=2.0
!pip install pylibjpeg-libjpeg>=2.1
!pip install gdcm>=3.0.10

In [None]:
!pip install aif360
!pip install aif360[inFairness]
!pip install aif360[OptimalTransport]

In [None]:
!pip install fairlearn

In [None]:
import glob
import os
import pydicom
import json
import numpy as np
from skimage.transform import resize
from sklearn.model_selection import train_test_split
from collections import defaultdict
import itertools
import time

# Import necessary libraries
import numpy as np
import pandas as pd
import os
import pydicom
import json

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, Input
from tensorflow.keras.optimizers import Adam, SGD
from tensorflow.keras.callbacks import Callback
from tensorflow import keras

from skimage.transform import resize
from sklearn.metrics import roc_auc_score, accuracy_score

import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from PIL import Image
import cv2 as cv

# Set random seed for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

### Functions

This section contains the functios required for reading the dicom image files, annotations, and creating GradCAM visualization heatmaps.

In [None]:
error_summary = defaultdict(int)
error_details = defaultdict(list)

# Loading annotations
def load_annotations(json_file):
    try:
        with open(json_file) as f:
            annotations = json.load(f)
        return annotations
    except Exception as e:
        error_summary['Annotation Load Errors'] += 1
        error_details['Annotation Load Errors'].append((json_file, str(e)))
        return []

# Loading DICOM images
## The images are resized and converted to
## 3-channel images by stacking them
def load_dicom_image(file_path, img_size):
    try:
        dicom = pydicom.dcmread(file_path)
        img = dicom.pixel_array
        img = resize(img, (img_size, img_size), mode='constant', anti_aliasing=True)
        img = np.stack((img,)*3, axis=-1)  # Convert to 3-channel image
        return img
    except Exception as e:
        print("DICOM Load Errors")
        error_summary['DICOM Load Errors'] += 1
        error_details['DICOM Load Errors'].append((file_path, str(e)))
        return None

# Recursively loading all DICOM files from the given directories
def load_dicom_files_from_folder(folder, max_files=10):
    try:
        files = glob.glob(os.path.join(folder, '**', '*.dcm'), recursive=True)
        return files[:max_files]  # Limit to a subset for testing
    except Exception as e:
        print("DICOM Folder Load Errors")
        error_summary['DICOM Folder Load Errors'] += 1
        error_details['DICOM Folder Load Errors'].append((folder, str(e)))
        return []

In [None]:
## The GradCAM function for creating the heatmap visualizations for explainability purposes and evaluations
## The function is according to the Keras documents
def gradcam(img_array, model, last_conv_layer_name, model_last_layer, pred_index=None):
    grad_model = keras.models.Model(
        model.inputs, [model.get_layer(last_conv_layer_name).output,
                       model.get_layer(model_last_layer).output])

    # Top predicted class; the last conv layer
    with tf.GradientTape() as tape:
        last_conv_layer_output, preds = grad_model(img_array)
        if pred_index is None:
            pred_index = tf.argmax(preds[0])
        class_channel = preds[:, pred_index]

    # gradient of the output neuron; feature map of the last conv layer
    grads = tape.gradient(class_channel, last_conv_layer_output)

    # mean intensity of the gradient over a specific feature map channel
    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))
    # this is to handle zero gradients
    epsilon = 1e-8
    pooled_grads = tf.maximum(pooled_grads, epsilon)

    # multiply each channel in the feature map array by its importance
    last_conv_layer_output = last_conv_layer_output[0]
    heatmap = last_conv_layer_output @ pooled_grads[..., tf.newaxis]
    heatmap = tf.squeeze(heatmap)
    # For visualization purpose, we will also normalize the heatmap between 0 & 1
    # normalize heatmap
    heatmap = tf.maximum(heatmap, 0)
    heatmap /= (tf.math.reduce_max(heatmap) + epsilon)
    return heatmap.numpy()

### Reading Images

This section loads the images from the dataset directories. The data is sampled for the first 1000 instances in sample directories.

Loading and Creating the datasets can take a while. Thus, please be patient.

In [None]:
sample_size = 200
img_size = 128

In [None]:
# Paths to directories containing DICOM files
image_dirs = [
    r"/content/drive/MyDrive/Folders/+Projects/Dr Shahbazian/AL Codes and Data/Dataset/mdai_public_project_LxR6zdR2_images_2018-08-20-184248",
    r"/content/drive/MyDrive/Folders/+Projects/Dr Shahbazian/AL Codes and Data/Dataset/mdai_rsna_project_x9N20BZa_images_2018-07-20-153330"
]

image_dirs_samples = [
    r"/content/drive/MyDrive/Folders/+Projects/Dr Shahbazian/AL Codes and Data/Dataset/samples_public",
    r"/content/drive/MyDrive/Folders/+Projects/Dr Shahbazian/AL Codes and Data/Dataset/samples_rsna"
]

# Load a subset of all DICOM files from the specified directories (sampled instances)
directory = image_dirs_samples

In [None]:
# ## Counting the number of files and folders in the pneumonia dataset directories
# import os
# for dir in image_dirs_samples:
#   print(dir)
#   files = folders = 0
#   for _, dirnames, filenames in os.walk(dir):
#       files += len(filenames)
#       folders += len(dirnames)
#   print("{:,} files, {:,} folders".format(files, folders))


# # ## (1000, 1010) files (for now, as samples)

In [None]:
## Loading dicom files from the specified directories
## each directory is loaded individually in a dictionary format
dicom_files = dict()
for img_dir in directory:
  dicom_files[directory.index(img_dir)] = load_dicom_files_from_folder(img_dir, max_files=sample_size)
  print(error_summary, error_details)
  print(f"Dataset {directory.index(img_dir)} has completed.")

In [None]:
len(dicom_files[0]), len(dicom_files[1])

In [None]:
## loading the images and extracting the relevant information from the dicom files
## each directory is loaded seperately to the Dataset dictionary.
## the information extracted from the dicom files include patient's gender, age,
## and the study_instance_id which is used to map the images to their annotations
Dataset = dict()
for key in dicom_files.keys():
  images = []
  for file_ in dicom_files[key]:
    dcm = pydicom.dcmread(file_)
    img = load_dicom_image(file_, img_size)
    gender = dcm.PatientSex
    age = dcm.PatientAge
    studyinstaneid = dcm.StudyInstanceUID
    images.append([studyinstaneid, img, gender, int(age)])
  Dataset[key] = images
  print(f"Dataset {key} has completed.")

In [None]:
## To prevent any changes on the original loaded data, we copy the dataset dictionary as a backup file.
## Further in the code, in case we require the original data to be restored,
## we won't have to load the dicom files, which is time-consuming
import copy
Dataset_backup = copy.deepcopy(Dataset)

### Plotting a Sample Image

In [None]:
# Displaying the first 9 images as for sample illustrations
plt.figure(figsize=(20,10))

for i in range(9):
    plt.subplot(3, 3, i + 1)

    image_sample = pydicom.dcmread(dicom_files[0][i]).pixel_array
    plt.imshow(image_sample, cmap='gray')
    plt.axis('off')

plt.tight_layout()

### Reading Annotations

In this section, we load annotations and map them to the images according to their study instance IDs. These IDs are unique, so that the mapping can be done without problem.

In [None]:
# Paths to annotation files
annotation_files = [
    r"/content/drive/MyDrive/Folders/+Projects/Dr Shahbazian/Projects-Ms.Movahed/Paper-Augmented/main data-original/pneumonia-challenge-annotations-adjudicated-kaggle_2018.json",
    r"/content/drive/MyDrive/Folders/+Projects/Dr Shahbazian/Projects-Ms.Movahed/Paper-Augmented/main data-original/pneumonia-challenge-annotations-original_2018.json",
    r"/content/drive/MyDrive/Folders/+Projects/Dr Shahbazian/Projects-Ms.Movahed/Paper-Augmented/main data-original/pneumonia-challenge-dataset-mappings_2018.json"
]

In [None]:
## Annotations1 => 'ChestX-ray14 subset: Pneumonia','x9N20BZa' => mdai_rsna_project_x9N20BZa_images_2018-07-20-153330 Dataset
## Annotations2 => 'RSNA Pneumonia Detection Challenge on Kaggle', 'LxR6zdR2' => mdai_public_project_LxR6zdR2_images_2018-08-20-184248 Dataset

## Annotations 1 => Dataset 1 => rsna
## Annotations 2 => Dataset 0 => public

annotations1 = load_annotations(annotation_files[0]) ## dict
annotations2 = load_annotations(annotation_files[1]) ## dict
annotations3 = load_annotations(annotation_files[2]) ## list

In [None]:
# annotations1['labelGroups'][0]['labels']

In [None]:
# annotations1['datasets'][0]['annotations'][0:10]

In [None]:
# annotations2['datasets'][0]['annotations'][0:10]

In [None]:
annotations1.keys(), annotations1['name'], annotations1['id']

In [None]:
annotations2.keys(), annotations2['name'], annotations2['id']

In [None]:
annotations3[0]

In [None]:
# study_instance_id = [i['StudyInstanceUID'] for i in  annotations3]
# series_instance_id = [i['SeriesInstanceUID'] for i in  annotations3]
# sop_instance_id = [i['SOPInstanceUID'] for i in  annotations3]

# len(study_instance_id) != len(set(study_instance_id)), len(series_instance_id) != len(set(series_instance_id)), len(sop_instance_id) != len(set(sop_instance_id))

## All instance ids are unique => can be used as image keys and annotations
# (False, False, False)

### Creating Dataset

In this section, we create the the final version of the datasets by adding the annotations to each data instance.

the ratio of images are also used to convert the bounding box to the final resized image.

In [None]:
ratio = 1024/img_size
print(f"Image Ratio: {ratio}")

id_annot1 = dict()
id_bbox1 = dict()
for annotation in annotations1['datasets'][0]['annotations']:
    if annotation['annotationNumber'] is None:
      label = 0
      data_x, data_y, data_width, data_height = 0, 0, 0, 0
    else:
      label = int(annotation['annotationNumber'])
      data_x = annotation['data']['x']/ratio
      data_y = annotation['data']['y']/ratio
      data_width = annotation['data']['width']/ratio
      data_height = annotation['data']['height']/ratio

    id_annot1[annotation['StudyInstanceUID']] = label
    id_bbox1[annotation['StudyInstanceUID']] = (data_x, data_y, data_width, data_height)


id_annot2 = dict()
id_bbox2 = dict()
for annotation in annotations2['datasets'][0]['annotations']:
    if annotation['annotationNumber'] is None:
      label = 0
      data_x, data_y, data_width, data_height = 0, 0, 0, 0
    else:
      label = int(annotation['annotationNumber'])
      data_x = annotation['data']['x']/ratio
      data_y = annotation['data']['y']/ratio
      data_width = annotation['data']['width']/ratio
      data_height = annotation['data']['height']/ratio

    id_annot2[annotation['StudyInstanceUID']] = label
    id_bbox2[annotation['StudyInstanceUID']] = (data_x, data_y, data_width, data_height)


id_annot3 = dict()
for item in annotations3:
  id_annot3[item['StudyInstanceUID']] = item['subset_init_label']

In [None]:
Dataset = copy.deepcopy(Dataset_backup)
## Annotations 1 => Dataset 1
## Annotations 2 => Dataset 0
for key in Dataset:
  for img in Dataset[key]:
    # img.append(id_annot3[img[0]])
    if key == 0:
      img.append(id_annot2[img[0]])
      img.append(id_bbox2[img[0]])
    elif key == 1:
      img.append(id_annot1[img[0]])
      img.append(id_bbox1[img[0]])
  print(f"Dataset {key} has completed.")

In [None]:
i = 0
for item in Dataset[0]:
  if item[-2] != 0:
    print(f"The first Pneumonia case is:{i} \n {item[2:]}")
    break
  i += 1

### Displaying a sample image with bbox

In [None]:
# Displaying a sample image with bbox
## To change the images you only need to specify the image index and the dataset key,
## which refers to samples from either public or rsna directories.
## Provided that the data instance is a positive case of pneumonia, there will be a red box on the image
fig, ax = plt.subplots()

dataset_index = 0
img_index = 1
print(f"Annotation: {Dataset[dataset_index][img_index][4]}")

image_sample = Dataset[dataset_index][img_index][1]
ax.imshow(image_sample)

# Create a Rectangle patch
x = Dataset[dataset_index][img_index][-1][0]
y = Dataset[dataset_index][img_index][-1][1]
width = Dataset[dataset_index][img_index][-1][2]
height = Dataset[dataset_index][img_index][-1][3]

print(f"Box: {(x, y, width, height)}")

rect = patches.Rectangle((x, y), width, height, linewidth=1, edgecolor='r', facecolor='none')

ax.add_patch(rect)
plt.tight_layout()

# Original Model

### Data Creation

In [None]:
# Xray_images = [img[1] for img in Dataset[key] for key in Dataset.keys()]
# labels = [img[4] for img in Dataset[key] for key in Dataset.keys()]

Xray_images = []
labels = []
genders = []
for key in Dataset.keys():
  for img in Dataset[key]:
    Xray_images.append(img[1])
    genders.append({'F':0, 'M':1}[img[2]])
    labels.append(img[4])

num_classes = len(set(labels))
print("Num Classes: ", num_classes)

Xray_images = np.array(Xray_images)
labels = np.array(labels)
genders = np.array(genders)

Xray_images = np.array(Xray_images)
labels = np.array(labels)

labels = np.expand_dims(labels,axis=1)
genders = np.expand_dims(genders,axis=1)
outputs = np.concatenate([labels, genders], axis=1)

print(Xray_images.shape, outputs.shape)

In [None]:
# Split data into training and validation sets
X_train, X_val, y_train, y_val = train_test_split(Xray_images, outputs, test_size=0.2, random_state=42)

In [None]:
y_train_original = y_train[:,0:2]
y_val_original = y_val[:,0:2]

In [None]:
print("Training Samples:", X_train.shape, y_train_original.shape)
print("Validation Samples:", X_val.shape, y_val_original.shape)

In [None]:
batch_size = 16

In [None]:
train_dataset_original = tf.data.Dataset.from_tensor_slices((tf.convert_to_tensor(X_train), tf.convert_to_tensor(y_train_original)))
train_dataset_original = train_dataset_original.batch(batch_size)

validation_dataset_original = tf.data.Dataset.from_tensor_slices((tf.convert_to_tensor(X_val), tf.convert_to_tensor(y_val_original)))
validation_dataset_original = validation_dataset_original.batch(batch_size)

### Training Loop

In [None]:
# Define the CNN model
input_shape = (img_size, img_size, 3)

original_model = Sequential([
    Input(input_shape),
    Conv2D(32, (3, 3), activation='relu'),
    MaxPooling2D((2, 2)),
    Conv2D(64, (3, 3), activation='relu'),
    MaxPooling2D((2, 2)),
    Flatten(),
    Dense(128, activation='relu'),
    Dropout(0.5),
    Dense(num_classes, activation='sigmoid')
])

###################

loss_fn = keras.losses.SparseCategoricalCrossentropy()
optimizer = Adam()
n_epochs = 30

val_acc_metric = keras.metrics.SparseCategoricalAccuracy()

for epoch in range(n_epochs):
  print("\nEpoch %d" % (epoch,))
  start_time = time.time()
  for step, (x_batch_train, y_batch_train) in enumerate(train_dataset_original):
    with tf.GradientTape() as tape:
        logits = original_model(x_batch_train, training=True)  # Logits for this minibatch
        loss_value = loss_fn(y_batch_train[:,0], logits)

    grads = tape.gradient(loss_value, original_model.trainable_weights)
    optimizer.apply_gradients(zip(grads, original_model.trainable_weights))

    if step % batch_size == 0:
        print(
            "Training loss (for one batch) at step %d: %.4f"
            % (step, float(loss_value))
        )
        print("Seen so far: %s samples" % ((step + 1) * batch_size))


  for x_batch_val, y_batch_val in validation_dataset_original:
      val_logits = original_model(x_batch_val, training=False)
      val_acc_metric.update_state(y_batch_val[:,0], val_logits)

  val_acc = val_acc_metric.result()
  val_acc_metric.reset_state()
  print("Validation acc: %.4f" % (float(val_acc),))
  print("Time taken: %.2fs" % (time.time() - start_time))


### GradCAM

In [None]:
original_model.summary()

In [None]:
conv_layers_names = []
for layer in original_model.layers:
  print(layer)
  if 'conv' in layer.name:
    conv_layers_names.append(layer.name)

print("\nConvolution layer names:", conv_layers_names)

In [None]:
last_conv_layer_name = conv_layers_names[-1]
model_last_layer = original_model.layers[-1].name
last_conv_layer_name, model_last_layer

In [None]:
batches = []
batches_labels = []
for x_batch_val, y_batch_val in validation_dataset_original:
  img, label = x_batch_val, y_batch_val
  batches.append(img)
  batches_labels.append(label)

In [None]:
len(batches_labels), len(batches_labels[0])

In [None]:
## In this cell, we specify a sample data instance to generate its GradCAM.
## Initially, we select a batch from the validation dataset, and then, specify the image index.
## The model prediction is done for the whole batch. However, we select the certain
## image and its label to produce the GradCAM heatmaps.

batch_index = 0
img = batches[batch_index]
label = batches_labels[batch_index][:,0]
print("Batch Labels:", label)
print(img.shape, label.shape)

index = 0

predictions = original_model.predict(img)
print("Batch Predictions:", np.argmax(predictions, axis=1))
print("True Labels:", np.array(label))

print(f"Batch Index={batch_index}; Image Index:{index}")

# img, label = validation_dataset.get_single_element()
img_test = np.expand_dims(img[index], axis=0)
print(img_test.shape)
pred = original_model.predict(img_test)
print(f"Prediction Vector for Total Batch: {pred}; \n Predicted Class:{tf.argmax(pred, axis=1)}; True Class:{label[index]}")

GradCAM for index 0

In [None]:
heatmap = gradcam(img_test, original_model, last_conv_layer_name, model_last_layer, pred_index=0)
print(np.mean(heatmap))

#######################
heatmap_ = np.uint8(255 * heatmap)
jet = mpl.colormaps["jet"]

# Use RGB values of the colormap
jet_colors = jet(np.arange(256))[:, :3]
jet_heatmap = jet_colors[heatmap_]

print(img_test[0].shape, jet_heatmap.shape)

# Create an image with RGB colorized heatmap
jet_heatmap = keras.utils.array_to_img(jet_heatmap)
jet_heatmap = jet_heatmap.resize((img_test.shape[1], img_test.shape[1]))
jet_heatmap = keras.utils.img_to_array(jet_heatmap)


# Superimpose the heatmap on original image
superimposed_img = jet_heatmap * 0.001 + img_test[0]
superimposed_img = keras.utils.array_to_img(superimposed_img)
superimposed_img = superimposed_img.resize((256, 256))

# Display Grad CAM
# display(superimposed_img)
fig, axes = plt.subplots(1, 2, figsize=(10, 5))
axes[0].matshow(heatmap_)
axes[0].set_title("Heatmap")
axes[0].axis('off')
axes[1].imshow(superimposed_img)
axes[1].set_title("Super Imposed Image")
axes[1].axis('off')
plt.show()

GradCAM for index 1

In [None]:
heatmap = gradcam(img_test, original_model, last_conv_layer_name, model_last_layer, pred_index=1)
print(np.mean(heatmap))

#######################
heatmap_ = np.uint8(255 * heatmap)
jet = mpl.colormaps["jet"]

# Use RGB values of the colormap
jet_colors = jet(np.arange(256))[:, :3]
jet_heatmap = jet_colors[heatmap_]

# Create an image with RGB colorized heatmap
jet_heatmap = keras.utils.array_to_img(jet_heatmap)
jet_heatmap = jet_heatmap.resize((img_test.shape[1], img_test.shape[1]))
jet_heatmap = keras.utils.img_to_array(jet_heatmap)


# Superimpose the heatmap on original image
superimposed_img = jet_heatmap * 0.001 + img_test[0]
superimposed_img = keras.utils.array_to_img(superimposed_img)
superimposed_img = superimposed_img.resize((256, 256))

# Display Grad CAM
# display(superimposed_img)
fig, axes = plt.subplots(1, 2, figsize=(10, 5))
axes[0].matshow(heatmap_)
axes[0].set_title("Heatmap")
axes[0].axis('off')
axes[1].imshow(superimposed_img)
axes[1].set_title("Super Imposed Image")
axes[1].axis('off')
plt.show()

GradCAM for index 2

In [None]:
heatmap = gradcam(img_test, original_model, last_conv_layer_name, model_last_layer, pred_index=2)
print(np.mean(heatmap))
#######################
heatmap_ = np.uint8(255 * heatmap)
jet = mpl.colormaps["jet"]

# Use RGB values of the colormap
jet_colors = jet(np.arange(256))[:, :3]
jet_heatmap = jet_colors[heatmap_]

# Create an image with RGB colorized heatmap
jet_heatmap = keras.utils.array_to_img(jet_heatmap)
jet_heatmap = jet_heatmap.resize((img_test.shape[1], img_test.shape[1]))
jet_heatmap = keras.utils.img_to_array(jet_heatmap)


# Superimpose the heatmap on original image
superimposed_img = jet_heatmap * 0.001 + img_test[0]
superimposed_img = keras.utils.array_to_img(superimposed_img)
superimposed_img = superimposed_img.resize((256, 256))

# Display Grad CAM
# display(superimposed_img)
fig, axes = plt.subplots(1, 2, figsize=(10, 5))
axes[0].matshow(heatmap_)
axes[0].set_title("Heatmap")
axes[0].axis('off')
axes[1].imshow(superimposed_img)
axes[1].set_title("Super Imposed Image")
axes[1].axis('off')
plt.show()

### Fairness

Evaluating the fairness of the model using Demographic Parity and Equality Odds metrics.

Fairness metrics are typically and principally used for binary classification. Hence, we have to map the true labels into a single label.

In [None]:
## Generating model predictions and recording the true labels for fairness evaluation
## Label 0 indicates the true classification label, and label 1 refers to the genders
predictions = []
true_labels = []

for x_batch_val, y_batch_val in validation_dataset_original:
  img, label = x_batch_val, y_batch_val
  pred = original_model.predict(img)
  predictions.append(tf.argmax(pred, axis=1).numpy())
  true_labels.append((label[:,0].numpy(), label[:,1].numpy()))

In [None]:
y_pred = []
y_true = []
for pred in predictions:
  y_pred.extend(pred)

for label in true_labels:
  for indx in range(len(label[0])):
    y_true.append([label[0][indx], label[1][indx]])

y_pred = np.array(y_pred)
y_true = np.array(y_true)

y_pred.shape, y_true.shape

In [None]:
print("Accuracy:", accuracy_score(y_true[:,0], y_pred))

In [None]:
from fairlearn.metrics import demographic_parity_difference, demographic_parity_ratio
## These metrics are for binary classification. Hence, we map the true labels to 1, which indicates the positive case of Pneumonia

def binary_map(input_labels):
    labels = []
    for item in input_labels:
      if item != 0:
        labels.append(1)
      else:
        labels.append(item)
    return labels

## The original multi-class labels
# img_preds = y_pred
# img_true = y_true[:,0]

## The binary classification labels
img_preds = binary_map(y_pred)
img_true = binary_map(y_true[:,0])
gender = [{0:'F', 1:'M'}[l] for l in y_true[:,1]] ## Specifying gender as the protected group

print("Demographic Parity Difference =", demographic_parity_difference(img_true, img_preds, sensitive_features=gender))
print("Demographic Parity Ratio =", demographic_parity_ratio(img_true, img_preds, sensitive_features=gender))

## Multi-class
# 0.02083333333333333
# 0.75

## Binary Mapped Values
# 0.09375
# 0.625

In [None]:
from fairlearn.metrics import equalized_odds_difference, equalized_odds_ratio
print("Equally Odds Difference =", equalized_odds_difference(img_true, img_preds, sensitive_features=gender))
print("Equally Odds Ratio =",equalized_odds_ratio(img_true, img_preds, sensitive_features=gender))

## Binary Mapped Values
# 0.20833333333333334
# 0.4444444444444444

### AUC and SPD

Evaluating the model based on AUC and SPD metrics.

AUC metric assess the model's accuracy, while SPD examines the model sensitivity towards biases.

In [None]:
from sklearn import metrics
from aif360.sklearn.metrics import statistical_parity_difference

AUC = metrics.roc_auc_score(img_true, img_preds)
print("AUC = ", AUC)

SPD = statistical_parity_difference(pd.DataFrame(img_true), np.array(img_preds))
print("SPD = ", SPD)

# Augmented Learning: Fairness and Explainability

In [None]:
# Xray_images = np.array([img[1] for img in Dataset[key] for key in Dataset.keys()])
# labels = np.array([img[4] for img in Dataset[key] for key in Dataset.keys()])
# genders = np.array([{'F':0, 'M':1}[img[2]] for img in Dataset[key] for key in Dataset.keys()])
# bbox = np.array([img[-1] for img in Dataset[key] for key in Dataset.keys()])

Xray_images = []
labels = []
genders = []
bbox = []
for key in Dataset.keys():
  for img in Dataset[key]:
    Xray_images.append(img[1])
    genders.append({'F':0, 'M':1}[img[2]])
    labels.append(img[4])
    bbox.append(img[5])

num_classes = len(set(labels))
print("Num Classes: ", num_classes)

Xray_images = np.array(Xray_images)
labels = np.array(labels)
genders = np.array(genders)
bbox = np.array(bbox)

bbox_normalized =  bbox / np.linalg.norm(bbox)

labels = np.expand_dims(labels,axis=1)
genders = np.expand_dims(genders,axis=1)

outputs = np.concatenate([labels, genders, bbox_normalized], axis=1)
print(Xray_images.shape, labels.shape, genders.shape, bbox.shape, outputs.shape)

In [None]:
outputs[0:10][:,2:]

In [None]:
# Split data into training and validation sets
# X_train, X_val, y_train, y_val = train_test_split(Xray_images, outputs, test_size=0.2, shuffle=False)
X_train, X_val, y_train, y_val = train_test_split(Xray_images, outputs, test_size=0.2)

In [None]:
print("Training Samples:", X_train.shape, y_train.shape)
print("Validation Samples:", X_val.shape, y_val.shape)

In [None]:
batch_size = 16

In [None]:
train_dataset = tf.data.Dataset.from_tensor_slices((tf.convert_to_tensor(X_train), tf.convert_to_tensor(y_train)))
train_dataset = train_dataset.batch(batch_size)

validation_dataset = tf.data.Dataset.from_tensor_slices((tf.convert_to_tensor(X_val), tf.convert_to_tensor(y_val)))
validation_dataset = validation_dataset.batch(batch_size)

In [None]:
## Illustrating the samples in the validation dataset
## The code is commented to prevent unnecessary RAM usage.
## So, please uncomment the following codes in case the illustrations are required

# imgs = []
# for x_batch_val, y_batch_val in validation_dataset:
#   for img in x_batch_val:
#       imgs.append(img)

# plt.figure(figsize=(10,10))

# i = 0
# for img in imgs[0:100]:
#   plt.subplot(10, 10, i + 1)
#   plt.imshow(img, cmap='gray')
#   plt.axis('off')
#   i += 1

# plt.tight_layout()

## Multiple Outputs AGL Model

### Single Optimization

In [None]:
from keras.models import Model
from keras.layers import *

# Difining the CNN model
input_shape = (img_size, img_size, 3)
inp = Input(shape=input_shape)

## Main layers
x = Conv2D(32, (3, 3), activation='relu')(inp)
x = MaxPooling2D((2, 2))(x)
x = Conv2D(64, (3, 3), activation='relu')(x)
x = MaxPooling2D((2, 2))(x)
x = Flatten()(x)
x = Dense(128, activation='relu')(x)
x = Dropout(0.5)(x)

## output layers
out1 = Dense(num_classes, activation='sigmoid')(x) ## label classification
out2 = Dense(2, activation='sigmoid')(x) ## gender-based explanation
out3 = Dense(4, activation='sigmoid')(x) # bbox-based explanation

model = Model(inp, [out1,out2,out3])

## EarlyStopping
best_loss = float('inf')
best_model_weights = None
patience = 10
early_stopping = False
val_loss_metric = keras.losses.SparseCategoricalCrossentropy()
###

loss_fn = keras.losses.SparseCategoricalCrossentropy()
loss_fn3 = keras.losses.MeanAbsoluteError()
optimizer = Adam()

n_epochs = 100

val_acc_metric1 = keras.metrics.SparseCategoricalAccuracy()
val_acc_metric2 = keras.metrics.SparseCategoricalAccuracy()
val_acc_metric3 = keras.metrics.MeanAbsoluteError()

for epoch in range(n_epochs):
  print("\nEpoch %d" % (epoch,))
  start_time = time.time()
  for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
    with tf.GradientTape() as tape:
        logits1, logits2, logits3 = model(x_batch_train, training=True)  # Logits for this minibatch
        loss_value1 = loss_fn(y_batch_train[:,0], logits1)
        loss_value2 = loss_fn(y_batch_train[:,1], logits2) ## Adding constraint loss for explainability
        loss_value3 = loss_fn3(y_batch_train[:,2:], logits3) ## Adding the BBox loss for explainability
        loss_value = loss_value1 + loss_value2 + loss_value3
    grads = tape.gradient(loss_value, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))

    if step % batch_size == 0:
        print(
            "Training loss (for one batch) at step %d: %.4f"
            % (step, float(loss_value))
        )
        print("Seen so far: %s samples" % ((step + 1) * batch_size))


  for x_batch_val, y_batch_val in validation_dataset:
      val_logits1, val_logits2, val_logits3 = model(x_batch_val, training=False)
      val_acc_metric1.update_state(y_batch_val[:,0], val_logits1)
      val_acc_metric2.update_state(y_batch_val[:,1], val_logits2)
      val_acc_metric3.update_state(y_batch_val[:,2:], val_logits3)

      val_loss = val_loss_metric(y_batch_val[:,0], val_logits1)

  val_acc1 = val_acc_metric1.result()
  val_acc_metric1.reset_state()

  val_acc2 = val_acc_metric2.result()
  val_acc_metric2.reset_state()

  val_error3 = val_acc_metric3.result()
  val_acc_metric3.reset_state()
  print("Validation acc: %.4f %.4f %.4f" % (float(val_acc1),float(val_acc2), float(val_error3)))
  print("Time taken: %.2fs" % (time.time() - start_time))

  if early_stopping:
    # Early stopping
    if float(val_loss) < best_loss:
        best_loss = float(val_loss)
        best_model_weights = copy.deepcopy(model.get_weights())  # Deep copy here
        patience = 10  # Reset patience counter
        print(f"Early Stopping Restart")
    else:
        patience -= 1
        print(f"Early Stopping Patience {patience}")
        if patience == 0:
            break

model_weights_backup = model.get_weights() ## Storing the model weights as a backup

if early_stopping:
  model.set_weights(best_model_weights) ## loading the best model from early stopping

### Seperate Optimization

In [None]:
from keras.models import Model
from keras.layers import *

# Defining the CNN model
input_shape = (img_size, img_size, 3)
inp = Input(shape=input_shape)

## Main layers
x = Conv2D(32, (3, 3), activation='relu')(inp)
x = MaxPooling2D((2, 2))(x)
x = Conv2D(64, (3, 3), activation='relu')(x)
x = MaxPooling2D((2, 2))(x)
x = Flatten()(x)
x = Dense(128, activation='relu')(x)
x = Dropout(0.5)(x)

## output layers
out1 = Dense(num_classes, activation='sigmoid')(x)
out2 = Dense(2, activation='sigmoid')(x) ## gender-based explanation
out3 = Dense(4, activation='sigmoid')(x) # bbox-based explanation

model = Model(inp, [out1,out2,out3])

#####################################
#####################################

loss_fn1 = keras.losses.SparseCategoricalCrossentropy()
loss_fn2 = keras.losses.SparseCategoricalCrossentropy()
loss_fn3 = keras.losses.MeanAbsoluteError()
optimizer1 = Adam()
optimizer2 = Adam()
optimizer3 = SGD()

n_epochs = 100

val_acc_metric1 = keras.metrics.SparseCategoricalAccuracy()
val_acc_metric2 = keras.metrics.SparseCategoricalAccuracy()
val_acc_metric3 = keras.metrics.MeanAbsoluteError()

for epoch in range(n_epochs):
  print("\nEpoch %d" % (epoch,))
  start_time = time.time()
  for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
    with tf.GradientTape() as tape0:
      with tf.GradientTape(persistent=True) as tape1:
          logits1, logits2, logits3 = model(x_batch_train, training=True)  # Logits for this minibatch

          loss_value1 = loss_fn1(y_batch_train[:,0], logits1) ## Loss for image classification
          loss_value2 = loss_fn2(y_batch_train[:,1], logits2) ## Adding constraint loss for explainability
          loss_value3 = loss_fn3(y_batch_train[:,2:], logits3) ## Adding the BBox loss for explainability


      variables1 = model.trainable_weights[0:8] ##6,7
      variables2 = model.trainable_weights[0:6] + model.trainable_weights[8:10] ##8,9
      variables3 = model.trainable_weights[0:6] + model.trainable_weights[10:] ##10,11

      grads1 = tape1.gradient(loss_value1, variables1)
      optimizer1.apply_gradients(zip(grads1, variables1))

      grads2 = tape1.gradient(loss_value2, variables2)
      optimizer2.apply_gradients(zip(grads2, variables2))

      grads3 = tape1.gradient(loss_value3, variables3)
      optimizer3.apply_gradients(zip(grads3, variables3))


    if step % batch_size == 0:
        print(
            "Training loss (for one batch) at step %d: %.4f %.4f %.4f"
            % (step, float(loss_value1), float(loss_value2), float(loss_value3))
        )
        print("Seen so far: %s samples" % ((step + 1) * batch_size))


  for x_batch_val, y_batch_val in validation_dataset:
      val_logits1, val_logits2, val_logits3 = model(x_batch_val, training=False)
      val_acc_metric1.update_state(y_batch_val[:,0], val_logits1)
      val_acc_metric2.update_state(y_batch_val[:,1], val_logits2)
      val_acc_metric3.update_state(y_batch_val[:,2:], val_logits3)

  val_acc1 = val_acc_metric1.result()
  val_acc_metric1.reset_state()

  val_acc2 = val_acc_metric2.result()
  val_acc_metric2.reset_state()

  val_error3 = val_acc_metric3.result()
  val_acc_metric3.reset_state()
  print("Validation acc: %.4f %.4f %.4f" % (float(val_acc1),float(val_acc2), float(val_error3)))
  print("Time taken: %.2fs" % (time.time() - start_time))

### GradCAMs

In [None]:
model.summary()

In [None]:
conv_layers_names = []
for layer in model.layers:
  print(layer)
  if 'conv' in layer.name:
    conv_layers_names.append(layer.name)

print("\nConvolution layer names:", conv_layers_names)

In [None]:
last_conv_layer_name = conv_layers_names[-1]
model_last_layer = model.layers[-3].name
last_conv_layer_name, model_last_layer

In [None]:
batches = []
batches_labels = []
for x_batch_val, y_batch_val in validation_dataset:
  img, label = x_batch_val, y_batch_val
  batches.append(img)
  batches_labels.append(label)

In [None]:
len(batches_labels), len(batches_labels[0])

In [None]:
batch_index = 1
img = batches[batch_index]
label = batches_labels[batch_index]
print(img.shape, label.shape)
predictions = model.predict(img)
print("Batch Predictions:", np.argmax(predictions[0], axis=1))
print("True Labels:", np.array(label[:,0]))

index = 4
img_test = np.expand_dims(img[index], axis=0)
print(img_test.shape)
pred1, pred2, pred3 = model.predict(img_test)
print(f"Prediction Vector: {pred1, pred2, pred3}; \nPredicted Class:{tf.argmax(pred1, axis=1)}; True Class:{label[index][0]}")

GradCAM index 0

In [None]:
heatmap = gradcam(img_test, model, last_conv_layer_name, model_last_layer, pred_index=0)
print(np.mean(heatmap))
#######################
heatmap_ = np.uint8(255 * heatmap)
jet = mpl.colormaps["jet"]

# Use RGB values of the colormap
jet_colors = jet(np.arange(256))[:, :3]
jet_heatmap = jet_colors[heatmap_]

# Create an image with RGB colorized heatmap
jet_heatmap = keras.utils.array_to_img(jet_heatmap)
jet_heatmap = jet_heatmap.resize((img_test.shape[1], img_test.shape[1]))
jet_heatmap = keras.utils.img_to_array(jet_heatmap)

print(img_test[0].shape, jet_heatmap.shape)

# Superimpose the heatmap on original image
superimposed_img = 0.002*jet_heatmap + img_test[0]
superimposed_img = keras.utils.array_to_img(superimposed_img)
superimposed_img = superimposed_img.resize((256, 256))

# Display Grad CAM
fig, axes = plt.subplots(1, 2, figsize=(10, 5))
axes[0].matshow(heatmap_)
axes[0].set_title("Heatmap")
axes[0].axis('off')
axes[1].imshow(superimposed_img)
axes[1].set_title("Super Imposed Image")
axes[1].axis('off')
plt.show()

GradCAM index1

In [None]:
heatmap = gradcam(img_test, model, last_conv_layer_name, model_last_layer, pred_index=1)
print(np.mean(heatmap))
#######################
heatmap_ = np.uint8(255 * heatmap)
jet = mpl.colormaps["jet"]

# Use RGB values of the colormap
jet_colors = jet(np.arange(256))[:, :3]
jet_heatmap = jet_colors[heatmap_]

# Create an image with RGB colorized heatmap
jet_heatmap = keras.utils.array_to_img(jet_heatmap)
jet_heatmap = jet_heatmap.resize((img_test.shape[1], img_test.shape[1]))
jet_heatmap = keras.utils.img_to_array(jet_heatmap)

print(img_test[0].shape, jet_heatmap.shape)

# Superimpose the heatmap on original image
superimposed_img = 0.002*jet_heatmap + img_test[0]
superimposed_img = keras.utils.array_to_img(superimposed_img)
superimposed_img = superimposed_img.resize((256, 256))

# Display Grad CAM
fig, axes = plt.subplots(1, 2, figsize=(10, 5))
axes[0].matshow(heatmap_)
axes[0].set_title("Heatmap")
axes[0].axis('off')
axes[1].imshow(superimposed_img)
axes[1].set_title("Super Imposed Image")
axes[1].axis('off')
plt.show()

GradCAM index2

In [None]:
heatmap = gradcam(img_test, model, last_conv_layer_name, model_last_layer, pred_index=2)
print(np.mean(heatmap))
#######################
heatmap_ = np.uint8(255 * heatmap)
jet = mpl.colormaps["jet"]

# Use RGB values of the colormap
jet_colors = jet(np.arange(256))[:, :3]
jet_heatmap = jet_colors[heatmap_]

# Create an image with RGB colorized heatmap
jet_heatmap = keras.utils.array_to_img(jet_heatmap)
jet_heatmap = jet_heatmap.resize((img_test.shape[1], img_test.shape[1]))
jet_heatmap = keras.utils.img_to_array(jet_heatmap)

print(img_test[0].shape, jet_heatmap.shape)

# Superimpose the heatmap on original image
superimposed_img = 0.002*jet_heatmap + img_test[0]
superimposed_img = keras.utils.array_to_img(superimposed_img)
superimposed_img = superimposed_img.resize((256, 256))

# Display Grad CAM
fig, axes = plt.subplots(1, 2, figsize=(10, 5))
axes[0].matshow(heatmap_)
axes[0].set_title("Heatmap")
axes[0].axis('off')
axes[1].imshow(superimposed_img)
axes[1].set_title("Super Imposed Image")
axes[1].axis('off')
plt.show()

### BBOX

Illustrating the true and the predicted BBOXes

In [None]:
## The following index is according to the index especified in the GradCAM section.
## The image is sampled from the GradCAM selected batch

img_sample = img[index]
True_Class = label[index][0]
## The predicted BBOX requires denormalization due to the original information being normalized to fit the neural network
## Therefore, to denormalize the image, we have two option:
## 1) use the norm of of original bbox values and multiply it into the predicted values
## 2) use the image size and treat the predicted values as the ratios to be mapped on the X-ray images.

box_pred = pred3[0]*np.linalg.norm(bbox) #*img_size #
box_true = label[index][2:] * np.linalg.norm(bbox)

box_true, box_pred

In [None]:
# Displaying a sample image with bbox
fig, ax = plt.subplots()
ax.imshow(img_sample)
print(f"Annotation: {True_Class}; Predicted: {np.argmax(pred1[0])}")

## Create a Rectangle patch
x_true = box_true[0].numpy()
y_true = box_true[1].numpy()
width_true = box_true[2].numpy()
height_true = box_true[3].numpy()

x_pred = box_pred[0]
y_pred = box_pred[1]
width_pred = box_pred[2]
height_pred = box_pred[3]

print(f"True Box: {(x_true, y_true, width_true, height_true)}")

rect1 = patches.Rectangle((x_true, y_true), width_true, height_true, linewidth=1, edgecolor='r', facecolor='none')
rect2 = patches.Rectangle((x_pred, y_pred), width_pred, height_pred, linewidth=1, edgecolor='b', facecolor='none')

ax.add_patch(rect1)
ax.add_patch(rect2)
plt.tight_layout()

## Fairness Metrics, AUC, SPD

In [None]:
predictions = []
true_labels = []

for x_batch_val, y_batch_val in validation_dataset:
  img, label = x_batch_val, y_batch_val
  pred1, pred2, _ = model.predict(img)
  predictions.append((tf.argmax(pred1, axis=1).numpy(), tf.argmax(pred2, axis=1).numpy()))
  true_labels.append((label[:,0].numpy(), label[:,1].numpy()))

In [None]:
y_pred = []
y_true = []
for pred in predictions:
  for indx in range(len(pred[0])):
    y_pred.append([pred[0][indx], pred[1][indx]])

for label in true_labels:
  for indx in range(len(label[0])):
    y_true.append([label[0][indx], label[1][indx]])

y_pred = np.array(y_pred)
y_true = np.array(y_true)

y_pred.shape, y_true.shape

In [None]:
from fairlearn.metrics import demographic_parity_difference, demographic_parity_ratio
## These metrics are for binary classification

def binary_map(input_labels):
    labels = []
    for item in input_labels:
      if item != 0:
        labels.append(1)
      else:
        labels.append(item)
    return labels

## Multi-class classification
# img_preds = y_pred[:,0]
# img_true = y_true[:,0]

## Binary classification
img_preds = binary_map(y_pred[:,0])
img_true = binary_map(y_true[:,0])
gender = [{0:'F', 1:'M'}[l] for l in y_true[:,1]]

print("Demographic Parity Difference:", demographic_parity_difference(img_true, img_preds, sensitive_features=gender))
print("Demographic Parity Ratio:", demographic_parity_ratio(img_true, img_preds, sensitive_features=gender))

## Multi-class
# # 0.03125
# # 0.8
# 0.02083333333333333
# 0.8333333333333334

## Binary Mapped Values
# 0.03125
# 0.875

In [None]:
from fairlearn.metrics import equalized_odds_difference, equalized_odds_ratio
print("Equality Odds Difference:", equalized_odds_difference(img_true, img_preds, sensitive_features=gender))
print("Equality Odds Ratio:", equalized_odds_ratio(img_true, img_preds, sensitive_features=gender))

## Binary Mapped Values
# 0.375
# 0.5

# 0.25
# 0.6666666666666666

In [None]:
from sklearn import metrics
from aif360.sklearn.metrics import statistical_parity_difference

AUC = metrics.roc_auc_score(img_true, img_preds)
print("AUC = ", AUC)

SPD = statistical_parity_difference(pd.DataFrame(img_true), np.array(img_preds))
print("SPD = ", SPD)

# Enhanced Augmented Learning: Fairness and Explainability

### Data Preparation

In [None]:
Xray_images = []
labels = []
genders = []
bbox = []
for key in Dataset.keys():
  for img in Dataset[key]:
    Xray_images.append(img[1])
    genders.append({'F':0, 'M':1}[img[2]])
    labels.append(img[4])
    bbox.append(img[5])

num_classes = len(set(labels))
print("Num Classes: ", num_classes)

Xray_images = np.array(Xray_images)
labels = np.array(labels)
genders = np.array(genders)
bbox = np.array(bbox)

# bbox_normalized1 = normalize(bbox, ord=1)
bbox_normalized =  bbox / np.linalg.norm(bbox)
# bbox_denormalized = bbox_normalized2 * np.linalg.norm(bbox)

labels = np.expand_dims(labels,axis=1)
genders = np.expand_dims(genders,axis=1)

outputs = np.concatenate([labels, genders, bbox_normalized], axis=1)
print(Xray_images.shape, labels.shape, genders.shape, bbox.shape,outputs.shape)

In [None]:
# Split data into training and validation sets
# X_train, X_val, y_train, y_val = train_test_split(Xray_images, outputs, test_size=0.2, shuffle=False)
X_train, X_val, y_train, y_val = train_test_split(Xray_images, outputs, test_size=0.2, random_state=42)

In [None]:
print("Training Samples:", X_train.shape, y_train.shape)
print("Validation Samples:", X_val.shape, y_val.shape)

In [None]:
batch_size = 16

In [None]:
train_dataset = tf.data.Dataset.from_tensor_slices((tf.convert_to_tensor(X_train), tf.convert_to_tensor(y_train)))
train_dataset = train_dataset.batch(batch_size)

validation_dataset = tf.data.Dataset.from_tensor_slices((tf.convert_to_tensor(X_val), tf.convert_to_tensor(y_val)))
validation_dataset = validation_dataset.batch(batch_size)

### Main Model

In [None]:
from keras.models import Model
from keras.layers import *


cons_landas = {key: 0 for key in range(2)}
meu = 0.5
C = 0.8
delta = 0.001
threshold = 0.5

# Define the CNN model
input_shape = (img_size, img_size, 3)
inp = Input(shape=input_shape)

## Main layers
x = Conv2D(32, (3, 3), activation='relu')(inp)
x = MaxPooling2D((2, 2))(x)
x = Conv2D(64, (3, 3), activation='relu')(x)
x = MaxPooling2D((2, 2))(x)
x = Flatten()(x)
x = Dense(128, activation='relu')(x)
x = Dropout(0.5)(x)

## output layers
out1 = Dense(num_classes, activation='sigmoid')(x)
out2 = Dense(2, activation='sigmoid')(x) ## gender-based explanation
out3 = Dense(4, activation='sigmoid')(x) # bbox-based explanation

model = Model(inp, [out1,out2,out3])

## EarlyStopping
best_loss = float('inf')
const_best_model_weights = None
patience_reset = 10
patience = patience_reset
early_stopping = False
val_loss_metric = keras.losses.SparseCategoricalCrossentropy()

###

loss_fn = keras.losses.SparseCategoricalCrossentropy()
loss_fn3 = keras.losses.MeanAbsoluteError()
optimizer = Adam()

n_epochs = 30

val_acc_metric1 = keras.metrics.SparseCategoricalAccuracy()
val_acc_metric2 = keras.metrics.SparseCategoricalAccuracy()
val_acc_metric3 = keras.metrics.MeanAbsoluteError()

for epoch in range(n_epochs):
  print("\nEpoch %d" % (epoch,))
  start_time = time.time()
  for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
    with tf.GradientTape() as tape:
        logits1, logits2, logits3 = model(x_batch_train, training=True)  # Logits for this minibatch
        loss_value1 = loss_fn(y_batch_train[:,0], logits1)

        loss_value2 = loss_fn(y_batch_train[:,1], logits2) ## Adding Gender constraint loss for explainability
        loss_value3 = loss_fn3(y_batch_train[:,2:6], logits3) ## Adding the BBox loss for explainability

        consts_loss = [loss_value2, loss_value3]
        consts_0 = cons_landas[0]*consts_loss[0] + (meu/2)*(max(0, consts_loss[0])**2)
        consts_1 = cons_landas[1]*consts_loss[1] + (meu/2)*(max(0, consts_loss[1])**2)
        final_loss = sum([consts_0, consts_1])

        theta_loss = 0
        for weight in model.get_weights():
          theta_loss += delta*tf.nn.l2_loss(weight)

        loss_value = loss_value1 + theta_loss + final_loss

    grads = tape.gradient(loss_value, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))


    cons_landas[0] += meu*max(0, consts_0)
    cons_landas[1] += meu*max(0, consts_1)

    sum_consts = 0
    sum_consts += max(0, consts_0)**2
    sum_consts += max(0, consts_1)**2

    if sum_consts > threshold:
      meu *= C

    if step % batch_size == 0:
        print(
            "Training loss (for one batch) at step %d: %.4f %.4f %.4f %.4f"
            % (step, float(loss_value1), float(loss_value), float(final_loss), float(theta_loss))
        )
        print("Seen so far: %s samples" % ((step + 1) * batch_size))


  for x_batch_val, y_batch_val in validation_dataset:
      val_logits1, val_logits2, val_logits3 = model(x_batch_val, training=False)
      val_acc_metric1.update_state(y_batch_val[:,0], val_logits1)
      val_acc_metric2.update_state(y_batch_val[:,1], val_logits2)
      val_acc_metric3.update_state(y_batch_val[:,2:6], val_logits3)

      val_loss = val_loss_metric(y_batch_val[:,0], val_logits1)

  val_acc1 = val_acc_metric1.result()
  val_acc_metric1.reset_state()

  val_acc2 = val_acc_metric2.result()
  val_acc_metric2.reset_state()

  val_error3 = val_acc_metric3.result()
  val_acc_metric3.reset_state()
  print("Validation acc: %.4f %.4f %.4f %.4f" % (float(val_acc1),float(val_acc2), float(val_error3), float(val_loss)))
  print("Time taken: %.2fs" % (time.time() - start_time))

  if early_stopping:
    # Early stopping
    if float(val_loss) < best_loss:
        best_loss = float(val_loss)
        const_best_model_weights = copy.deepcopy(model.get_weights())  # Deep copy here
        patience = patience_reset  # Reset patience counter
        print(f"Early Stopping Restart")
    else:
        patience -= 1
        print(f"Early Stopping Patience {patience}")
        if patience == 0:
            break


model_backup = copy.deepcopy(model)

if early_stopping:
  model.set_weights(const_best_model_weights)
  print("Best Loss:", best_loss)

In [None]:
# model.set_weights(model_backup.get_weights())

### GradCAMs

In [None]:
model.summary()

In [None]:
conv_layers_names = []
for layer in model.layers:
  print(layer)
  if 'conv' in layer.name:
    conv_layers_names.append(layer.name)

print("\nConvolution layer names:", conv_layers_names)

In [None]:
last_conv_layer_name = conv_layers_names[-1]
model_last_layer = model.layers[-3].name
last_conv_layer_name, model_last_layer

In [None]:
batches = []
batches_labels = []
for x_batch_val, y_batch_val in validation_dataset:
  img, label = x_batch_val, y_batch_val
  batches.append(img)
  batches_labels.append(label)

In [None]:
len(batches_labels), len(batches_labels[0])

In [None]:
batch_index = 0
img = batches[batch_index]
label = batches_labels[batch_index]

predictions = model.predict(img)
print("Batch Predictions:", np.argmax(predictions[0], axis=1))
print("True Labels:", np.array(label[:,0]))

index = 0
print(f"Batch Index: {batch_index}; Image Index: {index}")
# img, label = validation_dataset.get_single_element()
img_test = np.expand_dims(img[index], axis=0)
pred1, pred2, pred3 = model.predict(img_test)
print(f"Prediction Vector: {pred1, pred2, pred3}; \nPredicted Class:{tf.argmax(pred1, axis=1)}; True Class:{label[index][0]}")

GradCAM index 0

In [None]:
heatmap = gradcam(img_test, model, last_conv_layer_name, model_last_layer, pred_index=0)
print(np.mean(heatmap))
#######################
heatmap_ = np.uint8(255 * heatmap)
jet = mpl.colormaps["jet"]

# Use RGB values of the colormap
jet_colors = jet(np.arange(256))[:, :3]
jet_heatmap = jet_colors[heatmap_]

# Create an image with RGB colorized heatmap
jet_heatmap = keras.utils.array_to_img(jet_heatmap)
jet_heatmap = jet_heatmap.resize((img_test.shape[1], img_test.shape[1]))
jet_heatmap = keras.utils.img_to_array(jet_heatmap)

print(img_test[0].shape, jet_heatmap.shape)

# Superimpose the heatmap on original image
superimposed_img = 0.002*jet_heatmap + img_test[0]
superimposed_img = keras.utils.array_to_img(superimposed_img)
superimposed_img = superimposed_img.resize((256, 256))

# Display Grad CAM
fig, axes = plt.subplots(1, 2, figsize=(10, 5))
axes[0].matshow(heatmap_)
axes[0].set_title("Heatmap")
axes[0].axis('off')
axes[1].imshow(superimposed_img)
axes[1].set_title("Super Imposed Image")
axes[1].axis('off')
plt.show()

GradCAM index 1

In [None]:
heatmap = gradcam(img_test, model, last_conv_layer_name, model_last_layer, pred_index=1)
print(np.mean(heatmap))
#######################
heatmap_ = np.uint8(255 * heatmap)
jet = mpl.colormaps["jet"]

# Use RGB values of the colormap
jet_colors = jet(np.arange(256))[:, :3]
jet_heatmap = jet_colors[heatmap_]

# Create an image with RGB colorized heatmap
jet_heatmap = keras.utils.array_to_img(jet_heatmap)
jet_heatmap = jet_heatmap.resize((img_test.shape[1], img_test.shape[1]))
jet_heatmap = keras.utils.img_to_array(jet_heatmap)

print(img_test[0].shape, jet_heatmap.shape)

# Superimpose the heatmap on original image
superimposed_img = 0.002*jet_heatmap + img_test[0]
superimposed_img = keras.utils.array_to_img(superimposed_img)
superimposed_img = superimposed_img.resize((256, 256))

# Display Grad CAM
fig, axes = plt.subplots(1, 2, figsize=(10, 5))
axes[0].matshow(heatmap_)
axes[0].set_title("Heatmap")
axes[0].axis('off')
axes[1].imshow(superimposed_img)
axes[1].set_title("Super Imposed Image")
axes[1].axis('off')
plt.show()

GradCAM index 2

In [None]:
heatmap = gradcam(img_test, model, last_conv_layer_name, model_last_layer, pred_index=2)
print(np.mean(heatmap))
#######################
heatmap_ = np.uint8(255 * heatmap)
jet = mpl.colormaps["jet"]

# Use RGB values of the colormap
jet_colors = jet(np.arange(256))[:, :3]
jet_heatmap = jet_colors[heatmap_]

# Create an image with RGB colorized heatmap
jet_heatmap = keras.utils.array_to_img(jet_heatmap)
jet_heatmap = jet_heatmap.resize((img_test.shape[1], img_test.shape[1]))
jet_heatmap = keras.utils.img_to_array(jet_heatmap)

print(img_test[0].shape, jet_heatmap.shape)

# Superimpose the heatmap on original image
superimposed_img = 0.002*jet_heatmap + img_test[0]
superimposed_img = keras.utils.array_to_img(superimposed_img)
superimposed_img = superimposed_img.resize((256, 256))

# Display Grad CAM
fig, axes = plt.subplots(1, 2, figsize=(10, 5))
axes[0].matshow(heatmap_)
axes[0].set_title("Heatmap")
axes[0].axis('off')
axes[1].imshow(superimposed_img)
axes[1].set_title("Super Imposed Image")
axes[1].axis('off')
plt.show()

### BBOX

Illustrating a sample BBOX on an X-ray image

In [None]:
img_sample = img[index]
True_Class = label[index][0]
## The predicted BBOX requires denormalization due to the original information being normalized to fit the neural network
## Therefore, to denormalize the image, we have two option:
## 1) use the norm of of original bbox values and multiply it into the predicted values
## 2) use the image size and treat the predicted values as the ratios to be mapped on the X-ray images.


box_pred = pred3[0]*np.linalg.norm(bbox)
# box_pred = pred3[0]*img_size

d = pred3[0]*np.linalg.norm(bbox)
print("Denormalized Box info:", d)
box_true = label[index][2:] * np.linalg.norm(bbox)

box_true, box_pred

In [None]:
# Displaying a sample image with bbox
fig, ax = plt.subplots()
ax.imshow(img_sample)
print(f"Annotation: {True_Class}; Predicted: {np.argmax(pred1[0])}")

## Create a Rectangle patch
x_true = box_true[0].numpy()
y_true = box_true[1].numpy()
width_true = box_true[2].numpy()
height_true = box_true[3].numpy()

x_pred = box_pred[0]
y_pred = box_pred[1]
width_pred = box_pred[2]
height_pred = box_pred[3]

print(f"True Box: {(x_true, y_true, width_true, height_true)}")

rect1 = patches.Rectangle((x_true, y_true), width_true, height_true, linewidth=1, edgecolor='r', facecolor='none')
rect2 = patches.Rectangle((x_pred, y_pred), width_pred, height_pred, linewidth=1, edgecolor='b', facecolor='none')

ax.add_patch(rect1)
ax.add_patch(rect2)
plt.tight_layout()

### Fairness Metrics

In [None]:
predictions = []
true_labels = []

## If the next code threw an error unexpectedly, this code will work.
for x_batch_val, y_batch_val in validation_dataset:
  img, label = x_batch_val, y_batch_val
  pred1, pred2, _ = model.predict(img)
  predictions.append((tf.argmax(pred1, axis=1).numpy(), tf.argmax(pred2, axis=1).numpy()))
  true_labels.append((label[:,0].numpy(), label[:,1].numpy()))

In [None]:
y_pred = []
y_true = []
for pred in predictions:
  for indx in range(len(pred[0])):
    y_pred.append([pred[0][indx], pred[1][indx]])

for label in true_labels:
  for indx in range(len(label[0])):
    y_true.append([label[0][indx], label[1][indx]])

y_pred = np.array(y_pred)
y_true = np.array(y_true)

y_pred.shape, y_true.shape

In [None]:
print("Accuracy: ", accuracy_score(y_true[:,0], y_pred[:,0]))

In [None]:
from fairlearn.metrics import demographic_parity_difference, demographic_parity_ratio
## These metrics are for binary classification

def binary_map(input_labels):
    labels = []
    for item in input_labels:
      if item != 0:
        labels.append(1)
      else:
        labels.append(int(item))
    return labels

## Multi-class predictions
# img_preds = y_pred[:,0]
# img_true = y_true[:,0]

## Binary classification
img_preds = binary_map(y_pred[:,0])
img_true = binary_map(y_true[:,0])
gender = [{0:'F', 1:'M'}[l] for l in y_true[:,1]]

print("Demographic Parity Difference =", demographic_parity_difference(img_true, img_preds, sensitive_features=gender))
print("Demographic Parity Ratio =",demographic_parity_ratio(img_true, img_preds, sensitive_features=gender))

## Multi-class
## 0.03125
## 0.8
# 0.02083333333333333
# 0.8333333333333334

## Binary Mapped Values
# 0.03125
# 0.875

In [None]:
from fairlearn.metrics import equalized_odds_difference, equalized_odds_ratio
print("Equally Odds Difference =", equalized_odds_difference(img_true, img_preds, sensitive_features=gender))
print("Equally Odds Ratio =", equalized_odds_ratio(img_true, img_preds, sensitive_features=gender))

## Binary Mapped Values
# 0.375
# 0.5

# 0.25
# 0.6666666666666666

### AUC and SPD

In [None]:
from sklearn import metrics
from aif360.sklearn.metrics import statistical_parity_difference

AUC = metrics.roc_auc_score(img_true, img_preds)
print("AUC = ", AUC)

SPD = statistical_parity_difference(pd.DataFrame(img_true), np.array(img_preds))
print("SPD = ", SPD)