# Edge Cloud Joint Inference with Seldon Core and Tempo

Description

## Setup Environment

In [7]:
!conda env create --name edge-cloud-inference --file ./conda/edge-cloud-inference.yaml

In [6]:
!conda activate edge-cloud-inference

## Train models 

In [None]:
!pip install git+https://github.com/SachinVarghese/tempo.git@tempo-k8s-nodename#egg=mlops-tempo&subdirectory=tempo

In [1]:
import os
from tempo.utils import logger
import logging
logger.setLevel(logging.ERROR)
logging.basicConfig(level=logging.ERROR)
ARTIFACTS_FOLDER = os.getcwd()+"/artifacts"

In [3]:
# %load src/train.py
from src.data import IrisData
from sklearn.linear_model import LogisticRegression
from xgboost import XGBClassifier
import joblib

EdgeModelFolder = "edge"
CloudModelFolder = "cloud"


def train_edge_model(data: IrisData, artifacts_folder: str):
    logreg = LogisticRegression(C=1e5)
    logreg.fit(data.X, data.y)
    with open(f"{artifacts_folder}/{EdgeModelFolder}/model.joblib", "wb") as f:
        joblib.dump(logreg, f)


def train_cloud_model(data: IrisData, artifacts_folder: str):
    clf = XGBClassifier()
    clf.fit(data.X, data.y)
    clf.save_model(f"{artifacts_folder}/{CloudModelFolder}/model.bst")


In [4]:
from src.data import IrisData
from src.train import train_edge_model, train_cloud_model
data = IrisData()
train_edge_model(data, ARTIFACTS_FOLDER)
train_cloud_model(data, ARTIFACTS_FOLDER)






## Create Tempo artifacts

In [5]:
# %load src/tempo.py
from typing import Tuple

import numpy as np
from src.train import CloudModelFolder, EdgeModelFolder

from tempo.serve.metadata import ModelFramework, RuntimeOptions, KubernetesOptions
from tempo.serve.model import Model
from tempo.serve.pipeline import Pipeline, PipelineModels
from tempo.serve.utils import pipeline

PipelineFolder = "joint-classifier"
EdgePredictionTag = "edge prediction"
CloudPredictionTag = "cloud prediction"

edgeKubernetesOptions = RuntimeOptions()
edgeKubernetesOptions.k8s_options = KubernetesOptions(
    replicas=1,
    nodeName="edge-compute",
    namespace="production",
    authSecretName="minio-secret",
)


cloudKubernetesOptions = RuntimeOptions()
cloudKubernetesOptions.k8s_options = KubernetesOptions(
    replicas=2,
    nodeName="gke-kubeedge-cloudcore-default-pool-4dbe91a1-2t80",
    namespace="production",
    authSecretName="minio-secret",
)


def get_tempo_artifacts(artifacts_folder: str) -> Tuple[Pipeline, Model, Model]:

    edge_model = Model(
        name="edge-model",
        platform=ModelFramework.SKLearn,
        local_folder=f"{artifacts_folder}/{EdgeModelFolder}",
        uri="s3://tempo/joint-inference/edge",
        description="An Edge based Iris classification model",
        runtime_options=edgeKubernetesOptions,
    )

    cloud_model = Model(
        name="cloud-model",
        platform=ModelFramework.XGBoost,
        local_folder=f"{artifacts_folder}/{CloudModelFolder}",
        uri="s3://tempo/joint-inference/cloud",
        description="An Cloud based Iris classification model",
        runtime_options=cloudKubernetesOptions,
    )

    @pipeline(
        name="joint-classifier",
        uri="s3://tempo/basic/pipeline",
        local_folder=f"{artifacts_folder}/{PipelineFolder}",
        models=PipelineModels(edge_inference=edge_model, cloud_inference=cloud_model),
        description="A pipeline to make an edge based prediction or cloud based joint prediction for Iris classification",
        runtime_options=edgeKubernetesOptions,
    )
    def classifier(payload: np.ndarray) -> Tuple[np.ndarray, str]:
        # Custom Logic for hard example mining based on threshold, IBT, Cross Entropy etc
        res1 = classifier.models.edge_inference(input=payload)
        if res1[0] == 1:
            return res1, EdgePredictionTag
        else:
            return classifier.models.cloud_inference(input=payload), CloudPredictionTag

    return classifier, edge_model, cloud_model


In [2]:
from src.tempo import get_tempo_artifacts
classifier, edge_model, cloud_model = get_tempo_artifacts(ARTIFACTS_FOLDER)

# Save Classifier

In [7]:
from tempo.serve.loader import save
save(classifier)

Collecting packages...
Packing environment at '/home/sachin/miniconda3/envs/tempo-7207925c-a439-463b-a8f1-f9eb9a9f122f' to '/home/sachin/projects/mlops/edge-cloud-inference/artifacts/joint-classifier/environment.tar.gz'
[########################################] | 100% Completed | 26.7s


# Deploy to Kubernetes

In [8]:
from tempo.examples.minio import create_minio_rclone
import os
create_minio_rclone(os.getcwd()+"/rclone.conf")

In [9]:
from tempo.serve.loader import upload
upload(edge_model)
upload(cloud_model)
upload(classifier)

In [3]:
!kubectl create ns production
!kubectl apply -f src/rbac -n production

Error from server (AlreadyExists): namespaces "production" already exists
secret/minio-secret configured
serviceaccount/tempo-pipeline unchanged
role.rbac.authorization.k8s.io/tempo-pipeline unchanged
rolebinding.rbac.authorization.k8s.io/tempo-pipeline-rolebinding unchanged


In [4]:
from tempo.serve.metadata import KubernetesOptions
from tempo.seldon.k8s import SeldonCoreOptions
runtime_options = SeldonCoreOptions()

In [6]:
from tempo import deploy
remote_model = deploy(classifier, options=runtime_options)

In [36]:
import numpy as np

print(remote_model.predict(payload=np.array([[0, 0, 0, 0]])))
print(remote_model.predict(payload=np.array([[1, 2, 3, 4]])))

In [16]:
from tempo.seldon.k8s import SeldonKubernetesRuntime
k8s_runtime = SeldonKubernetesRuntime(runtime_options)
models = k8s_runtime.list_models(namespace="production")
print("Name\t\tDescription")
for model in models:
    details = model.get_tempo().model_spec.model_details
    print(f"{details.name}\t{details.description}")

models[2].predict(payload=np.array([[1, 2, 3, 4]]))


Name		Description
cloud-model	An Cloud based Iris classification model
edge-model	An Edge based Iris classification model
joint-classifier	A pipeline to make an edge based prediction or cloud based joint prediction for Iris classification


{'output0': array([[0.00847207, 0.03168793, 0.95984   ]], dtype=float32),
 'output1': 'cloud prediction'}

In [35]:
remote_model.undeploy()

INFO:tempo:Undeploying joint-classifier
DEBUG:tempo:Loading external kubernetes config
INFO:tempo:Undeploying edge-model
DEBUG:tempo:Loading external kubernetes config
INFO:tempo:Undeploying cloud-model
DEBUG:tempo:Loading external kubernetes config
