# End to End Machine Learning Pipeline for Income Prediction with Seldon Deploy

We use [demographic features from the 1996 US census](https://archive.ics.uci.edu/ml/datasets/census+income) to build an end to end machine learning pipeline. The pipeline is also annotated so it can be run as a [Kubeflow Pipeline](https://www.kubeflow.org/docs/pipelines/overview/pipelines-overview/) using the [Kale](https://github.com/kubeflow-kale/kale) pipeline generator.

The notebook/pipeline stages are:

 1. Setup 
   * Imports
   * pipeline-parameters
   * minio client test
 1. Train a simple sklearn model and push to minio
 1. Prepare an Anchors explainer for model and push to minio
 1. Test Explainer
 1. Train an isolation forest outlier detector for model and push to minio
 1. Deploy a Seldon model and test using the Seldon Deploy Enterprise API
 1. Deploy an outlier detector with the Seldon Deploy Enterprise API
 1. Test the outlier detector



In [22]:
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from alibi.explainers import AnchorTabular
from alibi.datasets import fetch_adult
from minio import Minio
from minio.error import ResponseError
from joblib import dump, load
import dill
import time
import json
from subprocess import run, Popen, PIPE
from alibi_detect.utils.data import create_outlier_batch
import swagger_client
from swagger_client.rest import ApiException
import yaml
import json
import urllib3
urllib3.disable_warnings()

In [23]:
MINIO_HOST="minio-service.kubeflow:9000"
MINIO_ACCESS_KEY="minio"
MINIO_SECRET_KEY="minio123"
MINIO_MODEL_BUCKET="seldon"
INCOME_MODEL_PATH="sklearn/income/model"
EXPLAINER_MODEL_PATH="sklearn/income/explainer"
OUTLIER_MODEL_PATH="sklearn/income/outlier"
DEPLOY_NAMESPACE="admin"
DEPLOY_SERVER = "https://x.x.x.x/seldon-deploy/"
DEPLOY_USER = "admin@seldon.io"
DEPLOY_PASSWORD = "12341234"

In [24]:
def get_minio():
    return Minio(MINIO_HOST,
                    access_key=MINIO_ACCESS_KEY,
                    secret_key=MINIO_SECRET_KEY,
                    secure=False)

In [25]:
def get_swagger_configuration():
    configuration = swagger_client.Configuration()
    configuration.host = 'http://seldon-deploy.seldon-system/seldon-deploy/api/v1alpha1'
    return configuration

In [26]:
import requests

from urllib.parse import urlparse

KF_SESSION_COOKIE_NAME = "authservice_session"


class SessionAuthenticator:
    """
    Returns the cookie token.
    """

    def __init__(self, server: str):
        self._server = server

        url = urlparse(server)
        self._host = f"{url.scheme}://{url.netloc}"

    def authenticate(self, user: str, password: str) -> str:
        auth_path = self._get_auth_path()
        success_path = self._submit_auth(auth_path, user, password)
        session_cookie = self._get_session_cookie(success_path)
        return session_cookie

    def _get_auth_path(self) -> str:
        # Send unauthenticated request
        res = requests.get(self._server, allow_redirects=False, verify=False)

        # Follow the 302 redirect
        oidc_path = res.headers["Location"]
        oidc_endpoint = f"{self._host}{oidc_path}"
        res = requests.get(oidc_endpoint, allow_redirects=False, verify=False)

        return res.headers["Location"]

    def _submit_auth(self, auth_path: str, user: str, password: str) -> str:
        auth_endpoint = f"{self._host}{auth_path}"
        auth_payload = {"login": user, "password": password}
        res = requests.post(auth_endpoint, auth_payload, allow_redirects=False, verify=False)
        
        login_path = res.headers["Location"]
        login_endpoint = f"{self._host}{login_path}"
        res = requests.get(login_endpoint, allow_redirects=False, verify=False)

        return res.headers["Location"]

    def _get_session_cookie(self, success_path: str) -> str:
        success_endpoint = f"{self._host}{success_path}"
        res = requests.get(success_endpoint, allow_redirects=False, verify=False)
        print(res.cookies)
        return res.cookies[KF_SESSION_COOKIE_NAME]

def authenticate():
    authenticator = SessionAuthenticator(DEPLOY_SERVER)

    cookie = authenticator.authenticate(DEPLOY_USER, DEPLOY_PASSWORD)
    return cookie


In [27]:
minioClient = get_minio()
buckets = minioClient.list_buckets()
for bucket in buckets:
    print(bucket.name, bucket.creation_date)

mlpipeline 2020-10-02 09:50:51.490000+00:00
seldon 2020-09-19 09:09:50.060000+00:00


In [28]:
if not minioClient.bucket_exists(MINIO_MODEL_BUCKET):
    minioClient.make_bucket(MINIO_MODEL_BUCKET)

## Train Model

In [6]:
adult = fetch_adult()
adult.keys()

dict_keys(['data', 'target', 'feature_names', 'target_names', 'category_map'])

In [7]:
data = adult.data
target = adult.target
feature_names = adult.feature_names
category_map = adult.category_map

Note that for your own datasets you can use our utility function [gen_category_map](../api/alibi.utils.data.rst) to create the category map:

In [8]:
from alibi.utils.data import gen_category_map

Define shuffled training and test set

In [9]:
np.random.seed(0)
data_perm = np.random.permutation(np.c_[data, target])
data = data_perm[:,:-1]
target = data_perm[:,-1]

In [10]:
idx = 30000
X_train,Y_train = data[:idx,:], target[:idx]
X_test, Y_test = data[idx+1:,:], target[idx+1:]

### Create feature transformation pipeline
Create feature pre-processor. Needs to have 'fit' and 'transform' methods. Different types of pre-processing can be applied to all or part of the features. In the example below we will standardize ordinal features and apply one-hot-encoding to categorical features.

Ordinal features:

In [13]:
ordinal_features = [x for x in range(len(feature_names)) if x not in list(category_map.keys())]
ordinal_transformer = Pipeline(steps=[('imputer', SimpleImputer(strategy='median')),
                                      ('scaler', StandardScaler())])

Categorical features:

In [14]:
categorical_features = list(category_map.keys())
categorical_transformer = Pipeline(steps=[('imputer', SimpleImputer(strategy='median')),
                                          ('onehot', OneHotEncoder(handle_unknown='ignore'))])

Combine and fit:

In [15]:
preprocessor = ColumnTransformer(transformers=[('num', ordinal_transformer, ordinal_features),
                                               ('cat', categorical_transformer, categorical_features)])

### Train Random Forest model

Fit on pre-processed (imputing, OHE, standardizing) data.

In [16]:
np.random.seed(0)
clf = RandomForestClassifier(n_estimators=50)

In [17]:
model=Pipeline(steps=[("preprocess",preprocessor),("model",clf)])
model.fit(X_train,Y_train)

Pipeline(memory=None,
     steps=[('preprocess', ColumnTransformer(n_jobs=None, remainder='drop', sparse_threshold=0.3,
         transformer_weights=None,
         transformers=[('num', Pipeline(memory=None,
     steps=[('imputer', SimpleImputer(copy=True, fill_value=None, missing_values=nan,
       strategy='median', verbose...obs=None,
            oob_score=False, random_state=None, verbose=0,
            warm_start=False))])

Define predict function

In [18]:
def predict_fn(x):
    return model.predict(x)

In [19]:
#predict_fn = lambda x: clf.predict(preprocessor.transform(x))
print('Train accuracy: ', accuracy_score(Y_train, predict_fn(X_train)))
print('Test accuracy: ', accuracy_score(Y_test, predict_fn(X_test)))

Train accuracy:  0.9655333333333334
Test accuracy:  0.855859375


In [20]:
dump(model, 'model.joblib') 

['model.joblib']

In [21]:
print(get_minio().fput_object(MINIO_MODEL_BUCKET, f"{INCOME_MODEL_PATH}/model.joblib", 'model.joblib'))

('694bfd00872125a7906e3a12413a3c71-7', None)


## Train Explainer

In [22]:
model.predict(X_train)
explainer = AnchorTabular(predict_fn, feature_names, categorical_names=category_map)

Discretize the ordinal features into quartiles

In [23]:
explainer.fit(X_train, disc_perc=[25, 50, 75])

In [24]:
with open("explainer.dill", "wb") as dill_file:
    dill.dump(explainer, dill_file)    
    dill_file.close()
print(get_minio().fput_object(MINIO_MODEL_BUCKET, f"{EXPLAINER_MODEL_PATH}/explainer.dill", 'explainer.dill'))

('eae1953a2d48b3fe5ff6a9592a8c01ba-2', None)


## Get Explanation

Below, we get an anchor for the prediction of the first observation in the test set. An anchor is a sufficient condition - that is, when the anchor holds, the prediction should be the same as the prediction for this instance.

In [25]:
model.predict(X_train)
idx = 0
class_names = adult.target_names
print('Prediction: ', class_names[explainer.predict_fn(X_test[idx].reshape(1, -1))[0]])

Prediction:  <=50K


We set the precision threshold to 0.95. This means that predictions on observations where the anchor holds will be the same as the prediction on the explained instance at least 95% of the time.

In [26]:
explanation = explainer.explain(X_test[idx], threshold=0.95)
print('Anchor: %s' % (' AND '.join(explanation['names'])))
print('Precision: %.2f' % explanation['precision'])
print('Coverage: %.2f' % explanation['coverage'])

Anchor: Marital Status = Separated AND Sex = Female
Precision: 0.96
Coverage: 0.11


## Train Outlier Detector

In [27]:
from alibi_detect.od import IForest

od = IForest(
    threshold=0.,
    n_estimators=200,
)


In [28]:
od.fit(X_train)



In [29]:
np.random.seed(0)
perc_outlier = 5
threshold_batch = create_outlier_batch(X_train, Y_train, n_samples=1000, perc_outlier=perc_outlier)
X_threshold, y_threshold = threshold_batch.data.astype('float'), threshold_batch.target
#X_threshold = (X_threshold - mean) / stdev
print('{}% outliers'.format(100 * y_threshold.mean()))

5.0% outliers


In [30]:
od.infer_threshold(X_threshold, threshold_perc=100-perc_outlier)
print('New threshold: {}'.format(od.threshold))
threshold = od.threshold

New threshold: 0.029017499251428627


In [31]:
X_outlier = [[300,  4,  4,  2,  1,  4,  4,  0,  0,  0, 600,  9]]

In [32]:
od.predict(
    X_outlier
)

{'data': {'instance_score': array([0.04198649]),
  'feature_score': None,
  'is_outlier': array([1])},
 'meta': {'name': 'IForest',
  'detector_type': 'offline',
  'data_type': 'tabular'}}

In [33]:
from alibi_detect.utils.saving import save_detector, load_detector
from os import listdir
from os.path import isfile, join

filepath="ifoutlier"
save_detector(od, filepath) 
onlyfiles = [f for f in listdir(filepath) if isfile(join(filepath, f))]
for filename in onlyfiles:
    print(filename)
    print(get_minio().fput_object(MINIO_MODEL_BUCKET, f"{OUTLIER_MODEL_PATH}/{filename}", join(filepath, filename)))

W0926 08:56:23.527403 139652675462976 saving.py:73] Directory ifoutlier does not exist and is now created.


meta.pickle
('e57a0ae93b75c8a169b2d003231243f8', None)
IForest.pickle
('c9c344f140bd36fc2520ed5abcf67d0b', None)


## Deploy Seldon Core Model with API

In [8]:
secret = f"""apiVersion: v1
kind: Secret
metadata:
  name: seldon-init-container-secret
  namespace: {DEPLOY_NAMESPACE}
type: Opaque
stringData:
  AWS_ACCESS_KEY_ID: {MINIO_ACCESS_KEY}
  AWS_SECRET_ACCESS_KEY: {MINIO_SECRET_KEY}
  AWS_ENDPOINT_URL: http://{MINIO_HOST}
  USE_SSL: "false"
"""
with open("secret.yaml","w") as f:
    f.write(secret)
run("cat secret.yaml | kubectl apply -f -", shell=True)

CompletedProcess(args='cat secret.yaml | kubectl apply -f -', returncode=0)

In [9]:
sa = f"""apiVersion: v1
kind: ServiceAccount
metadata:
  name: minio-sa
  namespace: {DEPLOY_NAMESPACE}
secrets:
  - name: seldon-init-container-secret
"""
with open("sa.yaml","w") as f:
    f.write(sa)
run("kubectl apply -f sa.yaml", shell=True)

CompletedProcess(args='kubectl apply -f sa.yaml', returncode=0)

In [10]:
configuration = get_swagger_configuration()
# create an instance of the API class
dep_instance = swagger_client.MLDeploymentsApi(swagger_client.ApiClient(configuration))
namespace = 'admin' # str | Namespace provides a logical grouping of resources

Create the Seldon deployment with the Deploy API

In [11]:
model_name = "income-classifier"
model_yaml=f"""apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
  name: {model_name}
  namespace: {DEPLOY_NAMESPACE}
spec:
  predictors:
  - componentSpecs:
    graph:
      implementation: SKLEARN_SERVER
      modelUri: s3://{MINIO_MODEL_BUCKET}/{INCOME_MODEL_PATH}
      envSecretRefName: seldon-init-container-secret
      name: classifier
      logger:
         mode: all
         url: http://default-broker
    explainer:
      type: AnchorTabular
      modelUri: s3://{MINIO_MODEL_BUCKET}/{EXPLAINER_MODEL_PATH}
      envSecretRefName: seldon-init-container-secret
    name: default
    replicas: 1
"""
d = yaml.safe_load(model_yaml)
model_json = json.dumps(d)
print(model_json)
created = dep_instance.create_seldon_deployment(model_json, namespace)

{"apiVersion": "machinelearning.seldon.io/v1", "kind": "SeldonDeployment", "metadata": {"name": "income-classifier", "namespace": "admin"}, "spec": {"predictors": [{"componentSpecs": null, "graph": {"implementation": "SKLEARN_SERVER", "modelUri": "s3://seldon/sklearn/income/model", "envSecretRefName": "seldon-init-container-secret", "name": "classifier", "logger": {"mode": "all", "url": "http://default-broker"}}, "explainer": {"type": "AnchorTabular", "modelUri": "s3://seldon/sklearn/income/explainer", "envSecretRefName": "seldon-init-container-secret"}, "name": "default", "replicas": 1}]}}


Wait for the Deployment to be ready

In [12]:
state = ""
while not state == "Available":
    res = dep_instance.list_seldon_deployments(namespace)
    for sd in res.items:
        state = sd.status.state
        print(sd.status.state)
    time.sleep(2)
time.sleep(10)

Creating
Creating
Creating
Creating
Creating
Creating
Creating
Creating
Available


Make a prediction request with the Seldon Deploy API

In [13]:
cookie = authenticate()
payload='{"data": {"ndarray": [[53,4,0,2,8,4,4,0,0,0,60,9]]}}'
cookie_str = f"{KF_SESSION_COOKIE_NAME}={cookie}"
predict_instance = swagger_client.PredictApi(swagger_client.ApiClient(configuration,cookie=cookie_str))
prediction = predict_instance.predict_seldon_deployment(model_name,namespace, prediction={"data": {"ndarray": [[53,4,0,2,8,4,4,0,0,0,60,9]]}})
print(prediction)

<RequestsCookieJar[<Cookie authservice_session=MTYwMTcwOTk5MHxOd3dBTkU1SVZrbFpVRkJIVFZKWFJWVkpRa0ZFVVU5WVZGWkpXamRUVXpjMVdESlRORWcwTmtOVFJGTkhSbHBUVkVKQ1RWbFFTa0U9fDOikC7Z6EmKPEGJ8EKLegfZFJfXB78nDQvYfW4Kjmyq for 34.91.115.100/>]>
{'data': {'names': ['t:0', 't:1'], 'ndarray': [[0.88, 0.12]]}, 'meta': {}}


In [14]:
explain_instance = swagger_client.ExplainerApi(swagger_client.ApiClient(configuration,cookie=cookie_str))
tries = 0
try:
    explanation = explain_instance.explain_seldon_deployment(namespace,model_name,explaindata={"data": {"ndarray": [[53,4,0,2,8,4,4,0,0,0,60,9]]}})
    print(explanation)
except ApiException as e:
    print(e)
    if tries > 5:
        raise e
    print("Retrying")
    tries = tries +1
    time.sleep(5)

{'coverage': 0.1047, 'meta': {'name': 'AnchorTabular'}, 'names': ['Marital Status = Separated', 'Sex = Female'], 'precision': 0.9772727272727273, 'raw': {'all_precision': 0, 'coverage': [0.1791, 0.1047], 'examples': [{'covered': [[20, 'Private', 'High School grad', 'Separated', 'Professional', 'Own-child', 'White', 'Male', 'Capital Gain <= 0.00', 'Capital Loss <= 0.00', 15, 'United-States'], [34, 'Private', 'High School grad', 'Separated', 'Admin', 'Not-in-family', 'White', 'Female', 'Capital Gain <= 0.00', 'Capital Loss <= 0.00', 40, 'United-States'], [36, 'Private', 'High School grad', 'Separated', 'White-Collar', 'Husband', 'White', 'Male', 'Capital Gain <= 0.00', 'Capital Loss <= 0.00', 50, 'United-States'], [49, 'Local-gov', 'Dropout', 'Separated', 'Service', 'Unmarried', 'Black', 'Female', 'Capital Gain <= 0.00', 'Capital Loss <= 0.00', 40, 'United-States'], [24, 'Private', 'High School grad', 'Separated', 'Blue-Collar', 'Husband', 'White', 'Male', 'Capital Gain <= 0.00', 'Capita

## Deploy Outier Detector

In [15]:
configuration = get_swagger_configuration()
outlier = swagger_client.OutlierDetectorApi(swagger_client.ApiClient(configuration))
outlier_params = {
"params": {
    "event_source": "io.seldon.serving.incomeod",
    "event_type": "io.seldon.serving.inference.outlier",
    "http_port": "8080",
    "model_name": "adultod",
    "protocol": "seldon.http",
    "reply_url": "http://default-broker",
    "storage_uri": "s3://seldon/sklearn/income/outlier",
    "env_secret_ref": "seldon-init-container-secret"
  }
}
res = outlier.create_outlier_detector_seldon_deployment(model_name, namespace, outlier_detector=outlier_params)

## Deploy KNative Eventing Event Display

In [16]:
event_display=f"""apiVersion: apps/v1
kind: Deployment
metadata:
  name: event-display
  namespace: {DEPLOY_NAMESPACE}          
spec:
  replicas: 1
  selector:
    matchLabels: &labels
      app: event-display
  template:
    metadata:
      labels: *labels
    spec:
      containers:
        - name: helloworld-go
          # Source code: https://github.com/knative/eventing-contrib/tree/master/cmd/event_display
          image: gcr.io/knative-releases/knative.dev/eventing-contrib/cmd/event_display@sha256:f4628e97a836c77ed38bd3b6fd3d0b06de4d5e7db6704772fe674d48b20bd477
---
kind: Service
apiVersion: v1
metadata:
  name: event-display
  namespace: {DEPLOY_NAMESPACE}
spec:
  selector:
    app: event-display
  ports:
    - protocol: TCP
      port: 80
      targetPort: 8080
---
apiVersion: eventing.knative.dev/v1alpha1
kind: Trigger
metadata:
  name: income-outlier-display
  namespace: {DEPLOY_NAMESPACE}
spec:
  broker: default
  filter:
    attributes:
      type: io.seldon.serving.inference.outlier
  subscriber:
    ref:
      apiVersion: v1
      kind: Service
      name: event-display
"""
with open("event_display.yaml","w") as f:
    f.write(event_display)
run("kubectl apply -f event_display.yaml", shell=True)

CompletedProcess(args='kubectl apply -f event_display.yaml', returncode=0)

In [17]:
run(f"kubectl rollout status -n {DEPLOY_NAMESPACE} deploy/event-display -n {DEPLOY_NAMESPACE}", shell=True)

CompletedProcess(args='kubectl rollout status -n admin deploy/event-display -n admin', returncode=0)

## Test Outlier Detection

In [45]:
def predict():
    configuration = get_swagger_configuration()
    cookie = authenticate()
    cookie_str = f"{KF_SESSION_COOKIE_NAME}={cookie}"
    predict_instance = swagger_client.PredictApi(swagger_client.ApiClient(configuration,cookie=cookie_str))
    prediction = predict_instance.predict_seldon_deployment(model_name,namespace, prediction={"data": {"ndarray": [[3000,4,4,2,1,4,4,0,0,0,600,9]]}})
    print(prediction)

In [42]:
def get_outlier_event_display_logs():
    cmd=f"kubectl logs $(kubectl get pod -l app=event-display -o jsonpath='{{.items[0].metadata.name}}' -n {DEPLOY_NAMESPACE}) -n {DEPLOY_NAMESPACE}"
    ret = Popen(cmd, shell=True,stdout=PIPE)
    res = ret.stdout.read().decode("utf-8").split("\n")
    data= []
    for i in range(0,len(res)):
        if res[i] == 'Data,':
            j = json.loads(json.loads(res[i+1]))
            print(j)
            if "is_outlier"in j["data"].keys():
                data.append(j)
    if len(data) > 0:
        return data[-1]
    else:
        return None
j = None
while j is None:
    predict()
    print("Waiting for outlier logs, sleeping")
    time.sleep(2)
    j = get_outlier_event_display_logs()
    
print(j)
print("Outlier",j["data"]["is_outlier"]==[1])

<RequestsCookieJar[<Cookie authservice_session=MTYwMTcxODE1MnxOd3dBTkZJM05saFNXRUUxUTFCVlUxTkdWelpPVGsxV1VEWkVURXRVTjFKRFZESlpORWxRU2xGVVUxQlpOMHBaUzAxU1VGbENURkU9fBn2PC5oubuZ8PaGDJgqkkvACTt2nP654oLkGFKwVJci for 34.91.115.100/>]>
{'data': {'names': ['t:0', 't:1'], 'ndarray': [[0.92, 0.08]]}, 'meta': {}}
Waiting for outlier logs, sleeping
{'data': {'instance_score': None, 'feature_score': None, 'is_outlier': [1]}, 'meta': {'name': 'IForest', 'detector_type': 'offline', 'data_type': 'tabular'}}
{'data': {'instance_score': None, 'feature_score': None, 'is_outlier': [1]}, 'meta': {'name': 'IForest', 'detector_type': 'offline', 'data_type': 'tabular'}}
{'data': {'instance_score': None, 'feature_score': None, 'is_outlier': [0]}, 'meta': {'name': 'IForest', 'detector_type': 'offline', 'data_type': 'tabular'}}
{'data': {'instance_score': None, 'feature_score': None, 'is_outlier': [0]}, 'meta': {'name': 'IForest', 'detector_type': 'offline', 'data_type': 'tabular'}}
{'data': {'instance_score': 

## Clean Up Resources

In [46]:
outlier.delete_outlier_detector_seldon_deployment(model_name, namespace, _preload_content=False)
dep_instance.delete_seldon_deployment(model_name, namespace, _preload_content=False)

<urllib3.response.HTTPResponse at 0x7fa51069b668>

In [47]:
run(f"kubectl delete sa  minio-sa -n {DEPLOY_NAMESPACE}", shell=True)
run(f"kubectl delete secret seldon-init-container-secret -n {DEPLOY_NAMESPACE}", shell=True)
run(f"kubectl delete deployment event-display -n {DEPLOY_NAMESPACE}", shell=True)
run(f"kubectl delete svc event-display -n {DEPLOY_NAMESPACE}", shell=True)
run(f"kubectl delete trigger income-outlier-display -n {DEPLOY_NAMESPACE}", shell=True)

CompletedProcess(args='kubectl delete trigger income-outlier-display -n admin', returncode=1)