In [None]:
import glob
import numpy as np
import pandas as pd
import os
import shutil
import matplotlib.pyplot as plt
import random
import keras
from keras.preprocessing.image import ImageDataGenerator, load_img, img_to_array, array_to_img
from sklearn.preprocessing import LabelEncoder
from keras.applications.resnet50 import ResNet50
from keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, Dropout, InputLayer, Lambda, GlobalAveragePooling2D, BatchNormalization
from keras.models import Model, Sequential
from keras import optimizers
from keras import backend as K
from keras.datasets import mnist
%matplotlib inline

In [None]:
# set to image directory
car = glob.glob('/Users/austinau-yeung/Documents/Georgia Tech/1/ECE6254/project/cardiomegaly2/*')
ede = glob.glob('/Users/austinau-yeung/Documents/Georgia Tech/1/ECE6254/project/edema2/*')

# parameters
car_train_num = 400
ede_train_num = 400
epochs = 20

In [None]:
num_classes = 2

# select random subset of images for training
car_train = np.random.choice(car,size=car_train_num,replace=False)
ede_train = np.random.choice(ede,size=ede_train_num,replace=False)
car = list(set(car)-set(car_train))
ede = list(set(ede)-set(ede_train))
car_test = car
ede_test = ede

car_test_num = len(car_test)
ede_test_num = len(ede_test)

In [None]:
IMG_WIDTH = 224
IMG_HEIGHT = 224
IMG_DIM = (IMG_WIDTH,IMG_HEIGHT)

# load training images
car_train_imgs = [img_to_array(load_img(img,target_size=IMG_DIM,color_mode="grayscale")) for img in car_train]
ede_train_imgs = [img_to_array(load_img(img,target_size=IMG_DIM,color_mode="grayscale")) for img in ede_train]

# create corresponding labels
train_imgs = np.array(car_train_imgs+ede_train_imgs)
train_imgs_scaled = train_imgs.astype('float32')/255
train_labels = car_train_num*['c']+ede_train_num*['e']

# load test images and create corresponding labels
car_test_imgs = [img_to_array(load_img(img,target_size=IMG_DIM,color_mode="grayscale")) for img in car_test]
ede_test_imgs = [img_to_array(load_img(img,target_size=IMG_DIM,color_mode="grayscale")) for img in ede_test]
test_imgs = np.array(car_test_imgs+ede_test_imgs)
test_imgs_scaled = test_imgs.astype('float32')/255
test_labels = car_test_num*['c']+ede_test_num*['e']

input_shape = (IMG_HEIGHT,IMG_WIDTH,train_imgs.shape[3])

In [None]:
# encode class labels as 0/1
le = LabelEncoder()
le.fit(train_labels)
train_labels_enc = le.transform(train_labels)
test_labels_enc = le.transform(test_labels)

In [None]:
# siamese network referenced from the following:
# https://github.com/keras-team/keras/blob/master/examples/mnist_siamese.py

def euclidean_distance(vects):
    x, y = vects
    sum_square = K.sum(K.square(x - y), axis=1, keepdims=True)
    return K.sqrt(K.maximum(sum_square, K.epsilon()))

def eucl_dist_output_shape(shapes):
    shape1, shape2 = shapes
    return (shape1[0], 1)

def contrastive_loss(y_true, y_pred):
    '''Contrastive loss from Hadsell-et-al.'06
    http://yann.lecun.com/exdb/publis/pdf/hadsell-chopra-lecun-06.pdf
    '''
    margin = 40
    square_pred = K.square(y_pred)
    margin_square = K.square(K.maximum(margin - y_pred, 0))
    return K.mean(y_true * square_pred + (1 - y_true) * margin_square)

def compute_accuracy(y_true, y_pred):
    '''Compute classification accuracy with a fixed threshold on distances.
    '''
    pred = y_pred.ravel() < 0.5
    return np.mean(pred == y_true)

def accuracy(y_true, y_pred):
    '''Compute classification accuracy with a fixed threshold on distances.
    '''
    return K.mean(K.equal(y_true, K.cast(y_pred < 0.5, y_true.dtype)))

def create_pairs(x, digit_indices):
    '''Positive and negative pair creation.
    Alternates between positive and negative pairs.
    '''
    pairs = []
    labels = []
    n = min([len(digit_indices[d]) for d in range(num_classes)]) - 1
    for d in range(num_classes):
        for i in range(n):
            z1, z2 = digit_indices[d][i], digit_indices[d][i + 1]
            pairs += [[x[z1], x[z2]]]
            inc = random.randrange(1, num_classes)
            dn = (d + inc) % num_classes
            z1, z2 = digit_indices[d][i], digit_indices[dn][i]
            pairs += [[x[z1], x[z2]]]
            labels += [1, 0]
    return np.array(pairs), np.array(labels)

def resnet_base(input_shape):
    resnet = ResNet50(include_top=False,weights=None,input_shape=input_shape,pooling=None)
    output = resnet.layers[-1].output
    output = keras.layers.Flatten()(output)
    resnet = Model(resnet.input,output)
    for layer in resnet.layers:
        layer.trainable=True

    x = resnet.output
    
#     imgIn = Input(shape=input_shape)
#     x = Flatten()(imgIn)
#     x = Dense(128, activation='relu')(x)
#     x = Dropout(0.1)(x)
#     x = Dense(128, activation='relu')(x)
#     x = Dropout(0.1)(x)
#     x = Dense(128, activation='relu')(x)
        
    return Model(resnet.input,x)


In [None]:
# create positive and negative pairs
idx = [np.where(train_labels_enc==i)[0] for i in range(num_classes)]
tr_pairs, tr_y = create_pairs(train_imgs_scaled,idx)

idx = [np.where(test_labels_enc==i)[0] for i in range(num_classes)]
te_pairs, te_y = create_pairs(test_imgs_scaled,idx)

# create siamese network with euclidean distance as final layer
base_network = resnet_base(input_shape)

input_a = Input(shape=input_shape)
input_b = Input(shape=input_shape)

processed_a = base_network(input_a)
processed_b = base_network(input_b)

distance = Lambda(euclidean_distance,output_shape=eucl_dist_output_shape)([processed_a, processed_b])

model = Model([input_a, input_b], distance)

model.summary()

# setup image augmentation on pairs of images using ImageDataGenerator, referenced from the following:
# https://github.com/keras-team/keras/issues/3386#issuecomment-237555199

def trainGenerator( X, I, Y):

    while True:
        # shuffled indices    
        idx = np.random.permutation( X.shape[0])
        # create image generator
        datagen = ImageDataGenerator(
                fill_mode='constant',
                cval=0,
                rescale=1./1,
                featurewise_center=False,  # set input mean to 0 over the dataset
                samplewise_center=False,  # set each sample mean to 0
                featurewise_std_normalization=False,  # divide inputs by std of the dataset
                samplewise_std_normalization=False,  # divide each input by its std
                zca_whitening=False,  # apply ZCA whitening
                rotation_range=5, #180,  # randomly rotate images in the range (degrees, 0 to 180)
                width_shift_range=0.05, #0.1,  # randomly shift images horizontally (fraction of total width)
                height_shift_range=0.05, #0.1,  # randomly shift images vertically (fraction of total height)
                horizontal_flip=True,  # randomly flip images
                vertical_flip=False)  # randomly flip images

        batches = datagen.flow( X[idx], Y[idx], batch_size=64, shuffle=False)
        idx0 = 0
        for batch in batches:
            idx1 = idx0 + batch[0].shape[0]

            yield [batch[0], I[ idx[ idx0:idx1 ] ]], batch[1]

            idx0 = idx1
            if idx1 >= X.shape[0]:
                break
                
def testGenerator( X, I, Y):

    while True:
        # suffled indices    
        idx = np.random.permutation( X.shape[0])
        # create image generator
        datagen = ImageDataGenerator(
                rescale=1./1)

        batches = datagen.flow( X[idx], Y[idx], batch_size=64, shuffle=False)
        idx0 = 0
        for batch in batches:
            idx1 = idx0 + batch[0].shape[0]

            yield [batch[0], I[ idx[ idx0:idx1 ] ]], batch[1]

            idx0 = idx1
            if idx1 >= X.shape[0]:
                break


In [None]:
rms = optimizers.RMSprop()
model.compile(loss=contrastive_loss, optimizer=rms, metrics=[accuracy])

# use unedited training images
model.fit([tr_pairs[:, 0], tr_pairs[:, 1]], tr_y,
          batch_size=128,
          epochs=1,
          validation_data=([te_pairs[:, 0], te_pairs[:, 1]], te_y))

# use augmented training images
# history = model.fit_generator(testGenerator(tr_pairs[:, 0],tr_pairs[:, 1],tr_y), 
#                               steps_per_epoch=2, 
#                               epochs=10,
#                               validation_data=testGenerator(te_pairs[:, 0],te_pairs[:, 1],te_y), 
#                               validation_steps=1, 
#                               verbose=1)

y_pred = model.predict([tr_pairs[:, 0], tr_pairs[:, 1]])
tr_acc = compute_accuracy(tr_y, y_pred)
y_pred = model.predict([te_pairs[:, 0], te_pairs[:, 1]])
te_acc = compute_accuracy(te_y, y_pred)

print('* Accuracy on training set: %0.2f%%' % (100 * tr_acc))
print('* Accuracy on test set: %0.2f%%' % (100 * te_acc))

In [None]:
y_pred