In [1]:
import requests
import numpy as np
import json
import time
from kubernetes import client
import pandas as pd
from IPython.core.interactiveshell import InteractiveShell

from kfserving import KFServingClient
from kfserving import constants
from kfserving import utils
from kfserving import V1beta1InferenceService
from kfserving import V1beta1InferenceServiceSpec
from kfserving import V1beta1PredictorSpec
from kfserving import V1beta1TFServingSpec
from kfserving import V1beta1LoggerSpec

from kubernetes.client import V1ResourceRequirements
from kubernetes.client import V1ServiceAccount

from kfserving import KFServingClient
KFServing = KFServingClient()

In [2]:
NAMESPACE = 'kfserving-test'
MODEL_NAME = 'product-recommender'
KFSERVING_VERSION = 'v1beta1'

## Deploy Service Account with AWS Credentials

In [None]:
def creds():
    KFServing.set_credentials(storage_type='S3',
                              namespace=NAMESPACE,
                              credentials_file="C:\\Users\\user\\.aws\\credentials", # base64 coded AWS Credentials
                              service_account='sa',
                              s3_profile='default',
                              s3_endpoint='s3-eu-central-1.amazonaws.com',
                              s3_region='eu-central-1',
                              s3_use_https='1',
                              s3_verify_ssl='1')
#creds()

## Deploy TensorFlow Model

In [22]:
api_version = constants.KFSERVING_GROUP + '/' + KFSERVING_VERSION

isvc = V1beta1InferenceService(api_version=api_version,
                               kind=constants.KFSERVING_KIND,
                               metadata=client.V1ObjectMeta(
                                   name=MODEL_NAME,
                                   namespace=NAMESPACE,
                                   # annotations= {"prometheus.io/scrape" : "true",
                                   #              "prometheus.io/port" : "8082"}
                               ),
                               spec=V1beta1InferenceServiceSpec(
                                   predictor=V1beta1PredictorSpec(
                                       service_account_name='sa',  # service account for aws credentials
                                       min_replicas=1,  # if 0: replica will scale down to 0 when there are no requests
                                       # logger=V1beta1LoggerSpec(
                                       #    mode='all',
                                       #    url='http://cloudevent-service.kfserving-test.svc.cluster.local:3000'),
                                       tensorflow=(V1beta1TFServingSpec(
                                           runtime_version='2.4.0',  # TF Serving Version
                                           storage_uri='s3://bucket-fex/0/f83dc82ab858401dac634a2ec3e35c82/artifacts/saved_model/tfmodel/')
                                       )))
                               )
isvc

{'api_version': 'serving.kubeflow.org/v1beta1',
 'kind': 'InferenceService',
 'metadata': {'annotations': None,
              'cluster_name': None,
              'creation_timestamp': None,
              'deletion_grace_period_seconds': None,
              'deletion_timestamp': None,
              'finalizers': None,
              'generate_name': None,
              'generation': None,
              'labels': None,
              'managed_fields': None,
              'name': 'product-recommender',
              'namespace': 'kfserving-test',
              'owner_references': None,
              'resource_version': None,
              'self_link': None,
              'uid': None},
 'spec': {'explainer': None,
          'predictor': {'active_deadline_seconds': None,
                        'affinity': None,
                        'automount_service_account_token': None,
                        'batcher': None,
                        'canary_traffic_percent': None,
                     

In [23]:
KFServing = KFServingClient()
KFServing.create(isvc, version=KFSERVING_VERSION)

{'apiVersion': 'serving.kubeflow.org/v1beta1',
 'kind': 'InferenceService',
 'metadata': {'creationTimestamp': '2021-01-14T16:13:26Z',
  'generation': 1,
  'name': 'product-recommender',
  'namespace': 'kfserving-test',
  'resourceVersion': '664708',
  'selfLink': '/apis/serving.kubeflow.org/v1beta1/namespaces/kfserving-test/inferenceservices/product-recommender',
  'uid': 'ce788f85-a50b-46d8-b4ab-4a004db4cdd8'},
 'spec': {'predictor': {'minReplicas': 1,
   'serviceAccountName': 'sa',
   'tensorflow': {'name': 'kfserving-container',
    'resources': {'limits': {'cpu': '1', 'memory': '2Gi'},
     'requests': {'cpu': '1', 'memory': '2Gi'}},
    'runtimeVersion': '2.4.0',
    'storageUri': 's3://bucket-fex/0/f83dc82ab858401dac634a2ec3e35c82/artifacts/saved_model/tfmodel/'}}}}

In [24]:
KFServing.get(MODEL_NAME, namespace=NAMESPACE, watch=True, timeout_seconds=120, version=KFSERVING_VERSION)

NAME                 READY      PREDICTOR_CANARY_TRAFFIC  URL                                                              
product-recommender  Unknown                                                                                               
product-recommender  Unknown                                                                                               
product-recommender  Unknown                                                                                               
product-recommender  Unknown                                                                                               
product-recommender  True                                 http://product-recommender.kfserving-test.example.com            


## Add Canary to InferenceService by patching a new InferenceService

In [11]:
isvc = V1beta1InferenceService(api_version=api_version,
                               kind=constants.KFSERVING_KIND,
                               metadata=client.V1ObjectMeta(
                                   name=MODEL_NAME,
                                   namespace=NAMESPACE,
                                   # annotations= {"prometheus.io/scrape" : "true",
                                   #              "prometheus.io/port" : "8082"}
                               ),
                               spec=V1beta1InferenceServiceSpec(
                                   predictor=V1beta1PredictorSpec(
                                       canary_traffic_percent=10,
                                       service_account_name='sa',  # service account for aws credentials
                                       min_replicas=1,  # if 0: replica will scale down to 0 when there are no requests
                                       # logger=V1beta1LoggerSpec(
                                       #    mode='all',
                                       #    url='http://cloudevent-service.kfserving-test.svc.cluster.local:3000'),
                                       tensorflow=(V1beta1TFServingSpec(
                                           runtime_version='2.4.0',  # TF Serving Version
                                           storage_uri="s3://bucket-fex/0/00f118c323604546985c24871736d5d7/artifacts/saved_model/tfmodel/")
                                       )))
                               )

KFServing.patch(MODEL_NAME, isvc, namespace=NAMESPACE,
                version=KFSERVING_VERSION)

{'apiVersion': 'serving.kubeflow.org/v1beta1',
 'kind': 'InferenceService',
 'metadata': {'creationTimestamp': '2021-01-14T16:00:02Z',
  'finalizers': ['inferenceservice.finalizers'],
  'generation': 3,
  'name': 'product-recommender',
  'namespace': 'kfserving-test',
  'resourceVersion': '660026',
  'selfLink': '/apis/serving.kubeflow.org/v1beta1/namespaces/kfserving-test/inferenceservices/product-recommender',
  'uid': '01cb2e6d-d418-46d1-b74b-8f786d898ba8'},
 'spec': {'predictor': {'canaryTrafficPercent': 10,
   'minReplicas': 1,
   'serviceAccountName': 'sa',
   'tensorflow': {'name': 'kfserving-container',
    'resources': {'limits': {'cpu': '1', 'memory': '2Gi'},
     'requests': {'cpu': '1', 'memory': '2Gi'}},
    'runtimeVersion': '2.4.0',
    'storageUri': 's3://bucket-fex/0/00f118c323604546985c24871736d5d7/artifacts/saved_model/tfmodel/'}}},
 'status': {'address': {'url': 'http://product-recommender.kfserving-test.svc.cluster.local/v1/models/product-recommender:predict'},
  '

In [12]:
KFServing.get(MODEL_NAME, namespace=NAMESPACE, watch=True, timeout_seconds=120, version=KFSERVING_VERSION)

NAME                 READY      PREDICTOR_CANARY_TRAFFIC  URL                                                              
product-recommender  True                                 http://product-recommender.kfserving-test.example.com            


In [21]:
KFServing.delete(MODEL_NAME, namespace=NAMESPACE)

{'apiVersion': 'serving.kubeflow.org/v1alpha2',
 'kind': 'InferenceService',
 'metadata': {'creationTimestamp': '2021-01-14T16:00:02Z',
  'deletionGracePeriodSeconds': 0,
  'deletionTimestamp': '2021-01-14T16:11:46Z',
  'finalizers': ['inferenceservice.finalizers'],
  'generation': 4,
  'name': 'product-recommender',
  'namespace': 'kfserving-test',
  'resourceVersion': '663626',
  'selfLink': '/apis/serving.kubeflow.org/v1alpha2/namespaces/kfserving-test/inferenceservices/product-recommender',
  'uid': '01cb2e6d-d418-46d1-b74b-8f786d898ba8'},
 'spec': {'canaryTrafficPercent': 10,
  'default': {'predictor': {'minReplicas': 1,
    'tensorflow': {'resources': {'limits': {'cpu': '1', 'memory': '2Gi'},
      'requests': {'cpu': '1', 'memory': '2Gi'}},
     'runtimeVersion': '2.4.0',
     'storageUri': 's3://bucket-fex/0/00f118c323604546985c24871736d5d7/artifacts/saved_model/tfmodel/'}}}},
 'status': {}}

# Test the Service

To execute this cell, you need to have 'list_sessions_padded.npy', 'list_last_clicked.npy' and 'ID_Mapping.csv' files from the Notebook 'session_based_recommender_with_mlflow' in this directory.

In [6]:
InteractiveShell.ast_node_interactivity = "all"
np.set_printoptions(precision=5)

sessions_padded = np.load('list_sessions_padded.npy')
print(sessions_padded.shape)
last_clicked = np.load('list_last_clicked.npy')
print(last_clicked.shape)
n_output_features = int(last_clicked.max())
n_unique_input_ids = int(sessions_padded[:, :, 0].max())
window_length = sessions_padded.shape[1]
n_input_features = sessions_padded.shape[2]

id_mapping = pd.read_csv('ID_Mapping.csv')
id_mapping

(45916, 207, 52)
(45916,)


Unnamed: 0.1,Unnamed: 0,Item_ID,Mapped_ID,category_code
0,0,2402273,1,appliances.personal.massager
1,1,20100164,2,apparel.trousers
2,2,21400264,3,electronics.clocks
3,3,1005239,4,construction.tools.light
4,4,5100885,5,computers.notebook
...,...,...,...,...
38510,38510,6902925,38511,electronics.telephone
38511,38511,2602249,38512,appliances.kitchen.refrigerators
38512,38512,20100099,38513,apparel.trousers
38513,38513,21405267,38514,electronics.clocks


In [27]:
def request_kf_serving(np_array, MODEL_NAME, NAMESPACE, INGRESS_HOST, INGRESS_PORT):
    data = json.dumps({"instances": np_array.tolist()})
    headers = {  # "content-type": "application/json",
        'Host': '{}.{}.example.com'.format(MODEL_NAME, NAMESPACE)}
    json_response = requests.post(
        'http://{}:{}/v1/models/{}:predict'.format(INGRESS_HOST, INGRESS_PORT, MODEL_NAME), data=data, headers=headers)

    try:
        predictions = json.loads(json_response.text)['predictions']
    except Exception as e:
        raise e
    return np.array(predictions).astype(np.float32)

# When using Kubernetes Port Forwarding of the ingressgateway: kubectl port-forward --namespace istio-system svc/istio-ingressgateway 8080:80
#INGRESS_HOST = 'localhost'
#INGRESS_PORT = 8080


INGRESS_HOST_LIST = !kubectl get po - l istio = ingressgateway - n istio-system - o jsonpath = {.items[0].status.hostIP}
INGRESS_HOST = INGRESS_HOST_LIST[0]  # eg. '192.168.52.86'
# print(INGRESS_HOST)
INGRESS_PORT = 30083

example = 6  # 5,6,7, 13
start = time.time()

pred = request_kf_serving(sessions_padded[example][np.newaxis, :, :],
                          MODEL_NAME, NAMESPACE, INGRESS_HOST, INGRESS_PORT)
end = time.time()
print(end - start)

top = pred.argsort()[0][::-1][:5]
print("Session:")
session = pd.DataFrame()
session['category_code'] = [id_mapping['category_code']
                            [int(i)-1] for i in sessions_padded[example, :, 0] if i > 0]
session['Item_ID'] = [id_mapping['Item_ID']
                      [int(i)-1] for i in sessions_padded[example, :, 0] if i > 0]
session['Item_ID_Mapped'] = [int(i)
                             for i in sessions_padded[example, :, 0] if i > 0]
session

print("Prediction:")
prediction = pd.DataFrame()
prediction['category_code'] = [
    id_mapping['category_code'][int(i)-1] for i in top if i > 0]
prediction['Item_ID'] = [id_mapping['Item_ID'][int(i)-1] for i in top if i > 0]
prediction['Item_ID_Mapped'] = [int(i) for i in top if i > 0]
prediction['probability'] = pred[0, top]
prediction
print("Ground Truth:", last_clicked[example])

0.04846978187561035
Session:


Unnamed: 0,category_code,Item_ID,Item_ID_Mapped
0,appliances.personal.massager,1801766,649
1,appliances.personal.massager,1801766,649
2,appliances.personal.massager,1801806,1308
3,appliances.personal.massager,1801806,1308


Prediction:


Unnamed: 0,category_code,Item_ID,Item_ID_Mapped,probability
0,appliances.personal.massager,1801906,4134,0.227082
1,appliances.personal.massager,1802038,2743,0.178314
2,appliances.personal.massager,1802033,2781,0.130538
3,appliances.personal.massager,1801841,18030,0.124758
4,appliances.personal.massager,1801940,1369,0.122702


Ground Truth: 2743


## Test autoscaling

In [None]:
import threading

while True:
    for _ in range(10):
        x = threading.Thread(target=request_kf_serving, args=(np.ones((1, 207, 52), dtype=np.float32),
                                                              MODEL_NAME, NAMESPACE, INGRESS_HOST, INGRESS_PORT))
        x.start()
    time.sleep(0.3)