## Pipeline Orchestrations Tutorial

Wallaroo provides data connections, orchestrations, and tasks to provide organizations with a method of creating and managing automated tasks that can either be run on demand, on a regular schedule, or as a service so they respond to requests.

| Object | Description |
|---|---|
| Orchestration | A set of instructions written as a python script with a requirements library.  Orchestrations are uploaded to the Wallaroo instance |
| Task | An implementation of an orchestration.  Tasks are run either once when requested, on a repeating schedule, or as a service. |
| Connector | Definitions set by MLOps engineers that are used by other Wallaroo users for connection information to a data source.  Usually paired with orchestrations. |

A typical flow in the orchestration, task and connector life cycle is:

1. (Optional) A connector is defined with information such as username, connection URL, tokens, etc.
1. One or more connectors are applied to a workspace for users to implement in their code or orchestrations.
1. An orchestration is created to perform some set instructions.  For example:
    1. Deploy a pipeline, request data from an external service, store the results in an external database, then undeploy the pipeline.
    1. Download a ML Model then replace a current pipeline step with the new version.
    1. Collect log files from a deployed pipeline once every hour and submit it to a Kafka or other service.
1. A task is created that specifies the orchestration to perform and the schedule:
    1. Run once.
    1. Run on a schedule (based on `cron` like settings).
    1. Run as a service to be run whenever requested.
1. Once the use for a task is complete, it is killed and its schedule or service removed.

## Tutorial Goals

The tutorial will demonstrate the following:

1. Create a simple connection to retrieve an Apache Arrow table file from a GitHub registry.
1. Create an orchestration that retrieves the Apache Arrow table file from the location defined by the connection, deploy a pipeline, perform an inference, then undeploys the pipeline.
1. Implement the orchestration as a task that runs every minute.
1. Display the logs from the pipeline after 5 minutes to verify the task is running.

## Requires Libraries

The following libraries are required, and included by default in a Wallaroo instance's JupyterHub service.

* [wallaroo](https://pypi.org/project/wallaroo/):  The Wallaroo SDK.
* [pandas](https://pypi.org/project/pandas/): The pandas data analysis library.
* [pyarrow](https://pypi.org/project/pyarrow/): The Apache Arrow Python library.

The specific versions used are set in the file `./resources/requirements.txt`.  Supported libraries are automatically installed with the `pypi` or `conda` commands.  For example, from the root of this tutorials folder:

```python
pip install -r ./resources/requirements.txt
```



Using pipeline orchestrations consist of these steps: 
1. [Write orchestration code](#1.-Write-orchestration-code)
2. [Create archive](#2.-Create-archive)
3. [Upload archive](#3.-Upload-archive)
4. [Run task](#4.-Run-task)

[Task Management](#Task-Management)

## Connect to the Wallaroo Instance

The first step is to connect to a Wallaroo instance.  We'll load the libraries and set our client connection settings

### Workspace, Model and Pipeline Setup

For this tutorial, we'll create a workspace, upload our sample model and deploy a pipeline.  We'll perform some quick sample inferences to verify that everything it working.

In [None]:
import wallaroo
from wallaroo.object import EntityNotFoundError

# to display dataframe tables
from IPython.display import display
# used to display dataframe information without truncating
import pandas as pd
pd.set_option('display.max_colwidth', None)
import pyarrow as pa

import os
# Used for the Wallaroo SDK version 2023.1
os.environ["ARROW_ENABLED"]="True"

In [None]:
# Login through local Wallaroo instance

# wl = wallaroo.Client()

# # SSO login through keycloak

# wallarooPrefix = "YOUR PREFIX"
# wallarooSuffix = "YOUR PREFIX"


# wallarooPrefix = "doc-test"
# wallarooSuffix = "wallaroocommunity.ninja"


# wl = wallaroo.Client(api_endpoint=f"https://{wallarooPrefix}.api.{wallarooSuffix}", 
#                     auth_endpoint=f"https://{wallarooPrefix}.keycloak.{wallarooSuffix}", 
#                     auth_type="sso")

# os.environ["WALLAROO_SDK_CREDENTIALS"] = './creds.json.example'

wallarooPrefix="doc-test"
wallarooSuffix="wallaroocommunity.ninja"

# wallarooPrefix="product-uat-ee"
# wallarooSuffix="wallaroocommunity.ninja"

# wl = wallaroo.Client(api_endpoint=f"https://{wallarooPrefix}.api.{wallarooSuffix}", 
#                     auth_endpoint=f"https://{wallarooPrefix}.keycloak.{wallarooSuffix}", 
#                     auth_type="sso")

wl = wallaroo.Client(api_endpoint=f"https://{wallarooPrefix}.api.{wallarooSuffix}", 
                    auth_endpoint=f"https://{wallarooPrefix}.keycloak.{wallarooSuffix}", 
                    auth_type="sso")

In [None]:
# Setting variables for later steps

workspace_name = 'orchestrationworkspace'
pipeline_name = 'orchestrationpipeline'
model_name = 'orchestrationmodel'
model_file_name = './models/rf_model.onnx'

### Helper Methods

The following helper methods are used to either create or get workspaces and pipelines.

In [None]:
# helper methods to retrieve workspaces and pipelines

def get_workspace(name):
    workspace = None
    for ws in wl.list_workspaces():
        if ws.name() == name:
            workspace= ws
    if(workspace == None):
        workspace = wl.create_workspace(name)
    return workspace

def get_pipeline(name):
    try:
        pipeline = wl.pipelines_by_name(name)[0]
    except EntityNotFoundError:
        pipeline = wl.build_pipeline(name)
    return pipeline

### Create the Workspace and Pipeline

We'll now create our workspace and pipeline for the tutorial.  If this tutorial has been run previously, then this will retrieve the existing ones with the assumption they're for us with this tutorial.

We'll set the retrieved workspace as the current workspace in the SDK, so all commands will default to that workspace.

In [None]:
workspace = get_workspace(workspace_name)
wl.set_current_workspace(workspace)

pipeline = get_pipeline(pipeline_name)

### Upload the Model and Deploy Pipeline

We'll upload our model into our sample workspace, then add it as a pipeline step before deploying the pipeline to it's ready to accept inference requests.

In [None]:
# Upload the model

housing_model_control = wl.upload_model(model_name, model_file_name).configure()

# Add the model as a pipeline step

pipeline.add_model_step(housing_model_control)

In [None]:
#deploy the pipeline
pipeline.deploy()

### Sample Inferences

We'll perform some quick sample inferences using an Apache Arrow table as the input.  Once that's finished, we'll undeploy the pipeline and return the resources back to the Wallaroo instance.

In [None]:
# sample inferences

batch_inferences = pipeline.infer_from_file('./data/xtest-1k.arrow')

large_inference_result =  batch_inferences.to_pandas()
display(large_inference_result.head(20))

In [None]:
# undeploy the pipeline

pipeline.undeploy()

## Create Wallaroo Connection




Connections are created at the Wallaroo instance level, typically by a MLOps or DevOps engineer, then applied to a workspace.

For this section:

1. We will create a sample connection that just has a URL to the same Arrow table file we used in the previous step.
1. We'll apply the data connection to the workspace above.
1. For a quick demonstration, we'll use the connection to retrieve the Arrow table file and use it for a quick sample inference.

### Create Connection

Connections are created with the Wallaroo client command [`add_connection`](https://staging.docs.wallaroo.ai/wallaroo-developer-guides/wallaroo-sdk-guides/wallaroo-sdk-essentials-guide/wallaroo-sdk-essentials-dataconnectors/#create-data-connection) with the following parameters.

| Parameter | Type | Description |
| --- | --- | ---|
| **name** | string (Required) | The name of the connector. |
| **type** | string (Required) | The user defined type of connector. |
| **details** | Dict (Requires) | User defined configuration details for the data connection.  These can be `{'username':'dataperson', 'password':'datapassword', 'port': 3339}`, or `{'token':'abcde123==', 'host':'example.com', 'port:1234'}`, or other user defined combinations.  |

We'll create the connection named `houseprice_arrow_table`, set it to the type `HTTPFILE`, and provide the details as `'host':'https://github.com/WallarooLabs/Wallaroo_Tutorials/blob/20230314_2023.2_updates/wallaroo-testing-tutorials/houseprice-saga/data/xtest-1k.arrow?raw=true'` - the location for our sample Arrow table inference input.

In [None]:
wl.add_connection("houseprice_arrow_table", 
                  "HTTPFILE", 
                  {'host':'https://github.com/WallarooLabs/Wallaroo_Tutorials/blob/20230314_2023.2_updates/wallaroo-testing-tutorials/houseprice-saga/data/xtest-1k.arrow?raw=true'}
                  )

The orchestration code will run in an environment very similar to the Wallaroo Jupyter Lab experience. The idea is code should work the same in a Lab notebook as it does in an orchestrator task.

* Same Python version
* Same Wallaroo SDK version
* Code may assume it will run in an empty `/home/jovyan` directory (any desired code or artifacts must be included explicitly)
* Pip dependencies can be specified
* `wallaroo.Client` constructor `auth_type` argument is ignored - it's okay to pass nothing
* New functions:
    * `wallaroo.in_task()` returns `True` if the code is running in an Orchestrator task
    * `wallaroo.task_args()` returns a `Dict` of invocation-specific arguments passed to the `run_` calls

Example `requirements.txt`
--------------------------
```python
dbt-bigquery==1.4.3
dbt-core==1.4.5
dbt-extractor==0.4.1
dbt-postgres==1.4.5
google-api-core==2.8.2
google-auth==2.11.0
google-auth-oauthlib==0.4.6
google-cloud-bigquery==3.3.2
google-cloud-bigquery-storage==2.15.0
google-cloud-core==2.3.2
google-cloud-storage==2.5.0
google-crc32c==1.5.0
google-pasta==0.2.0
google-resumable-media==2.3.3
googleapis-common-protos==1.56.4
```

Example orchestrator
--------------------
```python
from google.cloud import bigquery
import wallaroo

wl = wallaroo.Client()

if wl.in_task():
    conn_name = wl.task_args()["connection-name"]
    print(f"I am running in a task and I will read from {conn_name}")
          
# Get a database connection
workspace = wl.get_current_workspace()
prodcon = workspace.get_connection(conn_name)

# Deploy a pipeline

# Do some queries and inferences
etc...
```

Here are some other SDK functions useful inside an orchestration task.

In [None]:
# This will always return False in a notebook, because we are not in a task
wl.in_task()

In [None]:
# If we were in a task, we could get our arguments like this
wl.task_args()

## 2. Create archive

The uploaded artifact must be a ZIP file which contains:

* User code. If `main.py` exists, then that will be used as the task entrypoint. Otherwise, if only one .py exists, then that will be the entrypoint.
* Optional: A standard Python `requirements.txt` for any dependencies to be provided in the task environment. The Wallaroo SDK will already be present and should not be mentioned.
* Optional: Any other artifacts desired for runtime, including data or code.

The ZIP file should not contain any directories: only files at the top level.

**Note** - In future versions SDK may help with packaging. In this version, our Jupyter Lab includes the `zip` program for use in a terminal tab, or the zip file can be created elsewhere.

### Procedure

In a terminal, whether in a Jupyter Lab or desktop, assemble artifacts as above and then create the archive.
```shell
$ zip hello.zip main.py requirements.txt 
  adding: main.py (deflated 47%)
  adding: requirements.txt (deflated 52%)
```

The first argument to `zip` is the name of the archive you want, while the rest are all of the contents to add.

## 3. Upload archive

With zip archive in hand, we can use the SDK to upload it to Wallaroo.

In [None]:
import wallaroo
import os
os.environ["WALLAROO_SDK_CREDENTIALS"] = "creds.json"
wl = wallaroo.Client(auth_type="user_password")
orc1 = wl.upload_orchestration(path="hello.zip")

At this point, Wallaroo will perform a packaging step where it downloads and installs all the dependencies listed in `requirements.txt`. The status can be observed either in the orchestration list or by examining an individual orchestration object. The state will transition from `packaging pending` through `ready`. Also, the `orchestration.status()` method can be used to pause until packaging is finished.

In [None]:
orc1

In [None]:
wl.list_orchestrations()

In [None]:
while orc1.status() != 'ready': 
    print("waiting")
    time.sleep(5)

## 4. Run task

At this point, given a `ready` orchestration, it's ready to run as a task.  There are three alternatives here.

With an orchestration in hand we can launch it as a task in three ways.

| Type       | SDK Call |  How triggered                                                               | Purpose                                                       |
|------------|----------|:------------------------------------------------------------------------------|:---------------------------------------------------------------|
| Once       | `orc.run_once()` | User makes one api call. Task runs once and exits                      | Single batch, experimentation                                 |
| Scheduled  | `orc.run_scheduled()` | User provides schedule. Task runs exits whenever schedule dictates                                  | Recurrent batch ETL                                           |
| Continuous | `orc.run_continuously()` | User provides a listen port. Task runs forever. It can listen on that port if it wants and we help. | User defined network service, continuous ETL, queue processor |

All take a `json_args` parameter, where the user can pass an invocation-specific `Dict` of arguments which will be available in the running task.

In [None]:
# Example: run once
task = orc1.run_once({"a":"b"})
task

# Task Management

## Under the hood - Kubernetes log retrieval

In upcoming releases we will be able to retrieve task logs in friendly manners. In the meantime, some kubectl magic will be necessary. The cloud host's Kuberenetes log console can also be used.

When a task is launched, the ID will be shown in its object as above.  With that in hand, list workspaces

```shell
$ kubectl get namespace
NAME                       STATUS   AGE
default                    Active   530d
kube-system                Active   530d
wallaroo                   Active   16d
tasks-00d86236-d0fe-4a-1   Active   26s```
```

Notice the last namespace with matching `task-ID`.  There will be one task in that namespace:

```shell
$ kubectl -n tasks-00d86236-d0fe-4a-1 get pod
NAME                                                READY   STATUS             RESTARTS       AGE
00d-exec-orch-oneshot-arb-one-exe-6f95c84ff-99586   0/1     Running            5 (107s ago)   6m19s
```

You can now list logs from that pod.

```shell
$ kubectl -n tasks-00d86236-d0fe-4a-1 logs 00d-exec-orch-oneshot-arb-one-exe-6f95c84ff-99586
```

In [None]:
# What is the status of all tasks
wl.list_tasks()

In [None]:
# Kill one task
task.kill()

In [None]:
# Kill one task
task.kill()

In [None]:
# Kill all tasks
for t in wl.list_tasks(): t.kill()

In [None]:
# Pipeline Orchestration

url = f"{wl.api_endpoint}/v1/api/orchestration/upload"

fp = open("hello.zip", "rb")
resp = requests.post(
    url,
    headers={
    "Authorization": wl.auth._bearer_token_str(), },
    files=[("file", ("hello2.zip", fp, "application/octet-stream")), 
         ("metadata", ("metadata", '{"workspace_id": 1}', "application/json"))],
)

assert resp.status_code == 202
(resp.status_code, resp.text)

In [None]:
url=f"{wl.api_endpoint}/v1/api/orchestration/list"
resp=requests.post(url, headers=headers, json={'workspace_id':wsid})
assert resp.status_code == 200
assert resp.text == '[]'

In [None]:
url=f"{wl.api_endpoint}/v1/api/orchestration/list"
resp=requests.post(url, headers=headers)
#assert resp.status_code == 422

In [None]:
resp.text

In [None]:
data = {"workspace_id": wsid, "orch_id": orchid, "json": {"aa":"bb"}}
url=f"{wl.api_endpoint}/v1/api/orchestration/task/run_once"
resp=requests.post(url, headers=headers, json=data)
resp.json()

In [None]:
wl.list_workspaces()[0]

In [None]:
wl.get_current_workspace()

In [None]:
wl.list_orchestrations()

In [None]:
orch = wl.list_orchestrations()[0]

In [None]:
orch.status()

In [None]:
x = orch.run_once(json={"hello": 34})