In [1]:
import os
import random
import json

import cloudpickle
import pandas as pd
import numpy as np
import requests


In [2]:
os.chdir("/app/examples/iris/serving/")
INPUT_DATA = "static/data/inputs.csv"
GEN_SIZE = 1000
FEATURES = ["sepal-length", "sepal-width", "petal-length", "petal-width"]
LABEL = "species"
IRIS_SERVING_URL = "http://iris_serving:3000"

raw_data = pd.read_csv(INPUT_DATA)

# Helper Functions

- `load_logreg`: use a logistic-regression model to label the new data
- `gen_data_drift`: add white noises (independent Gaussians) to the random draw data from orginal 150 data points


In [3]:
def load_logreg():
    with open("logreg.pkl", "rb") as logreg_io:
        model = cloudpickle.load(logreg_io)
    with open("scaler.pkl", "rb") as scaler_io:
        scaler = cloudpickle.load(scaler_io)
    return scaler, model


def gen_data_drift(drift_mean=0.5, drift_std=0.2):
    inputs = raw_data[FEATURES]
    index = np.random.choice(range(inputs.shape[0]), GEN_SIZE)
    inputs = inputs.iloc[index]
    noises = np.random.normal(loc=drift_mean, scale=drift_std, size=inputs.shape)
    inputs = inputs + noises
    return inputs


# Request for Inference

In this demo, the IRIS ML system is running in restful APIs for real-time inference. A daemon monitoring process is running to monitor the data and performance of inferences.

For batch inference application, we can schedule with Airflow DAGs instead of restful API.

We will attack the online ML system by injecting purtubed features. 

In current production configuration:
- data monitoring is triggered with every `1,000` new data
- performance monitoring is triggered with every `3,000` new data

If a data skew is detected, it means that there is high possibility to get model performance distorted. 
Therefore, performance monitoring is ad-hocly triggered when data skew is detected.
If performance score is lower than configured threshold (defined in model evaluation configuration in model bundle), the model retrain pipeline will be triggered

Before starting this section, check the model version up for service in:
http://localhost:3000/model_ver

Check data monitoring reports in:
http://localhost:3000/data_reports

Check performance monitoring reports in:
http://localhost:3000/performance_reports

In [4]:
# DRIFT_MEAN = [0, 0, 0, 0]
# DRIFT_MEAN = [0.5, 0.5, 0.5, 0.5]
DRIFT_MEAN = [1, 1, 1, 1]
# DRIFT_STD = [0, 0, 0, 0]
# DRIFT_STD = [0.3, 0.2, 0.5, 0.5]
DRIFT_STD = [0.6, 0.5, 1.0, 1.0]
input_data = gen_data_drift(drift_mean=DRIFT_MEAN, drift_std=DRIFT_STD)
response = requests.post(
    f"{IRIS_SERVING_URL}/inference",
    json=input_data.to_json(orient="split"),
    params={"prediction_key": "classes"},
)
response_json = json.loads(response.text)
anomaly = response_json["Anomaly"]
prediction = response_json["classes"]
print(prediction)

# Label data and send back to server

Use logistic-regression model to label the data and upload to IRIS server

In [5]:
scaler, logreg = load_logreg()
label_data = input_data.copy()
label_data['species'] = logreg.predict(scaler.transform(input_data))
response = requests.post(
    f"{IRIS_SERVING_URL}/upload_label_data",
    json=label_data.to_json(orient="split")
)
print(response.text)

Label data uploaded.
