In [None]:
####$ ONNX-based recognizers $####

# -*- coding: utf-8 -*-
"""
Created on Sun Jan 30 21:29:02 2022

@author: KoRiF
"""
import os
import glob

import numpy as np
import onnx

class ONNXMNIST_Recognizer:
    def __init__(self, directory):
        self.path_to_model = directory
        self.model_filename = os.path.join(self.path_to_model, 'model.onnx' )
        # onnx_model is an in-memory ModelProto
        self.onnx_model = onnx.load(self.model_filename)
        return

    def image2input(self, image):
        import cv2
        if isinstance(image, str):
            image = cv2.imread(image) #'input.png'
        elif not isinstance(image, np.ndarray):
            image = np.array(image)
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        gray = cv2.resize(gray, (28,28)).astype(np.float32)/255
        return np.reshape(gray, (1,1,28,28))

    def run_session(self, model_inputs):
        pass


    def recognize(self, picture_data):
        from io import BytesIO
        imagedata = BytesIO(bytes(picture_data))

        from PIL import Image
        img = Image.open(imagedata)
        img.show()

        img = self.image2input(img)

        inputs = [img]

        outputs = self.run_session(inputs)
        print(outputs)
        print( "recognized as: ", np.argmax(outputs) )
        return int(np.argmax(outputs))

    def selfdiagnostic(self):
        from onnx import numpy_helper
        test_data_dir = 'test_data_set_{ix}'
        test_num = len(glob.glob(os.path.join(self.path_to_model, test_data_dir.format(ix='*'))))

        for no in range(test_num):
            print('------------------------------------------------------\n')

            test_data_path = os.path.join(self.path_to_model, test_data_dir.format(ix=no))
            print('Experiment for dataset from "{folder}"'.format(folder=test_data_path))

            # Load inputs
            inputs = []
            inputs_num = len(glob.glob(os.path.join(test_data_path, 'input_*.pb')))
            for i in range(inputs_num):
                input_file = os.path.join(test_data_path, 'input_{}.pb'.format(i))
                tensor = onnx.TensorProto()
                with open(input_file, 'rb') as f:
                    tensor.ParseFromString(f.read())
                inputs.append(numpy_helper.to_array(tensor))

            # Load reference outputs
            ref_outputs = []
            ref_outputs_num = len(glob.glob(os.path.join(test_data_path, 'output_*.pb')))
            for i in range(ref_outputs_num):
                output_file = os.path.join(test_data_path, 'output_{}.pb'.format(i))
                tensor = onnx.TensorProto()
                with open(output_file, 'rb') as f:
                    tensor.ParseFromString(f.read())
                ref_outputs.append(numpy_helper.to_array(tensor))

            # Run the model on the backend

            #outputs = list(backend.run_model(model, inputs))
            outputs = self.run_session(inputs)


            # Compare the results with reference outputs.
            for ref_o, o in zip(ref_outputs, outputs):
                print("result:", o)
                print("reference output:", ref_o)
                np.testing.assert_almost_equal(ref_o, o, 0)
            print( np.argmax(ref_outputs), "recognized as: ", np.argmax(outputs) )
            return

    def createRecognizer(recognizer_name, directory):
        if recognizer_name == 'TF':
            recognizer = TensorFlowONNXMNIST_Recognizer(directory)
        #elif ... - more ONNX-compatible ML library options 
        else:
            recognizer = RuntimeONNXMNIST_Recognizer(directory)
        return recognizer





class RuntimeONNXMNIST_Recognizer(ONNXMNIST_Recognizer):
    def run_session(self, model_inputs):
        from onnxruntime import InferenceSession
        sess = InferenceSession(self.onnx_model.SerializeToString())
        return sess.run(None, {'Input3' : model_inputs[0]})


class TensorFlowONNXMNIST_Recognizer(ONNXMNIST_Recognizer):
    def run_session(self, model_inputs):
        #import onnx_tf as xtf
        from onnx_tf.backend import prepare
        tf_rep = prepare(self.onnx_model)
        return tf_rep.run(model_inputs)


#class XXX_ONNXMNIST_Recognizer(ONNXMNIST_Recognizer) ...
#... implement recognizers for all supporting ML libraries



In [None]:
####$ application $####
shared_data = None
"""{
'''$
}"""
%store -r shared_data
diagnostic = shared_data['diagnostic'] if (shared_data) else True
"""{
$'''
}"""
"""%
shared_data = dict()
from delphi_module import delphi_form

diagnostic = delphi_form.isPictureEmpty
%"""

"""{
'''$
}"""
backend = shared_data['backend'] if (shared_data) else 'default' #use ONNX Runtime
path_to_model = shared_data['path_to_model'] if (shared_data) else '?'
"""{
$'''
}"""


"""%
backend = delphi_form.backendSwitchTag
shared_data['backend'] = backend
path_to_model = delphi_form.onnxDirectory
shared_data['path_to_model'] = path_to_model
%"""

recognizer = ONNXMNIST_Recognizer.createRecognizer(backend, path_to_model)

if diagnostic:
    recognizer.selfdiagnostic()
    exit
    
"""{
'''$
}"""
mnist_digit_pict = shared_data['mnist_digit_pict']
print(recognizer.recognize(mnist_digit_pict))
"""{
$'''
}"""    
    
    
"""%
mnist_digit_pict = delphi_form.PictureData
shared_data['mnist_digit_pict'] = mnist_digit_pict

delphi_form.RecognizedValue =  recognizer.recognize(mnist_digit_pict)

#%store shared_data
from IPython import get_ipython
ipython = get_ipython()
ipython.magic("store shared_data")
%"""


