In [1]:
import pandas as pd
import numpy as np

In [344]:
from numpy.testing import assert_allclose
from onnx.version_converter import convert_version
from pandas import DataFrame
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import FunctionTransformer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from skl2onnx import to_onnx

# For the custom converter
from skl2onnx import update_registered_converter
from skl2onnx.common.utils import check_input_and_output_numbers
from skl2onnx.algebra.onnx_ops import OnnxSlice, OnnxSub, OnnxDiv, OnnxMul, OnnxCastLike, OnnxAbs, OnnxIdentity, OnnxConstant
from skl2onnx.helpers import add_onnx_graph
import onnxscript
from onnxscript import opset18 as op
# To check discrepancies
from onnx.reference import ReferenceEvaluator
from onnxruntime import InferenceSession


In [345]:
def create_feature(df: pd.DataFrame) -> None:
    df["air_process_diff"] = abs(df["air_temperature"] - df["process_temperature"])
    df["speed_power"] = (
        df["rotational_speed"]
        * (2 * np.pi / 60)
        / (df["rotational_speed"] * (2 * np.pi / 60) * df["torque"])
    )

    df["torque_power"] = df["torque"] / (
        df["rotational_speed"] * (2 * np.pi / 60) * df["torque"]
    )

    df["tool_process"] = df["tool_wear"] * df["process_temperature"]
    df["temp_ratio"] = df["process_temperature"] / df["air_temperature"]
    df["product_id_num"] = pd.to_numeric(df["product_id"].str.slice(start=1))

    df.drop(columns="product_id", inplace=True)

In [346]:
data = pd.read_csv("./data/raw/total_data.csv")
y = data["machine_failure"]
data = data.drop("machine_failure", axis=1)

In [347]:
from skl2onnx.common.data_types import FloatTensorType, StringTensorType
from skl2onnx.common.data_types import Int64TensorType

def convert_dataframe_schema(df, drop=None):
    inputs = []
    for k, v in zip(df.columns, df.dtypes):
        if drop is not None and k in drop:
            continue
        if v == "int64":
            t = Int64TensorType([None, 1])
        elif v == "float64":
            t = FloatTensorType([None, 1])
        else:
            t = StringTensorType([None, 1])
        inputs.append((k, t))
    return inputs

In [348]:
class AbsDiffCalculator(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def calculate_abs_diff(self, x, y):
        return abs(x - y)

    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        x = X.apply(lambda x: self.calculate_abs_diff(x["air_temperature"], x["process_temperature"]), axis=1)
        return x.values.reshape((-1, 1))
    
def abs_diff_shape_calculator(operator):
    check_input_and_output_numbers(operator, input_count_range=1, output_count_range=1)
    # Gets the input type, the transformer works on any numerical type.
    input_type = operator.inputs[0].type.__class__
    # The first dimension is usually dynamic (batch dimension).
    input_dim = operator.inputs[0].get_first_dimension()
    operator.outputs[0].type = input_type([input_dim, 1])

def abs_diff_converter(scope, operator, container):
    # No need to retrieve the fitted estimator, it is not trained.
    # op = operator.raw_operator
    opv = container.target_opset
    X = operator.inputs[0]

    # 100 * (x-y)/y  --> 100 * (X[0] - X[1]) / X[1]

    zero = np.array([0], dtype=np.int64)
    one = np.array([1], dtype=np.int64)
    two = np.array([2], dtype=np.int64)

    
    # Slice(data, starts, ends, axes)
    x0 = OnnxSlice(X, zero, one, one, op_version=opv)
    x1 = OnnxSlice(X, one, two, one, op_version=opv)

    z = OnnxAbs(
        OnnxSub(x0, x1, op_version=opv),
        op_version=opv,
        output_names=operator.outputs[0]
    )
    z.add_to(scope, container)


update_registered_converter(
    AbsDiffCalculator,
    "AliasAbsDiffCalculator",
    abs_diff_shape_calculator,
    abs_diff_converter,
)

In [349]:
class SpeedConverter(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def calculate_power(self, x, y):
    #         df["speed_power"] = (
    #     df["rotational_speed"]
    #     * (2 * np.pi / 60)
    #     / (df["rotational_speed"] * (2 * np.pi / 60) * df["torque"])
    # )
    #
        return  x * (2 * np.pi / 60) / (x * (2 * np.pi / 60) * y)

    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        x = X.apply(lambda x: self.calculate_power(x["rotational_speed"], x["torque"]), axis=1)
        return x.values.reshape((-1, 1))


def speed_power_shape_calculator(operator):
    check_input_and_output_numbers(operator, input_count_range=1, output_count_range=1)
    # Gets the input type, the transformer works on any numerical type.
    input_type = operator.inputs[0].type.__class__
    # The first dimension is usually dynamic (batch dimension).
    input_dim = operator.inputs[0].get_first_dimension()
    operator.outputs[0].type = input_type([input_dim, 1])

def speed_power_converter(scope, operator, container):
    # Retrieve target opset version
    opv = container.target_opset
    X = operator.inputs[0]

    # Define constants
    zero = np.array([0], dtype=np.int64)
    one = np.array([1], dtype=np.int64)
    two = np.array([2], dtype=np.int64)
    # Constant value for (2 * np.pi / 60)
    const_val = np.array(2 * np.pi / 60, dtype=np.float32)

    
    # Slice operations to get 'rotational_speed' (x0) and 'torque' (x1)
    x0 = OnnxSlice(X, zero, one, one, op_version=opv)
    x1 = OnnxSlice(X, one, two, one, op_version=opv)

    # Calculate the numerator and denominator
    numerator = OnnxMul(x0, const_val, op_version=opv)
    denominator = OnnxMul(numerator, x1, op_version=opv)

    # Perform the division to get the final result
    result = OnnxDiv(numerator, denominator, op_version=opv, output_names=operator.outputs[0])

    result.add_to(scope, container)

update_registered_converter(
    SpeedConverter,
    "AliasSpeedConverter",
    speed_power_shape_calculator,
    speed_power_converter,
)


class TorqueConverter(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def calculate_power(self, x, y):
        return x / (y * (2 * np.pi / 60) * x)

    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        x = X.apply(lambda x: self.calculate_power(x["torque"], x["rotational_speed"]), axis=1)
        return x.values.reshape((-1, 1))
    
def torque_power_shape_calculator(operator):
    check_input_and_output_numbers(operator, input_count_range=1, output_count_range=1)
    # Gets the input type, the transformer works on any numerical type.
    input_type = operator.inputs[0].type.__class__
    # The first dimension is usually dynamic (batch dimension).
    input_dim = operator.inputs[0].get_first_dimension()
    operator.outputs[0].type = input_type([input_dim, 1])

def torque_power_converter(scope, operator, container):
    # Retrieve target opset version
    opv = container.target_opset
    X = operator.inputs[0]

    # Define constants
    zero = np.array([0], dtype=np.int64)
    one = np.array([1], dtype=np.int64)
    two = np.array([2], dtype=np.int64)
    # Constant value for (2 * np.pi / 60)
    const_val = np.array(2 * np.pi / 60, dtype=np.float32)

    # x / (y * (2 * np.pi / 60) * x)
    x0 = OnnxSlice(X, zero, one, one, op_version=opv)
    x1 = OnnxSlice(X, one, two, one, op_version=opv)

    # Calculate the numerator and denominator
    denominator = OnnxMul(OnnxMul(x1, const_val, op_version=opv), x0, op_version=opv)

    # Perform the division to get the final result
    result = OnnxDiv(x0, denominator, op_version=opv, output_names=operator.outputs[0])

    result.add_to(scope, container)

update_registered_converter(
    TorqueConverter,
    "AliasTorqueConverter",
    torque_power_shape_calculator,
    torque_power_converter,
)


    # df["tool_process"] = df["tool_wear"] * df["process_temperature"]


class MulCalculator(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def calculate_multiply(self, x, y):
        return x * y

    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        x = X.apply(lambda x: self.calculate_multiply(x["tool_wear"], x["process_temperature"]), axis=1)
        return x.values.reshape((-1, 1))
    
def multiply_shape_calculator(operator):
    check_input_and_output_numbers(operator, input_count_range=1, output_count_range=1)
    # Gets the input type, the transformer works on any numerical type.
    input_type = operator.inputs[0].type.__class__
    # The first dimension is usually dynamic (batch dimension).
    input_dim = operator.inputs[0].get_first_dimension()
    operator.outputs[0].type = input_type([input_dim, 1])

def multiply_converter(scope, operator, container):
    # Retrieve target opset version
    opv = container.target_opset
    X = operator.inputs[0]

    # Define constants
    zero = np.array([0], dtype=np.int64)
    one = np.array([1], dtype=np.int64)
    two = np.array([2], dtype=np.int64)

    # x / (y * (2 * np.pi / 60) * x)
    x0 = OnnxSlice(X, zero, one, one, op_version=opv)
    x1 = OnnxSlice(X, one, two, one, op_version=opv)

    # Calculate the numerator and denominator
    z = OnnxMul(x0, x1, op_version=opv, output_names=operator.outputs[0])

    z.add_to(scope, container)

update_registered_converter(
    MulCalculator,
    "AliasMulCalculator",
    multiply_shape_calculator,
    multiply_converter,
)

    # df["temp_ratio"] = df["process_temperature"] / df["air_temperature"]

class DivCalculator(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def calculate_divide(self, x, y):
        return x / y

    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        x = X.apply(lambda x: self.calculate_divide(x["process_temperature"], x["air_temperature"]), axis=1)
        return x.values.reshape((-1, 1))
    
def divide_shape_calculator(operator):
    check_input_and_output_numbers(operator, input_count_range=1, output_count_range=1)
    # Gets the input type, the transformer works on any numerical type.
    input_type = operator.inputs[0].type.__class__
    # The first dimension is usually dynamic (batch dimension).
    input_dim = operator.inputs[0].get_first_dimension()
    operator.outputs[0].type = input_type([input_dim, 1])

def divide_converter(scope, operator, container):
    # Retrieve target opset version
    opv = container.target_opset
    X = operator.inputs[0]

    # Define constants
    zero = np.array([0], dtype=np.int64)
    one = np.array([1], dtype=np.int64)
    two = np.array([2], dtype=np.int64)

    # x / (y * (2 * np.pi / 60) * x)
    x0 = OnnxSlice(X, zero, one, one, op_version=opv)
    x1 = OnnxSlice(X, one, two, one, op_version=opv)

    # Calculate the numerator and denominator
    z = OnnxDiv(x0, x1, op_version=opv, output_names=operator.outputs[0])

    z.add_to(scope, container)

update_registered_converter(
    DivCalculator,
    "AliasDivCalculator",
    divide_shape_calculator,
    divide_converter,
)
    # df["product_id_num"] = pd.to_numeric(df["product_id"].str.slice(start=1))
def preprocess_product_id(df):
    df['product_id_num'] = pd.to_numeric(df['product_id'].str.slice(start=1))
    df.drop("product_id",axis=1, inplace=True)

In [350]:
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, OrdinalEncoder
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
import skl2onnx
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType, StringTensorType
from skl2onnx.common.data_types import Int64TensorType


In [351]:
categorical_features = ["machine_type"]
numerical_features = ['air_temperature','process_temperature', 'rotational_speed', 'torque', 'tool_wear']

numeric_transformer = Pipeline(
    steps=[("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler())]
)

categorical_transformer = Pipeline(
    steps=[
        # --- SimpleImputer is not available for strings in ONNX-ML specifications.
        # ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
        ("onehot", OrdinalEncoder(handle_unknown="error"))
    ]
)

In [382]:
# Define ColumnTransformer to include the new features and apply numeric transformer
mapper = ColumnTransformer(
    transformers=[
        ("air_process_diff", Pipeline([
            ("calculator", AbsDiffCalculator()),  # Calculating the feature
            ("scaler", numeric_transformer)       # Scaling the feature
        ]), ["air_temperature", "process_temperature"]),
        
        ("speed_power", Pipeline([
            ("calculator", SpeedConverter()),     # Calculating the feature
            ("scaler", numeric_transformer)       # Scaling the feature
        ]), ["rotational_speed", "torque"]),
        
        ("torque_power", Pipeline([
            ("calculator", TorqueConverter()),    # Calculating the feature
            ("scaler", numeric_transformer)       # Scaling the feature
        ]), ["torque", "rotational_speed"]),
        
        ("tool_process", Pipeline([
            ("calculator", MulCalculator()),      # Calculating the feature
            ("scaler", numeric_transformer)       # Scaling the feature
        ]), ["tool_wear", "process_temperature"]),
        
        ("temp_ratio", Pipeline([
            ("calculator", DivCalculator()),      # Calculating the feature
            ("scaler", numeric_transformer)       # Scaling the feature
        ]), ["process_temperature", "air_temperature"]),
        
        # Apply numeric transformer directly to existing numerical features
        ("num", numeric_transformer, numerical_features),
        
        # Apply categorical transformer to categorical features
        ("cat", categorical_transformer, categorical_features)
    ],
    remainder='drop',
    verbose_feature_names_out=False
)
pipe_tr = Pipeline([("mapper", mapper), ("classifier", RandomForestClassifier())])
pipe_tr.fit(data[:5000], y[:5000])

In [383]:
selected_df = data[categorical_features + numerical_features].copy()
initial_inputs = convert_dataframe_schema(selected_df)

In [384]:
initial_inputs

[('machine_type', StringTensorType(shape=[None, 1])),
 ('air_temperature', FloatTensorType(shape=[None, 1])),
 ('process_temperature', FloatTensorType(shape=[None, 1])),
 ('rotational_speed', Int64TensorType(shape=[None, 1])),
 ('torque', FloatTensorType(shape=[None, 1])),
 ('tool_wear', Int64TensorType(shape=[None, 1]))]

In [385]:
from skl2onnx import convert_sklearn
try:
    model_onnx = convert_sklearn(
        pipe_tr, "pipe_tr", initial_inputs, target_opset=12

    )
except Exception as e:
    print(e)

In [386]:
# And save.
with open("pipe_tr.onnx", "wb") as f:
    f.write(model_onnx.SerializeToString())

In [387]:
print("predict", pipe_tr.predict(selected_df[10000:10010]))
print("predict_proba", pipe_tr.predict_proba(selected_df[10000:10010]))

predict [0 0 0 0 0 0 0 0 0 0]
predict_proba [[1.   0.  ]
 [1.   0.  ]
 [1.   0.  ]
 [1.   0.  ]
 [1.   0.  ]
 [1.   0.  ]
 [1.   0.  ]
 [0.86 0.14]
 [1.   0.  ]
 [1.   0.  ]]


In [388]:
inputs = {c: selected_df[c].values for c in selected_df.columns}
for c in ["air_temperature", "process_temperature", "rotational_speed",	"torque",	"tool_wear"]:
    if c == "rotational_speed" or c == "tool_wear":
        inputs[c] = inputs[c].astype(np.int64)
    else:
        inputs[c] = inputs[c].astype(np.float32)
for k in inputs:
    inputs[k] = inputs[k].reshape((inputs[k].shape[0], 1))

In [390]:
sess = InferenceSession("pipe_tr.onnx", providers=["CPUExecutionProvider"])
pred_onx = sess.run(None, inputs)
print("predict", pred_onx[0][10000:10010])
print("predict_proba", pred_onx[1][10000:10010])

predict [0 0 0 0 0 0 0 0 0 0]
predict_proba [{0: 1.0, 1: 0.0}, {0: 1.0, 1: 0.0}, {0: 1.0, 1: 0.0}, {0: 1.0, 1: 0.0}, {0: 1.0, 1: 0.0}, {0: 1.0, 1: 0.0}, {0: 1.0, 1: 0.0}, {0: 0.8600000143051147, 1: 0.14000000059604645}, {0: 1.0, 1: 0.0}, {0: 1.0, 1: 0.0}]
