In [1]:
import os
import json
import pandas as pd
import numpy as np

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassificationModel
from pyspark.ml import Pipeline

from onnxmltools import convert_sparkml
from onnxmltools.convert.sparkml.utils import buildInitialTypesSimple
import onnxruntime

### Setup Spark Context

In [2]:
conf = SparkConf()\
        .setMaster("local[*]")\
        .set("spark.cores.max", "2")\
        .setAppName("Model Inferencing")
sc = SparkContext.getOrCreate(conf=conf)
spark = SQLContext(sc)

### Load Data

In [3]:
pandas_df = pd.read_parquet("training_set.parquet").drop(["isFraud", "FROMACCTNBR"], axis=1)
df = spark.read.parquet("v3io://users/nick/onnx/training_set.parquet")
df = df.drop("isFraud")
df = df.drop("FROMACCTNBR")

### Load Spark Pipeline

In [4]:
numericCols = [
     'CARDTXNNBR',
     'isPinned',
     'TXNAMT',
     '2ndLastTrxnAmnt',
     'scenario1_withoutMerchant',
     'scenario1_withMerchant',
     'isGoogle',
     'isPlayStation',
     'isAmazon',
     'isApple',
     'isMicrosoft',
     'isFbPay',
     'isCashApp',
     'isPaypal',
     'isVenmo',
     'isWellumpay',
     'secanrio3NoOfTrxn',
     'secanrio3_Label',
     'scenario4_2trxn',
     'scenario5_2trxn',
     'diffLastTrxn',
     'isNewCustomer']
assembler = VectorAssembler(inputCols=numericCols, outputCol="features").setHandleInvalid("skip")
model = RandomForestClassificationModel.load("v3io://users/nick/onnx/model_v2")
pipeline = Pipeline(stages=[assembler, model])
pipeline_model = pipeline.fit(df)

### Export Pipeline to ONNX

In [5]:
initial_types = buildInitialTypesSimple(df)
onnx_model = convert_sparkml(pipeline_model, 'Pyspark model', initial_types, spark_session = spark)

The maximum opset needed by this model is only 1.
The maximum opset needed by this model is only 4.


In [6]:
with open(os.path.join("model.onnx"), "wb") as f:
    f.write(onnx_model.SerializeToString())

### Load ONNX Model

In [7]:
session = onnxruntime.InferenceSession("model.onnx")
outputs = [o.name for o in session.get_outputs()]
inputs = session.get_inputs()

### Performance Comparison

In [8]:
record = pandas_df.to_dict(orient="records")[0]
record

{'CARDTXNNBR': 269633690,
 'isPinned': 0,
 'TXNAMT': 2.0,
 '2ndLastTrxnAmnt': 1.0,
 'scenario1_withoutMerchant': 0,
 'scenario1_withMerchant': 0,
 'isGoogle': 0,
 'isPlayStation': 0,
 'isAmazon': 0,
 'isApple': 0,
 'isMicrosoft': 0,
 'isFbPay': 0,
 'isCashApp': 0,
 'isPaypal': 0,
 'isVenmo': 0,
 'isWellumpay': 0,
 'secanrio3NoOfTrxn': 0,
 'secanrio3_Label': 0,
 'scenario4_2trxn': 0,
 'scenario5_2trxn': 0,
 'diffLastTrxn': 0,
 'isNewCustomer': 0}

#### Spark

In [9]:
%%timeit
pandas_inf_df = pd.DataFrame([record])
spark_inf_df = spark.createDataFrame(pandas_inf_df)
pred = pipeline_model.transform(spark_inf_df)
pred_json = pred.toJSON().collect()
pred_dict = json.loads(pred_json[0])
prediction, probability = pred_dict["prediction"], pred_dict["probability"]["values"]

230 ms ± 17.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [10]:
# Remove timeit
# print(f"Prediction: {prediction}, Probability: {probability}")

#### ONNX

In [11]:
%%timeit
pandas_inf_df = pd.DataFrame([record])
input_data= {i.name: v for i, v in zip(inputs, pandas_inf_df.values.reshape(len(inputs),1,1).astype(np.float32))}

pred = session.run(output_names = outputs, input_feed=input_data)
prediction, probability = [i[0] for i in pred]

1.22 ms ± 32 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


In [12]:
# Remove timeit
# print(f"Prediction: {prediction}, Probability: {probability}")

### Stop Spark

In [13]:
sc.stop()