In [126]:
from abc import ABC, abstractmethod
from enum import Enum
from typing import Any

import numpy as np
import onnx
import onnxruntime as rt
import tensorflow as tf
import tf2onnx
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType
from sklearn.ensemble import RandomForestClassifier

# Create mock models

In [118]:
# DecisionTree model
random_forest = RandomForestClassifier()
X, y = np.random.rand(10, 784).astype(np.float32), np.random.randint(0, 11, size=(10,))
random_forest.fit(X, y)

onx = convert_sklearn(
    random_forest, initial_types=[("input", FloatTensorType((None, X.shape[1])))],
    options={id(random_forest): {"zipmap": False}},
    target_opset=21
)
with open("../models/random_forest.onnx", "wb") as f:
    f.write(onx.SerializeToString())

# Check model
sess = rt.InferenceSession("../models/random_forest.onnx", providers=["CPUExecutionProvider"])

input_name = sess.get_inputs()[0].name
label_name = sess.get_outputs()[0].name
pred_onx = sess.run([label_name], {input_name: X})
pred_onx

[array([2, 4, 8, 1, 6, 1, 6, 4, 8, 2], dtype=int64)]

In [119]:
# CNN model
inputs = tf.keras.Input(shape=(28, 28, 1))
x = tf.keras.layers.Conv2D(filters=1, kernel_size=3)(inputs)
x = tf.keras.layers.Reshape((-1,))(x)
x = tf.keras.layers.Dense(10, activation='softmax')(x)
model = tf.keras.Model(inputs=inputs, outputs=x)

opt = tf.keras.optimizers.Adam(learning_rate=0.001)
model.compile(optimizer=opt, loss='MSE')
X, y = np.random.rand(10, 28, 28, 1).astype(np.float32), np.random.randint(0, 11, size=(10,))
model.fit(X, y)

input_signature = [tf.TensorSpec([None, 28, 28, 1], tf.float32, name='input')]
# Use from_function for tf functions
onnx_model, _ = tf2onnx.convert.from_keras(model, input_signature, opset=18)
onnx.save(onnx_model, "../models/cnn.onnx")

# Check model
sess = rt.InferenceSession("../models/cnn.onnx", providers=["CPUExecutionProvider"])

input_name = sess.get_inputs()[0].name
label_name = sess.get_outputs()[0].name
pred_onx = sess.run([label_name], {input_name: X})[0]
pred_onx.shape

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 315ms/step - loss: 31.9386


I0000 00:00:1727047847.488959 10906995 devices.cc:76] Number of eligible GPUs (core count >= 8, compute capability >= 0.0): 0 (Note: TensorFlow was not compiled with CUDA or ROCm support)
I0000 00:00:1727047847.519354 10906995 devices.cc:76] Number of eligible GPUs (core count >= 8, compute capability >= 0.0): 0 (Note: TensorFlow was not compiled with CUDA or ROCm support)


(10, 10)

# Digit Classification

In [141]:
class DigitClassificationInterface(ABC):
    def __init__(self):
        self.__shape = (28, 28, 1)

    def predict(self, input_: np.ndarray) -> int:
        if not isinstance(input_, np.ndarray) or input_.shape[:] != self.__shape[:]:
            raise ValueError(f'The input must be a numpy nd-array of shape {self.__shape}')

        input_prepared = self._pre_process(input_)
        proba_prediction = self._predict(input_prepared)
        result = self._post_process(proba_prediction)

        return result

    @abstractmethod
    def _pre_process(self, input_: np.ndarray) -> Any:
        pass

    @abstractmethod
    def _predict(self, input_: Any) -> np.ndarray:
        pass

    def _post_process(self, proba_prediction: np.ndarray) -> int:
        return proba_prediction.argmax().item()


class DigitClassificationRandom(DigitClassificationInterface):
    def __init__(self):
        super().__init__()
        self.__seed = 42

    def _pre_process(self, input_: np.ndarray) -> Any:
        return input_[9: -9, 9: -9]  # center-crop

    def _predict(self, input_: Any) -> np.ndarray:
        prediction = np.random.rand(10)
        prediction_normalized = prediction / prediction.sum()

        return prediction_normalized


class DigitClassificationONNX(DigitClassificationInterface):
    def __init__(self, model_path):
        super().__init__()
        self._session = rt.InferenceSession(model_path, providers=["CPUExecutionProvider"])
        self._input_name = self._session.get_inputs()[0].name
        self._label_name = self._session.get_outputs()[0].name

    def _pre_process(self, input_: np.ndarray) -> Any:
        # insert sample dimension
        return input_.reshape((1, *input_.shape))

    def _predict(self, input_: Any) -> np.ndarray:
        return self._session.run([self._label_name], {self._input_name: input_})[0]

    def _post_process(self, proba_prediction: np.ndarray) -> int:
        # remove sample dimension
        return super()._post_process(proba_prediction[0])


class DigitClassificationCNN(DigitClassificationONNX):
    pass


class DigitClassificationRandomForest(DigitClassificationONNX):
    def __init__(self, model_path):
        super().__init__(model_path)
        # override target output
        self._label_name = self._session.get_outputs()[1].name

    def _pre_process(self, input_: np.ndarray) -> Any:
        input_ = super()._pre_process(input_)
        return input_.reshape((input_.shape[0], -1,))


class PredictionAlgorithms(Enum):
    RAND = DigitClassificationRandom()
    RANDOM_FOREST = DigitClassificationRandomForest(model_path='../models/random_forest.onnx')
    CNN = DigitClassificationCNN(model_path='../models/cnn.onnx')
    
    def __init__(self, value):
        if not isinstance(value, DigitClassificationInterface):
            raise ValueError(f'Invalid type of value: {type(value)}. Value must inherit from DigitClassificationInterface')

class DigitClassifier:
    def __init__(self, algorithm: PredictionAlgorithms):
        self.__algorithm = algorithm.value
    
    def predict(self, input_: np.ndarray):
        return self.__algorithm.predict(input_)

In [165]:
np.random.seed(42)
sample = np.random.rand(28, 28, 1).astype(np.float32)
classifier = DigitClassifier(PredictionAlgorithms.RAND)
classifier.predict(sample)

5