# workflows

Complex machine learning applications often require multi-stage pipelines (e.g., data loading, transforming, training, testing, iterating). **Workflows** in Spell allow you to manage these pipelines as a sequence of Spell runs, and are a lightweight alternative to tools like [Airflow](https://airflow.apache.org/) and [Luigi](https://github.com/spotify/luigi) for managing your model training pipelines.

Workflows can be launched using either the Spell CLI or the Spell Python API. In this tutorial we demonstrate both approaches by example.

## understanding workflows

Every workflow consists of one *master run* and one more more *worker runs*. The master run is responsible for control flow: that is, determining which worker runs should get executed when, and why. The worker runs then do all of the work required.

Our demo workflow consists of three steps:

1. downloading the dataset (a Project Gutenberg copy of _War and Peace_) and saving it to disk.
2. mounting that text corpus into a run, training the neural network on it, and saving the model to disk.
3. mounting the saved model into yet another run, sampling it for an interesting result, and streaming that output to logs.

To accomplish this, we will need one one master run and three worker runs, arranged thusly:

![](https://i.imgur.com/W5Ugs0S.png)

For this simple example we will execute the steps consecutively, conditioning the start of each worker run in the workflow on the success of its predecessor. More complex workflows may require more complicated control flow.

While the instance type of the worker runs is configurable, the master run always executes on the basic `cpu` instance type. Try to keep any computationally intensive logic isolated to the workers!

## understanding the workflow script

In order to execute a workflow, we need to define a workflow script. The **workflow script** is what gets executed on the master run: a Python script using the Spell Python API to define worker jobs and the control flow logic surrounding them.

Here is the workflow script that we will be using for this demo. Don't worry if you don't understand all of it right away, we'll walk through it step by step.

In [16]:
%%writefile workflow.py
import spell.client
client = spell.client.from_environment()

# create the first run to download the dataset (War and Peace, by Leo Tolstoy)
# if desired, replace data_url with url to another plain text file to train on
data_url = "https://www.gutenberg.org/files/2600/2600-0.txt"
r = client.runs.new(
    command="wget -O input.txt {}".format(data_url)
)
print("waiting for run {} to complete".format(r.id))
r.wait_status(*client.runs.FINAL)
r.refresh()
if r.status != client.runs.COMPLETE:
    raise OSError(f"failed at run {r.id}")

# create the second run to train char-RNN on the dataset
data_dir = "/data"
r = client.runs.new(
    machine_type="V100",
    command="python train.py --data_dir={}".format(data_dir),
    attached_resources={
        "runs/{}/input.txt".format(r.id): "{}/input.txt".format(data_dir)
    },
    commit_label="char-rnn",
)
print("waiting for run {} to complete".format(r.id))

r.wait_status(*client.runs.FINAL)
r.refresh()
if r.status != client.runs.COMPLETE:
    raise OSError(f"failed at run {r.id}")

# create the third run that samples the model to generate some text
r = client.runs.new(
    machine_type="V100",
    command="python sample.py",
    attached_resources={"runs/{}/save".format(r.id): "save"},
    commit_label="char-rnn",
)
print("waiting for run {} to complete".format(r.id))

r.wait_status(*client.runs.FINAL)
r.refresh()
if r.status != client.runs.COMPLETE:
    raise OSError(f"failed at run {r.id}")

# print the logs from the last run
# generated text should be the last log line
print("Logs from run {}:".format(r.id))
for line in r.logs():
    if line.status == client.runs.RUNNING and not line.status_event:
        print(line)

Overwriting workflow.py


Let's walk through this script step-by-step.

----

```python
import spell.client
client = spell.client.from_environment()
```

This initializes the client object. If you are not familiar with our Python API, check out the [Python API Reference](http://spell.run/docs/python) to learn more.

----

```python
data_url = "https://www.gutenberg.org/files/2600/2600-0.txt"
r = client.runs.new(
    command="wget -O input.txt {}".format(data_url)
)
print("waiting for run {} to complete".format(r.id))
```

These next few lines create a run executing the command `wget -O input.txt "https://www.gutenberg.org/files/2600/2600-0.txt"`. This downloads a copy of _War and Peace_ from the URL given using the `wget` command-line tool.

----

```python
r.wait_status(*client.runs.FINAL)
r.refresh()
if r.status != client.runs.COMPLETE:
    raise OSError(f"failed at run {r.id}")
```

We can only proceed to the next stage of the workflow when the first stage completes successfully. This next bit of code is a control flow block that achieves this end.

Every run transitions through a sequence of states as part of its execution: `machine_requested`, `running`, `pushing`, and so on. Runs eventually transition to a so-called **final state**: the state that the run is assigned at the end of its execution. There are four different possible final states, the most important of which is `COMPLETE`. A run which terminates in the `COMPLETE` state is one which has successfully run all of its code and pushed all of its outputs to SpellFS.

This `wait_status` methods blocks execution until the run API reports that the run has reached a final state. We then `refresh` the information on the run object (this has to be done manually because it requires a network roundtrip) and check if the `r.status` field reports that the run is `COMPLETE`. We only proceed with the rest of the script if it is&mdash;if it is not, e.g. if the run reached a failing final state (`FAILED`, `STOPPED`, or `INTERRUPTED`), we raise an error instead.

----

```python
data_dir = "/data"
r = client.runs.new(
    machine_type="K80",
    command="python train.py --data_dir={}".format(data_dir),
    attached_resources={
        "runs/{}/input.txt".format(r.id): "{}/input.txt".format(data_dir)
    },
    commit_label="char-rnn",
)
```

We are once again executing a worker run and blocking executing until it finishes running. This time we are running the [train.py](https://github.com/sherjilozair/char-rnn-tensorflow/blob/master/train.py) script from the `sherjilozair/char-rnn-tensorflow` repo, using the _War and Peace_ text as our corpus.

We are careful to mount the data to the correct `data_dir` inside of the new model training run using the `attached_resources` argument. Outputs that get saved to disk during a run are written to SpellFS at run exit time, allowing us to easily reuse the text corpus we downloaded in the previous command in the next one.

Note that we are not passing a `--github-url`! This script instead initializes its code environment using the `commit_label` field. If you skip ahead to the `spell workflow` command we used to initialize the workflow, you see that we set `--repo char-rnn=char-rnn-tensorflow/` as a parameter. When the master run sees that a run has been created with the `char-rnn` `commit_label` set, it knows to copy the contents of this directory into the run.

Note that this is a copy operation, not a `git clone`: `commit_label` must point to a directory that actually exists inside of the master run.

This feature allows you to share code and data artifacts between the master run and its worker runs. If a previous worker run generated some data that the master run then needed to modify as part of its flow, it's convenient to use `repo` and `commit_label` to pass the master's (updated) data to the worker run.

If your master run and worker runs don't need to share any code, you can continue to use `--github-url` as usual.

----


```python
r.wait_status(*client.runs.FINAL)
r.refresh()
if r.status != client.runs.COMPLETE:
    raise OSError(f"failed at run {r.id}")
```

We've already seen how this works!

----

Next we create a third and final run that scores the model on the data:

```python
# create the third run that samples the model to generate some text
r = client.runs.new(
    machine_type="K80",
    command="python sample.py",
    attached_resources={"runs/{}/save".format(r.id): "save"},
    commit_label="char-rnn",
)
print("waiting for run {} to complete".format(r.id))
r.wait_status(client.runs.COMPLETE)
```

This works much the same way to previous run worked.

----

The `sample.py` script prints its results to `stdout`, so the last thing we do before exiting is printing out the lines from the log:

```python
print("Logs from run {}:".format(r.id))
for line in r.logs():
    if line.status == client.runs.RUNNING and not line.status_event:
        print(line)
```

## executing the workflow script

Now that we understand this script, it's time to run it.

First of all, since we're passing the character-level RNN code into the run using `repo`/`commit_label`, we will want to clone that code to local disk so that it is included in the master run.

In [2]:
!git clone https://github.com/sherjilozair/char-rnn-tensorflow.git

Cloning into 'char-rnn-tensorflow'...
remote: Enumerating objects: 404, done.[K
remote: Total 404 (delta 0), reused 0 (delta 0), pack-reused 404[K
Receiving objects: 100% (404/404), 508.45 KiB | 1.72 MiB/s, done.
Resolving deltas: 100% (238/238), done.


Then, to actually run this workflow, we execute:

In [17]:
!spell workflow \
    --repo char-rnn=char-rnn-tensorflow/ \
    "python workflow.py"

[0m✨ Syncing repo char-rnn-tensorflow/.
[0mEverything up-to-date
Enumerating objects: 9, done.
Counting objects: 100% (9/9), done.
Delta compression using up to 12 threads
Compressing objects: 100% (5/5), done.
Writing objects: 100% (5/5), 770 bytes | 770.00 KiB/s, done.
Total 5 (delta 4), reused 0 (delta 0)
To git.spell.run:aleksey/e6cee8710721a8ef6f3d2924713ac7d351c972ca.git
 * [new branch]      HEAD -> br_25a003b88233dda6a67d2e47db041b920965d26d
💫 Casting workflow #9…
[0m✨ Following workflow at run 335.
[0m✨ Stop viewing logs with ^C
[0m[K[0m[?25h[0m✨ Machine_Requested… donee into environment[0m[0m[0m[0m
[0m[K[0m[?25h[0m✨ Building… done
[0m✨ [0mRun is running
[0mwaiting for run 336 to complete
[0mwaiting for run 337 to complete
[0mwaiting for run 338 to complete
[0mLogs from run 338:
[0mInstructions for updating:
[0mThis class is equivalent as tf.keras.layers.LSTMCell, and will be replaced by that in Tensorflow 2.0.
[0mInstructions for updating:
[0mThis cl

Checking the run logs in the web console we see the following generated text (your sample will look different):

```
May 05, 2020, 11:35:55: running:  of Moscupe, who foll and since hoer. We are and still turned
May 05, 2020, 11:35:55: running: merely as the corner that argument in lors for so a quality that.
May 05, 2020, 11:35:55: running: Welllaration wehe return, raisements of
May 05, 2020, 11:35:55: running: such a Frenchmen inspecting for them tallow me with the same correct actions,
May 05, 2020, 11:35:55: running: fellows and well—or watching, in animation, gay others.
May 05, 2020, 11:35:55: running: 
May 05, 2020, 11:35:55: running: 
May 05, 2020, 11:35:55: running: 
May 05, 2020, 11:35:55: running: 
May 05, 2020, 11:35:55: running: 
May 05, 2020, 11:35:55: running: CHAPTER XVIII
May 05, 2020, 11:35:55: running:  Yasova and givein offers, and man—do not restraining the woode, they pause they seemed to many apply as he left cordier in
May 05, 2020, 11:35:55: running: which Now did not be week wiplocking France
```

And that concludes our demo!

For even more code samples refer to the `simple` and `video-generation-workflow` folders in this repository.