# Crossflow 101
An introduction to the fundamentals of Crossflow

Workflows are a common feature of much computational science. In a workflow, the work to be done requires more than one piece of software, and the output from one becomes the input to the next, in some form of chain. Classically one would write some sort of bash script or similar to do the job, e.g.:

```bash
#!/usr/bin/env bash
input_file=input.dat
intermediate_file=intermediate.dat
result_file=result.dat

executable1 -i $input_file -o $intermediate_file
executable2 -i $intermediate_file -o $result_file

```
This is OK for basic use but:
* what if your workflow has loops, conditional executions, etc?
* what happens if you want to do things at scale?

Crossflow is designed to make this easier. Key points are:

1. The workflow becomes a Python program, and can make use of all programming workflow constructs (loops, if/then/else, etc.)
2. To do this, it provides a simple approach to turning command line tools into Python functions - this is `crossflow.tasks`.
3. It provides a way to hand the processing of individual workflow steps out to a distributed cluster of workers - this is `crossflow.clients`.

Here we look at each of these components in turn.

--------------------
## Crossflow Tasks

The `crossflow.tasks` subpackage provides methods to turn tools that would usually be used via the command line into Python functions. The basic concept is that a tool that is used from the commmand line something like:
```bash
my_tool -i input.dat -o output.dat
```
becomes, in Python:
```
output = my_tool_task('input.dat')
```
`
Where my_tool_task` is a `crossflow.SubprocessTask` for `my_tool` and `output` is a `crossflow.FileHandle`, which behaves much like a Python `Path` object (see [here](https://docs.python.org/3/library/pathlib.html)).

### Creating a crossflow.SubprocessTask

This is a three step process:

1. The task is created on the basis of a `template`, a string with a generalised version of the command you wish to execute.
2. The inputs for the task are specified.
3. The outputs from the task are specified.

Thus:
```python
my_tool_task = crossflow.tasks.SubprocessTask('my_tool -i x.in -o x.out')
my_tool_task.set_inputs(['x.in'])
my_tool_task.set_outputs(['x.out'])
```
Note that the names of files used in the template string are arbitrary, 'my_tool -i a -o b' would do just as well, as long as the corresponding names ('a', 'b') were used in .set_inputs() and .set_outputs().

If the tool takes multiple files as inputs, and/or produces multiple output files, the process is the same:
```python
my_othertool_task = crossflow.tasks.SubprocessTask('my_othertool -x x.in -y y.in -o x.out -l logfile')
my_othertool_task.set_inputs(['x.in', 'y.in'])
my_othertool_task.set_outputs(['x.out', 'logfile'])
```

There is no restriction on the order that inputs and outputs are specified in the template string, but the resulting task will expect its inputs to be provided in the order they are given in .set_inputs() and the tuple of outputs the task produces will be in the order they are specified in .set_outputs().

For more advanced aspects of `SubprocessTask` creation, see elsewhere.

### Running a crossflow.SubprocessTask
Although it is primarily expected that tasks will be run via a `crossflow.Client` (see below), they can also be executed directly:
```python
output, logfile = my_othertool(x, y)
```
As explained above, `output` and `logfile` will be 'Path-like' objects (but with more limited functionality than real `Path` objects). So to save the output to a local file:
```python
output.save('output.dat')
```
Or to look at the contents of the logfile directly:
```python
print(logfile.read_text())
```

--------------------
## Crossflow Clients
The `crossflow.clients` sub-package provides a Client through which one can execute tasks on distributed resources. At its heart a `crossflow.clients.Client()` is a [dask.distributed](https://distributed.dask.org/en/latest/) client, and new users are strongly encouraged to read the documentation there to understand how Crossflow works.

### Creating a crossflow.Client

A Crossflow client provides access to a cluster of workers. These may be remote machines, or a set of worker processes on the current compute resource (see the dask documentation for more details). The cluster may be already up and running, in which case the crossflow.Client just needs to know where it is (the address of its scheduler):

```python
my_client = crossflow.clients.Client(scheduler_file='scheduler.json')
```

Alternatively (typically for testing purposes), a local cluster may be created on the fly, to serve the Client:
```python
my_client = crossflow.clients.Client()
```

More generally, there are ways of creating compatible `cluster` objects on a wide variety of resources from clouds to HPC systems. See for example [dask jobqueue](https://jobqueue.dask.org/en/latest/) and [dask kubernetes](https://kubernetes.dask.org/en/latest/).


### Using a crossflow.Client

A crossflow.Task is sent to a crossflow.Client for execution using the client's .submit() or .map() method.


#### Running a single job:
```python
output_future, logfile_future = my_client.submit(my_othertool_task, x, y)
```
Compare with the interactive version above. The outputs (output_future, logfile_future) are now Futures - again, see the dask documentation for more detail, but also notice the difference: dask's .submit() method always returns a single Future, while crossflow's one returns one Future per expected output.

#### Running a set of jobs in parallel:
```python
xs = [x1, x2, x3, x4]
ys = [y1, y2, y3, y4]
output_futures, logfile_futures = my_client.map(my_othertool_task, xs, ys)
```
In this case the .map() method returns lists of Futures. The individual jobs are scheduled to the workers in the compute cluster in whatever way is most efficient, if there are enough of them to run all four jobs in parallel, they will.

-------------
## A simple demonstration

Here we create a `SubprocessTask` to reverse the order of the lines in a file, submit the job to a local `Client`, and then retrieve and view the result.

In [None]:
from crossflow import clients, tasks
from pathlib import Path

# Create a short text file:
here = Path('.')
inp_file = here / 'input.txt'
with inp_file.open('w') as f:
    for i in range(10):
        f.write('line {}\n'.format(i))

print('Original file:')
print(inp_file.read_text())

# Create a SubprocessTask that will reverse the lines in a file:
reverser = tasks.SubprocessTask('rev input > output')
reverser.set_inputs(['input'])
reverser.set_outputs(['output'])


# Create a local client to run the job, and submit it:
client = clients.Client()
output = client.submit(reverser, inp_file)

# output is a Future; collect its result(), which is a 'Path-like' 
# FileHandle, list its contents, then save to a file:

print('New file:')
print(output.result().read_text())
output.result().save(here / 'output.txt')

## Dealing with errors

If you call the `.result()` method on a future that comes from a task that has failed, this will raise an exception. In general this is a bit messy, so it's good practice write your code to catch these.

One option is to wrap tasks in try/except blocks, another to test the value of the `.status` attribute of the future, once the task has completed (using `distributed`'s `wait` function).

e.g.:

In [None]:
bad_command = tasks.SubprocessTask('foo input > output')
bad_command.set_inputs(['input'])
bad_command.set_outputs(['output'])

In [None]:
# The messy way:
output = client.submit(bad_command, inp_file)
print(output.result())

In [None]:
# The clean way #1:
output = client.submit(bad_command, inp_file)
try:
    print(output.result())
except:
    print(output.exception())

In [None]:
# The clean way #2:
from distributed import wait

output = client.submit(bad_command, inp_file)
wait(output)
if output.status != 'error':
    print(output.result())
else:
    print('there was an error!')