# Iris Flower Classification and Serving Using SkLearn, HopsML, and the Hopsworks Feature Store

In this notebook we will, 

1. Load the Iris Flower dataset from HopsFS
2. Do feature engineering on the dataset
3. Save the features to the feature store
4. Read the feature data from the feature store
5. Train a KNN Model using SkLearn
6. Save the trained model to HopsFS
7. Launch a serving instance to serve the trained model
8. Send some prediction requests to the served model
9. Monitor the predictions through Kafka


### Imports

In [9]:
from sklearn.neighbors import KNeighborsClassifier
import joblib
from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import IntegerType
import numpy as np
import time
import json
from hops import kafka, hdfs, featurestore, serving
from confluent_kafka import Producer, Consumer, KafkaError
import random

### Load Dataset

In [10]:
project_path = hdfs.project_path()
iris_df = spark.read.format("csv").option("header", "true").option("inferSchema", True).load(
    project_path + "TourData/iris/iris.csv")

In [11]:
iris_df.printSchema()

root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- variety: string (nullable = true)

### Feature  Engineering

The dataset is already quite well prepared, the only thing we need to for feature engineering is to convert the `variety` column to numeric and save a lookup table so that we later on can convert the numeric representation back to the categorical representation.

In [12]:
encoder = StringIndexer(inputCol="variety", outputCol="label")
model = encoder.fit(iris_df)
iris_df1 = model.transform(iris_df)
lookup_df = iris_df1.select(["variety", "label"]).distinct()
iris_df2 = iris_df1.drop("variety")
iris_df3 = iris_df2.withColumn("label", iris_df2["label"].cast(IntegerType()))
iris_df3.printSchema()

root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- label: integer (nullable = true)

In [13]:
iris_df3.show(5)

+------------+-----------+------------+-----------+-----+
|sepal_length|sepal_width|petal_length|petal_width|label|
+------------+-----------+------------+-----------+-----+
|         5.1|        3.5|         1.4|        0.2|    2|
|         4.9|        3.0|         1.4|        0.2|    2|
|         4.7|        3.2|         1.3|        0.2|    2|
|         4.6|        3.1|         1.5|        0.2|    2|
|         5.0|        3.6|         1.4|        0.2|    2|
+------------+-----------+------------+-----------+-----+
only showing top 5 rows

In [14]:
lookup_df.show(3)

+----------+-----+
|   variety|label|
+----------+-----+
| Virginica|  0.0|
|Versicolor|  1.0|
|    Setosa|  2.0|
+----------+-----+

### Save Features to the Feature Store

We can save two feature groups (hive tables), one called `iris_features` that contains the iris features and the corresponding numeric label, and another feature group called `iris_labels_lookup` for converting the numeric iris label back to categorical.

**Note**: To be able to run the feature store code, you first have to enable the Feature Store Service in your project. To do this, go to the "Settings" tab in your project, select the feature store service and click "Save". 

In [15]:
featurestore.create_featuregroup(iris_df3, "iris_features")

computing descriptive statistics for : iris_features, version: 1
computing feature correlation for: iris_features, version: 1
computing feature histograms for: iris_features, version: 1
computing cluster analysis for: iris_features, version: 1
Running sql: use demo_deep_learning_admin000_featurestore
Feature group created successfully

In [16]:
featurestore.create_featuregroup(lookup_df, "iris_labels_lookup", feature_correlation=False, 
                                 feature_histograms=False, cluster_analysis=False)

computing descriptive statistics for : iris_labels_lookup, version: 1
Running sql: use demo_deep_learning_admin000_featurestore
Feature group created successfully

### Read the Iris Training Dataset from the Feature Store

In [17]:
train_df = featurestore.get_featuregroup("iris_features", dataframe_type="pandas")

Running sql: use demo_deep_learning_admin000_featurestore
SQL string for the query created successfully
Running sql: SELECT * FROM iris_features_1

In [18]:
train_df.describe()

       sepal_length  sepal_width  petal_length  petal_width       label
count    150.000000   150.000000    150.000000   150.000000  150.000000
mean       5.843333     3.057333      3.758000     1.199333    1.000000
std        0.828066     0.435866      1.765298     0.762238    0.819232
min        4.300000     2.000000      1.000000     0.100000    0.000000
25%        5.100000     2.800000      1.600000     0.300000    0.000000
50%        5.800000     3.000000      4.350000     1.300000    1.000000
75%        6.400000     3.300000      5.100000     1.800000    2.000000
max        7.900000     4.400000      6.900000     2.500000    2.000000

In [19]:
x_df = train_df[['sepal_length', 'sepal_width', 'petal_length', 'petal_width']]
y_df = train_df[["label"]]
X = x_df.values
y = y_df.values.ravel()

### Train a KNN Model using the Feature Data

In [20]:
iris_knn = KNeighborsClassifier()
iris_knn.fit(X, y)

KNeighborsClassifier(algorithm='auto', leaf_size=30, metric='minkowski',
                     metric_params=None, n_jobs=None, n_neighbors=5, p=2,
                     weights='uniform')

### Save the Trained Model to HopsFS

In [21]:
joblib.dump(iris_knn, "iris_knn.pkl")
hdfs.copy_to_hdfs("iris_knn.pkl", "Resources", overwrite=True)

Started copying local path iris_knn.pkl to hdfs path hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Resources/iris_knn.pkl

Finished copying

### Constants

In [22]:
SERVING_NAME = "IrisFlowerClassifier"
SERVING_VERSION = 1

###  Export the Trained Model to Hopsworks Model Directory

It is not required but it is a best-practice to put trained models in the **Models** dataset in Hopsworks, indicating the versions of a model with the directory structure. There is a utility function in the `hops` module for doing this. (You can do it manually also with the `hdfs` module and file operations, or using the Hopsworks UI and drag-and-drop).

Below is the code for exporting the model saved in `Resources/iris_knn.pkl` to `Models/irisFlowerClassifier/1/iris_knn.pkl` using the hops module called `serving`.

In [23]:
model_path = "Resources/iris_knn.pkl"
serving.export(model_path, SERVING_NAME, SERVING_VERSION, overwrite=True)

'hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Models/IrisFlowerClassifier/1/'

### Serve the Trained Model

To serve a SkLearn Model, write a python script that downloads the HDFS model in the constructor and saves it as a class variable and then implements the `Predict` class and the methods `predict`, `classify` and `regress`, like this:

```python
from sklearn.externals import joblib
from hops import hdfs
import os

class Predict(object):

    def __init__(self):
        """ Initializes the serving state, reads a trained model from HDFS"""
        self.model_path = "Models/iris_knn.pkl"
        print("Copying SKLearn model from HDFS to local directory")
        hdfs.copy_to_local(self.model_path)
        print("Reading local SkLearn model for serving")
        self.model = joblib.load("./iris_knn.pkl")
        print("Initialization Complete")


    def predict(self, inputs):
        """ Serves a prediction request usign a trained model"""
        return self.model.predict(inputs).tolist() # Numpy Arrays are note JSON serializable

    def classify(self, inputs):
        """ Serves a classification request using a trained model"""
        return "not implemented"

    def regress(self, inputs):
        """ Serves a regression request using a trained model"""
        return "not implemented"
```

Then upload this python script to some folder in your project and go to the "Model Serving" service in Hopsworks:

![sklearn_serving1.png](./../../images/sklearn_serving1.png)

Then click on "create serving" and configure your serving:

![sklearn_serving2.png](./../../images/sklearn_serving2.png)

Once the serving is created, you can start it and view information like server-logs and which kafka topic it is logging inference requests to.

![sklearn_serving3.png](./../../images/sklearn_serving3.png)

A prepared example script for serving sklearn irisFlowerClassifier can be found here: https://github.com/logicalclocks/hops-examples/tree/master/tensorflow/notebooks/Serving 

It is a best-practice to put the script together with the trained model, below is the code for exporting the script from `Jupyter/Serving/sklearn/iris_flower_classifier.py` to `Models/irisFlowerClassifier/1/iris_flower_classifier.py`.

In [24]:
script_path = "Jupyter/Serving/sklearn/iris_flower_classifier.py"
serving.export(script_path, SERVING_NAME, SERVING_VERSION, overwrite=True)

'hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Models/IrisFlowerClassifier/1/'

Once all the files have been exported to the model directory, we can create a serving instance that points to the model files using `serving.create_or_update()`

In [25]:
for p in hdfs.ls("Models/" + SERVING_NAME, recursive=True):
    print(p)

hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Models/IrisFlowerClassifier/1
hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Models/IrisFlowerClassifier/1/iris_flower_classifier.py
hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Models/IrisFlowerClassifier/1/iris_knn.pkl

To update an existing serving, set the argument `update=True` in `serving.create_or_update()`. To delete an existing serving, call `serving.delete()`. 

In [26]:
script_path = "Models/" + SERVING_NAME + "/" + str(SERVING_VERSION) + "/iris_flower_classifier.py"
if serving.exists(SERVING_NAME):
    serving.delete(SERVING_NAME)
serving.create_or_update(script_path, SERVING_NAME, serving_type="SKLEARN", 
                                 model_version=SERVING_VERSION)

No serving with name IrisFlowerClassifier was found in the project demo_deep_learning_admin000
Creating a serving for model IrisFlowerClassifier ...
Serving for model IrisFlowerClassifier successfully created

After the serving have been created, you can find it in the Hopsworks UI by going to the "Model Serving" tab. You can also use the python module to query the Hopsworks REST API about information on the existing servings using methods like: 

- `get_servings()`
- `get_serving_id(serving_name)`
- `get_serving_artifact_path(serving_name)`
- `get_serving_type(serving_name)`
- `get_serving_version(serving_name)`
- `get_serving_kafka_topic(serving_name)`
- `get_serving_status(serving_name)`
- `exist(serving_name)`


In [27]:
for s in serving.get_all():
    print(s.name)

IrisFlowerClassifier
mnist

In [28]:
serving.get_id(SERVING_NAME)

8

In [29]:
serving.get_artifact_path(SERVING_NAME)

'/Projects/demo_deep_learning_admin000/Models/IrisFlowerClassifier/1/iris_flower_classifier.py'

In [30]:
serving.get_type(SERVING_NAME)

'SKLEARN'

In [31]:
serving.get_version(SERVING_NAME)

1

In [32]:
serving.get_kafka_topic(SERVING_NAME)

'IrisFlowerClassifier-inf9621'

In [33]:
serving.get_status(SERVING_NAME)

'Stopped'

You can start/stop the serving instance either from the Hopsworks UI or from the python/REST API as demonstrated below

In [34]:
serving.start(SERVING_NAME)

Starting serving with name: IrisFlowerClassifier...
Serving with name: IrisFlowerClassifier successfully started

In [35]:
time.sleep(5) # Let the serving startup correctly

In [36]:
serving.stop(SERVING_NAME)

Stopping serving with name: IrisFlowerClassifier...
Serving with name: IrisFlowerClassifier successfully stopped

In [37]:
time.sleep(5) # Let the serving stop and cleanup correctly

In [38]:
serving.start(SERVING_NAME)

Starting serving with name: IrisFlowerClassifier...
Serving with name: IrisFlowerClassifier successfully started

In [39]:
time.sleep(10) # Let the serving startup correctly before sending inference requests

### Send Prediction Requests to the Served Model using Hopsworks REST API

#### Constants

In [40]:
TOPIC_NAME = serving.get_kafka_topic(SERVING_NAME)
NUM_FEATURES = 4

For making inference requests you can use the utility method `serving.make_inference_request`

In [41]:
for i in range(20):
    data = {"inputs" : [[random.uniform(1, 8) for i in range(NUM_FEATURES)]]}
    response = serving.make_inference_request(SERVING_NAME, data)
    print(response)

{'predictions': [0]}
{'predictions': [0]}
{'predictions': [1]}
{'predictions': [1]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [1]}
{'predictions': [0]}
{'predictions': [1]}
{'predictions': [2]}
{'predictions': [0]}
{'predictions': [2]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}

### Monitor Prediction Requests and Responses using Kafka

All prediction requestst are automatically logged to Kafka which means that you can keep track for yourr model's performance and its predictions in a scalable manner.

**Note**: The code below (in particular the avro parsing) have only been tested on Python 2.7

##### Setup Kafka Consumer and Subscribe to the Topic containing the Inference Logs

In [42]:
config = kafka.get_kafka_default_config()
config['default.topic.config'] = {'auto.offset.reset': 'earliest'}
consumer = Consumer(config)
topics = [TOPIC_NAME]
consumer.subscribe(topics)

##### Read Kafka Avro Schema From Hopsworks and setup an Avro Reader

In [43]:
json_schema = kafka.get_schema(TOPIC_NAME)
avro_schema = kafka.convert_json_schema_to_avro(json_schema)

##### Read Lookup Table from the Feature Store for Converting Numerical Labels to Categorical

In [44]:
iris_labels_lookup_df = featurestore.get_featuregroup("iris_labels_lookup", dataframe_type="pandas")

Running sql: use demo_deep_learning_admin000_featurestore
SQL string for the query created successfully
Running sql: SELECT * FROM iris_labels_lookup_1

##### Read 10 Messages from the Kafka Topic, parse them with the Avro Schema and print the results

In [45]:
for i in range(0, 10):
    msg = consumer.poll(timeout=1.0)
    if msg is not None:
        value = msg.value()
        try:
            event_dict = kafka.parse_avro_msg(value, avro_schema)
            prediction = json.loads(event_dict["inferenceResponse"])["predictions"][0]
            prediction_label = iris_labels_lookup_df.loc[iris_labels_lookup_df['label'] == prediction, 
                                                         'variety'].iloc[0]
            print("serving: {}, version: {}, timestamp: {},"\
                  "\nrequest: {},\nprediction:{}, prediction_label:{}, http_response_code: {},"\
                  " serving_type: {}\n".format(
                                                                   event_dict["modelName"],
                                                                   event_dict["modelVersion"],
                                                                   event_dict["requestTimestamp"],
                                                                   event_dict["inferenceRequest"],
                                                                   prediction,
                                                                   prediction_label,
                                                                   event_dict["responseHttpCode"],
                                                                   event_dict["servingType"]
            ))
        except Exception as e:
            print("A message was read but there was an error parsing it")
            print(e)
    else:
        print("timeout.. no more messages to read from topic")

serving: IrisFlowerClassifier, version: 1, timestamp: 1560339963528,
request: {"inputs": [[4.441035498836311, 1.0405422685903551, 4.950509272511928, 7.201979298717179]]},
prediction:0, prediction_label:Virginica, http_response_code: 200, serving_type: SKLEARN

serving: IrisFlowerClassifier, version: 1, timestamp: 1560339963656,
request: {"inputs": [[1.1031470267993444, 7.509936088602124, 7.228032694461817, 3.7487129040092997]]},
prediction:0, prediction_label:Virginica, http_response_code: 200, serving_type: SKLEARN

serving: IrisFlowerClassifier, version: 1, timestamp: 1560339963767,
request: {"inputs": [[2.235706745836112, 3.093572695191493, 2.7323921162142804, 2.0689962421316657]]},
prediction:1, prediction_label:Versicolor, http_response_code: 200, serving_type: SKLEARN

serving: IrisFlowerClassifier, version: 1, timestamp: 1560339963896,
request: {"inputs": [[1.3205991163619348, 2.0595736232497166, 4.985849386074168, 1.4408294851114367]]},
prediction:1, prediction_label:Versicolor