# Parallel mini-batch processing with Saldon & Ray

## Prerequisites

 * Kind cluster with Seldon Installed
 * Docker to build image
 * kubectl 

## About the example

In this example we will explore how one can use Seldon with Ray to split incoming request into smaller mini-batches that can be computer in parallel.

In addition to usual `Seldon Deployment` we will create a `Ray Cluster` to which we will connect. 

![Component Connnections](figures/ray-proxy-1.svg "Component Connnections")

The idea is quite simple. User sends a single request that contains main feature vectors for which they want to compute inference. 

Assumption is that user expects result of the inference computation to be computed as soon as possible and returned in the request's response.

To achieve this we split incoming rquest into smaller mini-batches that which can be computed in parallel using pool of Ray Actors.
We then combine computed requests and return them to the user.

![Component Connnections](figures/ray-proxy-2.svg "Component Connnections")

In this example we will perform sentiment analysis using Roberta model.

## Setup Seldon Core

Use the setup notebook to [Setup Cluster](../../seldon_core_setup.ipynb) to setup Seldon Core with an ingress.

In [1]:
!kubectl create namespace distributed-roberta

namespace/distributed-roberta created


In [2]:
!kubectl config set-context $(kubectl config current-context) --namespace=distributed-roberta

Context "kind-kind" modified.


## Prepare Model and Docker Image

We start by preparing our model. We assume that model binary will be present at `/microservice/pytorch_model.bin`. 

The `RobertaModel` class bellow simply loads the binary and serves predictions. This class could be directly use with `Seldon`.

To offload computation to `Ray Actors` we prepare a second `ProxyModel` class that:
- connects to running Ray Cluster
- converts `RobertaModel` into `Ray Actors`
- serves prediction by proxing computation to `Ray Actors`

Number of actors and mini-batch size can be controlled via environmental variables at the runtime. We will also switch between `RobertaModel` and `ProxyModel` using environmental variables as well.

In [3]:
%%writefile Model.py
import logging
import time
import os

import ray

import numpy as np
from seldon_core.utils import getenv_as_bool


RAY_PROXY = getenv_as_bool("RAY_PROXY", default=False)
MODEL_FILE = "/microservice/pytorch_model.bin"

BATCH_SIZE = int(os.environ.get("BATCH_SIZE", "100"))
NUM_ACTORS = int(os.environ.get("NUM_ACTORS", "10"))


class RobertaModel:
    def __init__(self, load_on_init=False):
        if load_on_init:
            self.load()

    def load(self):
        import torch
        from simpletransformers.model import TransformerModel

        logging.info("starting RobertaModel...")
        model = TransformerModel(
            "roberta",
            "roberta-base",
            args=({"fp16": False, "use_multiprocessing": False}),
            use_cuda=False,
        )
        model.model.load_state_dict(torch.load(MODEL_FILE))
        self.model = model
        logging.info("... started RobertaModel")

    def predict(self, data, names=[], meta={}):
        logging.info(f"received inference request: {data}")
        data = data.astype("U")
        output = self.model.predict(data)[1].argmax(axis=1)
        logging.info("finished calculating prediction")
        return output


class ProxyModel:
    def load(self):
        ray.init(address="auto")

        self.actors = [
            ray.remote(RobertaModel).remote(load_on_init=True)
            for _ in range(NUM_ACTORS)
        ]

        self.pool = ray.util.ActorPool(self.actors)

    def predict(self, data, names=[], meta=[]):
        logging.info(f"data received: {data}")
        batches = np.array_split(data, max(data.shape[0] // BATCH_SIZE, 1))
        logging.info(f"spllited into {len(batches)} batches")

        t1 = time.perf_counter()
        results = list(self.pool.map(lambda a, v: a.predict.remote(v), batches))
        results = np.concatenate(results).tolist()
        t2 = time.perf_counter()

        return {"time-taken": t2 - t1, "results": results}


if not RAY_PROXY:
    logging.info("Model = RobertaModel")
    Model = RobertaModel
else:
    logging.info("Model = ProxyModel")
    Model = ProxyModel


Overwriting Model.py


## Prepare Docker image

We will use same image for `Seldon Deployment` and nodes in the `Ray Cluster`. 

This image is available as `seldonio/distributed-roberta:0.1`.

In [4]:
%%writefile requirements.txt
ray==1.0.0

simpletransformers==0.48.5
tensorboardX==1.9

# newer version seems to be missing wheels
sentencepiece==0.1.91

transformers==3.2.0
torch==1.6.0

Overwriting requirements.txt


In [5]:
%%writefile Dockerfile
FROM seldonio/seldon-core-s2i-python37-ubi8:1.7.0-dev

# Add Model
COPY training/outputs/pytorch_model.bin /microservice/pytorch_model.bin

# Install requirements
COPY requirements.txt /microservice
RUN pip install -r requirements.txt && rm -r ~/.cache/pip

RUN pip install ray[tune]

# Add file that will download Roberta cache inside container
COPY import_for_cache_download.py /tmp
RUN python3 /tmp/import_for_cache_download.py

# Add Seldon Model
ENV MODEL_NAME Model
ENV API_TYPE REST

COPY Model.py /microservice/
CMD seldon-core-microservice $MODEL_NAME $API_TYPE

Overwriting Dockerfile


## Setup Ray Cluster

The setup of Ray Cluster is based on [Ray documentation on advanced k8s usage](https://docs.ray.io/en/releases-1.0.0/cluster/kubernetes.html) (Ray 1.0.0).

We will use our `distributed-roberta:0.1` image and set 2 replicas with 5 workers each.

In [6]:
%%writefile deploy-ray.yaml
# Ray head node service, allowing worker pods to discover the head node.
apiVersion: v1
kind: Service
metadata:
  namespace: distributed-roberta
  name: ray-head
spec:
  ports:
    # Redis ports.
    - name: redis-primary
      port: 6379
      targetPort: 6379
    - name: redis-shard-0
      port: 6380
      targetPort: 6380
    - name: redis-shard-1
      port: 6381
      targetPort: 6381
    # Ray internal communication ports.
    - name: object-manager
      port: 12345
      targetPort: 12345
    - name: node-manager
      port: 12346
      targetPort: 12346
    - name: http
      port: 8000
      protocol: TCP
      targetPort: 8000
  selector:
    component: ray-head

---

apiVersion: apps/v1
kind: Deployment
metadata:
  namespace: distributed-roberta
  name: ray-head
spec:
  # Do not change this - Ray currently only supports one head node per cluster.
  replicas: 1
  selector:
    matchLabels:
      component: ray-head
      type: ray
  template:
    metadata:
      labels:
        component: ray-head
        type: ray
    spec:
      # If the head node goes down, the entire cluster (including all worker
      # nodes) will go down as well. If you want Kubernetes to bring up a new
      # head node in this case, set this to "Always," else set it to "Never."
      restartPolicy: Always

      # This volume allocates shared memory for Ray to use for its plasma
      # object store. If you do not provide this, Ray will fall back to
      # /tmp which cause slowdowns if is not a shared memory volume.
      volumes:
      - name: dshm
        emptyDir:
          medium: Memory
      containers:
        - name: ray-head
          image: seldonio/distributed-roberta:0.1
          imagePullPolicy: Always
          command: [ "/bin/bash", "-c", "--" ]
          args:
            - "ray start --head --node-ip-address=$MY_POD_IP --port=6379 --redis-shard-ports=6380,6381 --num-cpus=$MY_CPU_REQUEST --object-manager-port=12345 --node-manager-port=12346 --block"
          ports:
            - containerPort: 6379 # Redis port.
            - containerPort: 6380 # Redis port.
            - containerPort: 6381 # Redis port.
            - containerPort: 12345 # Ray internal communication.
            - containerPort: 12346 # Ray internal communication.
            - containerPort: 8000
              protocol: TCP
          # This volume allocates shared memory for Ray to use for its plasma
          # object store. If you do not provide this, Ray will fall back to
          # /tmp which cause slowdowns if is not a shared memory volume.
          volumeMounts:
            - mountPath: /dev/shm
              name: dshm
          env:
            - name: MY_POD_IP
              valueFrom:
                fieldRef:
                  fieldPath: status.podIP
            # This is used in the ray start command so that Ray can spawn the
            # correct number of processes. Omitting this may lead to degraded
            # performance.
            - name: MY_CPU_REQUEST
              valueFrom:
                resourceFieldRef:
                  resource: requests.cpu
          resources:
            requests:
              cpu: 1
              memory: 1024Mi

---

apiVersion: apps/v1
kind: Deployment
metadata:
  namespace: distributed-roberta
  name: ray-worker
spec:
  # Change this to scale the number of worker nodes started in the Ray cluster.
  replicas: 2
  selector:
    matchLabels:
      component: ray-worker
      type: ray
  template:
    metadata:
      labels:
        component: ray-worker
        type: ray
    spec:
      restartPolicy: Always
      volumes:
      - name: dshm
        emptyDir:
          medium: Memory
      containers:
      - name: ray-worker
        image: seldonio/distributed-roberta:0.1
        imagePullPolicy: Always
        command: ["/bin/bash", "-c", "--"]
        args:
          - "ray start --node-ip-address=$MY_POD_IP --num-cpus=$MY_CPU_REQUEST --address=$RAY_HEAD_SERVICE_HOST:$RAY_HEAD_SERVICE_PORT_REDIS_PRIMARY --object-manager-port=12345 --node-manager-port=12346 --block"
        ports:
          - containerPort: 12345 # Ray internal communication.
          - containerPort: 12346 # Ray internal communication.
        volumeMounts:
          - mountPath: /dev/shm
            name: dshm
        env:
          - name: MY_POD_IP
            valueFrom:
              fieldRef:
                fieldPath: status.podIP
          # This is used in the ray start command so that Ray can spawn the
          # correct number of processes. Omitting this may lead to degraded
          # performance.
          - name: MY_CPU_REQUEST
            valueFrom:
              resourceFieldRef:
                resource: requests.cpu
        resources:
          requests:
            cpu: 5
            memory: 512Mi

Overwriting deploy-ray.yaml


## Setup Seldon Deployment

We will create two Seldon Deployments in this example. One standard and one with proxy to Ray cluster.

In [7]:
%%writefile deploy-seldon.yaml
apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
  name: seldon-model-ray-proxy
  namespace: distributed-roberta
spec:
  name: mock-deployment
  predictors:
  - componentSpecs:
    - spec:
        volumes:
        - name: dshm
          emptyDir:
            medium: Memory
        containers:
        - name: model
          image: seldonio/distributed-roberta:0.1
          imagePullPolicy: Always
          securityContext:
            runAsUser: 0
          command: [ "/bin/bash", "-c", "--" ]
          args:
            - "ray start --node-ip-address=$MY_POD_IP --num-cpus=0 --address=$RAY_HEAD_SERVICE_HOST:$RAY_HEAD_SERVICE_PORT_REDIS_PRIMARY --object-manager-port=12345 --node-manager-port=12346 &&
              seldon-core-microservice $MODEL_NAME $API_TYPE"
          ports:
            - containerPort: 12345 # Ray internal communication.
            - containerPort: 12346 # Ray internal communication.
          volumeMounts:
            - mountPath: /dev/shm
              name: dshm
          env:
          - name: MY_POD_IP
            valueFrom:
              fieldRef:
                fieldPath: status.podIP
          - name: SELDON_LOG_LEVEL
            value: DEBUG
          - name: GUNICORN_THREADS
            value: "1"
          - name: RAY_PROXY
            value: "true"
          - name: BATCH_SIZE
            value: "50"
          - name: NUM_ACTORS
            value: "10"
    graph:
      name: model
      type: MODEL
    name: default
    replicas: 1

---

apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
  name: seldon-model
  namespace: distributed-roberta
spec:
  name: mock-deployment
  predictors:
  - componentSpecs:
    - spec:
        containers:
        - name: model
          image: seldonio/distributed-roberta:0.1
          imagePullPolicy: Always
          securityContext:
            runAsUser: 0
          env:
          - name: SELDON_LOG_LEVEL
            value: DEBUG
          - name: GUNICORN_THREADS
            value: "1"
          - name: RAY_PROXY
            value: "false"
    graph:
      name: model
      type: MODEL
    name: default
    replicas: 1

Overwriting deploy-seldon.yaml


## Deploy to k8s

In [8]:
%%bash
kubectl apply -f deploy-ray.yaml
kubectl apply -f deploy-seldon.yaml

service/ray-head created
deployment.apps/ray-head created
deployment.apps/ray-worker created
seldondeployment.machinelearning.seldon.io/seldon-model-ray-proxy created
seldondeployment.machinelearning.seldon.io/seldon-model created


In [11]:
!kubectl rollout status deploy/$(kubectl get deploy -l seldon-deployment-id=seldon-model -o jsonpath='{.items[0].metadata.name}')

deployment "seldon-model-default-0-model" successfully rolled out


In [12]:
!kubectl rollout status deploy/$(kubectl get deploy -l seldon-deployment-id=seldon-model-ray-proxy -o jsonpath='{.items[0].metadata.name}')

deployment "seldon-model-ray-proxy-default-0-model" successfully rolled out


## Test Deployment

In [13]:
import numpy as np
import requests

In [14]:
from alibi.datasets import fetch_movie_sentiment

In [15]:
movies = fetch_movie_sentiment()
data = movies.data

In [16]:
payload = data[:500]

### Send requests (only Seldon)

In [17]:
%%time
model_namespace = "distributed-roberta"
model_name = "seldon-model"

endpoint = (
    f"http://localhost:8003/seldon/{model_namespace}/{model_name}/api/v1.0/predictions"
)

r = requests.post(url=endpoint, json={"data": {"ndarray": payload}})

CPU times: user 8.77 ms, sys: 97 µs, total: 8.87 ms
Wall time: 1min 25s


In [18]:
d1 = r.json()["data"]["ndarray"]

### Send requests (proxy to Ray)

In [19]:
%%time
model_namespace = "distributed-roberta"
model_name = "seldon-model-ray-proxy"

endpoint = (
    f"http://localhost:8003/seldon/{model_namespace}/{model_name}/api/v1.0/predictions"
)

r = requests.post(url=endpoint, json={"data": {"ndarray": payload}})

CPU times: user 3.05 ms, sys: 725 µs, total: 3.78 ms
Wall time: 11.2 s


In [20]:
d2 = r.json()["jsonData"]["results"]

### Check

As we see, both methods yields the same results

In [21]:
d1 == d2

True

And as expected, using 10 workers give, give of take, 10x speedup in computation.

## Resource cleanup

In [22]:
%%bash
kubectl delete -f deploy-ray.yaml
kubectl delete -f deploy-seldon.yaml

service "ray-head" deleted
deployment.apps "ray-head" deleted
deployment.apps "ray-worker" deleted
seldondeployment.machinelearning.seldon.io "seldon-model-ray-proxy" deleted
seldondeployment.machinelearning.seldon.io "seldon-model" deleted
