# Leveraging AsyncIO in inference pipelines

Tempo includes experimental support for `asyncio`, which provides a way to optimise pipelines.
In particular, `asyncio` can be beneficial in scenarios where most of the heavy lifting is done by downstream models and the pipeline just orchestrates calls across these models.
In this case, most of the time within the pipeline will be spent waiting for the requests from downstream models to come back.
`asyncio` will allow us to process other incoming requests during this waiting time.

This example will walk us through the process of setting up an asynchronous pipeline.
As you will see, it's quite similar to the usual synchronous pipelines.

## Prerequisites

This notebooks needs to be run in the `tempo-examples` conda environment defined below. Create from project root folder:

```bash
conda env create --name tempo-examples --file conda/tempo-examples.yaml
```

## Project Structure

In [1]:
!tree -P "*.py"  -I "__init__.py|__pycache__" -L 2

[01;34m.[00m
├── [01;34martifacts[00m
└── [01;34msrc[00m
    ├── constants.py
    ├── data.py
    ├── tempo.py
    └── train.py

2 directories, 4 files


## Train Models

This section is where as a data scientist you do your work of training models and creating artfacts.
For this example, we will train two sklearn and xgboost classification models using the iris dataset.

These models will be used by our inference pipeline.

In [1]:
import logging
from tempo.utils import logger

logger.setLevel(logging.ERROR)
logging.basicConfig(level=logging.ERROR)

In [6]:
# %load src/train.py
import joblib
import os

from sklearn.linear_model import LogisticRegression
from xgboost import XGBClassifier

from src.data import IrisData
from src.constants import SKLearnFolder, XGBoostFolder


def train_sklearn(data: IrisData):
    logreg = LogisticRegression(C=1e5)
    logreg.fit(data.X, data.y)

    model_path = os.path.join(SKLearnFolder, "model.joblib")
    with open(model_path, "wb") as f:
        joblib.dump(logreg, f)


def train_xgboost(data: IrisData):
    clf = XGBClassifier()
    clf.fit(data.X, data.y)

    model_path = os.path.join(XGBoostFolder, "model.json")
    clf.save_model(model_path)


In [8]:
from src.data import IrisData
from src.train import train_sklearn, train_xgboost

data = IrisData()

train_sklearn(data)
train_xgboost(data)





## Create Tempo Artifacts

Here we create the Tempo models and orchestration Pipeline for our final service using our models.
For illustration the final service will call the sklearn model and based on the result will decide to return that prediction or call the xgboost model and return that prediction instead.

In [9]:
from src.tempo import inference_pipeline

ImportError: cannot import name 'PipelineModels' from 'tempo' (/home/agm/Seldon/tempo/tempo/__init__.py)

In [None]:
# %load src/tempo.py
from typing import Tuple

import numpy as np
from src.train import SKLearnFolder, XGBoostFolder

from tempo.serve.metadata import ModelFramework
from tempo.serve.model import Model
from tempo.serve.pipeline import Pipeline, PipelineModels
from tempo.serve.utils import pipeline

PipelineFolder = "classifier"
SKLearnTag = "sklearn prediction"
XGBoostTag = "xgboost prediction"


def get_tempo_artifacts(artifacts_folder: str) -> Tuple[Pipeline, Model, Model]:

    sklearn_model = Model(
        name="test-iris-sklearn",
        platform=ModelFramework.SKLearn,
        local_folder=f"{artifacts_folder}/{SKLearnFolder}",
        uri="s3://tempo/basic/sklearn",
        description="An SKLearn Iris classification model",
    )

    xgboost_model = Model(
        name="test-iris-xgboost",
        platform=ModelFramework.XGBoost,
        local_folder=f"{artifacts_folder}/{XGBoostFolder}",
        uri="s3://tempo/basic/xgboost",
        description="An XGBoost Iris classification model",
    )

    @pipeline(
        name="classifier",
        uri="s3://tempo/basic/pipeline",
        local_folder=f"{artifacts_folder}/{PipelineFolder}",
        models=PipelineModels(sklearn=sklearn_model, xgboost=xgboost_model),
        description="A pipeline to use either an sklearn or xgboost model for Iris classification",
    )
    def classifier(payload: np.ndarray) -> Tuple[np.ndarray, str]:
        res1 = classifier.models.sklearn(input=payload)

        if res1[0] == 1:
            return res1, SKLearnTag
        else:
            return classifier.models.xgboost(input=payload), XGBoostTag

    return classifier, sklearn_model, xgboost_model
