In [1]:
import numpy as np 
import os 
import h5py
import tensorflow as tf
import cv2
import matplotlib.pyplot as plt
from csv import reader
from sklearn.preprocessing import scale
from tensorflow.keras.layers import Conv1D, Conv2D, MaxPooling2D, Flatten, Dense, Dropout 
from tensorflow.keras import models, layers, backend as K
from tensorflow.keras.utils import to_categorical
from sklearn.metrics import confusion_matrix

In [2]:
def extract_data(source, kind, reshape, standardize):
    train_imgs, train_vals = extract_helper(source, 'train', kind, reshape)
    test_imgs, test_vals = extract_helper(source, 'valid', kind, reshape)
    if standardize == True:
        mean = np.mean(train_imgs, axis = (1,2), keepdims = True)
        std = np.std(train_imgs, axis = (1,2), keepdims = True)
        train_imgs = (train_imgs - mean)/std
        test_imgs = (test_imgs - mean)/std
    return train_imgs, train_vals, test_imgs, test_vals

def extract_helper(source, torv, kind, reshape):
    os.chdir(source+'\\MURA-v1.1')
    os.chdir(torv+'_specific_paths')
    file = open(torv+'_image_paths_'+kind+'.csv')
    return extract(source, file, reshape)
    
def extract(source, file, reshape):
    readCSV = reader(file)
    imgs = []
    vals = []
    for row in readCSV:
        im = cv2.imread(source+'\\'+row[0], cv2.IMREAD_GRAYSCALE)
        imgs.append(np.array(cv2.resize(im,reshape)))
        if 'positive' in row[0]:
            vals.append(1)
        else:
            vals.append(0)
    file.close()
    imgs = np.array(imgs)
    vals = np.array(vals)
    imgs = np.expand_dims(imgs, axis=3)
    return imgs,vals

class patient:
    def __init__(self, imgs, vals, value):
        self.imgs = imgs
        self.vals = vals
        self.value = value
        
def patient_code(path):
    pos = path.find('patient')+7
    return path[pos:pos+5]

def patient_value(path):
    if 'positive' in path:
        return 1
    return 0
        
def extract_data_patients(source, kind, reshape, standardize):
    train_patients = extract_helper_patients(source, 'train', kind, reshape)
    test_patients  = extract_helper_patients(source, 'valid',  kind, reshape)
    if standardize == True:
        train_imgs = np.array([])
        for p in train_patients:
            train_imgs = np.concatenate(train_imgs, p.imgs)
        mean = np.mean(train_imgs, axis = (1,2), keepdims = True)
        std = np.std(train_imgs, axis = (1,2), keepdims = True)
        for p in train_patients:
            p.imgs = (p.imgs - mean)/std
        for p in test_patients:
            p.imgs = (p.imgs - mean)/std
    return train_patients, test_patients
    
def extract_helper_patients(source, torv, kind, reshape):
    os.chdir(source+'\\MURA-v1.1')
    os.chdir(torv+'_specific_paths')
    file = open(torv+'_image_paths_'+kind+'.csv')
    return extract_patients(source, file, reshape)

def extract_patients(source, file, reshape):
    patients = []
    readCSV = reader(file)
    imgs = []
    vals = []
    
    row = next(readCSV)
    prev_patient = patient_code(row[0])
    vals.append(patient_value(row[0]))
    im = cv2.imread(source+'\\'+row[0], cv2.IMREAD_GRAYSCALE)
    imgs.append(np.array(cv2.resize(im,reshape)))
    
    for row in readCSV:
        curr_patient = patient_code(row[0])
        if curr_patient == prev_patient:
            vals.append(patient_value(row[0]))
            im = cv2.imread(source+'\\'+row[0], cv2.IMREAD_GRAYSCALE)
            imgs.append(np.array(cv2.resize(im,reshape)))
                
        else:
            imgs = np.array(imgs)
            imgs = np.expand_dims(imgs, axis=3)
            vals = np.array(vals)
            patients.append(patient(imgs, vals, vals[0]))
            imgs = []
            vals = []
            prev_patient = curr_patient
            vals.append(patient_value(row[0]))
            im = cv2.imread(source+'\\'+row[0], cv2.IMREAD_GRAYSCALE)
            imgs.append(np.array(cv2.resize(im,reshape)))
                
    file.close()
    imgs = np.array(imgs)
    imgs = np.expand_dims(imgs, axis=3)
    vals = np.array(vals)
    patients.append(patient(imgs, vals, vals[0])) 

    return patients  

In [15]:
def classic_validation(model, data_x, data_y, rate, batch_size, number_of_epochs, class_weights):
    rate = int(len(data_y)*rate)
    train_x, train_y = shuffler(data_x[:rate], data_y[:rate])
    valid_x, valid_y = shuffler(data_x[rate:], data_y[rate:])
    score = 0
    if class_weights == True:
        model_copy = copy_model(model)
        model_copy.fit(train_x, train_y, batch_size = batch_size, epochs = number_of_epochs, class_weight = class_weight(train_y))
        score = conf_matrix(model_copy, valid_x, valid_y)
    else:
        model.fit(train_x, train_y, batch_size = batch_size, epochs = number_of_epochs)
        score = conf_matrix(model, valid_x, valid_y)
    if class_weights == True:
        model.fit(data_x, data_y, batch_size = batch_size, epochs = number_of_epochs, class_weight = class_weight(data_y))
    else:
        model.fit(valid_x, valid_y, batch_size = batch_size, epochs = number_of_epochs)
    return score, model

def k_fold_cross_validation(k, model, data_x, data_y, batch_size, number_of_epochs, class_weights):
    data_x, data_y = shuffler(data_x, data_y)
    folds_x = []
    folds_y = []
    l = len(data_y)
    for i in range(k):
        folds_x.append(data_x[(l//k)*i: (l//k)*(i+1)])
        folds_y.append(data_y[(l//k)*i: (l//k)*(i+1)])
    score = 0
    for i in range(k):
        model_copy = copy_model(model)
        for j in range(k):
            if j!=i:
                if class_weights == True:
                    model_copy.fit(folds_x[j],folds_y[j], batch_size = batch_size, epochs = number_of_epochs, class_weight = class_weight(folds_y[j]))
                else:
                    model_copy.fit(folds_x[j],folds_y[j], batch_size = batch_size, epochs = number_of_epochs)
        score += model_copy.evaluate(folds_x[i],folds_y[i])[1]
    
    if class_weights == True:
        model.fit(data_x, data_y, batch_size = batch_size, epochs = number_of_epochs, class_weight = class_weight(data_y))
    else:
        model = model_copy
        model.fit(folds_x[k-1],folds_y[k-1], batch_size = batch_size, epochs = number_of_epochs)
    return score/k, model

def shuffler(data_x, data_y):
    p = np.random.permutation(len(data_y))
    return (data_x[p], data_y[p])

def conf_matrix(model, data_x, data_y): 
    y_pred = model.predict(data_x).flatten().tolist()
    y_true = data_y.tolist()
    for i in range(len(y_pred)):
        y_pred[i] = round(y_pred[i])
    return print_conf_matrix(confusion_matrix(y_true, y_pred), y_true, y_pred)

def patients_conf_matrix(model, test_patients): 
    y_true = []
    y_pred = []
    for p in test_patients:
        y_true.append(p.value)
        p_predict = round(np.mean(model.predict(p.imgs)))
        y_pred.append(p_predict)
    return print_conf_matrix(confusion_matrix(y_true, y_pred), y_true, y_pred)
    
def print_conf_matrix(cm, y_true, y_pred):
    score = 0
    l = len(y_true)
    for i in range(l):
        if y_true[i] == y_pred[i]:
            score+=1
    score/=l
    print('Accuracy: '+str(score))
    tn, fp,fn, tp = cm[0][0], cm[0][1], cm[1][0], cm[1][1]
    print('     T       F')
    print('P    '+str(tp)+' '*(8-len(str(tp)))+str(fp))
    print('N    '+str(tn)+' '*(8-len(str(tn)))+str(fn))
    return score

def save_model(model, source, name):
    os.chdir(source+'\\MURA-v1.1\\models')
    model.save(name+'.h5')
    
def load_model(source, name):
    os.chdir(source+'\\MURA-v1.1\\models')
    return models.load_model(name+'.h5')

def copy_model(model):
    model_copy = models.clone_model(model)
    model_copy.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
    return model_copy

def class_weight(data_y):
    positive = np.sum(data_y)
    negative = np.size(data_y) - positive
    return {0 : 1 + positive/negative, 1: 1 + negative/positive}  

def heatmap(source, image_name, kind, reshape, model, last_conv_index, heatmap_name = None): 
    os.chdir(source+'\\MURA-v1.1\\heatmaps\\'+kind+'\\images')
    image = cv2.imread(image_name+'.png', cv2.IMREAD_GRAYSCALE)
    image = cv2.resize(image, reshape)
    image = np.expand_dims(image, axis = 2)

    specoutput=model.output
    last_conv_layer = model.get_layer(index = last_conv_index) # indexed from 0
    grads = K.gradients(specoutput, last_conv_layer.output)[0]
    pooled_grads = K.mean(grads, axis=(0, 1, 2))
    iterate = K.function([model.input], [pooled_grads, last_conv_layer.output[0]])
    pooled_grads_value, conv_layer_output_value = iterate(np.array([image]))
    for i in range(conv_layer_output_value.shape[2]):
        conv_layer_output_value[:, :, i]*= pooled_grads_value[i]
    
    heatmap = np.mean(conv_layer_output_value, axis=-1)
    heatmap = np.maximum(heatmap, 0)
    heatmap/= np.max(heatmap)
    # plt.matshow(heatmap)
    # plt.show()
    image = cv2.imread(image_name+'.png', cv2.IMREAD_GRAYSCALE)
    image = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)
    heatmap = cv2.resize(heatmap, (image.shape[1], image.shape[0]))
    heatmap = np.uint8(255 * heatmap)
    heatmap = cv2.applyColorMap(heatmap, cv2.COLORMAP_JET)
    superimposed = heatmap * 0.7 + image
    os.chdir(source+'\\MURA-v1.1\\heatmaps\\'+kind+'\\heatmaps')
    if heatmap_name != None:
        cv2.imwrite(heatmap_name+'.png', superimposed)
    else:
        cv2.imwrite(name+'.png', superimposed)

In [6]:
source = 'C:\\Users\\Admin\\Desktop\\python' # depends on where you saved MURA

In [49]:
# Example: how to extract data
reshape = (512, 512)
train_x, train_y, test_x, test_y = extract_data(source ,'humerus', reshape, False)
train_x, train_y = shuffler(train_x, train_y)
test_x, test_y = shuffler(test_x, test_y)
train_patients, test_patients = extract_data_patients(source ,'humerus', reshape, False)

In [16]:
# Example: how to use models
model = load_model(source, 'conv3HUMERUS')
heatmap(source, 'positive1', 'humerus', (512,512), model, 12, 'conv3_positive1')