**Ground up 3D neural network for ADNI alzheimers classification**

In [1]:
import math
import json

import numpy as np
import numpy.ma as ma
import cv2

import scipy as scipy

import pandas as pd

import matplotlib.pyplot as plt
from matplotlib import cm
from matplotlib.colors import ListedColormap

from mpl_toolkits.axes_grid1 import make_axes_locatable

import itertools

import keras
from keras import backend as K
from keras.utils import np_utils
from keras.engine import Layer

from keras.layers import Input, Dense, Convolution1D, Convolution2D, MaxPooling2D, Deconvolution2D, UpSampling2D, Reshape, Flatten, ZeroPadding2D, BatchNormalization, Lambda, Dropout, Activation
from keras.layers import Convolution3D, MaxPooling3D
from keras.models import Model, Sequential
from keras.models import model_from_json

from keras.optimizers import SGD, RMSprop, Adam
from keras.layers.advanced_activations import LeakyReLU
from keras.preprocessing import image

from keras.callbacks import Callback
from keras.models import load_model

from keras.callbacks import ModelCheckpoint
from keras.callbacks import EarlyStopping

from vis.utils import utils
#from vis.visualization import visualize_saliency

from mpl_toolkits.mplot3d import Axes3D

import nibabel as nib
from keras.utils import to_categorical

from scipy.ndimage import zoom

import os

from sklearn.model_selection import train_test_split


Using TensorFlow backend.


In [2]:
#Parameters (Modify as needed)
img_size_x = 192
img_size_y = 192
img_size_z = 160

batch_size = 128
nb_classes = 3
nb_epoch = 25

c = 0

learning_rate = 0.003
early_stopping_patience = 20

class_names = ["CN", "MCI", "AD"]

In [66]:
def show_slices(slices):
    """ Function to display row of image slices """
    fig, axes = plt.subplots(1, len(slices))
    for i, slice in enumerate(slices):
        axes[i].imshow(slice.T, cmap="gray", origin="lower")

In [1]:
def load_dataset(directory):
    
    patientData = np.loadtxt("ADNI1_Complete_1Yr_1.5T_10_26_2019.csv", 
                         dtype= 'str', skiprows=1, delimiter=',')
    
    integer_mapping = {x: i for i,x in enumerate(['AD', 'MCI', 'CN'])}
    y = np.asarray([integer_mapping[word] for word in patientData[:,1]])
    labels = to_categorical(y, num_classes=3, dtype='float32')
    
    
    
    xdim = 192
    ydim = 192
    zdim = 160
    X = np.zeros((637,xdim,ydim,zdim,1))
    Y = np.zeros((637,3))
    for i, filename in enumerate(os.listdir(directory)):
        subject = filename[:-4]
        
        epi_img = nib.load(directory + subject + '.nii')
        # Get voxel array
        epi_img_data = epi_img.get_fdata()
        n_i, n_j, n_k = epi_img_data.shape
        
        if (n_i != 192 or n_j != 192 or n_k != 160):
            #epi_img_data = cv2.resize(epi_img_data, (192, 192))
            #epi_img_data = epi_img_data[0:192,0:192,int(n_k/2)-80:int(n_k/2)+80]
            
            #resampling to make all MRI volumes the same dimensions
            epi_img_data = zoom(epi_img_data, (float(xdim/n_i), float(ydim/n_j), float(zdim/n_k)), order = 0)
        
        x = epi_img_data
        
        x = np.expand_dims(x, axis=3)
        
        X[i] = x
        
        ind = np.where(patientData[:,0] == subject[:-1])[0][0]
        y = labels[ind, :]
        y = y.reshape(1,-1)
        
        Y[i] = y
        
        if i % 50 == 0:
            print ("loaded " + i + "subjects")
        
        
    print (X.shape)
    print (Y.shape)
        
    return X, Y

In [6]:
X, Y = load_dataset(directory = "ADNI_TRAIN/")

(192, 192, 160, 1)
0
(192, 192, 160, 1)
1
(192, 192, 160, 1)
2
(192, 192, 160, 1)
3
(192, 192, 160, 1)
4
(192, 192, 160, 1)
5
(192, 192, 160, 1)
6
(192, 192, 160, 1)
7
(192, 192, 160, 1)
8
(192, 192, 160, 1)
9
(192, 192, 160, 1)
10
(192, 192, 160, 1)
11
(192, 192, 160, 1)
12


KeyboardInterrupt: 

In [93]:
def split_dataset(X, Y):
    
    X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.15, random_state=1)
    
    X_train, X_val, Y_train, Y_val = train_test_split(X_train, Y_train, test_size=0.2, random_state=1)
    
    return X_train, Y_train, X_test, Y_test, X_val, Y_val

In [94]:
train_data, train_labels, test_data, test_labels, val_data, val_labels = split_dataset(X, Y)

KeyboardInterrupt: 

In [105]:
train_data = X[0:450]
train_labels = Y[0:450]
test_data = X[450:545]
test_labels = Y[450:545]
val_data = X[545:637]
val_labels = Y[545:637]

In [106]:
#Builds keras 3D CNN model
def build_cnn(dimension = '3d', activation = 'softmax', heatmap = False, w_path = None, compile_model = True):
    input_3d = (img_size_x, img_size_y, img_size_z, 1)
    input_2d = (img_size_x, img_size_y, 1)
    
    pool_3d = (2, 2, 2)
    pool_2d = (2, 2)
    
    def build_conv_3d():
        model = Sequential()
        
        model.add(Convolution3D(8, 3, 3, 3, name='conv1', input_shape=input_3d))
        model.add(MaxPooling3D(pool_size=pool_3d, name='pool1'))

        model.add(Convolution3D(8, 3, 3, 3, name='conv2'))
        model.add(MaxPooling3D(pool_size=pool_3d, name='pool2'))

        model.add(Convolution3D(8, 3, 3, 3, name='conv3'))
        model.add(MaxPooling3D(pool_size=pool_3d, name='pool3'))
        
        return model
        
    def build_conv_2d():
        model = Sequential()
        
        model.add(Convolution2D(8, 3, 3, name='conv1', input_shape=input_2d))
        model.add(MaxPooling2D(pool_size=pool_2d, name='pool1'))

        model.add(Convolution2D(8, 3, 3, name='conv2'))
        model.add(MaxPooling2D(pool_size=pool_2d, name='pool2'))

        model.add(Convolution2D(8, 3, 3, name='conv3'))
        model.add(MaxPooling2D(pool_size=pool_2d, name='pool3'))
        
        return model
    
    if(dimension == '3d'):
        model = build_conv_3d()
    else:
        model = build_conv_2d()
        
    model.add(Flatten())
    model.add(Dense(2000, activation='relu', name='dense1'))
    model.add(Dropout(0.5, name='dropout1'))

    model.add(Dense(500, activation='relu', name='dense2'))
    model.add(Dropout(0.5, name='dropout2'))

    model.add(Dense(nb_classes, activation=activation, name='softmax'))

    if w_path:
        model.load_weights(w_path)

    opt = keras.optimizers.Adadelta(clipnorm=1.)
    
    if(compile_model):
        model.compile(optimizer=opt,loss='categorical_crossentropy', metrics=['accuracy'])
    
    print ('Done building model.')

    return model

In [99]:
model = build_cnn(dimension = '3d')

  if sys.path[0] == '':
  from ipykernel import kernelapp as app


Done building model.


In [19]:
# Tracks model learning rate
class SGDLearningRateTracker(Callback):
    def on_epoch_end(self, epoch, logs={}):
        optimizer = self.model.optimizer
        lr = K.eval(optimizer.lr * (1. / (1. + optimizer.decay * optimizer.iterations)))
        print (str('\nLR: {:.6f}\n').format(float(lr)))

In [20]:
# Fitting model architecture to data, also runs loss/accuracy tracker
def fit_model(model, v, train_data, train_labels, val_data, val_labels):
    model_weights_file = 'img_classifier_weights_%s.h5' %v
    epoch_weights_file = 'img_classifier_weights_%s_{epoch:02d}_{val_acc:.2f}.hdf5' %v
    model_file = 'img_classifier_model_%s.h5' %v
    history_file = 'img_classifier_history_%s.json' %v
    
    def save_model_and_weights():
        model.save(model_file)
        model.save_weights(model_weights_file)
        
        return 'Saved model and weights to disk!'

    def save_model_history(m):
        with open(history_file, 'wb') as history_json_file:
            json.dump(m.history, history_json_file)
        
        return 'Saved model history to disk!'
    
    def visualise_accuracy(m):
        plt.plot(m.history['acc'])
        plt.plot(m.history['val_acc'])
        plt.title('model accuracy')
        plt.ylabel('accuracy')
        plt.xlabel('epoch')
        plt.legend(['train', 'validation'], loc='upper left')
        plt.show()
      
    def visualise_loss(m):
        plt.plot(m.history['loss'])
        plt.plot(m.history['val_loss'])
        plt.title('model loss')
        plt.ylabel('loss')
        plt.xlabel('epoch')
        plt.legend(['train', 'validation'], loc='upper left')
        plt.show()
    
    def model_callbacks():
        checkpoint = ModelCheckpoint(epoch_weights_file, monitor='val_acc', verbose=1, save_best_only=True, mode='max')
        early_stopping = EarlyStopping(monitor='val_loss', min_delta=0, patience=early_stopping_patience, verbose=1, mode='auto')
        lr_tracker = SGDLearningRateTracker()
        
        return [checkpoint,early_stopping,lr_tracker]
        
    callbacks_list = model_callbacks()

    m = model.fit(train_data,train_labels,batch_size=batch_size,nb_epoch=nb_epoch,verbose=1,shuffle=True,validation_data=(val_data,val_labels),callbacks=callbacks_list)
    
    print (save_model_and_weights())
    print (save_model_history(m))
    
    visualise_accuracy(m)
    visualise_loss(m)
    
    return m

In [22]:
# Evaluates test data performance and creates confusion matrix
def evaluate_model(m, weights, test_data, test_labels):    
    def plot_confusion_matrix(cm, classes, normalize=False, title='Confusion matrix', cmap=plt.cm.Blues):
        plt.imshow(cm, interpolation='nearest', cmap=cmap)
        plt.title(title)
        plt.colorbar()
        tick_marks = np.arange(len(classes))
        plt.xticks(tick_marks, classes, rotation=45)
        plt.yticks(tick_marks, classes)

        if normalize:
            cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

        thresh = cm.max() / 2.
        for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
            plt.text(j, i, cm[i, j],
                     horizontalalignment="center",
                     color="white" if cm[i, j] > thresh else "black")

        plt.tight_layout()
        plt.ylabel('True label')
        plt.xlabel('Predicted label')
     
    plt.close('all')

    m.load_weights(weights)
    m.compile(loss='binary_crossentropy', optimizer='rmsprop', metrics=['accuracy'])
    
    print ("Done compiling model.")
    
    prediction = m.predict(test_data)
    prediction_labels = np_utils.to_categorical(np.argmax(prediction, axis=1), nb_classes)
    
    print ('Accuracy on test data:', accuracy_score(test_labels, prediction_labels))

    print ('Classification Report')
    print (classification_report(test_labels, prediction_labels, target_names = class_names))

    # Compute confusion matrix
    cnf_matrix = confusion_matrix(np.argmax(test_labels, axis=1), np.argmax(prediction, axis=1))
    np.set_printoptions(precision=2)

    # Plot non-normalized confusion matrix
    plt.figure()
    plot_confusion_matrix(cnf_matrix, classes = class_names, normalize=False, title='Confusion matrix')

    plt.show()


In [None]:
#This will take time
fit_model(model, "v1", train_data, train_labels, val_data, val_labels)



Train on 450 samples, validate on 92 samples
Epoch 1/25


In [None]:
#Load the fit model and evaluate performance on test
evaluate_model(model, 'img_classifier_weights_v1.h5', test_data, test_labels)