# Model Analysis

Use this Jupyter Notebook to analyse the performance of exported Lobe.ai models.

1. Export your Lobe.ai model as TensorFlow for Python
2. Paste the contents of your exported model (labels.txt, saved_model.pb, signature.json, variables and example) to the LobeModel/ directory.
3. Paste your structured labelled test images into TEST1/ (Further test directories can be added and the script will process them all automatically)
4. Run this notebook.

In [None]:
# Rename the test datasets for the purpose of table & graph labelling (leave actual directory names the same)

test_dataset_names = ["Test1","Test2"]

In [None]:
# Required imports

import sys
import os
import tensorflow.keras
import tensorflow as tf
import json
from PIL import Image
import numpy as np
from shutil import copyfile
from sklearn.metrics import accuracy_score, classification_report, roc_curve, auc
import matplotlib.pyplot as plt

In [None]:
# Directory variables

# Directory containing 'saved_model.pb', 'labels.txt', 'signature.json' and 'variables' directory
model_dir = os.path.join(os.getcwd(),"LobeModel")

# Directory containing all image directories
all_images_dir = os.path.join(os.getcwd(),"MAIN")

# Directory containing the first set of test images
test1_image_dir = os.path.join(all_images_dir,"TEST1")

# Directory containing the optional second set of test images
test2_image_dir = os.path.join(all_images_dir,"TEST2")

In [None]:
# Helper function

def index_in_list(a_list, index):
    return index < len(a_list)

In [None]:
# This class is provided by Lobe.ai in the 'Example' folder when exporting a project to Python

EXPORT_MODEL_VERSION = 1

class TFModel:
    def __init__(self, model_dir) -> None:
        # make sure our exported SavedModel folder exists
        self.model_dir = model_dir
        with open(os.path.join(model_dir, "signature.json"), "r") as f:
            self.signature = json.load(f)
        self.model_file = os.path.join(self.model_dir, self.signature.get("filename"))
        # self.model_file = "../" + self.signature.get("filename")
        if not os.path.isfile(self.model_file):
            raise FileNotFoundError(f"Model file does not exist")
        self.inputs = self.signature.get("inputs")
        self.outputs = self.signature.get("outputs")
        # placeholder for the tensorflow session
        self.session = None

        # Look for the version in signature file.
        # If it's not found or the doesn't match expected, print a message
        version = self.signature.get("export_model_version")
        if version is None or version != EXPORT_MODEL_VERSION:
            print(
                f"There has been a change to the model format. Please use a model with a signature 'export_model_version' that matches {EXPORT_MODEL_VERSION}."
            )

    def load(self) -> None:
        self.cleanup()
        # create a new tensorflow session
        self.session = tf.compat.v1.Session(graph=tf.Graph())
        # load our model into the session
        tf.compat.v1.saved_model.loader.load(sess=self.session, tags=self.signature.get("tags"), export_dir=self.model_dir)

    def predict(self, image: Image.Image) -> dict:
        # load the model if we don't have a session
        if self.session is None:
            self.load()

        image = self.process_image(image, self.inputs.get("Image").get("shape"))
        # create the feed dictionary that is the input to the model
        # first, add our image to the dictionary (comes from our signature.json file)
        feed_dict = {self.inputs["Image"]["name"]: [image]}

        # list the outputs we want from the model -- these come from our signature.json file
        # since we are using dictionaries that could have different orders, make tuples of (key, name) to keep track for putting
        # the results back together in a dictionary
        fetches = [(key, output["name"]) for key, output in self.outputs.items()]

        # run the model! there will be as many outputs from session.run as you have in the fetches list
        outputs = self.session.run(fetches=[name for _, name in fetches], feed_dict=feed_dict)
        return self.process_output(fetches, outputs)

    def process_image(self, image, input_shape) -> np.ndarray:
        """
        Given a PIL Image, center square crop and resize to fit the expected model input, and convert from [0,255] to [0,1] values.
        """
        width, height = image.size
        # ensure image type is compatible with model and convert if not
        if image.mode != "RGB":
            image = image.convert("RGB")
        # center crop image (you can substitute any other method to make a square image, such as just resizing or padding edges with 0)
        if width != height:
            square_size = min(width, height)
            left = (width - square_size) / 2
            top = (height - square_size) / 2
            right = (width + square_size) / 2
            bottom = (height + square_size) / 2
            # Crop the center of the image
            image = image.crop((left, top, right, bottom))
        # now the image is square, resize it to be the right shape for the model input
        input_width, input_height = input_shape[1:3]
        if image.width != input_width or image.height != input_height:
            image = image.resize((input_width, input_height))

        # make 0-1 float instead of 0-255 int (that PIL Image loads by default)
        image = np.asarray(image) / 255.0
        # format input as model expects
        return image.astype(np.float32)

    def process_output(self, fetches, outputs) -> dict:
        # do a bit of postprocessing
        out_keys = ["label", "confidence"]
        results = {}
        # since we actually ran on a batch of size 1, index out the items from the returned numpy arrays
        for i, (key, _) in enumerate(fetches):
            val = outputs[i].tolist()[0]
            if isinstance(val, bytes):
                val = val.decode()
            results[key] = val
        confs = results["Confidences"]
        labels = self.signature.get("classes").get("Label")
        output = [dict(zip(out_keys, group)) for group in zip(labels, confs)]
        sorted_output = {"predictions": sorted(output, key=lambda k: k["confidence"], reverse=True)}
        return sorted_output

    def cleanup(self) -> None:
        # close our tensorflow session if one exists
        if self.session is not None:
            self.session.close()
            self.session = None

    def __del__(self) -> None:
        self.cleanup()

In [None]:
# runModel is used to classify the images in the TEST directory

def runModel(test_image_dir, model, class_labels):
    numiter = 0
    testResults = {"true": [], "predicted": [], "confidence": []}
    mislabelled = []
    
    classes = []
    
    for className in os.listdir(test_image_dir):
        
        classDir = os.path.join(test_image_dir, className)
        if os.path.isdir(classDir) and not className.startswith('.'):
            # Inside each folder
            classes.append(className)

            for imageName in os.listdir(classDir):
                imageDir = os.path.join(classDir, imageName)
                if os.path.isfile(imageDir) and not imageName.startswith('.'):
                    # For each image

                    image = Image.open(imageDir)
                    outputs = model.predict(image)
                    
                    predicted_label = outputs["predictions"][0]["label"]

                    if predicted_label == classes[0]:
                        confidence = outputs["predictions"][0]["confidence"]
                    else:
                        confidence = outputs["predictions"][1]["confidence"]

                    testResults["true"].append(class_labels[className.upper()])
                    testResults["predicted"].append(class_labels[predicted_label])
                    testResults["confidence"].append(confidence)
                    
                    if(class_labels[className.upper()] is not class_labels[predicted_label]):
                        mislabelled.append({"image": imageName, "predicted": predicted_label,"actual": className.upper()})

                    if numiter % 25 == 0 and numiter != 0:
                        print(f"{numiter} images processed.")
                    numiter += 1
    print(f"Finished! {numiter} images classified total.")
    return [testResults, mislabelled]


# plotROC is used to display an ROC graph
# Accepts multiple results: [[y_true, y_score, title], ...]

def plotROC(data):
    
    plt.figure(figsize=(5,5), dpi=100)
    for dataItem in data:
        y_true = dataItem[0]
        y_score = dataItem[1]
        title = dataItem[2]
        logistic_fpr, logistic_tpr, threshold = roc_curve(y_true, y_score)
        auc_logistic = auc(logistic_fpr, logistic_tpr)
        plt.plot(logistic_fpr, logistic_tpr, marker='.', label=f'{title} (auc = {round(auc_logistic,3)})')
    
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title("ROC")

    plt.legend()

    plt.show()
    print("\n\n")
    
    
# printResults displays the accuracy score and classification report for a set of results
    
def printResults(results, title):
    print(f"{title}\n")
    print(accuracy_score(results["true"],results["predicted"]))
    print("\n")
    print(classification_report(results["true"],results["predicted"]))   
    print("\n\n")
    
    
# printMislabelled lists all of the incorrectly classified images in a dataset
    
def printMislabelled(mislabelledImages, title):
    print(f"{title}\n")
    for mislabelledImage in mislabelledImages:
        print(f"{mislabelledImage['image']} was wrongly predicted to be {mislabelledImage['predicted']}")
    print("\n\n")
  
    

In [None]:
# Class labels for the data

class_labels = {'NORMAL':0, 'ABNORMAL':1}

In [None]:
# Load the model using Lobe.ai's TFModel Class

model = TFModel(model_dir=model_dir)
model.load()

In [None]:
# Run model

allTestResults = []
allTestResultsProcessed = []

for test_set in os.listdir(all_images_dir):
    test_set_dir = os.path.join(all_images_dir, test_set)
    if os.path.isdir(test_set_dir) and not test_set.startswith('.'):
        # Inside each folder
        testResult = runModel(test_set_dir, model, class_labels)
        allTestResults.append(testResult)
        allTestResultsProcessed.append([testResult[0]["true"],testResult[0]["predicted"],testResult[0]["confidence"]])

In [None]:
# Accuracy Tables

for index, testResult in enumerate(allTestResults):
    resultName = f"Test {index + 1} Accuracy"
    if index_in_list(test_dataset_names, index):
        resultName = f"{test_dataset_names[index]} Accuracy"
    printResults(testResult[0], resultName)

In [None]:
# ROC and AUC

roc_data = []
for index,testResult in enumerate(allTestResultsProcessed):
    resultName = f"Test {index} Dataset"
    if index_in_list(test_dataset_names, index):
        resultName = f"{test_dataset_names[index]} Dataset"
    roc_data.append([testResult[0],testResult[2],resultName])
    
plotROC(roc_data)

In [None]:
for index,testResult in enumerate(allTestResults):
    resultName = f"Mislabelled Images from Test {index} Dataset:"
    if index_in_list(test_dataset_names, index):
        resultName = f"Mislabelled Images from {test_dataset_names[index]} Dataset:"
    printMislabelled(testResult[1], resultName)
# printMislabelled(test1Mislabelled, f"Mislabelled Images from {test1_dataset_name}")
# printMislabelled(test2Mislabelled, f"Mislabelled Images from {test2_dataset_name}")