# Demonstrator 2023-ecp-f2f

Refer to `README.md` for installation instructions.

## 0. Configuration

Let's start off by sourcing secrets and defining the locations of the various required services:

In [None]:
# source secrets
source .env

#### **List TES instances**

Let's see what TES instances we have defined:

In [None]:
unset TES_INSTANCES
declare -A TES_INSTANCES
while IFS=',' read -r KEY URL; do
    TES_INSTANCES["$KEY"]=$URL
done < .tes_instances

for key in "${!TES_INSTANCES[@]}"; do
    export TES="${TES_INSTANCES[$key]}"
    echo "$key: $TES"
done

If you are running the demo on the ELIXIR Cloud infrastructure, the demo
makes use of the following TES and cloud storage deployments.

**Figure 0. ELIXIR Cloud deployments.**

![Figure 0](images/figure_0.svg)

> This setup will of course differ if you deployed your own nodes.

## 1. Executing tasks via the GA4GH TES API

In this section, we will use both the shell and a dedicated Python library to
send tasks to the defined TES instances.

### Using the shell

Here, we will use the `curl` library to send requests to the TES APIs. It
should be easy to adapt the calls for use with other tools, such as Postman or
your favorite progamming language's HTTP request libraries.

#### **Running a minimal task**

Now we will submit a very simple task to each of these instances (Figure 1A).

**Figure 1A. Executing a minimal task.**

![Figure 1A](images/figure_1A.svg)

The task we use here defines no inputs and outputs, so we do not need to read
from or write to any storage instances.

The payload for the task needs to be provided in JSON format. Nicely formatted,
it looks like this:

```json
{
  "executors": [
    {
      "image": "alpine",
      "command": [
        "echo",
        "hello"
      ]
    }
  ]
}
```

With these instructions, we are asking the TES instances to execute the command
`echo hello` in (the default version of) an Alpine Linux container.

Let's minify the JSON payload and assign it to a variable:

In [None]:
export PAYLOAD='{"executors":[{"image":"alpine","command":["echo","hello"]}]}'

Now we are ready to submit the tasks:

In [None]:
unset TASKS
declare -A TASKS
for key in "${!TES_INSTANCES[@]}"; do
    export TES="${TES_INSTANCES[$key]}"
    echo "Submitting task to $key ($TES)..."
    TASK_ID=$( \
        curl \
            --silent \
            --request "POST" \
            --header "accept: application/json" \
            --header "Content-Type: application/json" \
            --user "${FUNNEL_SERVER_USER}:${FUNNEL_SERVER_PASSWORD}" \
            --data "$PAYLOAD" \
            "${TES%/}/v1/tasks" | \
        jq ".id" - | \
        tr -d '"'
    )
    if [ $TASK_ID == "null" ]; then
        echo "FAILED"
    else
        echo "Task ID: $TASK_ID"
        TASKS["$TASK_ID"]="$TES"
    fi
    echo "================================================================================"
done
echo "DONE"

Let's see how the execution of successfully submitted tasks is progressing:

In [None]:
for TASK_ID in "${!TASKS[@]}"; do
    export TES="${TASKS[$TASK_ID]}"
    echo "Checking state of task '$TASK_ID' ($TES)..."
    RESPONSE=$( \
        curl \
            --silent \
            --request "GET" \
            --header "accept: application/json" \
            --header "Content-Type: application/json" \
            --user "${FUNNEL_SERVER_USER}:${FUNNEL_SERVER_PASSWORD}" \
            "${TES%/}/v1/tasks/${TASK_ID}" \
    )
    echo -n "Task State: "
    echo $RESPONSE | jq ".state"
    echo "================================================================================"
done
echo "DONE"

#### **Running a task with inputs and outputs**

Let's try a little more realistic task with an input (from the web) and an
output (written to an FTP instance), as depicted in Figure 1B.

**Figure 1B. Data flow for TES execution.** TES calls are as in Figure 1A and
have been omitted for clarity.

![Figure 1B](images/figure_1B.svg)

We define the following payload:

```json
{
  "name": "md5sum",
  "description": "calculate md5sum of input file and write to output file",
  "tags": {
    "project": "2023-ecp-f2f Demonstrator",
    "project_owner": "ELIXIR Cloud & AAI"
  },
  "executors": [
    {
      "command": [
        "md5sum",
        "/data/input"
      ],
      "image": "alpine",
      "stdout": "/data/output",
      "workdir": "/data"
    }
  ],
  "inputs": [
    {
      "url": "{{INPUT_FILE}}",
      "path": "/data/input"
    }
  ],
  "outputs": [
    {
      "path": "/data/output",
      "url": "{{FTP_INSTANCE}}/2023-ecp-f2f/md5sum",
      "type": "FILE"
    }
  ],
  "resources": {
    "cpu_cores": 1,
    "disk_gb": 1,
    "preemptible": false,
    "ram_gb": 1
  }
}
```

As you can see, here we determine the MD5 sum of an input file, write it to an
output file inside the container, and finally copy it over to an FTP server.

Let's minify that and replace the placeholders `{{INPUT_FILE}}` and
`{{FTP_INSTANCE}}` with some actual values.

> Note that because Funnel does currently only allow [passing FTP storage
> credentials via the FTP
> URL](https://ohsu-comp-bio.github.io/funnel/docs/storage/ftp/) and TESK does
> not support FTP URLs with credentials, we need to use different payloads for
> the two services!

In [None]:
PAYLOAD_RAW='{"name":"md5sum","description":"calculate md5sum of input file and write to output file","tags":{"project":"2023-ecp-f2f Demonstrator","project_owner":"ELIXIR Cloud & AAI"},"executors":[{"command":["md5sum","/data/input"],"image":"alpine","stdout":"/data/output","workdir":"/data"}],"inputs":[{"url":"{{INPUT_FILE}}","path":"/data/input"}],"outputs":[{"path":"/data/output","url":"{{FTP_INSTANCE}}/2023-ecp-f2f/md5sum","type":"FILE"}],"resources":{"cpu_cores":1,"disk_gb":1,"preemptible":false,"ram_gb":1}}'
PAYLOAD_TMP=$(sed 's#{{INPUT_FILE}}#https://raw.githubusercontent.com/elixir-cloud-aai/elixir-cloud-demos/df5be391faf992ebcd5ec2b2aad581c99de26101/LICENSE#' <<< $PAYLOAD_RAW)
export PAYLOAD_TESK=$(sed "s|{{FTP_INSTANCE}}|${FTP_INSTANCE%/}|" <<< $PAYLOAD_TMP)
export PAYLOAD_FUNNEL=$(sed "s|ftp://|ftp://${FTP_USER}:${FTP_PASSWORD}@|g" <<< $PAYLOAD_TESK)

Let's submit as before (but setting the payload according to the service):

In [None]:
unset TASKS
declare -A TASKS
for key in "${!TES_INSTANCES[@]}"; do
    export TES="${TES_INSTANCES[$key]}"
    if [[ $key =~ "Funnel" ]]; then
        export PAYLOAD="$PAYLOAD_FUNNEL"
    else
        export PAYLOAD="$PAYLOAD_TESK"
    fi
    echo "Submitting task to $key ($TES)..."
    TASK_ID=$( \
        curl \
            --silent \
            --request "POST" \
            --header "accept: application/json" \
            --header "Content-Type: application/json" \
            --user "${FUNNEL_SERVER_USER}:${FUNNEL_SERVER_PASSWORD}" \
            --data "$PAYLOAD" \
            "${TES%/}/v1/tasks" | \
        jq ".id" - | \
        tr -d '"'
    )
    if [ $TASK_ID == "null" ]; then
        echo "FAILED"
    else
        echo "Task ID: $TASK_ID"
        TASKS["$TASK_ID"]="$TES"
    fi
    echo "================================================================================"
done
echo "DONE"

And check the states, but with a lot more detail:

In [None]:
VIEW=BASIC
for TASK_ID in "${!TASKS[@]}"; do
    export TES="${TASKS[$TASK_ID]}"
    echo "Checking state of task '$TASK_ID' ($TES)..."
    RESPONSE=$( \
        curl \
            --silent \
            --request "GET" \
            --header "accept: application/json" \
            --header "Content-Type: application/json" \
            --user "${FUNNEL_SERVER_USER}:${FUNNEL_SERVER_PASSWORD}" \
            "${TES%/}/v1/tasks/${TASK_ID}?view=${VIEW}" | \
        sed "s/${FTP_USER}:${FTP_PASSWORD}@//g"  # remove FTP credentials from logs
    )
    echo $RESPONSE | jq "."
    echo "================================================================================"
done
echo "DONE"

#### **Other TES operations**

We have seen how we can submit tasks and get summary or detailed information on
individual tasks.

The full list of currently supported operations is:

| HTTP Method | Endpoint | Description |
| --- | --- | --- |
| GET | `/service-info` | Fetch information about the service and its optional capabilities |
| POST | `/tasks` | Create a task |
| GET | `/tasks` | Fetch a list of all tasks |
| GET | `/tasks/{task_id}` | Fetch details about a specific task |
| POST | `/tasks/{task_id}:cancel` | Cancel a task |

### Using the `py-tes` Python library

In this section, we are submitting the simple task from above using the Python
TES client `py-tes` (Figure 1C).

**Figure 1C. Submitting tasks via the Python library.**

![Figure 1C](images/figure_1C.svg)

Usage is simple (from the [`py-tes` repository](https://github.com/ohsu-comp-bio/py-tes)):

```python
import tes

task = tes.Task(
    executors=[
        tes.Executor(
            image="alpine",
            command=["echo", "hello"]
        )
    ]
)

cli = tes.HTTPClient("http://funnel.example.com", timeout=5)
task_id = cli.create_task(task)
res = cli.get_task(task_id)
```

To do so, we will execute Python script `task_submission.py`, which triggers
the execution of our minimal task on the available TES instances and then
checks the task state periodically until all tasks are in a finished state
(one of `COMPLETE`, `EXECUTOR_ERROR`, `SYSTEM_ERROR`, or `CANCELLED`) or six
checks (whichever occurs first).

In [None]:
./task_submission.py

Note that the `py-tes` library is used in various TES tools, including the
[proTES](https://github.com/elixir-cloud-aai/proTES) gateway (see below) and
the [cwl-tes](https://github.com/uniqueg/cwl-tes) and
[Snakemake](https://snakemake.readthedocs.io/) workflow engines.

## 2. Sending tasks to TES gateway

In this section, instead of sending tasks directly to the TES instances, we
will send tasks to the [proTES](https://github.com/elixir-cloud-aai/proTES)
gateway instead. The proTES, which acts both as a TES server and client, will
then relay incoming tasks to the TES instances it knows about (this currently
needs to be configured when deploying).

To determine to which TES instance proTES relays a given task, two distribution
logics have been implemented:
- Random distribution
- Distance-based distribution

We will start off with randomly distributing tasks.

### Random task distribution

In the random task distribution, for each incoming task, the known TES
instances are randonly sorted and arranged into a ranked list. proTES will
then attempt to forward the TES request to the first item in the list. If the
request succeeds and a task identifier is returned, proTES will return its own
task identifier to the client. If a task submission fails (e.g., because the
TES instance is down), proTES will attempt to submit the task to the next TES
instance in the list.

After submitting a task, proTES will continue monitoring the execution of the
task on the remote service in the background. In this way, the client can query
the task status conveniently via proTES. Indeed, proTES is a complete TES API
implementation, so the task list, task cancellation and service info endpoints
are available as well. Thus, from the client's point of view, proTES is
indistinguishable from a "real" TES instance, i.e., one that does the actual
computation of the task as requested.

For the random task distribution, we will use the minimal task definition from
before, as it is sufficient to demonstrate the principle. Also, the random task
distribution middleware is chosen by default, when proTES receives tasks
without inputs. The service calls are depicted in Figure 2A.

**Figure 2A. Task distribution via the proTES gateway.**

![Figure 2A](images/figure_2A.svg)

In [None]:
export PAYLOAD='{"executors":[{"image":"alpine","command":["echo","hello"]}]}'

Now let's set the gateway as the TES instance once and for all:

In [None]:
export TES="$TES_GATEWAY"

Okay. Let's start off with a single task submission:

In [None]:
echo "Submitting task to TES gateway ($TES)..."
TASK_ID=$( \
    curl \
        --silent \
        --request "POST" \
        --header "accept: application/json" \
        --header "Content-Type: application/json" \
        --user "${FUNNEL_SERVER_USER}:${FUNNEL_SERVER_PASSWORD}" \
        --data "$PAYLOAD" \
        "${TES%/}/ga4gh/tes/v1/tasks" | \
    jq ".id" - | \
    tr -d '"'
)
if [ $TASK_ID == "null" ]; then
    echo "FAILED"
else
    echo "Task ID: $TASK_ID"
fi
echo "================================================================================"
echo "DONE"

And let's inspect the logs in detail:

In [None]:
VIEW=FULL
echo "Checking state of task '$TASK_ID' ($TES)..."
RESPONSE=$( \
    curl \
        --silent \
        --request "GET" \
        --header "accept: application/json" \
        --header "Content-Type: application/json" \
        --user "${FUNNEL_SERVER_USER}:${FUNNEL_SERVER_PASSWORD}" \
        "${TES%/}/ga4gh/tes/v1/tasks/${TASK_ID}?view=${VIEW}" \
)
echo $RESPONSE | jq "."
echo "================================================================================"
echo "DONE"

Now, to ensure that we submit to different TES instances, let's send off a
couple of task requests:

In [None]:
N_TASKS=8
unset TASKS
declare -A TASKS
for key in {1..10}; do
    echo "Submitting task to TES gateway ($TES)..."
    TASK_ID=$( \
        curl \
            --silent \
            --request "POST" \
            --header "accept: application/json" \
            --header "Content-Type: application/json" \
            --user "${FUNNEL_SERVER_USER}:${FUNNEL_SERVER_PASSWORD}" \
            --data "$PAYLOAD" \
            "${TES%/}/ga4gh/tes/v1/tasks" | \
        jq ".id" - | \
        tr -d '"'
    )
    if [ $TASK_ID == "null" ]; then
        echo "FAILED"
    else
        echo "Task ID: $TASK_ID"
        TASKS["$TASK_ID"]="$TES"
    fi
    echo "================================================================================"
done
echo "DONE"

In [None]:
for TASK_ID in "${!TASKS[@]}"; do
    echo "Checking state of task '$TASK_ID' ($TES)..."
    RESPONSE=$( \
        curl \
            --silent \
            --request "GET" \
            --header "accept: application/json" \
            --header "Content-Type: application/json" \
            --user "${FUNNEL_SERVER_USER}:${FUNNEL_SERVER_PASSWORD}" \
            "${TES%/}/ga4gh/tes/v1/tasks/${TASK_ID}?view=BASIC"\
    )
    echo -n "Task State: "
    echo $RESPONSE | jq ".state"
    echo -n "Executed at: "
    echo $RESPONSE | jq '.logs[0].metadata.forwarded_to.url'
    echo -n "External task ID: "
    echo $RESPONSE | jq '.logs[0].metadata.forwarded_to.id'
    echo "================================================================================"
done
echo "DONE"

### Distance-based task distribution

With this distribution logic, proTES determines the approximate locations of
all known TES instances and all input files for a task via their IP
geolocations. For each TES instance, it then calculates the geographic distance
to each input file and sums up the total distance for all files. Finally, it
rank orders the available TES instances by total distance in ascending order.
proTES then tries to forward the incoming task to the top entry of the resuling
list, as described for the random distribution logic.

In this way, the total geographic distance that files (but not bytes, as the
input files sizes are unknown!) have to travel is minimized.

Of course we will need a task with inputs. We will use our previous task again,
as one input file should be sufficient to prove our point. It is also easier to
check whether the distribution logic chose the right TES. To make it a bit more
interesting, we will run the task several times, but each attempt with an input
file at different location. The exercise is visualized in Figure 2B.

**Figure 2B. Data flow for distance-based task distribution.** Service calls
are as in Figure 2A and have been omitted for clarity.

![Figure 2B](images/figure_2B.svg)

Let's define our payload as before, but let's create one version per input
file, with files hosted in the Czech Republic, Finland, Greece and Switzerland:

In [None]:
PAYLOAD_RAW='{"name":"md5sum","description":"calculate md5sum of input file and write to output file","tags":{"project":"2023-ecp-f2f Demonstrator","project_owner":"ELIXIR Cloud & AAI"},"executors":[{"command":["md5sum","/data/input"],"image":"alpine","stdout":"/data/output","workdir":"/data"}],"inputs":[{"url":"{{INPUT_FILE}}","path":"/data/input","type":"FILE"}],"outputs":[{"path":"/data/output","url":"{{FTP_INSTANCE}}/2023-ecp-f2f/md5sum","type":"FILE"}],"resources":{"cpu_cores":1,"disk_gb":1,"preemptible":false,"ram_gb":1}}'
PAYLOAD_TMP_1=$(sed "s|{{FTP_INSTANCE}}|${FTP_INSTANCE%/}|" <<< $PAYLOAD_RAW)
PAYLOAD_TMP_2=$(sed "s|ftp://|ftp://${FTP_USER}:${FTP_PASSWORD}@|g" <<< $PAYLOAD_TMP_1)
unset PAYLOADS
declare -A PAYLOADS
PAYLOADS["Czech Republic"]=$(sed 's#{{INPUT_FILE}}#https://is.muni.cz/pics/design/r/znak_MU.png#' <<< $PAYLOAD_TMP_2)
PAYLOADS["Finland"]=$(sed 's#{{INPUT_FILE}}#https://www.csc.fi/o/csc-theme/images/csc-logo-teksti-fi.png#' <<< $PAYLOAD_TMP_2)
PAYLOADS["Greece"]=$(sed 's#{{INPUT_FILE}}#https://www.athenarc.gr/sites/default/files/picture1_0.png#' <<< $PAYLOAD_TMP_2)
PAYLOADS["Switzerland"]=$(sed 's#{{INPUT_FILE}}#https://www.sib.swiss//templates/sib/images/SIB_LogoQ_GBv.svg#' <<< $PAYLOAD_TMP_2)

In [None]:
unset TASKS
declare -A TASKS
for COUNTRY in "${!PAYLOADS[@]}"; do
    PAYLOAD="${PAYLOADS[$COUNTRY]}"
    echo "Submitting task with input data in $COUNTRY ($TES)..."
    TASK_ID=$( \
        curl \
            --silent \
            --request "POST" \
            --header "accept: application/json" \
            --header "Content-Type: application/json" \
            --user "${FUNNEL_SERVER_USER}:${FUNNEL_SERVER_PASSWORD}" \
            --data "$PAYLOAD" \
            "${TES%/}/ga4gh/tes/v1/tasks" | \
        jq ".id" - | \
        tr -d '"'
    )
    if [ $TASK_ID == "null" ]; then
        echo "FAILED"
    else
        echo "Task ID: $TASK_ID"
        TASKS["$TASK_ID"]="$COUNTRY"
    fi
    echo "================================================================================"
done
echo "DONE"

It took a little while to do all the IP geolocation lookups. Let's see if it
was worth the wait:

In [None]:
for TASK_ID in "${!TASKS[@]}"; do
    export COUNTRY="${TASKS[$TASK_ID]}"
    echo "Checking state of task '$TASK_ID' (input data in: $COUNTRY)..."
    RESPONSE=$( \
        curl \
            --silent \
            --request "GET" \
            --header "accept: application/json" \
            --header "Content-Type: application/json" \
            --user "${FUNNEL_SERVER_USER}:${FUNNEL_SERVER_PASSWORD}" \
            "${TES%/}/ga4gh/tes/v1/tasks/${TASK_ID}?view=BASIC"\
    )
    echo -n "Task State: "
    echo $RESPONSE | jq ".state"
    echo -n "Executed at: "
    echo $RESPONSE | jq '.logs[0].metadata.forwarded_to.url'
    echo -n "External task ID: "
    echo $RESPONSE | jq '.logs[0].metadata.forwarded_to.id'
    echo "================================================================================"
done
echo "DONE"

You should see that each task was computed in the TES instance that is
geographically closest to the location of the particular input data files.
Nice!

## 3. Executing workflows via the TES network

In this section, we will demonstrate how a workflow engine with a TES backend
can make use of the TES gateway to execute workflows on a network of TES
instances.

We will use the [`cwl-tes`](https://github.com/uniqueg/cwl-tes) workflow engine
for running workflows written in the [Common Workflow Langauge
(CWL)](https://www.commonwl.org/). `cwl-tes` extends the
[`cwltool`](https://github.com/common-workflow-language/cwltool) CWL reference
runner by adding a TES backend and cloud storage handlers.

### Running CWL workflows with `cwl-tes`

For the demo, we will run the "hash splitter" workflow (Figure 3A), a simple
workflow with a scatter-gather step.

**Figure 3A. DAG representation of the "hash splitter" workflow.**

![Figure 3A](images/figure_3A.svg)

**Figure 3B. Service calls and data flow for running the hash splitter
workflow.** Numbers on labels for service calls, inputs and outputs match with
the step numbers in Figure 3A.

![Figure 3B](images/figure_3B.svg)

In [None]:
unset TASK_IDS
REMOTE_STORAGE_URL=$(sed "s|ftp://|ftp://${FTP_USER}:${FTP_PASSWORD}@|" <<< ${FTP_INSTANCE%})
echo "Starting hash splitter CWL workflow on cwl-tes via TES backend (TES gateway at ${TES_GATEWAY})..."
TASK_IDS=$(
    cwl-tes \
         --remote-storage-url $REMOTE_STORAGE_URL \
         --tes "${TES_GATEWAY}ga4gh/tes" \
         --user $FUNNEL_SERVER_USER \
         --password $FUNNEL_SERVER_PASSWORD \
         --timeout "30" \
         --outdir "results/" \
         --tmpdir-prefix "results/tmp_" \
         --tmp-outdir-prefix "results/tmp-out_" \
         --leave-outputs \
         "workflows/cwl_hashsplitter/hashsplitter-workflow.cwl" \
         "workflows/cwl_hashsplitter/hashsplitter-config.yml" \
         2>&1 >/dev/null \
    | tee /dev/tty \
    | grep "task id:" \
    | sed -n 's/^.*task id: //p'
)
echo "================================================================================"
echo "DONE"

In [None]:
for TASK_ID in $TASK_IDS; do
    echo "Checking state of task '$TASK_ID'..."
    RESPONSE=$( \
        curl \
            --silent \
            --request "GET" \
            --header "accept: application/json" \
            --header "Content-Type: application/json" \
            --user "${FUNNEL_SERVER_USER}:${FUNNEL_SERVER_PASSWORD}" \
            "${TES_GATEWAY}/ga4gh/tes/v1/tasks/${TASK_ID}?view=BASIC"\
    )
    echo -n "Task State: "
    echo $RESPONSE | jq ".state"
    echo -n "Executed at: "
    echo $RESPONSE | jq '.logs[0].metadata.forwarded_to.url'
    echo -n "External task ID: "
    echo $RESPONSE | jq '.logs[0].metadata.forwarded_to.id'
    echo "================================================================================"
done
echo "DONE"