In [None]:
from background_process import *

from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_score, recall_score
from skimage.filters import threshold_otsu as otsu
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
import torch.nn as nn
from scipy.ndimage import center_of_mass
from PIL import Image
import matplotlib.pyplot as plt
import re, ast

# CheXpert Processing

In [2]:
def processed_file_name(img_path):
    name = os.path.basename(img_path)
    pre = os.path.dirname(img_path)

    name = name.split('.')[0] + '.png' if not name.endswith('.png') else name
    name = 'PROCESSED_' + name
    return os.path.join(pre, name)

In [None]:
chex = pd.read_csv('/Data/CheX_Data/chexpertchestxrays-u20210408/train_cheXbert_w_views.csv')

# only frontal images and no empty pneumonia values
chex = chex[(chex.ViewAgreement == True) & (chex['Frontal/Lateral'] == 'Frontal') & (~chex.Pneumonia.isna())]

# after processing using CXR_Processor
chex['Processed_Path'] = chex.Path.apply(get_chex_img_path).apply(processed_file_name)
chex['Processed_Exists'] = chex['Processed_Path'].apply(os.path.isfile)
chex = chex[chex['Processed_Exists'] == True]     

# drop uncertain diagnoses
chex = chex[chex.Pneumonia.isin((0,1))]

chex['Patient'] = chex.Path.str[20:32]

chex

In [None]:
#chex['Pneumonia'].value_counts(normalize=True) * 100
chex['Pneumonia'].value_counts()

# VinDR Processing

In [9]:
# before processing

vindr = pd.read_csv('/Data/VinDR_Data/physionet.org/files/vindr-cxr/1.0.0/annotations/image_labels_merged.csv')

positive_df = vindr[vindr['Pneumonia'] == 1]
negative_df = vindr[vindr['Pneumonia'] == 0].sample(2 * len(positive_df), random_state=0)

new_df = pd.concat([positive_df, negative_df]).reset_index(drop=True)
new_df.to_csv('/Data/VinDR_Data/physionet.org/files/vindr-cxr/1.0.0/annotations/image_labels_merged_pneumonia.csv', index=False)

In [None]:
# after processing

vindr = pd.read_csv('/Data/VinDR_Data/physionet.org/files/vindr-cxr/1.0.0/annotations/image_labels_merged_pneumonia.csv')

vindr['Processed_Path'] = vindr.apply(get_vindr_img_path, axis=1).apply(processed_file_name)
vindr['Processed_Exists'] = vindr['Processed_Path'].apply(os.path.isfile)
vindr = vindr[vindr['Processed_Exists'] == True]

vindr['Patient'] = vindr.image_id
vindr

In [None]:
vindr['Pneumonia'].value_counts()

In [5]:
# Balance classes

vindr_positive = vindr[vindr['Pneumonia'] == 1]
vindr_negative = vindr[vindr['Pneumonia'] == 0].sample(len(vindr_positive), random_state=0)
#vindr_negative = vindr[vindr['Pneumonia'] == 0].sample(len(vindr_positive)*2, random_state=0) # twice as many negative 

vindr = pd.concat([vindr_positive, vindr_negative])

# PadChest Processing

In [None]:
# before processing

pad = pd.read_csv('/Data/BIMCV-PadChest-FULL/PADCHEST_chest_x_ray_images_labels_w_views.csv')
pad = pad[(pad.ViewAgreement == True) & (pad['Frontal/Lateral'] == 'Frontal') & (~pad.Labels.isna())]

pad['Pneumonia'] = pad.Labels.apply(lambda labels : 1 if 'pneumonia' in labels else 0)

positive_df = pad[pad['Pneumonia'] == 1]
negative_df = pad[pad['Pneumonia'] == 0].sample(2 * len(positive_df), random_state=0)
new_df = pd.concat([positive_df, negative_df])

new_df.to_csv('/Data/BIMCV-PadChest-FULL/PADCHEST_chest_x_ray_images_pneumonia.csv', index=False)

In [None]:
# after processing

pad = pd.read_csv('/Data/BIMCV-PadChest-FULL/PADCHEST_chest_x_ray_images_pneumonia.csv')

pad['Processed_Path'] = pad.apply(get_pad_img_path, axis=1).apply(processed_file_name)
pad['Processed_Exists'] = pad['Processed_Path'].apply(os.path.isfile)
pad = pad[pad['Processed_Exists'] == True]

pad['Patient'] = pad.PatientID
pad

In [None]:
#pad['Pneumonia'].value_counts(normalize=True) * 100
pad['Pneumonia'].value_counts()

In [None]:
# compare with MIMIC LLM + VinDr test set

pad_positive = pad[pad['Pneumonia'] == 1]
pad_negative = pad[pad['Pneumonia'] == 0].sample(len(pad_positive), random_state=0)
pad_merge = pd.concat([pad_positive, pad_negative])

pad_merge

In [None]:
# OR balance classes with CheXpert

pad_positive = pad[pad['Pneumonia'] == 1]
total_positive = len(pad_positive) + len(chex[chex['Pneumonia'] == 1])

diff = total_positive - len(chex[chex['Pneumonia'] == 0])
#diff = (2 * total_positive) - len(chex[chex['Pneumonia'] == 0]) # twice as many negative

pad_negative = pad[pad['Pneumonia'] == 0].sample(diff, random_state=0)

pad = pd.concat([pad_positive, pad_negative])

In [None]:
pad['Pneumonia'].value_counts(normalize=True) * 100

In [None]:
print(f'Pneumonia positive: {len(pad[pad["Pneumonia"] == 1]) + len(chex[chex["Pneumonia"] == 1])}')
print(f'Pneumonia negative: {len(pad[pad["Pneumonia"] == 0]) + len(chex[chex["Pneumonia"] == 0])}')

# NIH Processing

In [84]:
# before processing

nih = pd.read_csv('/Data/NIH_Data/Data_Entry_2017.csv')
nih['Pneumonia'] = nih['Finding Labels'].apply(lambda labels : 1 if 'Pneumonia' in labels else 0)

positive_df = nih[nih['Pneumonia'] == 1]
negative_df = nih[nih['Pneumonia'] == 0].sample(2 * len(positive_df), random_state=0)
new_df = pd.concat([positive_df, negative_df])

new_df.to_csv('/Data/NIH_Data/Data_Entry_2017_pneumonia.csv', index=False)

In [None]:
# after processing

nih = pd.read_csv('/Data/NIH_Data/Data_Entry_2017_pneumonia.csv')

nih['Processed_Path'] = nih['Image Index'].apply(get_nih_img_path).apply(processed_file_name)
nih['Processed_Exists'] = nih['Processed_Path'].apply(os.path.isfile)
nih = nih[nih['Processed_Exists'] == True]

nih['Patient'] = nih['Patient ID']
nih

In [None]:
nih.Pneumonia.value_counts()

In [None]:
# compare with MIMIC LLM + VinDr test set

nih_positive = nih[nih['Pneumonia'] == 1]
nih_negative = nih[nih['Pneumonia'] == 0].sample(len(nih_positive), random_state=0)
nih_merge = pd.concat([nih_positive, nih_negative])

nih_merge

# MIMIC Processing

In [None]:
mimic = pd.read_csv('/Data/physionet.org/files/mimic-cxr-jpg/2.0.0/mimic-cxr-pneumonia.csv')
mimic['dicom_id'] = mimic.Path.apply(lambda path: os.path.basename(path)[:-4])

mimic

In [None]:
# each image tagged with quality issues (bool), corrected views (FRONT/LAT), and gender mismatch in metadata (bool)
# performed in previous study of ours. doi: 10.1371/journal.pdig.0000835

mimic_meta = pd.read_csv('/Data/physionet.org/files/mimic-cxr-jpg/2.0.0/mimic-cxr-2.0.0-metadata-augmented-trimmed.csv')
mimic_meta

In [5]:
# only retain high-quality, frontal images
def mimic_include(dicom_id):
    row = mimic_meta[mimic_meta['dicom_id'] == dicom_id].iloc[0]
    if row['CorrectedView'] == 'FRONTAL' and not row['QualityIssue'] and not row['GenderMismatch']:
        return True
    return False

In [None]:
mimic['Keep'] = mimic['dicom_id'].apply(mimic_include)
mimic = mimic[mimic['Keep'] == True]
mimic

#mimic.to_csv('/Data/physionet.org/files/mimic-cxr-jpg/2.0.0/mimic-cxr-pneumonia-frontal.csv', index=False)

In [None]:
# after processing

mimic = pd.read_csv('/Data/physionet.org/files/mimic-cxr-jpg/2.0.0/mimic-cxr-pneumonia-frontal.csv')

mimic['Processed_Path'] = mimic.Path.apply(processed_file_name)
mimic['Processed_Exists'] = mimic['Processed_Path'].apply(os.path.isfile)
mimic = mimic[mimic['Processed_Exists'] == True]   

mimic

#mimic.to_csv('/Data/physionet.org/files/mimic-cxr-jpg/2.0.0/mimic-cxr-pneumonia-frontal-processed.csv', index=False)

In [None]:
mimic['Pneumonia'].value_counts()

In [5]:
# balance classes

mimic_positive = mimic[mimic['Pneumonia'] == 1]
mimic_negative = mimic[mimic['Pneumonia'] == 0].sample(len(mimic_positive), random_state=0)

mimic = pd.concat([mimic_positive, mimic_negative])

# MIMIC LLM Data

In [None]:
mimic_llm = pd.read_csv('/Data/physionet.org/files/mimic-cxr-jpg/2.0.0/mimic-cxr-pneumonia-full-wreports_llm_complete.csv')
mimic_llm = mimic_llm[mimic_llm.LLM_class.isin((0,1))]

mimic_llm

In [12]:
mimic_llm['Keep'] = mimic_llm['dicom_id'].apply(mimic_include)
mimic_llm = mimic_llm[mimic_llm['Keep'] == True]
mimic_llm

#mimic_llm.to_csv('/Data/physionet.org/files/mimic-cxr-jpg/2.0.0/mimic-cxr-pneumonia-full-wreports_llm_complete_frontal.csv', index=False)

In [None]:
mimic_llm = pd.read_csv('/Data/physionet.org/files/mimic-cxr-jpg/2.0.0/mimic-cxr-pneumonia-full-wreports_llm_complete_frontal.csv')

mimic_llm['Processed_Path'] = mimic_llm.Path.apply(processed_file_name)
mimic_llm['Processed_Exists'] = mimic_llm['Processed_Path'].apply(os.path.isfile)
mimic_llm = mimic_llm[mimic_llm['Processed_Exists'] == True]

mimic_llm

In [None]:
mimic_llm.LLM_class.value_counts()

In [None]:
# balance classes

mimic_positive = mimic_llm[mimic_llm.LLM_class == 1]
mimic_negative = mimic_llm[mimic_llm.LLM_class == 0].sample(len(mimic_positive), random_state=0)

mimic = pd.concat([mimic_positive, mimic_negative])
mimic

# RSNA Processing

In [None]:
rsna = pd.read_csv('/Data/RSNA_Data/stage_2_train_labels.csv')

rsna['Processed_Path'] = rsna.patientId.apply(get_rsna_img_path).apply(processed_file_name)
rsna['Processed_Exists'] = rsna['Processed_Path'].apply(os.path.isfile)
rsna = rsna[rsna['Processed_Exists'] == True]

rsna['Patient'] = rsna.patientId
rsna['Pneumonia'] = rsna.Target

rsna

In [None]:
rsna.Pneumonia.value_counts()

In [None]:
rsna_positive = rsna[rsna['Pneumonia'] == 1]
rsna_negative = rsna[rsna['Pneumonia'] == 0].sample(len(rsna_positive), random_state=0)
rsna_merge = pd.concat([rsna_positive, rsna_negative])

rsna_merge

# Train-test-val split

In [19]:
mimic['Pneumonia'] = mimic.LLM_class # for LLM labels

In [42]:
#data_filtered = chex[['Processed_Path', 'Pneumonia', 'Patient']]
#data_filtered = pad[['Processed_Path', 'Pneumonia', 'Patient']]
#data_filtered = pad_merge[['Processed_Path', 'Pneumonia', 'Patient']]
#data_filtered = nih_merge[['Processed_Path', 'Pneumonia', 'Patient']]
#data_filtered = rsna_merge[['Processed_Path', 'Pneumonia', 'Patient']]
#data_filtered = mimic[['Processed_Path', 'Pneumonia', 'Patient']]
data_filtered = vindr[['Processed_Path', 'Pneumonia', 'Patient']]

data_filtered['Pneumonia'] = data_filtered['Pneumonia'].astype(int)

# assign label as the mode for each patient for stratification
patient_labels = data_filtered.groupby('Patient')['Pneumonia'].agg(lambda x: x.mode().iloc[0])
data_filtered = data_filtered.merge(patient_labels.rename('Patient_Label'), on='Patient')

unique_patients = data_filtered[['Patient', 'Patient_Label']].drop_duplicates()

# split patients (80-20) and balance labels
train_patients, val_test_patients = train_test_split(
    unique_patients,
    test_size=.2,
    stratify=unique_patients['Patient_Label'],
    random_state=0
)

# validation and test sets (50-50 split of 20%)
val_patients, test_patients = train_test_split(
    val_test_patients,
    test_size=.5,
    stratify=val_test_patients['Patient_Label'],
    random_state=0
)

train_data = data_filtered[data_filtered['Patient'].isin(train_patients['Patient'])]
val_data = data_filtered[data_filtered['Patient'].isin(val_patients['Patient'])]
test_data = data_filtered[data_filtered['Patient'].isin(test_patients['Patient'])]

In [None]:
train_distribution = train_data['Pneumonia'].value_counts(normalize=True) * 100
val_distribution = val_data['Pneumonia'].value_counts(normalize=True) * 100
test_distribution = test_data['Pneumonia'].value_counts(normalize=True) * 100

train_distribution, val_distribution, test_distribution

In [45]:
# no patient data leakage
train_patients_set = set(train_data['Patient'])
val_patients_set = set(val_data['Patient'])
test_patients_set = set(test_data['Patient'])

overlap_train_val = train_patients_set.intersection(val_patients_set)
overlap_train_test = train_patients_set.intersection(test_patients_set)
overlap_val_test = val_patients_set.intersection(test_patients_set)

assert len(overlap_train_val) == 0, 'leakage between train and val'
assert len(overlap_train_test) == 0, 'leakage between train and test'
assert len(overlap_val_test) == 0, 'leakage between val and test'

In [None]:
train_data.rename(columns={'Processed_Path': 'Path',
                           'Pneumonia': 'Label'}, inplace=True)
val_data.rename(columns={'Processed_Path': 'Path',
                           'Pneumonia': 'Label'}, inplace=True)
test_data.rename(columns={'Processed_Path': 'Path',
                           'Pneumonia': 'Label'}, inplace=True)

train_data

In [47]:
#train_data.to_csv('/Data/CheX_Data/train.csv', index=False)
#val_data.to_csv('/Data/CheX_Data/validation.csv', index=False)
#test_data.to_csv('/Data/CheX_Data/test.csv', index=False)

#train_data.to_csv('/Data/BIMCV-PadChest-FULL/train.csv', index=False)
#val_data.to_csv('/Data/BIMCV-PadChest-FULL/validation.csv', index=False)
#test_data.to_csv('/Data/BIMCV-PadChest-FULL/test.csv', index=False)

#train_data.to_csv('/Data/physionet.org/files/mimic-cxr-jpg/2.0.0/train.csv', index=False)
#val_data.to_csv('/Data/physionet.org/files/mimic-cxr-jpg/2.0.0/validation.csv', index=False)
#test_data.to_csv('/Data/physionet.org/files/mimic-cxr-jpg/2.0.0/test.csv', index=False)

#train_data.to_csv('/Data/physionet.org/files/mimic-cxr-jpg/2.0.0/train_nlp.csv', index=False)
#val_data.to_csv('/Data/physionet.org/files/mimic-cxr-jpg/2.0.0/validation_nlp.csv', index=False)
#test_data.to_csv('/Data/physionet.org/files/mimic-cxr-jpg/2.0.0/test_nlp.csv', index=False)
#train_data.to_csv('/Data/physionet.org/files/mimic-cxr-jpg/2.0.0/train_llm.csv', index=False)
#val_data.to_csv('/Data/physionet.org/files/mimic-cxr-jpg/2.0.0/validation_llm.csv', index=False)
#test_data.to_csv('/Data/physionet.org/files/mimic-cxr-jpg/2.0.0/test_llm.csv', index=False)

#train_data.to_csv('/Data/VinDR_Data/train.csv', index=False)
#val_data.to_csv('/Data/VinDR_Data/validation.csv', index=False)
#test_data.to_csv('/Data/VinDR_Data/test.csv', index=False)

#train_data.to_csv('/Data/NIH_Data/train.csv', index=False)
#val_data.to_csv('/Data/NIH_Data/validation.csv', index=False)
#test_data.to_csv('/Data/NIH_Data/test.csv', index=False)

train_data.to_csv('/Data/RSNA_Data/train.csv', index=False)
val_data.to_csv('/Data/RSNA_Data/validation.csv', index=False)
test_data.to_csv('/Data/RSNA_Data/test.csv', index=False)

In [39]:
# imbalanced sets

#train_data.to_csv('/Data/VinDR_Data/train_imbalanced.csv', index=False)
#val_data.to_csv('/Data/VinDR_Data/validation_imbalanced.csv', index=False)
#test_data.to_csv('/Data/VinDR_Data/test_imbalanced.csv', index=False)

#train_data.to_csv('/Data/BIMCV-PadChest-FULL/train_imbalanced.csv', index=False)
#val_data.to_csv('/Data/BIMCV-PadChest-FULL/validation_imbalanced.csv', index=False)
#test_data.to_csv('/Data/BIMCV-PadChest-FULL/test_imbalanced.csv', index=False)

train_data.to_csv('/Data/physionet.org/files/mimic-cxr-jpg/2.0.0/train_imbalanced.csv', index=False)
val_data.to_csv('/Data/physionet.org/files/mimic-cxr-jpg/2.0.0/validation_imbalanced.csv', index=False)
test_data.to_csv('/Data/physionet.org/files/mimic-cxr-jpg/2.0.0/test_imbalanced.csv', index=False)

# Final Data

In [None]:
chex_train = pd.read_csv('/Data/CheX_Data/train.csv')
pad_train = pd.read_csv('/Data/BIMCV-PadChest-FULL/train.csv')
mimic_train = pd.read_csv('/Data/physionet.org/files/mimic-cxr-jpg/2.0.0/train.csv')
vindr_train = pd.read_csv('/Data/VinDR_Data/train.csv')

chex_val = pd.read_csv('/Data/CheX_Data/validation.csv')
pad_val = pd.read_csv('/Data/BIMCV-PadChest-FULL/validation.csv')
mimic_val = pd.read_csv('/Data/physionet.org/files/mimic-cxr-jpg/2.0.0/validation.csv')
vindr_val = pd.read_csv('/Data/VinDR_Data/validation.csv')

chex_test = pd.read_csv('/Data/CheX_Data/test.csv')
pad_test = pd.read_csv('/Data/BIMCV-PadChest-FULL/test.csv')
mimic_test = pd.read_csv('/Data/physionet.org/files/mimic-cxr-jpg/2.0.0/test.csv')
vindr_test = pd.read_csv('/Data/VinDR_Data/test.csv')

train_data = pd.concat([chex_train, pad_train, mimic_train, vindr_train])
val_data = pd.concat([chex_val, pad_val, mimic_val, vindr_val])
test_data = pd.concat([chex_test, pad_test, mimic_test, vindr_test])

train_data

In [None]:
test_data = pd.read_csv('/Data/RSNA_Data/test.csv')
test_data

In [None]:
# imbalanced datasets

chex_train = pd.read_csv('/Data/CheX_Data/train.csv')
pad_train = pd.read_csv('/Data/BIMCV-PadChest-FULL/train_imbalanced.csv')
mimic_train = pd.read_csv('/Data/physionet.org/files/mimic-cxr-jpg/2.0.0/train_imbalanced.csv')
vindr_train = pd.read_csv('/Data/VinDR_Data/train_imbalanced.csv')

chex_val = pd.read_csv('/Data/CheX_Data/validation.csv')
pad_val = pd.read_csv('/Data/BIMCV-PadChest-FULL/validation_imbalanced.csv')
mimic_val = pd.read_csv('/Data/physionet.org/files/mimic-cxr-jpg/2.0.0/validation_imbalanced.csv')
vindr_val = pd.read_csv('/Data/VinDR_Data/validation_imbalanced.csv')

chex_test = pd.read_csv('/Data/CheX_Data/test.csv')
pad_test = pd.read_csv('/Data/BIMCV-PadChest-FULL/test_imbalanced.csv')
mimic_test = pd.read_csv('/Data/physionet.org/files/mimic-cxr-jpg/2.0.0/test_imbalanced.csv')
vindr_test = pd.read_csv('/Data/VinDR_Data/test_imbalanced.csv')

train_data = pd.concat([chex_train, pad_train, mimic_train, vindr_train])
val_data = pd.concat([chex_val, pad_val, mimic_val, vindr_val])
test_data = pd.concat([chex_test, pad_test, mimic_test, vindr_test])

train_data

In [2]:
mimic_test = pd.read_csv('/Data/physionet.org/files/mimic-cxr-jpg/2.0.0/test_llm.csv')
vindr_test = pd.read_csv('/Data/VinDR_Data/test.csv')

test_data = pd.concat([mimic_test, vindr_test])

In [10]:
# compare with MIMIC LLM + VinDr test set

#vindr_test[:-1].to_csv('/Data/VinDR_Data/final_test.csv', index=False)

mimic_positive = mimic_test[mimic_test['Label'] == 1].sample(70, random_state=0)
mimic_negative = mimic_test[mimic_test['Label'] == 0].sample(70, random_state=0)

mimic_test = pd.concat([mimic_positive, mimic_negative])

mimic_test.to_csv('/Data/physionet.org/files/mimic-cxr-jpg/2.0.0/final_test_llm.csv', index=False)

# Grad-CAM

In [4]:
class GradCAM:
    def __init__(self, model, target_layer):
        self.model = model
        self.target_layer = target_layer
        self.gradients = None
        self.activations = None

        # patch forward method if needed
        self.patch_forward()

        # save activations and gradients
        self.target_layer.register_forward_hook(self.save_activation)
        self.target_layer.register_full_backward_hook(self.save_gradient)

    def patch_forward(self):
        """
        For models like DenseNet that use in-place ReLU operations, override the forward function to use a non in-place ReLU
        """
        if hasattr(self.model, 'features') and hasattr(self.model, 'classifier'):
            def modified_forward(model_self, x):
                features = model_self.features(x)
                out = F.relu(features, inplace=False)
                out = F.adaptive_avg_pool2d(out, (1, 1))
                out = torch.flatten(out, 1)
                out = model_self.classifier(out)
                return out

            self.model.forward = modified_forward.__get__(self.model, type(self.model))

    def save_activation(self, module, input, output):
        self.activations = output.detach()

    def save_gradient(self, module, grad_input, grad_output):
        self.gradients = grad_output[0].detach().clone()

    def __call__(self, x, class_idx=None):
        """
        Compute Grad-CAM for input image tensor 'x'
        If class_idx is provided, compute for that class; otherwise, use the predicted class
        """
        output = self.model(x) # forward pass
        probabilities = F.softmax(output, dim=1)

        # class to visualise
        if class_idx is None:
            predicted_conf, predicted_class = torch.max(probabilities, dim=1)
            class_idx = predicted_class.item()
            confidence = predicted_conf.item()
        else:
            confidence = probabilities[0, class_idx].item()

        # backward pass with one-hot encoding for the target class
        self.model.zero_grad()
        one_hot = torch.zeros_like(output).to(output.device)
        one_hot[0, class_idx] = 1.0
        output.backward(gradient=one_hot, retain_graph=True)

        # global average pool the gradients, combine with activations
        weights = self.gradients.mean(dim=(2, 3), keepdim=True)
        grad_cam = torch.sum(weights * self.activations, dim=1).squeeze(0)
        grad_cam = F.relu(grad_cam)  # keep only positive influence

        # normalise to [0, 1]
        grad_cam -= grad_cam.min()
        if grad_cam.max() > 0:
            grad_cam /= grad_cam.max()
        grad_cam = grad_cam.cpu().numpy()

        return grad_cam, class_idx, confidence

def overlay_heatmap(heatmap, img, threshold=0, cmap=cv2.COLORMAP_JET):
    heatmap[heatmap < threshold] = 0  # zero out activations below threshold
    
    # resize heatmap to image size
    heatmap = cv2.resize(heatmap, (img.size[0], img.size[1]))

    # heatmap to colour map
    heatmap_color = cv2.applyColorMap(np.uint8(255 * heatmap), cmap)

    # BGR for opencv
    img_np = np.array(img)
    img_bgr = cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR)

    # overlay heatmap
    overlay = cv2.addWeighted(img_bgr, .5, heatmap_color, .5, 0)

    # back to RGB for matplotlib
    overlay_rgb = cv2.cvtColor(overlay, cv2.COLOR_BGR2RGB)

    return heatmap, overlay_rgb

def original_file_name(img_path):
    if 'PadChest' in img_path:
        return img_path.replace('PROCESSED_','')
    return img_path.replace('PROCESSED_','').replace('.png', '.jpg')

In [5]:
class_labels = {0: 'No pneumonia', 1: 'Pneumonia'}
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

#model = models.efficientnet_b5(pretrained=True)
#model = models.efficientnet_b6(pretrained=True)
#model = models.efficientnet_v2_m(pretrained=True)
model = models.densenet121(pretrained=True)

# efficientnet
#model.classifier = nn.Sequential(
#    nn.Linear(model.classifier[1].in_features, 512), 
#    nn.ReLU(), 
#    nn.BatchNorm1d(512), 
#    nn.Dropout(p=.3),
#    nn.Linear(512, 2)
#)

model.classifier = nn.Sequential(
    nn.Linear(model.classifier.in_features, 512), 
    nn.ReLU(), 
    nn.BatchNorm1d(512), 
    nn.Dropout(p=.3),
    nn.Linear(512, 2)
)

model.load_state_dict(torch.load('/Data/mimic_llm_model.pth'))
model = model.to(device)
model.eval()

process = transforms.Compose([
    transforms.Resize((480, 480)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[.485, .456, .406], std=[.229, .224, .225])
])

In [None]:
test_data = pd.read_csv('/Data/physionet.org/files/mimic-cxr-jpg/2.0.0/test_llm.csv')
test_data

In [None]:
data = test_data.reset_index(drop=True).loc[185]
img_path, truth = data.Path, data.Label

img = Image.open(img_path).convert('RGB')
img_tensor = process(img).unsqueeze(0).to(device)

# last convolutional layer
target_layer = model.features[-1] # efficientnet, densenet

grad_cam = GradCAM(model, target_layer)

heatmap, pred_class_idx, confidence = grad_cam(img_tensor)
heatmap, overlay_rgb = overlay_heatmap(heatmap, img, threshold=0)

true_class = class_labels[truth]
pred_class = class_labels[pred_class_idx]

_,ax = plt.subplots(ncols=2, figsize=(10,5), tight_layout=True)

ax[0].imshow(img)
ax[0].set_title(f'Ground truth: {true_class}')
ax[0].axis("off")

ax[1].imshow(overlay_rgb)
ax[1].set_title(f'Prediction: {pred_class} ({confidence:.2%})'.title())
ax[1].axis("off")

#plt.savefig('/Data/Figures/mimic_gradcam_2.svg')

In [8]:
def divide_lung(lung_mask):
    """Divides lung mask into three equal portions based on height"""
    
    # indices of lung pixels
    row_indices = torch.nonzero(lung_mask.sum(dim=1), as_tuple=True)[0]
    min_idx = row_indices.min().item()
    max_idx = row_indices.max().item()
    lung_height = max_idx - min_idx + 1

    # split boundaries
    split1 = min_idx + lung_height // 3
    split2 = min_idx + 2 * (lung_height // 3)

    # masks for each portion
    upper_mask = torch.zeros_like(lung_mask, dtype=torch.bool)
    middle_mask = torch.zeros_like(lung_mask, dtype=torch.bool)
    lower_mask = torch.zeros_like(lung_mask, dtype=torch.bool)

    upper_mask[min_idx:split1, :] = lung_mask[min_idx:split1, :]
    middle_mask[split1:split2, :] = lung_mask[split1:split2, :]
    lower_mask[split2:max_idx + 1, :] = lung_mask[split2:max_idx + 1, :]

    return upper_mask, middle_mask, lower_mask

def add_fraction_text(ax, mask, fraction, color='white'):
    """Adds text annotation to the centre of a lung mask to show the fraction of activation"""

    mask_np = mask.cpu().numpy()
    if mask_np.sum() == 0:
        return

    # centre of mass (y, x)
    cy, cx = center_of_mass(mask_np)

    text_str = f"{fraction:.2f}"
    ax.text(cx, cy, text_str,
            color=color, ha='center', va='center',
            fontweight='bold', fontsize=20)

In [None]:
data = test_data.reset_index(drop=True).loc[185]
path, truth = original_file_name(data.Path), data.Label

processor = CXR_Processor()

original, processed = processor.load_and_process(path)
cropped_img, cropped_mask, left_lung, right_lung = processor.crop_trunk(original, processed, .025, return_lungs=True)
outlier_mask = processor.find_outliers(cropped_img, cropped_mask, lower_percentile=.1, outlier_thresh=.05)
clahe_img = cv2.createCLAHE(clipLimit=2).apply(cropped_img)
if outlier_mask is not None:
    clahe_img[outlier_mask] = 0

img = Image.fromarray(clahe_img).convert('RGB')
img_tensor = process(img).unsqueeze(0).to(device)

heatmap, pred_class_idx, confidence = grad_cam(img_tensor)
heatmap, overlay_rgb = overlay_heatmap(heatmap, img, threshold=otsu(heatmap))

true_class = class_labels[truth]
pred_class = class_labels[pred_class_idx]

_,ax = plt.subplots(ncols=3, figsize=(15,10), constrained_layout=True)

ax[0].imshow(img)
ax[0].set_title(f'Input CXR\nGround truth: {true_class}')
ax[0].axis("off")

ax[1].imshow(overlay_rgb)
ax[1].set_title(f'Grad-CAM Overlay\nPrediction: {pred_class} ({confidence:.2%})')
ax[1].axis("off")

combined_lungs = left_lung | right_lung
heatmap[~combined_lungs.cpu()] = 0 # zero out activations outside lungs

# renormalise
heatmap -= heatmap.min()
if heatmap.max() > 0:
    heatmap /= heatmap.max()

upper_left, middle_left, lower_left = divide_lung(left_lung)
upper_right, middle_right, lower_right = divide_lung(right_lung)

# max activations per lung zone
frac_upper_left = heatmap[upper_left.cpu()].max().item()
frac_middle_left = heatmap[middle_left.cpu()].max().item()
frac_lower_left = heatmap[lower_left.cpu()].max().item()

frac_upper_right = heatmap[upper_right.cpu()].max().item()
frac_middle_right = heatmap[middle_right.cpu()].max().item()
frac_lower_right = heatmap[lower_right.cpu()].max().item()

ax[2].imshow(img)
ax[2].imshow(heatmap, cmap='jet', alpha=.5)

ax[2].contour(upper_left.cpu(), levels=[.5], colors='red', linewidths=2)
ax[2].contour(middle_left.cpu(), levels=[.5], colors='red', linewidths=2)
ax[2].contour(lower_left.cpu(), levels=[.5], colors='red', linewidths=2)

add_fraction_text(ax[2], upper_left,  frac_upper_left)
add_fraction_text(ax[2], middle_left,  frac_middle_left)
add_fraction_text(ax[2], lower_left,  frac_lower_left)

ax[2].contour(upper_right.cpu(), levels=[.5], colors='red', linewidths=2)
ax[2].contour(middle_right.cpu(), levels=[.5], colors='red', linewidths=2)
ax[2].contour(lower_right.cpu(), levels=[.5], colors='red', linewidths=2)

add_fraction_text(ax[2], upper_right,  frac_upper_right)
add_fraction_text(ax[2], middle_right,  frac_middle_right)
add_fraction_text(ax[2], lower_right,  frac_lower_right)

ax[2].axis("off")

#plt.savefig('/Data/Figures/mimic_pos_gradcam_loc.svg')

# MIMIC Localisation

In [None]:
def evaluate_gradcam(mapped_col, pred_col):
    """
    Evaluate Grad-CAM heatmaps against ground truth lung zones using various metrics
    
    Args:
        mapped_col : Column of dictionaries for ground truth
        pred_col : Column of dictionaries for Grad-CAM activations
    """

    all_mapped, all_pred = [], []
    
    # zone-level metrics
    total_precision_numerator = 0
    total_precision_denominator = 0
    total_recall_numerator = 0
    total_recall_denominator = 0
    total_intersection = 0
    total_union = 0
    
    for mapped_zones, pred_zones in zip(mapped_col, pred_col):
        mapped_array = np.array(list(mapped_zones.values()))
        pred_array = np.array(list(pred_zones.values()))
        
        # global lists for aggregate metrics
        all_mapped.extend(mapped_array)
        all_pred.extend(pred_array)
        
        # intersection and union for IoU
        intersection = np.minimum(mapped_array, pred_array).sum()
        union = np.maximum(mapped_array, pred_array).sum()
        total_intersection += intersection
        total_union += union
        
        # precision
        precision_numerator = (mapped_array * pred_array).sum()
        precision_denominator = pred_array.sum()
        total_precision_numerator += precision_numerator
        total_precision_denominator += precision_denominator
        
        # recall
        recall_numerator = (mapped_array * pred_array).sum()
        recall_denominator = mapped_array.sum()
        total_recall_numerator += recall_numerator
        total_recall_denominator += recall_denominator
    
    # aggregate computation
    all_mapped = np.array(all_mapped)
    all_pred = np.array(all_pred)
    
    # weighted match (sum of activations in ground truth zones)
    mapped_indices = all_mapped == 1
    weighted_match = all_pred[mapped_indices].sum() / mapped_indices.sum() if mapped_indices.sum() > 0 else 0
    
    # MSE
    mse = np.mean((all_mapped - all_pred) ** 2)
    
    # MAE
    mae = np.mean(np.abs(all_mapped - all_pred))
    
    # cosine similarity
    numerator = np.dot(all_mapped, all_pred)
    denominator = np.sqrt(np.sum(all_mapped ** 2)) * np.sqrt(np.sum(all_pred ** 2))
    cosine_similarity = numerator / denominator if denominator != 0 else 0
    
    # IoU
    iou = total_intersection / total_union if total_union > 0 else 0
    
    # precision
    localisation_precision = total_precision_numerator / total_precision_denominator if total_precision_denominator > 0 else 0
    
    # recall
    localisation_recall = total_recall_numerator / total_recall_denominator if total_recall_denominator > 0 else 0
    
    # F1 score
    if localisation_precision + localisation_recall > 0:
        localisation_f1 = 2 * (localisation_precision * localisation_recall) / (localisation_precision + localisation_recall)
    else:
        localisation_f1 = 0
    
    return {
        "Weighted Match": weighted_match,
        "MSE": mse,
        "MAE": mae,
        "Cosine Similarity": cosine_similarity,
        "IoU": iou,
        "Localisation Precision": localisation_precision,
        "Localisation Recall": localisation_recall,
        "Localisation F1 Score": localisation_f1,
    }

In [10]:
# map localisation strings to lung zones

def assign_zones(zones, zone_type, left, right, bilateral):
    if bilateral or not (left or right):  # no direction, map to both sides
        zones[f"{zone_type}_left"] = 1
        zones[f"{zone_type}_right"] = 1
    if left:
        zones[f"{zone_type}_left"] = 1
    if right:
        zones[f"{zone_type}_right"] = 1

def map_pneumonia_locs(location_text):
    # lung zones
    zones = {
        'upper_left': 0, 'middle_left': 0, 'lower_left': 0,
        'upper_right': 0, 'middle_right': 0, 'lower_right': 0
    }
    
    # split location into components
    components = [comp.strip().lower() for comp in location_text.split(',')]
    
    # map each component
    for comp in components:
        # directional flags
        left = 'left' in comp
        right = 'right' in comp
        bilateral = 'bilateral' in comp or 'both' in comp
    
        # lower zones
        if 'bas' in comp or 'lower' in comp or 'costophrenic' in comp or 'cardiac' in comp:
            assign_zones(zones, 'lower', left, right, bilateral)

        # upper zones
        if 'apical' in comp or 'upper' in comp:
            assign_zones(zones, 'upper', left, right, bilateral)

        # middle zones
        if 'mid' in comp:
            if 'middle lobe' in comp:
                zones['middle_right'] = 1  # specific to right lung
            else:
                assign_zones(zones, 'middle', left, right, bilateral)
        if 'hilar' in comp or 'bronchi' in comp:
            assign_zones(zones, 'middle', left, right, bilateral)
        if 'lingula' in comp:
            zones['middle_left'] = 1  # specific to left lung

        if set(zones.values()) == {0}: # no specific zones assigned
            if bilateral:
                zones = {key: 1 for key in zones}  # all zones
            if left:
                zones['upper_left'] = 1
                zones['middle_left'] = 1
                zones['lower_left'] = 1
            if right:
                zones['upper_right'] = 1
                zones['middle_right'] = 1
                zones['lower_right'] = 1
        
    # return zones if any are detected, else N/A
    return zones if any(zones.values()) else 'N/A'

In [None]:
test_data = pd.read_csv('/Data/physionet.org/files/mimic-cxr-jpg/2.0.0/test_llm.csv')

In [None]:
# parsing radiologist localisations from LLM responses and mapping them to lung zones

mimic_llm = pd.read_csv('/Data/physionet.org/files/mimic-cxr-jpg/2.0.0/mimic-cxr-pneumonia-full-wreports_llm_complete_frontal.csv')

mimic_llm['LLM_loc'] = mimic_llm.LLM_Pneumonia.apply(lambda response : re.findall(r'LOCATION>(.*?)</LOCATION>', response.upper())[-1])
mimic_llm['LLM_loc'] = mimic_llm['LLM_loc'].apply(lambda locs : ', '.join(ast.literal_eval(locs.strip())) if locs != 'NONE' and '[' in locs else 'N/A')
mimic_llm['Mapped_Zones'] = mimic_llm.LLM_loc.apply(map_pneumonia_locs)

mimic_llm['Processed_Path'] = mimic_llm.Path.apply(processed_file_name)
positive_rows = mimic_llm[(mimic_llm.Processed_Path.isin(test_data[test_data.Label == 1].Path)) & (mimic_llm.Mapped_Zones != 'N/A')].reset_index(drop=True)

positive_rows

In [None]:
data = positive_rows.iloc[88]
img_path,loc = data.Path,data.LLM_loc

processor = CXR_Processor()
original, processed = processor.load_and_process(img_path)
cropped_img, cropped_mask, left_lung, right_lung = processor.crop_trunk(original, processed, .025, return_lungs=True)
outlier_mask = processor.find_outliers(cropped_img, cropped_mask, lower_percentile=.1, outlier_thresh=.05)
clahe_img = cv2.createCLAHE(clipLimit=2).apply(cropped_img)
if outlier_mask is not None:
    clahe_img[outlier_mask] = 0

img = Image.fromarray(clahe_img).convert('RGB')
img_tensor = process(img).unsqueeze(0).to(device)

heatmap, pred_class_idx, confidence = grad_cam(img_tensor)
heatmap, overlay_rgb = overlay_heatmap(heatmap, img, threshold=otsu(heatmap))

if pred_class_idx == 1: # pneumonia-positive
    pred_class = 'Pneumonia-positive'
    combined_lungs = left_lung | right_lung
    heatmap[~combined_lungs.cpu()] = 0

    # renormalise
    heatmap -= heatmap.min()
    if heatmap.max() > 0:
        heatmap /= heatmap.max()

    upper_left, middle_left, lower_left = divide_lung(left_lung)
    upper_right, middle_right, lower_right = divide_lung(right_lung)

    frac_upper_left = heatmap[upper_left.cpu()].max().item()
    frac_middle_left = heatmap[middle_left.cpu()].max().item()
    frac_lower_left = heatmap[lower_left.cpu()].max().item()

    frac_upper_right = heatmap[upper_right.cpu()].max().item()
    frac_middle_right = heatmap[middle_right.cpu()].max().item()
    frac_lower_right = heatmap[lower_right.cpu()].max().item()

    mapping = {
    'Upper Left': [frac_upper_left, upper_left],
    'Middle Left': [frac_middle_left, middle_left],
    'Lower Left': [frac_lower_left, lower_left],
    'Upper Right': [frac_upper_right, upper_right],
    'Middle Right': [frac_middle_right, middle_right],
    'Lower Right': [frac_lower_right, lower_right]
    }
    locs,masks = zip(*[(key,mask) for key,(val,mask) in mapping.items() if val > .8])
    locs = ', '.join(locs)

    _,ax = plt.subplots(ncols=3, figsize=(12,8), constrained_layout=True)

    ax[0].imshow(img)
    ax[0].set_title(f'Input Processed CXR\nGround Truth: {loc.title()} Pneumonia')
    ax[0].axis("off")

    ax[1].imshow(overlay_rgb)
    ax[1].set_title(f'Grad-CAM Overlay\nDiagnosis: {pred_class} ({confidence:.2%})')
    ax[1].axis("off")

    _, overlay_rgb = overlay_heatmap(heatmap, img)
    ax[2].imshow(overlay_rgb)
    ax[2].set_title(f'Maximum Activation per Lung Zone\nLocalisation: {locs} Zone')

    ax[2].contour(upper_left.cpu(), levels=[.5], colors='white', linewidths=2)
    ax[2].contour(middle_left.cpu(), levels=[.5], colors='white', linewidths=2)
    ax[2].contour(lower_left.cpu(), levels=[.5], colors='white', linewidths=2)

    add_fraction_text(ax[2], upper_left,  frac_upper_left)
    add_fraction_text(ax[2], middle_left,  frac_middle_left)
    add_fraction_text(ax[2], lower_left,  frac_lower_left)

    ax[2].contour(upper_right.cpu(), levels=[.5], colors='white', linewidths=2)
    ax[2].contour(middle_right.cpu(), levels=[.5], colors='white', linewidths=2)
    ax[2].contour(lower_right.cpu(), levels=[.5], colors='white', linewidths=2)

    add_fraction_text(ax[2], upper_right,  frac_upper_right)
    add_fraction_text(ax[2], middle_right,  frac_middle_right)
    add_fraction_text(ax[2], lower_right,  frac_lower_right)

    for mask in masks:
        ax[2].contour(mask.cpu(), levels=[.5], colors='red', linewidths=2)
        
    ax[2].axis("off")

else:
    print('\nNegative prediction\n')

#plt.savefig('/Data/Figures/mimic_loc_3.svg')

In [None]:
processor = CXR_Processor()

all_zones = []

for i in range(len(positive_rows)):
    data = positive_rows.iloc[i]
    path = data.Path

    original, processed = processor.load_and_process(path)
    cropped_img, cropped_mask, left_lung, right_lung = processor.crop_trunk(original, processed, .025, return_lungs=True)
    outlier_mask = processor.find_outliers(cropped_img, cropped_mask, lower_percentile=.1, outlier_thresh=.05)
    clahe_img = cv2.createCLAHE(clipLimit=2).apply(cropped_img)
    if outlier_mask is not None:
        clahe_img[outlier_mask] = 0

    img = Image.fromarray(clahe_img).convert('RGB')
    img_tensor = process(img).unsqueeze(0).to(device)

    heatmap, pred_class_idx, confidence = grad_cam(img_tensor) # only positive predictions
    #heatmap, pred_class_idx, confidence = grad_cam(img_tensor, class_idx=1) # all predictions
    
    if pred_class_idx == 1: # pneumonia-positive
        heatmap, overlay_rgb = overlay_heatmap(heatmap, img, threshold=otsu(heatmap))

        combined_lungs = left_lung | right_lung
        heatmap[~combined_lungs.cpu()] = 0

        # renormalise
        heatmap -= heatmap.min()
        if heatmap.max() > 0:
            heatmap /= heatmap.max()

        upper_left, middle_left, lower_left = divide_lung(left_lung)
        upper_right, middle_right, lower_right = divide_lung(right_lung)

        frac_upper_left = heatmap[upper_left.cpu()].max().item()
        frac_middle_left = heatmap[middle_left.cpu()].max().item()
        frac_lower_left = heatmap[lower_left.cpu()].max().item()

        frac_upper_right = heatmap[upper_right.cpu()].max().item()
        frac_middle_right = heatmap[middle_right.cpu()].max().item()
        frac_lower_right = heatmap[lower_right.cpu()].max().item()
        
        thresh = .8
        frac_upper_left = int(frac_upper_left > thresh)
        frac_middle_left = int(frac_middle_left > thresh)
        frac_lower_left = int(frac_lower_left > thresh)
        frac_upper_right = int(frac_upper_right > thresh)
        frac_middle_right = int(frac_middle_right > thresh)
        frac_lower_right = int(frac_lower_right > thresh)

        zones = {
            'upper_left': frac_upper_left, 'middle_left': frac_middle_left, 'lower_left': frac_lower_left,
            'upper_right': frac_upper_right, 'middle_right': frac_middle_right, 'lower_right': frac_lower_right
        }
        all_zones.append(zones)
    else:
        all_zones.append('N/A')

all_zones

In [None]:
# predicted pneumonia-positive lung zones

positive_rows['Pred_Zones'] = all_zones
positive_rows = positive_rows[positive_rows.Pred_Zones != 'N/A'].reset_index(drop=True)

positive_rows

In [None]:
# compare reported and predicted lung zones

evaluate_gradcam(positive_rows['Mapped_Zones'], positive_rows['Pred_Zones'])

# Failed Processing Attempts

In [None]:
failures = pd.read_csv('/Data/CheX_Data/CheX_failed_cxrs.csv')
failures

In [None]:
missing_masks = failures[failures['Failure Reason'] == 'Lung mask(s) missing']['Image Path'].tolist()

axes = 5
indices = [np.random.randint(0,len(missing_masks)) for _ in range(axes)]

_,ax = plt.subplots(ncols=axes, figsize=(15,5), constrained_layout=True)
for i,idx in enumerate(indices):
    ax[i].imshow(cv2.imread(missing_masks[idx], cv2.IMREAD_GRAYSCALE), cmap='gray')

plt.suptitle('Missing Lung Mask(s)', y=.85, fontsize=15)

# Delete Processed Images

In [None]:
# entire dataset
chex = pd.read_csv('/Data/CheX_Data/chexpertchestxrays-u20210408/train_cheXbert_w_views.csv')

paths = chex.Path.apply(get_chex_img_path).tolist()
processed_paths = [processed_file_name(path) for path in paths]
existing_processed_paths = [path for path in processed_paths if os.path.isfile(path)]

existing_processed_paths

In [42]:
for path in existing_processed_paths:
    os.remove(path)