# A maximally simple solution to CT / CTA detection!

In [None]:
import numpy as np
import pandas as pd
from IPython.display import display
pd.set_option('precision', 2)
import matplotlib.pyplot as plt
%matplotlib inline
import pydicom
import os
from glob import glob

import keras
from keras.layers import Convolution2D, Conv2D, MaxPooling2D, Dropout, Flatten, Dense, Activation
from keras.models import Sequential

from skimage.transform import downscale_local_mean

In [None]:
import sys
sys.version

## Loading DICOM from the SHAIP environment

In [None]:
DICOM_PATH = 'data/dicom_dir/'
SMILY = u'\U0001F603'

In [None]:
class Cohort(object):
    """ 
    Manages a SHAIP-like cohort of datasets, finding what datasets are available, reading data and GT.
    Deals only with the raw input data - no normalization happens here.  
    Accessors generally present lazy evaluation semantics.
    """
    def __init__(self):
        """ The constructor scans the data path to find what data is present and
        setup a list and dictionary of dataset ids and paths.  It does not *read*
        the data"""
        self.dicom_path = 'data/dicom_dir/'
        self.filepaths = glob(DICOM_PATH + '*.dcm')
        self.ids = [os.path.basename(fp)[:7] for fp in self.filepaths]
        self.id_to_path_map = {id: path for id, path in zip(self.ids, self.filepaths)}
        self.size = len(self.ids)
        
        # Private cache storage
        self._images = self._dicoms = self._groundtruth = None
        
    @property
    def dicoms(self):
        """ Lazily read and return a list of dicom objects in the same order as self.ids """
        if self._dicoms is None:
            self._dicoms = [pydicom.dcmread(fp) for fp in self.filepaths]
        return self._dicoms
        
    @property
    def images(self):
        """ Lazily extract and a list of images (2d numpy arrays) in the same order as self.ids """
        if self._images is None:
            self._images = [dcm.pixel_array for dcm in self.dicoms]
        return self._images
    
    @staticmethod
    def _filename_to_contrast_gt(fname):
        """ Filenames look like this: "ID_0087_AGE_0044_CONTRAST_0_CT.dcm """
        assert fname[17:25] == 'CONTRAST'
        c = fname[26]
        assert c in ('0', '1')
        return int(c)
        
    @property
    def groundtruth(self):
        """ Return a list of ground-truth values as {0, 1} integers in the same order as self.ids"""
        if self._groundtruth is None:
            self._groundtruth = [Cohort._filename_to_contrast_gt(os.path.basename(fp)) for fp in self.filepaths]
        return self._groundtruth
            

def test_cohort_init():
    cohort = Cohort()
    # print(datasets.ids)
    # print(datasets.id_to_path_map)
    assert len(cohort.ids) == 100
    assert len(cohort.ids[0]) == 7 and cohort.ids[0][:3] == 'ID_'
    assert os.path.exists(cohort.filepaths[0])
    print(SMILY, "test_cohort_init passed.")
    
def test_cohort_accessors():
    cohort = Cohort()
    assert len(cohort.dicoms) == len(cohort.ids) == len(cohort.images) == \
           len(cohort.groundtruth) == len(cohort.groundtruth) == len(cohort.filepaths) == cohort.size
    assert all(['PixelData' in dcm for dcm in cohort.dicoms])
    assert len(cohort.images) == len(cohort.ids)
    assert all([im.shape == (512, 512) for im in cohort.images])
    assert all([im.dtype in (np.int16, np.uint16) for im in cohort.images])
    assert all([gt in (0,1) for gt in cohort.groundtruth])
    print(SMILY, "test_cohort_accessors passed.")
                    
def test__filename_to_contrast_gt():
    fname = 'ID_0087_AGE_0044_CONTRAST_0_CT.dcm'
    gt = Cohort._filename_to_contrast_gt(fname)
    assert gt == 0
    print(SMILY, "test__filename_to_contrast_gt passed.")
    

test__filename_to_contrast_gt()
test_cohort_init()
test_cohort_accessors()
    

In [None]:
def explore_cohort():
    cohort = Cohort()
    df = pd.DataFrame(columns=['ID', 'MinV', 'MaxV', 'Slope', 'Incpt', 'MmPerPix', 'Padding'])
    for ix in range(cohort.size):
        image = cohort.images[ix]
        dcm = cohort.dicoms[ix]
        id_ = cohort.ids[ix]
        padding = dcm.data_element('PixelPaddingValue').value if 'PixelPaddingValue' in dcm else None
        slope = dcm.data_element('RescaleSlope').value
        intercept = dcm.data_element('RescaleIntercept').value
        mmpp_x, mmpp_y = dcm.data_element('PixelSpacing').value
        assert mmpp_x == mmpp_y
        row = (id_, np.min(image), np.max(image), slope, intercept, mmpp_x, padding)

        df.loc[ix] = row
        
    display(df.describe(include='all'))
    display(df)
        
explore_cohort()

In [None]:
def show_images(cohort):
    fig, axes = plt.subplots(nrows=4, ncols=4, figsize=(16,16))
    for ix, ax in enumerate(axes.flat):   # Show just a selection
        im = cohort.images[ix]
        gt = cohort.groundtruth[ix]
        pltim = ax.imshow(im)
        ax.set_title("%s GT=%d"% (cohort.ids[ix], gt))
        cbar=fig.colorbar(pltim, ax=ax)
    plt.show()
        
show_images(cohort=Cohort())

## Preprocessing

In [None]:
class PreprocessedCohort(object):
    """ 
    Represents cohort of data with basic pre-procession applied.  For example, deal with padding,
    conversion to Hounsfield etc.  At this stage we are no longer concerned with file formats, directories
    etc.
    """
    downsample_factor = (4, 4)
    imshape = tuple(512//dsf for dsf in downsample_factor)
    
    def __init__(self, cohort):
        dicoms = cohort.dicoms
        
        self.size = cohort.size   # Number of images
        self.ids = cohort.ids
        self.groundtruth = cohort.groundtruth
        self.dicoms = cohort.dicoms
        
        self._preprocessed_images = None
        
    def _preprocess_one_dicom(self, dcm):
        """ Return a nicely normalised numpy float32 image """
        raw_image = dcm.pixel_array
        padding = dcm.data_element('PixelPaddingValue').value if 'PixelPaddingValue' in dcm else None
        # print(raw_image.dtype)
        slope = dcm.data_element('RescaleSlope').value
        intercept = dcm.data_element('RescaleIntercept').value
        
        image = np.array(raw_image, dtype=np.float32)
        image = image * slope + intercept
        image = np.array(image, dtype=np.float32)
       
        # It seems that padding value lies!  So we'll just clamp image values and hope for the best!
        # print("Image (min,max) = (%6.1f, %6.1f)" % (np.min(image), np.max(image)))
        clip_min = -200.0
        clip_max = 1000.0
        image[image < clip_min] = clip_min
        image[image > clip_max] = clip_max
            
        assert np.min(image) >= clip_min
        assert np.max(image) <= clip_max
        
        # Finally, downscale !
    
        image = downscale_local_mean(image, self.downsample_factor)
            
        return image
        
        
    @property
    def images(self):
        """ Lazily apply normalisation """
        if self._preprocessed_images is None:
            self._preprocessed_images = [self._preprocess_one_dicom(dcm) for dcm in self.dicoms]
        return self._preprocessed_images
    
def test__preprocess_one_dicom():
    cohort = Cohort()
    ppch = PreprocessedCohort(cohort)
    dcm1 = cohort.dicoms[0]
    image = ppch._preprocess_one_dicom(dcm1)
    assert image.shape == PreprocessedCohort.imshape
    plt.imshow(image)
    plt.colorbar()
    plt.show()
    print(SMILY, "test__preprocess_one_dicom passed.")
    
def test_preprocessed_cohort_accessors():
    ppch = PreprocessedCohort(Cohort())
    assert len(ppch.images) == len(ppch.ids) == len(ppch.groundtruth) == ppch.size
    print(SMILY, "test_preprocessed_cohort_accessors passed.")
    
test__preprocess_one_dicom()
test_preprocessed_cohort_accessors()

In [None]:
show_images(PreprocessedCohort(Cohort()))

## Convolutional Neural Network

In [None]:
input_shape = (128, 128, 1)
model = Sequential()
model.add(Conv2D(32, kernel_size=(5, 5), strides=(2, 2),
                 activation='relu',
                 input_shape=input_shape))
model.add(MaxPooling2D(pool_size=(3, 3)))
model.add(Conv2D(64, (5, 5), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Flatten())
model.add(Dense(100, activation='relu'))
model.add(Dense(2, activation='softmax'))

In [None]:
model.summary()

In [None]:
model.compile(loss=keras.losses.categorical_crossentropy,
              optimizer=keras.optimizers.Adam(),
              metrics=['accuracy'])

In [None]:
ppch = PreprocessedCohort(Cohort())

In [None]:
siz = PreprocessedCohort.imshape
x_data = np.array(ppch.images).reshape(-1, siz[0], siz[1], 1)
x_data = x_data / 500.0 - 0.5
print(x_data.shape, np.min(x_data), np.max(x_data))

In [None]:
y_data = keras.utils.to_categorical(ppch.groundtruth, 2)
y_data.shape

In [None]:
ntrain = 70
x_train = x_data[:ntrain,:,:,:]
x_test = x_data[ntrain:,:,:,:]

y_train = y_data[:ntrain,:]
y_test = y_data[ntrain:,:]

In [None]:
class AccuracyHistory(keras.callbacks.Callback):
    def on_train_begin(self, logs={}):
        self.acc = []
        self.val_acc = []

    def on_epoch_end(self, batch, logs={}):
        self.acc.append(logs.get('acc'))
        self.val_acc.append(logs.get('val_acc'))

history = AccuracyHistory()

In [None]:
model.fit(x_train, y_train,
          batch_size=10,
          epochs=20,
          verbose=1,
          validation_split=0.2,
          validation_data=(x_test, y_test),
          callbacks=[history])

In [None]:
score = model.evaluate(x_test, y_test, verbose=0)
print('Test loss: %5.3f' % score[0])
print('Test accuracy %5.3f:' % score[1])
epochs = range(1, len(history.acc)+1)
plt.plot(epochs, history.acc, label='Train')
plt.plot(epochs, history.val_acc, label='Validation')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()