# Seldon Kafka Integration Example with CIFAR10 Model

In this example we will run SeldonDeployments for a CIFAR10 Tensorflow model which take their inputs from a Kafka topic and push their outputs to a Kafka topic. We will experiment with both REST and gRPC Seldon graphs. For REST we will load our input topic with Tensorflow JSON requests and for gRPC we will load Tensorflow PredictRequest protoBuffers.

## Requirements

 * [Install gsutil](https://cloud.google.com/storage/docs/gsutil_install)


In [1]:
# !pip install -r requirements.txt
!pip install gsutil



## Setup Kafka

Install Strimzi on cluster

In [1]:
!helm repo add strimzi https://strimzi.io/charts/

"strimzi" already exists with the same configuration, skipping


In [3]:
!helm uninstall my-release strimzi/strimzi-kafka-operator
!helm install my-release strimzi/strimzi-kafka-operator

release "my-release" uninstalled
Error: uninstall: Release name is invalid: strimzi/strimzi-kafka-operator
NAME: my-release
LAST DEPLOYED: Thu Dec 22 14:12:01 2022
NAMESPACE: seldon
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
Thank you for installing strimzi-kafka-operator-0.32.0

To create a Kafka cluster refer to the following documentation.

https://strimzi.io/docs/operators/latest/deploying.html#deploying-cluster-operator-helm-chart-str


Set the following to whether you are running a local Kind cluster or a cloud based cluster.

In [5]:
## strimzi operator image needed. 
## kind load docker-image quay.io/strimzi/operator:0.32.0  --name seldon
clusterType = "kind"
# clusterType="cloud"

In [29]:
#### Create Seldon ns and set as current context
!kubectl create namespace seldon
!kubectl config set-context $(kubectl config current-context) --namespace=seldon
!kubectl config view | grep namespace

Error from server (AlreadyExists): namespaces "seldon" already exists
Context "kind-seldon" modified.
    namespace: seldon


In [58]:
%%writefile kafka-jbod.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    version: 3.1.0
    replicas: 1
    listeners:
      - name: plain
        port: 9092
        type: loadbalancer
        tls: false
      - name: tls
        port: 9093
        type: loadbalancer
        tls: true
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        size: 100Gi
        deleteClaim: false
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
      auto.create.topics.enable: true
      default.replication.factor: 1
      min.insync.replicas: 1
  zookeeper:
    replicas: 1
    storage:
      type: persistent-claim
      size: 100Gi
      deleteClaim: false
  entityOperator:
    topicOperator: {}
    userOperator: {}

Overwriting kafka-jbod.yaml


In [75]:
if clusterType == "kind":
    !kubectl delete -f ./seldon-core/examples/kafka/cifar10/cluster-kind.yaml
    !kubectl apply -f ./seldon-core/examples/kafka/cifar10/cluster-kind.yaml
else:
    # !kubectl delete -f ./seldon-core/examples/kafka/cifar10/cluster-cloud.yaml
    !kubectl apply -f ./seldon-core/examples/kafka/cifar10/cluster-cloud.yaml

kafka.kafka.strimzi.io "my-cluster" deleted
kafka.kafka.strimzi.io/my-cluster created


In [76]:
# get Kafka and check
!kubectl get Kafka
!kubectl get svc

my-cluster   1                        1                             
NAME                                              TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)             AGE
model-server-triton-default                       ClusterIP   10.96.250.111   <none>        8000/TCP,5001/TCP   44h
model-server-triton-default-model-server-triton   ClusterIP   10.96.209.237   <none>        9000/TCP,9500/TCP   44h


Get broker endpoint.

In [57]:
if clusterType == "kind":
    res = !kubectl get service my-cluster-kafka-plain-bootstrap -n seldon -o=jsonpath='{.status.loadBalancer.ingress[0].ip}'
    ip = res[0]
    %env BROKER=$ip:9092
else:
    res = !kubectl get service my-cluster-kafka-external-bootstrap -o=jsonpath='{.status.loadBalancer.ingress[0].hostname}'
    if len(res) == 1:
        hostname = res[0]
        %env BROKER=$h:9094
    else:
        res = !kubectl get service my-cluster-kafka-external-bootstrap -o=jsonpath='{.status.loadBalancer.ingress[0].ip}'
        ip = res[0]
        %env BROKER=$ip:9094

env: BROKER=Error from server (NotFound): services "my-cluster-kafka-plain-bootstrap" not found:9092


In [36]:
%%writefile topics.yaml
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
  name: cifar10-rest-input
  labels:
    strimzi.io/cluster: "my-cluster"
spec:
  partitions: 2
  replicas: 1
---
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
  name: cifar10-rest-output
  labels:
    strimzi.io/cluster: "my-cluster"
spec:
  partitions: 2
  replicas: 1
---
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
  name: cifar10-grpc-input
  labels:
    strimzi.io/cluster: "my-cluster"
spec:
  partitions: 2
  replicas: 1
---
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
  name: cifar10-grpc-output
  labels:
    strimzi.io/cluster: "my-cluster"
spec:
  partitions: 2
  replicas: 1

Overwriting topics.yaml


In [38]:
!kubectl delete -f topics.yaml
!kubectl apply -f topics.yaml

kafkatopic.kafka.strimzi.io "cifar10-rest-input" deleted
kafkatopic.kafka.strimzi.io "cifar10-rest-output" deleted
kafkatopic.kafka.strimzi.io "cifar10-grpc-input" deleted
kafkatopic.kafka.strimzi.io "cifar10-grpc-output" deleted
kafkatopic.kafka.strimzi.io/cifar10-rest-input created
kafkatopic.kafka.strimzi.io/cifar10-rest-output created
kafkatopic.kafka.strimzi.io/cifar10-grpc-input created
kafkatopic.kafka.strimzi.io/cifar10-grpc-output created


In [62]:
## Get Topics 
!kubectl get KafkaTopic

NAME                  CLUSTER      PARTITIONS   REPLICATION FACTOR   READY
cifar10-grpc-input    my-cluster   2            1                    
cifar10-grpc-output   my-cluster   2            1                    
cifar10-rest-input    my-cluster   2            1                    
cifar10-rest-output   my-cluster   2            1                    


## Install Seldon

  * [Install seldon via ansible](https://github.com/SeldonIO/ansible-k8s-collection)

## Download Test Request Data
We have two example datasets containing 50,000 requests in tensorflow serving format for CIFAR10. One in JSON format and one as length encoded proto buffers.

In [39]:
!gsutil cp gs://seldon-datasets/cifar10/requests/tensorflow/cifar10_tensorflow.json.gz cifar10_tensorflow.json.gz
!gunzip cifar10_tensorflow.json.gz
!gsutil cp gs://seldon-datasets/cifar10/requests/tensorflow/cifar10_tensorflow.proto cifar10_tensorflow.proto

Copying gs://seldon-datasets/cifar10/requests/tensorflow/cifar10_tensorflow.json.gz...
If you experience problems with multiprocessing on MacOS, they might be related to https://bugs.python.org/issue33725. You can disable multiprocessing by editing your .boto config or by adding the following flag to your command: `-o "GSUtil:parallel_process_count=1"`. Note that multithreading is still available even if you disable multiprocessing.

| [1 files][324.3 MiB/324.3 MiB]    7.5 MiB/s                                   
Operation completed over 1 objects/324.3 MiB.                                    
cifar10_tensorflow.json already exists -- do you wish to overwrite (y or n)? ^C
Copying gs://seldon-datasets/cifar10/requests/tensorflow/cifar10_tensorflow.proto...
If you experience problems with multiprocessing on MacOS, they might be related to https://bugs.python.org/issue33725. You can disable multiprocessing by editing your .boto config or by adding the following flag to your command: `-o "

## Test CIFAR10 REST Model

Upload tensorflow serving rest requests to kafka. This may take some time dependent on your network connection.

In [67]:
# !pip install confluent_kafka
# !pip install tensorflow-serving-api
!python ./seldon-core/util/kafka/test-client.py produce $BROKER cifar10-rest-input --file cifar10_tensorflow.json

2022-12-21 17:27:11.167444: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
%4|1671623835.971|CONFWARN|TI-MAC-088.local#producer-1| [thrd:app]: Configuration property group.id is a consumer property and will be ignored by this producer instance
%4|1671623835.971|CONFWARN|TI-MAC-088.local#producer-1| [thrd:app]: Configuration property auto.offset.reset is a consumer property and will be ignored by this producer instance
%3|1671623835.971|FAIL|TI-MAC-088.local#producer-1| [thrd:Error from server (NotFound): services "my-cluster-kafka-plain-]: Error from server (NotFound): services "my-cluster-kafka-plain-bootstrap" not found:9092:9092/bootstrap: Failed to resolve 'Error from server (NotFound): services "my-cluster-kafka-plain-

In [64]:
res = !kubectl get service my-cluster-kafka-plain-bootstrap -o=jsonpath='{.spec.clusterIP}'
ip = res[0]
%env BROKER_CIP=$ip

env: BROKER_CIP=Error from server (NotFound): services "my-cluster-kafka-plain-bootstrap" not found


In [41]:
%%writefile cifar10_rest.yaml
apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
  name: tfserving-cifar10
spec:
  protocol: tensorflow
  transport: rest
  serverType: kafka  
  predictors:
  - componentSpecs:
    - spec:
        containers:
        - args: 
          - --port=8500
          - --rest_api_port=8501
          - --model_name=resnet32
          - --model_base_path=gs://seldon-models/tfserving/cifar10/resnet32
          - --enable_batching
          image: tensorflow/serving
          name: resnet32
          ports:
          - containerPort: 8501
            name: http
    svcOrchSpec:
      env:
      - name: KAFKA_BROKER
        value: BROKER_IP
      - name: KAFKA_INPUT_TOPIC
        value: cifar10-rest-input
      - name: KAFKA_OUTPUT_TOPIC
        value: cifar10-rest-output
    graph:
      name: resnet32
      type: MODEL
      endpoint:
        service_port: 8501
    name: model
    replicas: 1

Overwriting cifar10_rest.yaml


In [42]:
!cat cifar10_rest.yaml | sed s/BROKER_IP/$BROKER_CIP:9092/ | kubectl apply -f -

error: error parsing STDIN: error converting YAML to JSON: yaml: line 27: mapping values are not allowed in this context


Looking at the metrics dashboard for Seldon you should see throughput we are getting. For a single replica on GKE with n1-standard-4 nodes we can see roughly 150 requests per second being processed.

![rest](tensorflow-rest-kafka.png)

In [25]:
!kubectl delete -f cifar10_rest.yaml

Error from server (NotFound): error when deleting "cifar10_rest.yaml": seldondeployments.machinelearning.seldon.io "tfserving-cifar10" not found


## Test CIFAR10 gRPC Model

Upload tensorflow serving rest requests to kafka. This is a file of protobuffer `tenserflow.serving.PredictRequest` ([defn](https://github.com/tensorflow/serving/blob/master/tensorflow_serving/apis/predict.proto)). Each binary protobuffer is prefixed by the numbre of bytes. Out test-client python script reads them and sends to our topic. This may take some time dependent on your network connection.

In [41]:
!python ./seldon-core/util/kafka/test-client.py produce $BROKER cifar10-grpc-input --file cifar10_tensorflow.proto --proto_name tensorflow.serving.PredictRequest

2022-05-19 15:23:40.161293: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2022-05-19 15:23:40.161322: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
%4|1652970223.000|CONFWARN|clive-T470p#producer-1| [thrd:app]: Configuration property group.id is a consumer property and will be ignored by this producer instance
%4|1652970223.000|CONFWARN|clive-T470p#producer-1| [thrd:app]: Configuration property auto.offset.reset is a consumer property and will be ignored by this producer instance
Messages sent: 100
Messages sent: 200
Messages sent: 300
Messages sent: 400
Messages sent: 500
Messages sent: 600
Messages sent: 700
Messages sent: 800
Messages sent: 900
Messages sent: 1000
Messages sent: 1100
Messages sent: 1200
Messages sent: 1300
Messages sent: 1400
Mess

Messages sent: 36900
Messages sent: 37000
Messages sent: 37100
Messages sent: 37200
Messages sent: 37300
Messages sent: 37400
Messages sent: 37500
Messages sent: 37600
Messages sent: 37700
Messages sent: 37800
Messages sent: 37900
Messages sent: 38000
Messages sent: 38100
Messages sent: 38200
Messages sent: 38300
Messages sent: 38400
Messages sent: 38500
Messages sent: 38600
Messages sent: 38700
Messages sent: 38800
Messages sent: 38900
Messages sent: 39000
Messages sent: 39100
Messages sent: 39200
Messages sent: 39300
Messages sent: 39400
Messages sent: 39500
Messages sent: 39600
Messages sent: 39700
Messages sent: 39800
Messages sent: 39900
Messages sent: 40000
Messages sent: 40100
Messages sent: 40200
Messages sent: 40300
Messages sent: 40400
Messages sent: 40500
Messages sent: 40600
Messages sent: 40700
Messages sent: 40800
Messages sent: 40900
Messages sent: 41000
Messages sent: 41100
Messages sent: 41200
Messages sent: 41300
Messages sent: 41400
Messages sent: 41500
Messages sent

In [16]:
res = !kubectl get service my-cluster-kafka-plain-bootstrap -o=jsonpath='{.spec.clusterIP}'
ip = res[0]
%env BROKER_CIP=$ip

env: BROKER_CIP=10.96.66.27


In [42]:
%%writefile cifar10_grpc.yaml
apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
  name: tfserving-cifar10
spec:
  protocol: tensorflow
  transport: grpc
  serverType: kafka  
  predictors:
  - componentSpecs:
    - spec:
        containers:
        - args: 
          - --port=8500
          - --rest_api_port=8501
          - --model_name=resnet32
          - --model_base_path=gs://seldon-models/tfserving/cifar10/resnet32
          - --enable_batching          
          image: tensorflow/serving
          name: resnet32
          ports:
          - containerPort: 8500
            name: grpc
          - containerPort: 8501
            name: http
    svcOrchSpec:
      env:
      - name: KAFKA_BROKER
        value: BROKER_IP
      - name: KAFKA_INPUT_TOPIC
        value: cifar10-grpc-input
      - name: KAFKA_OUTPUT_TOPIC
        value: cifar10-grpc-output
    graph:
      name: resnet32
      type: MODEL
      endpoint:
        grpcPort: 8500
        httpPort: 8501
    name: model
    replicas: 1

Overwriting cifar10_grpc.yaml


In [43]:
!cat cifar10_grpc.yaml | sed s/BROKER_IP/$BROKER_CIP:9092/ | kubectl apply -f -

seldondeployment.machinelearning.seldon.io/tfserving-cifar10 created


Looking at the metrics dashboard for Seldon you should see throughput we are getting. For a single replica on GKE with n1-standard-4 nodes we can see around 220 requests per second being processed.

![grpc](tensorflow-grpc-kafka.png)

In [39]:
!kubectl delete -f cifar10_grpc.yaml

seldondeployment.machinelearning.seldon.io "tfserving-cifar10" deleted
