In [None]:
%matplotlib inline

import sklearn.metrics
import random
import tensorflow as tf
import matplotlib.pyplot as plt
import os
import io
import glob
import scipy.misc
import numpy as np
import pandas as pd
from six import BytesIO
from PIL import Image, ImageDraw, ImageFont
import shutil
from tensorflow.keras.applications.inception_v3 import InceptionV3
from tensorflow.keras import layers
from tensorflow.keras import Model
import matplotlib
from tensorflow.keras.optimizers import RMSprop
import zipfile
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import matplotlib.image as mpimg

# Define learning rate
LEARNING_RATE = 0.0001
repo_url = 'https://github.com/adleberg/medical-ai'
# Define image dimensions
IMAGE_HEIGHT, IMAGE_WIDTH = 256, 256

# Define a function to convert PIL Image to a NumPy array
def load_image_into_numpy_array(image):
    image = image.convert('RGB')
    (im_width, im_height) = image.size
    return np.array(image.getdata()).reshape(
        (im_height, im_width, 3)).astype(np.uint8)


%cd -q /content
repo_dir_path = os.path.abspath(os.path.join('.', os.path.basename(repo_url)))
!git clone {repo_url} --quiet
%cd -q {repo_dir_path}
!git pull -q

Screen Shot 2020-11-08 at 1.14.35 PM.png

Above are the labels we have available to us in this dataset.

In [None]:
# Change this finding value to any of the 8 conditions listed above
finding = "cardiomegaly"
finding = finding.capitalize()

In [None]:
df = pd.read_csv("/content/medical-ai/labels.csv")
df.head()

In [None]:
positives = df.loc[df["label"] == finding]
negatives = df.loc[df["label"] == "No Finding"]
n = len(positives)
print(n)

In [None]:
# Define the training and testing ratios
TRAIN_RATIO = 0.8
TEST_RATIO = 0.2
n = len(positives)
TRAIN_N = int(n*TRAIN_RATIO)
TEST_N = int(n*TEST_RATIO)
print(TRAIN_N, TEST_N)

In [None]:
# Concatenate the labels for training and testing data
# Assuming you have two dataframes 'positives' and 'negatives' containing labels
# Concatenate the first samples from 'positives' and 'negatives'
train_labels = pd.concat([positives[:TRAIN_N], negatives[:TRAIN_N]])
test_labels = pd.concat([positives[TRAIN_N:], negatives[TRAIN_N:n]])

## Prepare the Data

Sort the data into two folders: one with negative cases and one with positive cases.

In [None]:
# Define the root directory where you want to create subdirectories
rootdir = "/content/medical-ai/images/"
# Create directories for testing and training data, both for positive and negative cases
os.makedirs(rootdir+finding+"/test/positive",  exist_ok=True)
os.makedirs(rootdir+finding+"/test/negative",  exist_ok=True)
os.makedirs(rootdir+finding+"/train/positive", exist_ok=True)
os.makedirs(rootdir+finding+"/train/negative", exist_ok=True)

In [None]:
# Copy images to new directories for training purposes
for idx, image in positives[:TRAIN_N].iterrows():
  source = rootdir+image["filename"]
  dst = rootdir+finding+"/train/positive/"+image["filename"]
  shutil.copy(source, dst)

for idx, image in positives[TRAIN_N:].iterrows():
  source = rootdir+image["filename"]
  dst = rootdir+finding+"/test/positive/"+image["filename"]
  shutil.copy(source, dst)

# Copy images to new directories for testing purposes
for idx, image in negatives[:TRAIN_N].iterrows():
  source = rootdir+image["filename"]
  dst = rootdir+finding+"/train/negative/"+image["filename"]
  shutil.copy(source, dst)

for idx, image in negatives[TRAIN_N:n].iterrows():
  source = rootdir+image["filename"]
  dst = rootdir+finding+"/test/negative/"+image["filename"]
  shutil.copy(source, dst)

print("Done moving "+str(n*2)+" images to positive and negative folders.")

In [None]:
# Load images into memory for visualization
positive_imgs, negative_imgs = [], []
IMAGE_HEIGHT, IMAGE_WIDTH = 256, 256

# Load and resize the first 6 images from the positives dataset for visualization
for idx, row in positives[:6].iterrows():
  image_path = rootdir+row["filename"]
  image = Image.open(image_path).resize((IMAGE_WIDTH, IMAGE_HEIGHT))
  positive_imgs.append(load_image_into_numpy_array(image))

# Load and resize the first 6 images from the positives dataset for visualization
for idx, row in negatives[:6].iterrows():
  image_path = rootdir+row["filename"]
  image = Image.open(image_path).resize((IMAGE_WIDTH, IMAGE_HEIGHT))
  negative_imgs.append(load_image_into_numpy_array(image))

In [None]:
# show six cases from the set with the specific finding
for idx, img in enumerate(positive_imgs[:6]):
  plt.subplot(2, 3, idx+1)
  plt.title(finding)
  plt.imshow(positive_imgs[idx])
# Show the subplot with all positive images
plt.show()

# show six cases from the set without the specific finding
for idx, img in enumerate(negative_imgs[:6]):
  plt.subplot(2, 3, idx+1)
  plt.title("No Findings")
  plt.imshow(negative_imgs[idx])
# Show the subplot with all negative images
plt.show()

## Train the Model

Use InceptionV3 to look at our images. This is a model that Google created to do image analysis.

In [None]:
pre_trained_model = InceptionV3(
    input_shape=(IMAGE_HEIGHT, IMAGE_WIDTH, 3), weights='imagenet', include_top=False)

# Freeze all layers in the pre-trained model to prevent further training
for layer in pre_trained_model.layers:
  layer.trainable = False

last_layer = pre_trained_model.get_layer('mixed7')
last_output = last_layer.output

# Flatten the output layer to 1 dimension
x = layers.Flatten()(last_output)
# Add a fully connected layer with 1,024 hidden units and ReLU activation
x = layers.Dense(1024, activation='relu')(x)
# Add a dropout rate of 0.2
x = layers.Dropout(0.2)(x)
# Add a final sigmoid layer for classification
x = layers.Dense(1, activation='sigmoid')(x)

# Configure and compile the model
model = Model(pre_trained_model.input, x)
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['acc'])

In [None]:
# Define our example directories and files
base_dir = rootdir = "/content/medical-ai/images/"
train_dir = os.path.join(base_dir, finding, 'train')
test_dir = os.path.join(base_dir, finding, 'test')

train_pos_dir = os.path.join(train_dir, 'positive')
train_neg_dir = os.path.join(train_dir, 'negative')
test_pos_dir = os.path.join(test_dir, 'positive')
test_neg_dir = os.path.join(test_dir, 'negative')

In [None]:
# Add our data-augmentation parameters to ImageDataGenerator on training data
train_datagen = ImageDataGenerator(
    rescale=1./255,
    rotation_range=40,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=False)

# Test data should not be augmented
val_datagen = ImageDataGenerator(rescale=1./255)

In [None]:
train_generator = train_datagen.flow_from_directory(
        train_dir, # This is the source directory for training images
        target_size=(IMAGE_WIDTH, IMAGE_HEIGHT),
        batch_size=1, # Number of images per batch
        class_mode='binary') # Binary classification (positive or negative)

test_generator = val_datagen.flow_from_directory(
        test_dir,
        target_size=(IMAGE_WIDTH, IMAGE_HEIGHT),
        batch_size=1,
        class_mode='binary')

# Calculate the number of training and testing steps
# Multiply the number of positive cases by 2 because there are both positive and negative samples
train_steps = len(os.listdir(train_pos_dir)) * 2
test_steps = len(os.listdir(test_pos_dir)) * 2

## Run the Model

Train on all 80% of the labels, and verify their accuracy on the remaining 20%.

In [None]:
history = model.fit(
      train_generator,
      steps_per_epoch=train_steps,
      epochs=20,
      validation_data=test_generator,
      validation_steps=test_steps,
      verbose=2)

In [None]:
# Plot the training and test loss and accuracy to show it conclusively:

# Retrieve a list of accuracy results on training and test data
# sets for each training epoch
acc = history.history['acc']
val_acc = history.history['val_acc']

# Retrieve a list of list results on training and test data
# sets for each training epoch
loss = history.history['loss']
val_loss = history.history['val_loss']

# Get number of epochs
epochs = range(len(acc))

# Create subplots for accuracy and loss
plt.subplot(2,1,1)
plt.plot(epochs, acc, label="train") # Plot training accuracy
plt.plot(epochs, val_acc, label="test") # Plot test accuracy
plt.ylabel("Accuracy")
plt.title('Training and Test Accuracy')
plt.legend(loc="lower right")

plt.subplot(2,1,2)
plt.plot(epochs, loss, label="train") # Plot training loss
plt.plot(epochs, val_loss, label="test") # Plot test loss
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.title('Training and Test Loss')
plt.legend(loc="lower right")

plt.show()

## Evaluate Performance

In [None]:
# Function to predict the class of an image given its filename
def predict_image(filename):
  image = Image.open(filename).resize((IMAGE_HEIGHT, IMAGE_WIDTH))
  # Convert the image to a NumPy array
  image_np = load_image_into_numpy_array(image)
  # Normalize the image data
  exp = np.true_divide(image_np, 255.0)
  # Add an extra dimension to match model input
  expanded = np.expand_dims(exp, axis=0)
  # Make a prediction and return the result
  return model.predict(expanded)[0][0]

# Function to display an image and its prediction
def show_df_row(row):
  image_path = row["filepath"]
  image = Image.open(image_path).resize((IMAGE_WIDTH, IMAGE_HEIGHT))
  img = load_image_into_numpy_array(image)
  exp = np.true_divide(img, 255.0)
  expanded = np.expand_dims(exp, axis=0)
  pred = model.predict(expanded)[0][0]

  # Assign a guess based on the prediction
  guess = "neg"
  if pred > 0.5:
    guess = "pos"
  # Plot the graph
  title = "Image: "+row["filename"]+" Label: "+row["label"]+" Guess: "+guess+" Score: "+str(pred)
  plt.title(title)
  plt.imshow(img)
  plt.show()
  return

In [None]:
results = []
# Loop through images in the negative test directory
for image in os.listdir(test_neg_dir):
  filename = test_neg_dir+"/"+image
  # Predict the image and get confidence
  confidence = predict_image(filename)
  # Determine the guess based on confidence
  guess = 'pos' if confidence > 0.5 else 'neg'
  # Append results to the list
  results.append([filename, image, "neg", guess, confidence])

for image in os.listdir(test_pos_dir):
  filename = test_pos_dir+"/"+image
  confidence = predict_image(filename)
  guess = 'pos' if confidence > 0.5 else 'neg'
  results.append([filename, image, "pos", guess, confidence])

# Sort the results based on confidence in descending order
sorted_results = sorted(results, key=lambda x: x[4], reverse=True)
# Create a DataFrame from the sorted results with specified column names
df = pd.DataFrame(data=sorted_results, columns=["filepath","filename","label","guess","confidence"])

In [None]:
df.head()

In [None]:
# Generate a random index within the range of the DataFrame
n = random.randint(0, len(df)-1)
# Display an example image and its prediction based on the random index
show_df_row(df.iloc[n])

In [None]:
# Show Table of images
df[::5][['filename', 'label',"guess","confidence"]]

In [None]:
#  Show histogram
from matplotlib.ticker import FormatStrFormatter

# Get confidence scores for positive and negative cases
pos = df.loc[df['label'] == "pos"]["confidence"]
neg = df.loc[df['label'] == "neg"]["confidence"]

# Create a figure and axis for the histogram
fig, ax = plt.subplots()

# Create a histogram with custom settings
n, bins, patches = plt.hist([pos,neg], np.arange(0.0, 1.1, 0.1).tolist(), edgecolor='black', linewidth=0.5, density=False, histtype='bar', stacked=True, color=['green', 'red'], label=[finding, 'Negative'])
plt.xlabel('Confidence')
plt.ylabel('N')
plt.xticks(bins)
ax.xaxis.set_major_formatter(FormatStrFormatter('%.2f'))
plt.title('Confidence Scores for Different Values')
plt.legend(loc="upper right", fontsize=16)
plt.show()

In [None]:
# create a cutoff point
# how to define the cutoff to determine whether someone is positive or not?
# You can adjust this value based on your needs
cutoff = 0.46 #@param {type:"slider", min:0, max:1, step:0.01}

In [None]:
# Define a function to create a histogram with a specified cutoff value
def create_with_cutoff(cutoff):
  __, ax = plt.subplots()

  # Calculate True Positives (TP), False Positives (FP), False Negatives (FN), and True Negatives (TN)
  TP = df.loc[(df['label'] == "pos") & (df["confidence"] > cutoff)]["confidence"]
  FP = df.loc[(df['label'] == "neg") & (df["confidence"] > cutoff)]["confidence"]
  FN = df.loc[(df['label'] == "pos") & (df["confidence"] < cutoff)]["confidence"]
  TN = df.loc[(df['label'] == "neg") & (df["confidence"] < cutoff)]["confidence"]

  # Create a histogram to visualize TP, FP, TN, and FN
  plt.hist([TP,FP,TN,FN], np.arange(0.0, 1.1, 0.1).tolist(), \
           edgecolor='black', linewidth=0.5, density=False, histtype='bar', \
           stacked=True, color=['limegreen','forestgreen','orangered','salmon'], \
           label=['TP','FP','TN','FN'])
  plt.xlabel('Confidence')
  plt.ylabel('N')
  plt.xticks(bins)
  ax.xaxis.set_major_formatter(FormatStrFormatter('%.2f'))
  plt.title('Confidence scores for different values')

  # Add a vertical dashed line to indicate the cutoff value
  plt.axvline(cutoff, color='k', linestyle='dashed', linewidth=2)
  plt.legend(loc="upper right", fontsize=16)

  # Calculate sensitivity and specificity and display statistics as text
  sens = round(len(TP)/(len(TP)+len(FN)),2)
  spec = round(len(TN)/(len(TN)+len(FP)),2)
  stats = "sensitivity: "+str(sens)+"\n"+"specificity: "+str(spec)+"\n\n"+"TP: "+str(len(TP))+"\n"+"FP: "+str(len(FP))+"\n"+"TN: "+str(len(TN))+"\n"+"FN: "+str(len(FN))
  plt.text(0.05, 0.05, stats, fontsize=14, transform=ax.transAxes)
  plt.show()

# Call the function with the specified cutoff value
create_with_cutoff(cutoff)

In [None]:
# Define a function to create the ROC curve and calculate AUC
def create_auc_curve(classifications):
  squares = {}

  # Loop through each classification entry
  for x in classifications:
    conf = x[4]
    TP, FP, TN, FN = 0, 0, 0, 0

    # Calculate TP, FP, TN, and FN for each confidence threshold
    for row in classifications:
      assert (row[2] == "neg" or row[2] == "pos")
      if row[2] == "neg":
        if float(row[4]) < conf: TN += 1
        else: FP += 1
      else:
        if float(row[4]) > conf: TP += 1
        else: FN += 1
    squares[conf] = [TP, FP, TN, FN]

  # Calculate sensitivity and specificity for each confidence threshold
  sens_spec = {}
  for entry in squares:
    sens = squares[entry][0] / float(squares[entry][0] + squares[entry][3])
    spec = squares[entry][2] / float(squares[entry][2] + squares[entry][1])
    sens_spec[entry] = (1-spec, sens)
  return squares, sens_spec

# Call the function to create the ROC curve and calculate AUC
squares, sens_spec = create_auc_curve(sorted_results)

# Extract sensitivity and 1-specificity values for plotting the ROC curve
x = []
y = []
for point in sens_spec.keys():
  x.append(sens_spec[point][0])
  y.append(sens_spec[point][1])

# Calculate the Area Under the Curve (AUC) using sklearn.metrics.auc
auc = sklearn.metrics.auc(x, y)

# Create and display the ROC curve
plt.figure()
lw = 2
plt.plot(x, y, color='darkorange', lw=lw, label='ROC curve (area = %0.3f)' % auc)
plt.plot([0, 1], [0, 1], color='navy', lw=lw, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.0])
plt.ylabel('Sensitivity')
plt.xlabel('1-specificity')
plt.title('Receiver Operating Characteristic')
plt.legend(loc="lower right", fontsize=20)
plt.show()

In [None]:
# Save model
model.save('/content/export/'+finding)
!zip -r /content/{finding}.zip /content/export/{finding}