In [1]:
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras.applications import ResNet50V2
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.regularizers import l2
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from sklearn.model_selection import train_test_split
#from tf_keras_vis.gradcam import Gradcam
from tf_keras_vis.utils import normalize
#from keras import backend as K
#from hyperopt import hp, fmin, tpe, Trials
from tensorflow.keras.layers import Dense, BatchNormalization
from keras.models import load_model
import sklearn
import matplotlib.pyplot as plt
from skimage import exposure

In [2]:
from PIL import Image

In [3]:
from keras.preprocessing.image import img_to_array, load_img
from skimage import exposure
import numpy as np

In [4]:
import util
import os
import cv2


In [5]:
print(tf.__version__)

2.16.1


In [6]:
if tf.test.gpu_device_name():
    print("Default GPU device:", tf.test.gpu_device_name())
else:
    print("No GPU device found")

No GPU device found


In [7]:
train_df = pd.read_csv("chexnet_train.csv")
val_df = pd.read_csv("chexnet_val.csv")
test_df = pd.read_csv("chexnet_test.csv")



In [8]:
train_df, inter_df = train_test_split(df, test_size=0.2, random_state=42)
val_df, test_df = train_test_split(inter_df, test_size=0.4, random_state=42)

'train_df, inter_df = train_test_split(df, test_size=0.2, random_state=42)\nval_df, test_df = train_test_split(inter_df, test_size=0.4, random_state=42)'

In [9]:
num_classes = 14
batch_size = 32

In [10]:
class_names = train_df.columns[1:].to_list()

In [11]:
print(class_names)

['Atelectasis', 'Cardiomegaly', 'Consolidation', 'Edema', 'Effusion', 'Emphysema', 'Fibrosis', 'Infiltration', 'Mass', 'Nodule', 'Pleural_Thickening', 'Pneumonia', 'Pneumothorax', 'No Finding']


In [12]:
labels = ['Cardiomegaly',
          'Emphysema', 
          'Effusion', 
          'Infiltration', 
          'Mass', 
          'Nodule', 
          'Atelectasis',
          'Pneumothorax',
          'Pleural_Thickening', 
          'Pneumonia', 
          'Fibrosis', 
          'Edema', 
          'Consolidation']

In [13]:
def check_for_leakage(df1, df2, patient_col):
    
    df1_patients_unique = set(df1[patient_col].values)
    df2_patients_unique = set(df2[patient_col].values)
    
    patients_in_both_groups = df1_patients_unique.intersection(df2_patients_unique)

    leakage = len(patients_in_both_groups) > 0
    
    return leakage
print("leakage between train and test: {}".format(check_for_leakage(train_df, test_df, 'id')))
print("leakage between valid and test: {}".format(check_for_leakage(val_df, test_df, 'id')))

leakage between train and test: False
leakage between valid and test: False


In [14]:
def z_score_normalization(image):
    mean = np.mean(image)
    std = np.std(image)
    return (image - mean) / (std + 1e-7)

In [15]:
def convert_to_grayscale(img_array):
    r, g, b = img_array[:,:,0], img_array[:,:,1], img_array[:,:,2]
    return 0.2126 * r + 0.7152 * g + 0.0722 * b

In [16]:
def preprocess_image(image):
    image = exposure.equalize_adapthist(image/np.max(image))
    #image = exposure.equalize_hist(image)
    image = image / 255.0
    
    return image

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from skimage import exposure

# Load the train.csv file
train_df = pd.read_csv('Chexnet_train.csv')

# Specify the path to the image directory
image_dir = r"Images/"

# Iterate over the images in the train.csv file
for index, row in train_df.iterrows():
    # Load the image
    image_path = os.path.join(image_dir, row['id'])
    image = cv2.imread(image_path)
    image = image.astype(np.float32) / 255.0
    image = np.array(image)
    
    # Apply adaptive histogram equalization
    equalized_image = exposure.equalize_adapthist(image)
    
    # Display the original and equalized images
    fig, ax = plt.subplots(1, 2, figsize=(10, 5))
    plt.imshow(image)
    plt.title('Original Image')
    plt.imshow(equalized_image)
    plt.title('Equalized Image')
    plt.show()

In [20]:
datagen = ImageDataGenerator(
    preprocessing_function = preprocess_image,
    zoom_range=0.2, 
    shear_range=0.2,
    horizontal_flip=False)

train_generator = datagen.flow_from_dataframe(
    dataframe= train_df,
    directory= "Images/",
    x_col='id',
    y_col=class_names,
    target_size=(224, 224),
     class_mode="raw"
  
)

val_generator = datagen.flow_from_dataframe(
    dataframe= val_df,
    directory= "Images/",
    x_col='id',
    y_col=class_names,
    target_size=(224, 224),
    batch_size=batch_size,
     class_mode="raw"
  
)

test_generator = datagen.flow_from_dataframe(
    dataframe= test_df,
    directory= "Images/",
    x_col='id',
    y_col=class_names,
    target_size=(224, 224),
    batch_size=batch_size,
    class_mode = "raw", 
    shuffle=False )


base_model = ResNet50V2(weights='imagenet', include_top= False, input_shape=(224, 224, 3))

for layer in base_model.layers[:-2]:  
    layer.trainable = False


x = base_model.output
x = GlobalAveragePooling2D()(x)
x = Dense(512, activation='gelu', kernel_regularizer=l2(0.005))(x)
x = Dropout(0.1)(x)
x = Dense(256, activation='gelu', kernel_regularizer=l2(0.005))(x)
x = Dropout(0.2)(x)
predictions = Dense(14, activation='softmax')(x)
model = Model(inputs=base_model.input, outputs=predictions)

# Compile the model
model.compile(optimizer=Adam(learning_rate=0.001),
              loss='categorical_crossentropy',
              metrics=['accuracy', 'precision', 'recall'])




Found 28430 validated image filenames.
Found 226 validated image filenames.
Found 450 validated image filenames.


In [None]:
model.summary()

In [None]:
class_names = train_df.columns[1:]  # extract class names from column headers (excluding the first column)
selected_columns = [label for label in labels if label in train_df.columns]
class_labels = train_df[selected_columns].values.argmax(axis=1)  # get the class labels from the one-hot encoded columns

class_weights = sklearn.utils.compute_class_weight(class_weight='balanced', 
                                                  classes=np.unique(class_labels), 
                                                  y=class_labels)
class_weight_dict = {i: weight for i, weight in enumerate(class_weights)}


In [None]:
print(class_weights)

In [None]:
reduce_lr = ReduceLROnPlateau(
    monitor='val_loss',  
    factor=0.3,  
    patience=2,  
    min_lr=0.0005 
)
    
#early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)
  
history = model.fit(
    train_generator,
    epochs=6,
    validation_data=val_generator,
    callbacks=[reduce_lr]
    
)



In [None]:
model.save('chexres50.h5')


In [None]:
# Evaluate the model on the test set
loss, accuracy = model.evaluate(test_generator)
print(f'Test accuracy: {accuracy:.3f}')

In [None]:
predicted_vals = model.predict(test_generator)


In [None]:
labels = ['Cardiomegaly', 
          'Emphysema', 
          'Effusion', 
          'Hernia', 
          'Infiltration', 
          'Mass', 
          'Nodule', 
          'Atelectasis',
          'Pneumothorax',
          'Pleural_Thickening', 
          'Pneumonia', 
          'Fibrosis', 
          'Edema', 
          'Consolidation',
           'No Finding']

In [None]:
def get_roc_curve(labels, predicted_vals, generator):
    auc_roc_vals = []
    for i in range(len(labels)):
        try:
            gt = generator.labels[:, i]
            pred = predicted_vals[:, i]
            auc_roc = roc_auc_score(gt, pred)
            auc_roc_vals.append(auc_roc)
            fpr_rf, tpr_rf, _ = roc_curve(gt, pred)
            plt.figure(1, figsize=(10, 10))
            plt.plot([0, 1], [0, 1], 'k--')
            plt.plot(fpr_rf, tpr_rf,
                     label=labels[i] + " (" + str(round(auc_roc, 3)) + ")")
            plt.xlabel('False positive rate')
            plt.ylabel('True positive rate')
            plt.title('ROC curve')
            plt.legend(loc='best')
        except:
            print(
                f"Error in generating ROC curve for {labels[i]}. "
                f"Dataset lacks enough examples."
            )
    plt.show()
    return auc_roc_vals

In [None]:
predicted_vals = model.predict(test_generator)


In [None]:
auc_rocs = util.get_roc_curve(labels, predicted_vals, test_generator)

In [None]:
import pickle

In [None]:
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.applications.densenet import preprocess_input, decode_predictions
from tensorflow.keras.models import Model

def preprocess_image(image_path):
    img = image.load_img(image_path, target_size=(224, 224))
    img_array = image.img_to_array(img)
    img_array = np.expand_dims(img_array, axis=0)  # Add batch dimension
    return tf.keras.applications.densenet.preprocess_input(img_array)



def get_gradcam(model, img, layer_name):
    
    img_array = img_to_array(img)
    img_array = np.expand_dims(img_array, axis=0)
    img_array = preprocess_input(img_array)
    
    grad_model = Model(inputs=model.inputs, outputs=[model.get_layer(layer_name).output, model.output])

    with tf.GradientTape() as tape:
        conv_outputs, predictions = grad_model(img_array)
        class_idx = tf.argmax(predictions[0])

    output = conv_outputs[0]
    grads = tape.gradient(predictions, conv_outputs)[0]
    guided_grads = tf.cast(output > 0, 'float32') * tf.cast(grads > 0, 'float32') * grads

    weights = tf.reduce_mean(guided_grads, axis=(0, 1))
    cam = tf.reduce_sum(tf.multiply(weights, output), axis=-1)
    heatmap = np.maximum(cam, 0)
    heatmap /= tf.reduce_max(heatmap)
    heatmap_img = plt.cm.jet(heatmap)[..., :3]

    # Load the original image
    original_img = Image.fromarray(img)

    # Resize the heatmap to match the original image size
    heatmap_img = Image.fromarray((heatmap_img * 255).astype(np.uint8))
    heatmap_img = heatmap_img.resize(original_img.size)

    # Overlay the heatmap on the original image
    overlay_img = Image.blend(original_img, heatmap_img, 0.5)

    # Return the overlayed image
    return overlay_img

In [None]:
def custom_decode_predictions(predictions, class_labels):
    
    decoded_predictions = []
    for pred in predictions:
        # Get indices of top predicted classes
        top_indices = pred.argsort()[-3:][::-1]  # Change 5 to the number of top classes you want to retrieve
        # Decode each top predicted class
        decoded_pred = [(class_labels[i], pred[i]) for i in top_indices]
        decoded_predictions.append(decoded_pred)
    return decoded_predictions

def classify_image(img):
    img_array = img_to_array(img)
    img_array = np.expand_dims(img_array, axis=0)
    img_array = preprocess_input(img_array)


    predictions1 = model.predict(img_array)
    decoded_predictions = custom_decode_predictions(predictions1, class_names)
    overlay_img = get_gradcam(model, img, layer_name)

    # Return the decoded predictions and the overlayed image
    return decoded_predictions, overlay_img
# Gradio interface
iface = gr.Interface(
    fn=classify_image, 
    inputs="image", 
    outputs=["text", "image"],  # Add an "image" output for the overlayed image
    title="Xray Classification - KIMS",
    description="Classify cxr into one of 20 classes - Atelectasis, Cardiomegaly, Consolidation, Edema, Effusion, Emphysema, Fibrosis, Hernia, Infiltration, Mass, Nodule, Pleural Thickening, Pneumonia, Pneumothorax, Pneumoperitoneum, Pneumomediastinum, Subcutaneous Emphysema, Tortuous Aorta, Calcification of the Aorta, No Finding. Built by Dr Sai and Dr Ajavindu"
)

# Launch the interface
iface.launch()