In [2]:
###############
# Hip Implant Detector Master Code Repository
# 
# (C) Jaret Karnuta 2020
#
###############

In [3]:
# Imports
from abc import ABC, abstractmethod
import h5py
import numpy as np
from keras.models import load_model, Model
from keras.preprocessing import image
import matplotlib.pyplot as plt
from keras import backend as K
import pandas as pd
import os
from tqdm.notebook import tqdm_notebook
%matplotlib inline
K.clear_session()

Using TensorFlow backend.


In [1]:
#####
# Image preprocessing
# Statics to avoid memory overhead 

class ImageProcessor:
    @staticmethod
    def load_image(image_path, target_size = None):
        return image.load_img(image_path, target_size = target_size)
    @staticmethod
    def _process_image(img, 
                    processing_function):
        # makes image 3D array [x,y,channels]
        x = image.img_to_array(img)
        # turns image into batch-compatible image
        # 4D array, [1, x, y, channels]
        x = np.expand_dims(x, axis = 0)
        x = processing_function(x)
        return x
    
    @staticmethod
    def _load_process(image_path, target_size, processing_function):
        img = ImageProcessor.load_image(image_path, target_size)
        return ImageProcessor._process_image(img, processing_function)
    @staticmethod
    def inceptionV3(image_path):
        from keras.applications.inception_v3 import preprocess_input
        return ImageProcessor._load_process(image_path, 
                                          (299,299), 
                                          preprocess_input)
    @staticmethod
    def resnet50(image_path):
        from keras.applications.resnet50 import preprocess_input
        return ImageProcessor._load_process(image_path, 
                                          (224, 224), 
                                          preprocess_input)
    @staticmethod
    def vgg16(image_path):
        from keras.applications.vgg16 import preprocess_input
        return ImageProcessor._load_process(image_path,
                                           (224,224),
                                           preprocess_input)
    
    @staticmethod
    def show_image(image):
        plt.imshow(image)

In [5]:
# Abstract predictor
class AbstractPredictor:
    def predict(self, image):
        # Predicts the class of the given image_path
        # returns predictions in 1D list form
        # e.g. [class_0_score, class_1_score, class_n_score]
        pass

In [6]:
# Class defining Inception predictor
class Predictor(AbstractPredictor):
    
    model = None
    loaded = False
    labels = None
    
    def __init__(self):
        pass
    
    def load_model(self, model_definition):
        self.model = load_model(model_definition)
        self.loaded = True
    
    def load_architecture(self, architecture):
        if isinstance(architecture, Model):
            self.model = architecture
            self.loaded = True
        else:
            raise ValueError("architecture must be keras.models.Model")
    
    def load_weights(self, weights):
        self.model.load_weights(weights)
    
    def load_labels(self, labels_file):
        self.labels = np.load(labels_file, allow_pickle=True).item()
    
    def is_loaded(self):
        return self.loaded
    
    def get_model(self):
        return self.model
    
    def _dict_predictions(self, preds, class_labels):
        # given a list of predictions (class_0_score, class_1_score, etc)
        # and class_labels {CLASS_0_LABEL:index_0, CLAS_1_LABEL:index_1,...}
        # return a dictionray mapping scores to class labels
        # {CLASS_LABEL_0:class_0_score, CLASS_LABEL_1:class_1_score, ...}
        preds = np.squeeze(preds)
        results = dict(zip(class_labels.keys(), preds))
        return results
            
    def _top_k(self, zipped_preds, k = 5):
        # given dictionaty of zipped predictions, return top k entries
        # sorted decending by score
        d = [(x,y) for x,y in zipped_preds.items()]
        d = sorted(d, key = lambda x: x[1], reverse = True)
        if k >= len(d):
            return d
        return d[:k]
    
    def predict(self, processed_image):
        if not self.is_loaded():
            raise Exception("Predictor has no valid model loaded")
        # make predictions
        # processed image must be a pre-processed image per the specific model definition
        # must be a 4D array (1, x, y, channels)
        # see ImageProcessor for inceptionV3, resnet50, and vgg16 implementations
        preds = self.model.predict(processed_image)
        return preds

    
    def preds_df(self, preds, k = np.Inf):
        dict_preds = self._dict_predictions(preds, self.labels)
        top = self._top_k(dict_preds, k = k)
        predictions = pd.DataFrame(top,columns=['category','probability'])
        return predictions
    
    ##########
    # BEGIN STATIC FUNCTIONS
    ##########
    
    @staticmethod
    def get_predicted_index(preds):
        return np.argmax(np.squeeze(preds))
    
    @staticmethod
    def get_predicted_class(preds_df):
        return preds_df.loc[0,'category']

    @staticmethod
    def plot_predictions(preds_df):
        import seaborn as sns
        f = sns.barplot(x='probability',y='category',data=preds_df,color="red")
        sns.set_style(style='white')
        f.grid(False)
        f.spines["top"].set_visible(False)
        f.spines["right"].set_visible(False)
        f.spines["bottom"].set_visible(False)
        f.spines["left"].set_visible(False)
        f.set_title('Top {} Prediction{}:'.format(
            preds_df.shape[0],
            "" if preds_df.shape[0] == 1 else "s"
        ))
        return f
        

In [7]:
# Class defining Ludwig predictor (resnet50)

class LudwithPredictor(AbstractPredictor):
    """
    Due to constraints of Ludwig, you MUST have a separate prediction file that handles the ludwig-side logic
    The file will then be called using a pipe and a ludwig-specific virtual environment
    Ludwig has *very obnoxious* limitations regarding tensorflow versions (does not work with TF v2)
    
    Implemented in a separate file due to the above restraints
    """
    def __init__(self):
        pass
    
    def predict(self, *args, **kwargs):
        pass
    

In [8]:
# Class defining extracting attention graphs from model

class ActivationMap:
    """
    This is one way to visualize the gradient flow through a CNN
    
    This code takes an arbitrary model and created a heatmap of 
    class activations
    
    The heatmap is a 2D matrix of scores associated with 
    the given class at the last convolutional layer
    """
    # CNN model used to generate the activation mapping
    # Must be pre-instantiated
    model = None
    
    # Last convolutional layer containing activation gradients
    # Examples include:
    # ----------------
    # ResNet50: res5c_branch2c
    # Inception V3: conv2d_94
    # VGG16: block5_conv3
    last_conv_layer_name = None
    
    def __init__(self, model, last_conv_layer):
        self.model = model
        if last_conv_layer == 'inceptionv3':
            self.last_conv_layer_name = 'conv2d_94'
        elif last_conv_layer == 'resnet50':
            self.last_conv_layer_name = 'res5c_branch2makec'
        elif  last_conv_layer == 'vgg16':
            self.last_conv_layer_name = 'block5_conv3'
        else:
            self.last_conv_layer_name = last_conv_layer
        
        # validate last_conv_layer
        try:
            self.model.get_layer(self.last_conv_layer_name)
        except ValueError as ve:
            raise ve
    
    def _pooled_gradients(self,argmax, last_conv_layer):
        # given argmax and a CNN, return the channel_pooled gradients for argmax
        output = self.model.output[:,argmax]
        gradients = K.gradients(output, last_conv_layer.output)[0]
        pooled_gradients = K.mean(gradients, axis = (0,1,2))
        #pooled gradients has a size of (192,) for inceptionV3
        return pooled_gradients
    
    def _calculate_gradient_importances(self, 
                                        pooled_grads, 
                                        last_conv_layer,
                                        image_tensor_scaled):
        # access values within last convolutional layer and pooled gradients
        iterate = K.function([self.model.input], [pooled_grads,last_conv_layer.output[0]])
        # iterate across provided image tensor
        pooled_grads_value, conv_layer_output_value = iterate([image_tensor_scaled])
        for i in range(pooled_grads.shape[0]):
            # multiply each channel in feature map by importance proxied by gradient value
            # i.e. high gradients = high importance
            conv_layer_output_value[:,:,i] *= pooled_grads_value[i]
        return conv_layer_output_value
    
    def make_heatmap(self, conv_layer_output_value):
        h = np.mean(conv_layer_output_value, axis = -1)
        h = np.maximum(h, 0) # remove negatives
        h /= np.max(h) # scale to [0,1]
        return h
        
    
    def activate(self, image_tensor_scaled, class_index):
        # Params:
        #     scaled image tensor (see ImageProcessor for calculating image tensors)
        #     class_index of predictions
        # Return:
        #     Heatmap of activations at last convolutional layer
        last_conv_layer = self.model.get_layer(self.last_conv_layer_name)
        pooled_gradients = self._pooled_gradients(class_index, last_conv_layer)
        grad_importances = self._calculate_gradient_importances(pooled_gradients,
                                                               last_conv_layer,
                                                               image_tensor_scaled)
        heatmap = self.make_heatmap(grad_importances)
        return heatmap
    
    @staticmethod
    def superimpose(image_path, heatmap):
        import cv2
        # Given image_path to ORIGINAL image (not tensor)
        # and heatmap calculated from ActivationMap.activate
        # return original image superimposed with heatmap
        img = cv2.imread(image_path)
        # resize heatmap to fit image
        # i.e. inception heatmap is 8x8 for 299x299 image input
        h = cv2.resize(heatmap, (img.shape[1], img.shape[0]))
        # convert [0,1] colors to [0,256)
        h = np.uint8(255 * h)
        # Apply colormap, JET provides nice contrast
        h = cv2.applyColorMap(h, cv2.COLORMAP_JET)
        # heatmap intensity factor, 0.5 still allows for visualizaton of original image
        hif = .5
        # apply heatmap over original image
        superimposed_img = h * hif + img
        return superimposed_img

    @staticmethod
    def save(image, output):
        import cv2
        cv2.imwrite(output, image)

In [9]:
class FileOps:
    """
    Class containing static file operation helper functions
    """
    import re
    
    IMAGE_REGEX = re.compile(".*?\.(png|jpe?g)$", flags = re.IGNORECASE)
    EXTENSION = ".png"
    RENAME_TEMPLATE = '{CLASS}_{VIEW}_{INDEX}_{RESIZE}_{SET}_{AUGMENTID}.{EXTENSION}'
    
    @staticmethod
    def traverse_directory_tree(path):
        # returns all files within a file tree
        all_images = []
        for root,dirs,files in os.walk(path):
            for file in files:
                all_images.append(os.path.join(root,file))
        return all_images
    
    @staticmethod        
    def _search_list(files, regex):
        # search for a given file in a given list of files
        ret = []
        for f in files:
            if regex is not None:
                if regex.search(f):
                    ret.append(f)
        return ret
    
    @staticmethod
    def get_filename_data(path):
    # Expected format:
    # '{DIRECTORY}/{CLASS}_{VIEW}_{INDEX}_{RESIZE}_{SET}_{AUGMENTID}.png'
        base = os.path.basename(path)
        _split = '.'.join(base.split('.')[:-1]).split('_')
        _directory = os.path.dirname(path).split(os.sep)[-1]
        _class = _split[0]
        _view = _split[1]
        _index = _split[2]
        _resize = _split[3]
        _set = _split[4]
        _augment = _split[5]
        return {
            'DIRECTORY':_directory,
            'CLASS':_class,
            'VIEW':_view,
            'INDEX':int(_index),
            'RESIZE':int(_resize),
            'SET':int(_set),
            'AUGMENTID':_augment
               }
    
    @staticmethod
    def make_filename(dictionary):
        return FileOps.RENAME_TEMPLATE.format(
                            CLASS = dictionary['CLASS'],
                            VIEW = dictionary['VIEW'],
                            INDEX = dictionary['INDEX'],
                            RESIZE = dictionary['RESIZE'],
                            SET = dictionary['SET'],
                            AUGMENTID = dictionary['AUGMENTID'],
                            EXTENSION = FileOps.EXTENSION
                            )
    

In [11]:
def inception_wrapper():
    
    # instantiate models
    model_saved = "KERAS/inception_raw_trained/inception_299_trained_raw.h5"
    labels = "KERAS/inception_raw_trained/CLASS_LABELS.npy"
    
    print("Loading {}...".format(model_saved))
    model = Predictor()
    model.load_model(model_saved)
    print("Loaded model")
    model.load_labels(labels)
    
    
    pics = '/home/premramkumar/Desktop/jmk/ArthroplastyID/HIPS/8_VGG_224/test'
    save_activation_dir = 'ACTIVATION_MAPS'
    if not os.path.isdir(save_activation_dir):
        os.makedirs(save_activation_dir)
        
    all_images = FileOps.traverse_directory_tree(pics)
    
    column_names = ["implant_class", "implant_id", "implant_path", 
                    "top_1_class", "top_1_score", 
                    "top_2_class", "top_2_score", 
                    "top_3_class", "top_3_score",
                    "correct", "k_3_correct",
                    "activation_map_path",
                   ]
    df = pd.DataFrame(columns = column_names)
    for i in tqdm_notebook(all_images):
        _data = FileOps.get_filename_data(i)
        to_add = {
            'implant_class': _data['DIRECTORY'],
            'implant_id': _data["INDEX"],
            'implant_path': i,
            'top_1_class': None,
            'top_1_score': None,
            'top_2_class': None,
            'top_2_score': None,
            'top_3_class': None,
            'top_3_score': None,
            'correct': None,
            'k_3_correct': None,
            'activation_map_path': None
        }
        
        # load image
        img = ImageProcessor.inceptionV3(i)
        
        # get predictions
        preds = model.predict(img)
        
        # populate top_n_name and top_n_score
        preds_df = model.preds_df(preds, k = 3)
        to_add['top_1_class'] = preds_df.loc[0,'category']
        to_add['top_1_score'] = preds_df.loc[0,'probability']
        to_add['top_2_class'] = preds_df.loc[1,'category']
        to_add['top_2_score'] = preds_df.loc[1,'probability']
        to_add['top_3_class'] = preds_df.loc[2,'category']
        to_add['top_3_score'] = preds_df.loc[2,'probability']
        
        # calculate correct variable
        to_add['correct'] = 1 if to_add['implant_class'] == to_add['top_1_class'] else 0
        top_3 = preds_df['category'].tolist()
        to_add['k_3_correct'] = 1 if to_add['implant_class'] in top_3 else 0
        
        # calculate attention map and save to directory
        
        output_base = os.path.basename(i).split('.')[0] + '.saliency.png'
        output_relative = os.path.join(save_activation_dir, output_base)
        output_absolute = os.path.abspath(output_relative)
        
        argmax = Predictor.get_predicted_index(preds)
        am = ActivationMap(model.get_model(), 'inceptionv3')
        heatmap = am.activate(img, argmax)
        superimg = ActivationMap.superimpose(i, heatmap)
        ActivationMap.save(superimg, output_relative)
        
        to_add['activation_map_path'] = output_absolute
#         print(to_add)
        df = df.append(to_add, ignore_index = True)
    df.to_csv('inception_raw_final.csv', index = False)



Loading KERAS/inception_raw_trained/inception_299_trained_raw.h5...
Loaded model


HBox(children=(FloatProgress(value=0.0, max=206.0), HTML(value='')))




