# Spark-ONNX

Note: this notebook uses multiple kernels/languages:

* [sos-notebook](https://github.com/vatlab/sos-notebook) : super-kernel that allows multiple sub-kernels
    * [toree](https://toree.incubator.apache.org/) : scala kernel with spark
    * [python](https://jupyter.org/) : default jupyter python kernel

In [1]:
# export iris dataset from sklearn
from sklearn.datasets import load_iris

import pandas as pd

iris = load_iris()
X, y = iris.data, iris.target

df = pd.DataFrame(X)
df['label'] = y

df.to_csv('data/iris.csv', index=None)

In [2]:
println(spark.version)

2.3.2


# Train Spark Model in Spark-Scala

In [3]:

val dfLoad = spark.read
    .format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load("data/iris.csv")
    
dfLoad.printSchema()
dfLoad.show(5)

root
 |-- 0: double (nullable = true)
 |-- 1: double (nullable = true)
 |-- 2: double (nullable = true)
 |-- 3: double (nullable = true)
 |-- label: integer (nullable = true)

+---+---+---+---+-----+
|  0|  1|  2|  3|label|
+---+---+---+---+-----+
|5.1|3.5|1.4|0.2|    0|
|4.9|3.0|1.4|0.2|    0|
|4.7|3.2|1.3|0.2|    0|
|4.6|3.1|1.5|0.2|    0|
|5.0|3.6|1.4|0.2|    0|
+---+---+---+---+-----+
only showing top 5 rows



dfLoad = [0: double, 1: double ... 3 more fields]


[0: double, 1: double ... 3 more fields]

In [4]:
import org.apache.spark.ml.feature.VectorAssembler

val assembler = new VectorAssembler()
    .setInputCols(Array("0", "1", "2", "3"))
    .setOutputCol("features")
    
val df = assembler.transform(dfLoad)
    .select("features", "label")

df.show(5)

+-----------------+-----+
|         features|label|
+-----------------+-----+
|[5.1,3.5,1.4,0.2]|    0|
|[4.9,3.0,1.4,0.2]|    0|
|[4.7,3.2,1.3,0.2]|    0|
|[4.6,3.1,1.5,0.2]|    0|
|[5.0,3.6,1.4,0.2]|    0|
+-----------------+-----+
only showing top 5 rows



assembler = vecAssembler_f5fc70deebd6
df = [features: vector, label: int]


[features: vector, label: int]

In [5]:
val Array(train, test) = df.randomSplit(Array(0.7, 0.3))

train = [features: vector, label: int]
test = [features: vector, label: int]


[features: vector, label: int]

In [6]:
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.{Pipeline, PipelineModel}

val lr = new LogisticRegression()
val pipe = new Pipeline()
    .setStages(Array(lr))

val model = pipe.fit(train)
model.write.overwrite().save("models/spark-lr")

val predictions = model.transform(test)
predictions.show(5)

+-----------------+-----+--------------------+--------------------+----------+
|         features|label|       rawPrediction|         probability|prediction|
+-----------------+-----+--------------------+--------------------+----------+
|[4.4,2.9,1.4,0.2]|    0|[34.8163862311835...|[0.99999999999996...|       0.0|
|[4.6,3.1,1.5,0.2]|    0|[37.8819563292555...|[0.99999999999999...|       0.0|
|[4.6,3.2,1.4,0.2]|    0|[40.9887864583076...|[1.0,8.5418181641...|       0.0|
|[4.8,3.0,1.4,0.3]|    0|[33.5381692382255...|[0.99999999999974...|       0.0|
|[4.8,3.4,1.9,0.2]|    0|[41.3059111545101...|[1.0,1.5760202039...|       0.0|
+-----------------+-----+--------------------+--------------------+----------+
only showing top 5 rows



lr = logreg_7bb8d7a88798
pipe = pipeline_160e25e5b788
model = pipeline_160e25e5b788
predictions = [features: vector, label: int ... 3 more fields]


[features: vector, label: int ... 3 more fields]

In [7]:
import org.apache.spark.mllib.evaluation.MulticlassMetrics

val metrics = new MulticlassMetrics(predictions.select("label", "prediction").as[(Double, Double)].rdd)
println(s"test set accuracy: ${metrics.accuracy}")

test set accuracy: 0.9761904761904762


metrics = org.apache.spark.mllib.evaluation.MulticlassMetrics@13ed6538


org.apache.spark.mllib.evaluation.MulticlassMetrics@13ed6538

# Convert to onnx model

https://github.com/onnx/onnxmltools

https://github.com/onnx/onnxmltools/tree/master/onnxmltools/convert/sparkml

https://github.com/onnx/onnxmltools/blob/master/docs/examples/plot_convert_sparkml.py


```bash
pip install onnxmltools
```

In [8]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [10]:
# load and test model using pyspark
from pyspark.ml import PipelineModel
from pyspark.ml.feature import VectorAssembler

# load model
model = PipelineModel.load("models/spark-lr")

# load and reformat data
dfLoad = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("data/iris.csv")
    
assembler = VectorAssembler() \
    .setInputCols(["0", "1", "2", "3"]) \
    .setOutputCol("features")
    
df = assembler.transform(dfLoad) \
    .select("features", "label")

# predict
predictions = model.transform(df)

predictions.show(5)

+-----------------+-----+--------------------+--------------------+----------+
|         features|label|       rawPrediction|         probability|prediction|
+-----------------+-----+--------------------+--------------------+----------+
|[5.1,3.5,1.4,0.2]|    0|[45.5310525224865...|[1.0,3.3008935103...|       0.0|
|[4.9,3.0,1.4,0.2]|    0|[34.5192147381927...|[0.99999999999986...|       0.0|
|[4.7,3.2,1.3,0.2]|    0|[41.1325197544599...|[1.0,1.1391290863...|       0.0|
|[4.6,3.1,1.5,0.2]|    0|[37.8819563292555...|[0.99999999999999...|       0.0|
|[5.0,3.6,1.4,0.2]|    0|[48.4941493553864...|[1.0,4.2034642524...|       0.0|
+-----------------+-----+--------------------+--------------------+----------+
only showing top 5 rows



In [11]:
from pyspark.mllib.evaluation import MulticlassMetrics

metrics = MulticlassMetrics(predictions.select("label", "prediction").rdd.map(lambda lp: (lp.prediction, float(lp.label))))
print(f"full set accuracy: {metrics.accuracy}")

full set accuracy: 0.98


In [12]:
# convert model and save in onnx format
from onnxmltools.convert.common.data_types import FloatTensorType
from onnxmltools import convert_sparkml

initial_types = [ 
    #("label", FloatTensorType([None, 1])), # error if provided
    ("features", FloatTensorType([None, 4])) # why 4??
]

onnx_model = convert_sparkml(model, 'My Sparkml Pipeline', initial_types)

with open("models/spark-lr.onnx", "wb") as f:
    f.write(onnx_model.SerializeToString())

{'classlabels_ints': [0, 1, 2],
 'coefficients': [-5.433780543151039,
                  24.197187785847923,
                  -6.871113504673745,
                  -15.244235542823668,
                  3.9842347508985205,
                  -10.01934063212504,
                  -0.3318504026548301,
                  0.5345400721224732,
                  1.449545792252519,
                  -14.177847153722885,
                  7.2029639073285745,
                  14.709695470701195],
 'intercepts': [1.2215820571970788, 15.779321387299293, -17.000903444496373],
 'multi_class': 1,
 'name': 'LinearClassifier',
 'post_transform': 'LOGISTIC'}


In [24]:
# convert data
from sklearn.datasets import load_iris

iris = load_iris()
X, y = iris.data, iris.target

In [20]:
# transform with onnx model
import onnxruntime
import numpy as np

sess = onnxruntime.InferenceSession(onnx_model.SerializeToString())

input_name = sess.get_inputs()[0].name
label_name = sess.get_outputs()[0].name

output = sess.run(None, {input_name: X.astype(np.float32)})[0]

In [21]:
output

array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
       2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
       2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2], dtype=int64)

In [22]:
y

array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
       2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
       2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2])

In [23]:
# evaluate predictions
# note: portion of predictions are contained in the training set
import sklearn.metrics

print("Accuracy: {}".format(sklearn.metrics.accuracy_score(y, output)))
print("Bal. Acc: {}".format(sklearn.metrics.balanced_accuracy_score(y, output)))

Accuracy: 0.98
Bal. Acc: 0.98


In [18]:
# predicted vs actual
sklearn.metrics.confusion_matrix(y, output)

array([[50,  0,  0],
       [ 0, 48,  2],
       [ 0,  1, 49]])

In [19]:
# spark transform vs onnx transform
sklearn.metrics.confusion_matrix(predictions.select("prediction").collect(), output) # identical!

array([[50,  0,  0],
       [ 0, 49,  0],
       [ 0,  0, 51]])