# NIH CXR8 Classifier
The following is a machine learning exercise on constructing a model that can detect cases of Cardiomegaly from the NIH CXR8 dataset and evaluating the effectiveness of the model. The following code is made for Google Collab, and may not necessarily run correctly as a simple Python Jupyter Notebook.

In [None]:
# Load the relevant libraries and dependencies

%matplotlib inline

import sklearn.metrics
import matplotlib.pyplot as plt
import os
import numpy as np
import pandas as pd
from PIL import Image
import shutil
from tensorflow.keras.applications.inception_v3 import InceptionV3
from tensorflow.keras import layers
from tensorflow.keras import Model
import os
from tensorflow.keras.preprocessing.image import ImageDataGenerator

LEARNING_RATE = 0.0001
repo_url = 'https://github.com/adleberg/medical-ai'
IMAGE_HEIGHT, IMAGE_WIDTH = 256, 256

def load_image_into_numpy_array(image):
    image = image.convert('RGB')
    (im_width, im_height) = image.size
    return np.array(image.getdata()).reshape(
        (im_height, im_width, 3)).astype(np.uint8)

print("Welcome! Downloading some things... this will take a minute.")

%cd -q /content
repo_dir_path = os.path.abspath(os.path.join('.', os.path.basename(repo_url)))
!git clone {repo_url} --quiet
%cd -q {repo_dir_path}
!git pull -q

print("Great! You clicked on it correctly. Now let's get started.")

# 1. Preparing the Data
The dataset is going to split into training data, which will represent 80% of the corpus, and testing data. In order to perform the classification, the dataset will also be split into negative and positive cases.

In [None]:
finding = "cardiomegaly"
finding = finding.capitalize()

In [None]:
df = pd.read_csv("/content/medical-ai/labels.csv")
df.head()

In [None]:
positives = df.loc[df["label"] == finding]
negatives = df.loc[df["label"] == "No Finding"]
n = len(positives)

if n == 0:
  print("No studies found! Maybe check your spelling?")
  assert (n > 0)

In [None]:
TRAIN_RATIO = 0.8
TEST_RATIO = 1 - TRAIN_RATIO
n = len(positives)
TRAIN_N = int(n*TRAIN_RATIO)
TEST_N = int(n*TEST_RATIO)
print(TRAIN_N, TEST_N)

In [None]:
train_labels = pd.concat([positives[:TRAIN_N], negatives[:TRAIN_N]])
test_labels = pd.concat([positives[TRAIN_N:], negatives[TRAIN_N:n]])

In [None]:
rootdir = "/content/medical-ai/images/"

dirs = ["/test/positive", "/test/negative", "/train/positive", "/train/negative" ]

for dir in dirs:
  os.makedirs(rootdir+finding+dir,  exist_ok=True)

In [None]:
# Copy images to new directories for training purposes

image_sets = [ (positives[:TRAIN_N], "/train/positive/"), (positives[TRAIN_N:],"/test/positive/"), (negatives[:TRAIN_N], "/train/negative/"), (negatives[TRAIN_N:n],"/test/negative/" )]

for image_set in image_sets:
  for idx, image in image_set[0].iterrows():
    source = rootdir+image["filename"]
    dst = rootdir+finding+image_set[1]+image["filename"]
    shutil.copy(source, dst)

print("Done moving "+str(n*2)+" images to positive and negative folders.")

In [None]:
# Load images into memory for visualization
positive_imgs, negative_imgs = [], []
IMAGE_HEIGHT, IMAGE_WIDTH = 256, 256
image_set_end = 6

image_sets = [(positives, positive_imgs), (negatives, negative_imgs)]

for image_set in image_sets:
  for idx, row in image_set[0][:image_set_end].iterrows():
    image_path = rootdir+row["filename"]
    image = Image.open(image_path).resize((IMAGE_WIDTH, IMAGE_HEIGHT))
    image_set[1].append(load_image_into_numpy_array(image))

In [None]:
image_sets = [positive_imgs, negative_imgs]

for index, image_set in enumerate(image_sets):
  for idx, img in enumerate(image_set[:6]):
    plt.subplot(2, 3, idx+1)
    plt.title(finding if index == 0 else "No Findings")
    plt.imshow(image_set[idx])
  plt.show()

# 2. Training the Model

The model used is InceptionV3, with a modified final few layers that involve flattening the model to 1 dimension, adding a 20% dropout, and applying Sigmoid activation for the classification.


In [None]:
pre_trained_model = InceptionV3(
    input_shape=(IMAGE_HEIGHT, IMAGE_WIDTH, 3), weights='imagenet', include_top=False)

for layer in pre_trained_model.layers:
  layer.trainable = False

last_layer = pre_trained_model.get_layer('mixed7')
last_output = last_layer.output

x = layers.Flatten()(last_output) # Flatten the output layer to 1 dimension
x = layers.Dense(1024, activation='relu')(x) # Add a fully connected layer with 1,024 hidden units and ReLU activation
x = layers.Dropout(0.2)(x) # Add a dropout rate of 0.2
x = layers.Dense(1, activation='sigmoid')(x) # Add a final sigmoid layer for classification

model = Model(pre_trained_model.input, x) # Configure and compile the model
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['acc'])
print("Done compiling the model!")

In [None]:
# Define our example directories and files
base_dir = rootdir = "/content/medical-ai/images/"
train_dir = os.path.join(base_dir, finding, 'train')
test_dir = os.path.join(base_dir, finding, 'test')

train_pos_dir = os.path.join(train_dir, 'positive')
train_neg_dir = os.path.join(train_dir, 'negative')
test_pos_dir = os.path.join(test_dir, 'positive')
test_neg_dir = os.path.join(test_dir, 'negative')

In [None]:
# Add our data-augmentation parameters to ImageDataGenerator
train_datagen = ImageDataGenerator(
    rescale=1./255,
    rotation_range=40,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=False)

# Note that the test data should not be augmented!
val_datagen = ImageDataGenerator(rescale=1./255)

In [None]:
train_generator = train_datagen.flow_from_directory(
        train_dir, # This is the source directory for training images
        target_size=(IMAGE_WIDTH, IMAGE_HEIGHT),
        batch_size=1,
        class_mode='binary')

test_generator = val_datagen.flow_from_directory(
        test_dir,
        target_size=(IMAGE_WIDTH, IMAGE_HEIGHT),
        batch_size=1,
        class_mode='binary')

train_steps = len(os.listdir(train_pos_dir)) * 2
test_steps = len(os.listdir(test_pos_dir)) * 2

print(train_steps)
print(test_steps)

print("Done funneling data!")

# 3. Running the Model
Now that the model has been trained, it will be tested on the test dataset, and its accuracy and loss will be evaluated graphically. 

In [None]:
history = model.fit(
      train_generator,
      steps_per_epoch=train_steps,
      epochs=20,
      validation_data=test_generator,
      validation_steps=test_steps,
      verbose=2)

In [None]:
# Retrieve a list of accuracy results on training and test data
# sets for each training epoch
acc = history.history['acc']
val_acc = history.history['val_acc']

# Retrieve a list of list results on training and test data
# sets for each training epoch
loss = history.history['loss']
val_loss = history.history['val_loss']

# Get number of epochs
epochs = range(len(acc))

graph_vals = [(acc, val_acc, "Accuracy"), (loss, val_loss, "Loss")]

for index, graph_val in enumerate(graph_vals):
  plt.subplot(2,1,index + 1)
  plt.plot(epochs, graph_val[0], label="train")
  plt.plot(epochs, graph_val[1], label="test")
  plt.xlabel("Epochs")
  plt.ylabel(graph_val[2])
  plt.title('Training and test ' + graph_val[2])
  plt.legend(loc="lower right")

plt.show()


# 4. Evaluating the Model
In this section, the model will be evaluted more deeply to observe its performance using a histogram, and evaluating its specificity, sensitivity, and receiver operating characteristic (or area under the curve).

In [None]:
def predict_image(filename):
  image = Image.open(filename).resize((IMAGE_HEIGHT, IMAGE_WIDTH))
  image_np = load_image_into_numpy_array(image)
  exp = np.true_divide(image_np, 255.0)
  expanded = np.expand_dims(exp, axis=0)
  return model.predict(expanded)[0][0]

def show_df_row(row):
  image_path = row["filepath"]
  image = Image.open(image_path).resize((IMAGE_WIDTH, IMAGE_HEIGHT))
  img = load_image_into_numpy_array(image)
  exp = np.true_divide(img, 255.0)
  expanded = np.expand_dims(exp, axis=0)
  pred = model.predict(expanded)[0][0]
  guess = "neg"
  if pred > 0.5:
    guess = "pos"
  title = "Image: "+row["filename"]+" Label: "+row["label"]+" Guess: "+guess+" Score: "+str(pred)
  plt.title(title)
  plt.imshow(img)
  plt.show()
  return

In [None]:
results = []

image_vals = [(test_neg_dir, "neg"), (test_pos_dir, "pos")]

for image_val in image_vals:
  for image in os.listdir(image_val[0]):
    filename = image_val[0]+"/"+image
    confidence = predict_image(filename)
    guess = 'pos' if confidence > 0.5 else 'neg'
    results.append([filename, image, image_val[1], guess, confidence])

sorted_results = sorted(results, key=lambda x: x[4], reverse=True)
df = pd.DataFrame(data=sorted_results, columns=["filepath","filename","label","guess","confidence"])

print("Done inference!")

In [None]:
# Checking the dataframe
df.head()

In [None]:
# An example image
import random
n = random.randint(0, len(df)-1)
show_df_row(df.iloc[n])

In [None]:
# Show a table of images
df[::5][['filename', 'label',"guess","confidence"]]

In [None]:
# Show histogram
from matplotlib.ticker import FormatStrFormatter
pos = df.loc[df['label'] == "pos"]["confidence"]
neg = df.loc[df['label'] == "neg"]["confidence"]
fig, ax = plt.subplots()
n, bins, patches = plt.hist([pos,neg], np.arange(0.0, 1.1, 0.1).tolist(), edgecolor='black', linewidth=0.5, density=False, histtype='bar', stacked=True, color=['green', 'red'], label=[finding, 'Negative'])
plt.xlabel('Confidence')
plt.ylabel('N')
plt.xticks(bins)
ax.xaxis.set_major_formatter(FormatStrFormatter('%.2f'))
plt.title('Confidence scores for different values')
plt.legend(loc="lower right", fontsize=16)
plt.show()

In [None]:
# Specificity / Sensitvity Cutoff

cutoff = 0.79 #@param {type:"slider", min:0, max:1, step:0.01}

def create_with_cutoff(cutoff):
  __, ax = plt.subplots()
  TP = df.loc[(df['label'] == "pos") & (df["confidence"] > cutoff)]["confidence"]
  FP = df.loc[(df['label'] == "neg") & (df["confidence"] > cutoff)]["confidence"]
  FN = df.loc[(df['label'] == "pos") & (df["confidence"] < cutoff)]["confidence"]
  TN = df.loc[(df['label'] == "neg") & (df["confidence"] < cutoff)]["confidence"]
  plt.hist([TP,FP,TN,FN], np.arange(0.0, 1.1, 0.1).tolist(), \
           edgecolor='black', linewidth=0.5, density=False, histtype='bar', \
           stacked=True, color=['limegreen','forestgreen','orangered','salmon'], \
           label=['TP','FP','TN','FN'])
  plt.xlabel('Confidence')
  plt.ylabel('N')
  plt.xticks(bins)
  ax.xaxis.set_major_formatter(FormatStrFormatter('%.2f'))
  plt.title('Confidence scores for different values')
  plt.axvline(cutoff, color='k', linestyle='dashed', linewidth=2)
  plt.legend(loc="lower right", fontsize=16)
  sens = round(len(TP)/(len(TP)+len(FN)),2)
  spec = round(len(TN)/(len(TN)+len(FP)),2)
  stats = "sensitivity: "+str(sens)+"\n"+"specificity: "+str(spec)+"\n\n"+"TP: "+str(len(TP))+"\n"+"FP: "+str(len(FP))+"\n"+"TN: "+str(len(TN))+"\n"+"FN: "+str(len(FN))
  plt.text(0.05, 0.05, stats, fontsize=14, transform=ax.transAxes)
  plt.show()

create_with_cutoff(cutoff)

In [None]:
# ROC Curve

def create_auc_curve(classifications):
  squares = {}
  for x in classifications:
    conf = x[4]
    TP, FP, TN, FN = 0, 0, 0, 0
    for row in classifications:
      assert (row[2] == "neg" or row[2] == "pos")
      if row[2] == "neg":
        if float(row[4]) < conf: TN += 1
        else: FP += 1
      else:
        if float(row[4]) > conf: TP += 1
        else: FN += 1
    squares[conf] = [TP, FP, TN, FN]
  # now we have a list of stuff: convert to
  sens_spec = {}
  for entry in squares:
    sens = squares[entry][0] / float(squares[entry][0] + squares[entry][3])
    spec = squares[entry][2] / float(squares[entry][2] + squares[entry][1])
    sens_spec[entry] = (1-spec, sens)
  return squares, sens_spec

squares, sens_spec = create_auc_curve(sorted_results)

x = []
y = []
for point in sens_spec.keys():
  x.append(sens_spec[point][0])
  y.append(sens_spec[point][1])

auc = sklearn.metrics.auc(x, y)

plt.figure()
lw = 2
plt.plot(x, y, color='darkorange', lw=lw, label='ROC curve (area = %0.3f)' % auc)
plt.plot([0, 1], [0, 1], color='navy', lw=lw, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.0])
plt.ylabel('Sensitivity')
plt.xlabel('1-specificity')
plt.title('Receiver operating characteristic')
plt.legend(loc="lower right", fontsize=20)
plt.show()

# 5. Save the Model
The following code allows you to save the model from Google Collab

In [None]:
model.save('/content/export/'+finding)
!zip -r /content/{finding}.zip /content/export/{finding}