# Ray Tasks Revisited

© 2019-2021, Anyscale. All Rights Reserved

![Anyscale Academy](../images/AnyscaleAcademyLogo.png)

The [Ray Crash Course](../ray-crash-course/00-Ray-Crash-Course-Overview.ipynb) introduced the core concepts of Ray's API and how they parallelize work. Specifically, we learned how to define Ray _tasks_ and _actors_, run them, and retrieve the results. 

This lesson explores Ray tasks in greater depth, including the following:

* How task dependencies are handled automatically by Ray
* Usage patterns for `ray.get()` and `ray.wait()`
* Specifying limits on the number of invocations and retries on failure
* An exploration of task granularity considerations
* Profiling tasks

In [None]:
import ray, time, os, sys 
import numpy as np 
sys.path.append("..")
from util.printing import pd, pnd  # convenience methods for printing results.

In [None]:
ray.init(ignore_reinit_error=True)

The Ray Dashboard URL is printed above and also part of the output dictionary item `webui_url`

(When using the Anyscale platform, use the URL provided by your instructor to access the Ray Dashboard.)

## Ray Task Dependencies

Let's define a few remote tasks, which will have _dependency_ relationships. We'll learn how Ray handles these dependent, asynchronous computations.

One task will return a random NumPy array of some size `n` and the other task will add two such arrays. We'll also add a sleep time, one tenth the size of `n` to simulate expensive computation.

> **Note:** Dependencies and how Ray implements handling of them are explored in depth in the [03: Ray Internals](03-Ray-Internals.ipynb) lesson. 

In [None]:
@ray.remote
def make_array(n):
    time.sleep(n/10.0)
    return np.random.standard_normal(n)

Now define a task that can add two NumPy arrays together. The arrays need to be the same size, but we'll ignore any checking for this requirement.

In [None]:
@ray.remote
def add_arrays(a1, a2):
    time.sleep(a1.size/10.0)
    return np.add(a1, a2)

Now lets use them!

In [None]:
start = time.time()
ref1 = make_array.remote(20)
ref2 = make_array.remote(20)
ref3 = add_arrays.remote(ref1, ref2)
print(ray.get(ref3))
pd(time.time() - start, prefix="Total time:")

Something subtle and "magical" happened here; when we called `add_arrays`, we didn't need to call `ray.get()` first for `ref1` and `ref2`, since `add_arrays` expects NumPy arrays. Because `add_arrays` is a Ray task, Ray automatically does the extraction for us, so we can write code that looks more natural and Pythonic.

Furthermore, note that the `add_arrays` task effectively depends on the outputs of the two `make_array` tasks. Ray won't run `add_arrays` until the other tasks are finished. Hence, Ray automatically handles task dependencies for us.

This is why the elapsed time is about 4 seconds. We used a size of 20, so we slept 2 seconds in each call to `make_array`, but those happened in parallel, _followed_ by a second sleep of 2 seconds in `add_arrays`.

Even though three task invocations occurred, we only used one call to `ray.get()`, when we actually needed the final results. Eliminating unnecessary `ray.get()` calls helps avoid forcing tasks to become synchronous when they could be asynchronous. So, keep these two key points in mind:

* _Don't ask for results you don't need._
* _Don't ask for the results you need until you really need them._

We don't need to see the objects for `id1` and `id2`. We only need the final array for `id3`.

## Using ray.wait() with ray.get()

Here is an idiomatic way to use `ray.get()`, where we fire all five asynchronous tasks at once, then ask for all the results at once with `ray.get()`:

In [None]:
start = time.time()

# Comprehension list: five NumPy object references or futures created
array_refs = [make_array.remote(n*10) for n in range(5)]

# Comprehension list: object references or futures of the result of addition
added_array_refs = [add_arrays.remote(ref, ref) for ref in array_refs]

# Iterate o er the list of object references or futures
for array in ray.get(added_array_refs):
    print(f'{array.size}: {array}')
pd(time.time() - start, prefix="Total time:")

This takes about eight seconds, four seconds for the longest invocation invocation of `make_array`, `make_array(4)`  , and four seconds with longest invocation of `add_arrays`, when passed the results of `make_array(4)`. 

We did the right thing inside each list comprehension. We started the asynchronous tasks all at once and allowed Ray to handle the dependencies. Then we waited on one `ray.get()` call for all the output. 

However, what you see is no output and then everything is suddenly printed at once after eight seconds.

There are two fundamental problems with the way we've used `ray.get()` so far:

1. There's no timeout, in case something gets "hung".
2. We have to wait for _all_ the objects to be available before `ray.get()` returns.

The ability to specify a timeout is essential in production code as a defensive measure. Many potential problems could happen in a real production system, any one of which could cause the task we're waiting on to take an abnormally long time to complete or never complete. Our application would be deadlocked waiting on this task. Hence, it's **strongly recommended** in production software to always use timeouts on blocking calls, so that the application can attempt some sort of recovery in situations like this, or at least report the error and "degrade gracefully".

Actually, there _is_ a `timeout=<value>` option you can pass to `ray.get()` ([documentation](https://ray.readthedocs.io/en/latest/package-ref.html#ray.get)), but it will most likely be removed in a future release of Ray. Why remove it if timeouts are important? This change will simplify the implementation of `ray.get()` and encourage the use of `ray.wait()` for waiting ([documentation](https://ray.readthedocs.io/en/latest/package-ref.html#ray.wait)) instead, followed by using `ray.get()` to retrieve values for tasks that `ray.wait()` tells us are finished. 

Using `ray.wait()` is also the way to fix the second problem with using `ray.get()` by itself, that we have to wait for all tasks to finish before we get any values back. Some of those tasks finish more quickly in our contrived example. We would like to process those results as soon as they are available, even while others continue to run. We'll use `ray.wait()` for this purpose.

Therefore, while `ray.get()` is simple and convenient, for _production code_, we recommend using `ray.wait()`, **with** timeouts, for blocking on running tasks. Then use `ray.get()` to retrieve values of completed tasks. 

Here is the previous example rewritten to use `ray.wait()`:

In [None]:
start = time.time()
array_refs = [make_array.remote(n*10) for n in range(5)]
added_array_refs = [add_arrays.remote(ref, ref) for ref in array_refs]

arrays = []
waiting_refs = list(added_array_refs)  # Assign a working list to the full list of refs
while len(waiting_refs) > 0:           # Loop until all tasks have completed
    # Call ray.wait with:
    #   1. the list of refs we're still waiting to complete,
    #   2. tell it to return immediately as soon as one of them completes,
    #   3. tell it wait up to 10 seconds before timing out.
    ready_refs, remaining_refs = ray.wait(waiting_refs, num_returns=1, timeout=10.0)
    print('Returned {:3d} completed tasks. (elapsed time: {:6.3f})'.format(len(ready_refs), time.time() - start))
    new_arrays = ray.get(ready_refs)
    arrays.extend(new_arrays)
    for array in new_arrays:
        print(f'{array.size}: {array}')
    waiting_refs = remaining_refs  # Reset this list; don't include the completed refs in the list again!
    
print(f"\nall arrays: {arrays}")
pd(time.time() - start, prefix="Total time:")

Now it still takes about 8 seconds to complete, 4 seconds for the longest invocation of `make_array` and 4 seconds for the invocation of `add_arrays`, but since the others complete more quickly, we see their results as soon as they become available, at 0, 2, 4, and 6 second intervals.

> **Warning:** For each call to `ray.wait()` in a loop like this, it's important to remove the refs that have completed. Otherwise, `ray.wait()` will return immediately with the same list containg the first completed item, over and over again; you'll loop forever!! Resetting the list is easy, since the second list returned by `ray.wait()` is the rest of the items that are still running. So, that's what we use.

Now let's try it with `num_returns = 2`:

In [None]:
start = time.time()
array_refs = [make_array.remote(n*10) for n in range(5)]
added_array_refs = [add_arrays.remote(ref, ref) for ref in array_refs]

arrays = []
waiting_refs = list(added_array_refs)  # Assign a working list to the full list of refs
while len(waiting_refs) > 0:           # Loop until all tasks have completed
    # Call ray.wait with:
    #   1. the list of refs we're still waiting to complete,
    #   2. tell it to return immediately as soon as TWO of them complete,
    #   3. tell it wait up to 10 seconds before timing out.
    return_n = 2 if len(waiting_refs) > 1 else 1
    ready_refs, remaining_refs = ray.wait(waiting_refs, num_returns=return_n, timeout=10.0)
    print('Returned {:3d} completed tasks. (elapsed time: {:6.3f})'.format(len(ready_refs), time.time() - start))
    new_arrays = ray.get(ready_refs)
    arrays.extend(new_arrays)
    for array in new_arrays:
        print(f'{array.size}: {array}')
    waiting_refs = remaining_refs  # Reset this list; don't include the completed refs in the list again!
    
print(f"\nall arrays: {arrays}")
pd(time.time() - start, prefix="Total time:")

Now we get two at a time output. Note that we don't actually pass `num_returns=2` every time. If you ask for more items than the length of the input list, you get an error. So, we compute `num_returns`, using `2` except when there's only one task to wait on, in which case we use `1`. So, in fact, the output for `40` was a single task result, because we started with `5` and processed two at a time.

 For a longer discussion on `ray.wait()`, see [this blog post](https://medium.com/distributed-computing-with-ray/ray-tips-and-tricks-part-i-ray-wait-9ed7a0b9836d).

## Exercise 1

The following cell is identical to the last one. Modify it to use a timeout of `2.5` seconds, shorter than our longest tasks. What happens now? Try using other times.

See the [solutions notebook](solutions/Advanced-Ray-Solutions.ipynb) for a discussion of this exercise and the subsequent exercises.

In [None]:
start = time.time()
array_refs = [make_array.remote(n*10) for n in range(5)]
added_array_refs = [add_arrays.remote(ref, ref) for ref in array_refs]

arrays = []
waiting_refs = list(added_array_refs)  # Assign a working list to the full list of refs
while len(waiting_refs) > 0:           # Loop until all tasks have completed
    # Call ray.wait with:
    #   1. the list of refs we're still waiting to complete,
    #   2. tell it to return immediately as soon as TWO of them complete,
    #   3. tell it wait up to 10 seconds before timing out.
    return_n = 2 if len(waiting_refs) > 1 else 1
    ready_refs, remaining_refs = ray.wait(waiting_refs, num_returns=return_n, timeout=10.0)
    print('Returned {:3d} completed tasks. (elapsed time: {:6.3f})'.format(len(ready_refs), time.time() - start))
    new_arrays = ray.get(ready_refs)
    arrays.extend(new_arrays)
    for array in new_arrays:
        print(f'{array.size}: {array}')
    waiting_refs = remaining_refs  # Reset this list; don't include the completed refs in the list again!
    
print(f"\nall arrays: {arrays}")
pd(time.time() - start, prefix="Total time:")

In conclusion:

> **Tips:**
>
> 1. Use `ray.wait()` with a timeout to wait for one or more running tasks. Then use `ray.get()` to retrieve the values for the finished tasks.
> 2. When looping over calls to `ray.wait()` with a list of object refs for running tasks, remove the previously-completed and retrieved objects from the list.
> 3. Don't ask for results you don't need.
> 4. Don't ask for the results you need until you really need them.

## Exercise 3

Let's practice converting a slow loop to Ray, including using `ray.wait()`. Change the function to be a Ray task. Change the invocations to use the `ray.wait()` idiom. You can just use the default values for `num_returns` and `timeout` if you want. The second cell uses `assert` statements to check your work.

In [None]:
def slow_square(n):
    time.sleep(n)
    return n*n

start = time.time()
squares = [slow_square(n) for n in range(4)]
for square in squares:
    print (f'finished: {square}')
duration = time.time() - start

In [None]:
assert squares == [0, 1, 4, 9], f'Did you use ray.get() to retrieve the values? squares = {squares}'
assert duration < 4.1, f'Did you use Ray to parallelize the work? duration = {duration}' 

## Limiting Task Invocations and Retries on Failure

> **Note:** This feature may change in a future version of Ray. See the latest details in the [Ray documentation](https://docs.ray.io/en/latest/package-ref.html#ray.remote). 

Two options you can pass to `ray.remote` when defining a task affect how often it can be invoked and retrying on failure:

* `max_calls`: This specifies the maximum number of times that a given worker can execute the given remote function before it must exit. This can be used to address memory leaks in third-party libraries or to reclaim resources that cannot easily be released, e.g., GPU memory that was acquired by TensorFlow. By default this is infinite.
* `max_retries`: This specifies the maximum number of times that the remote function should be rerun when the worker process executing it crashes unexpectedly. The minimum valid value is 0, the default is 4, and a value of -1 indicates infinite retries are allowed.

Example:

```python
@ray.remote(max_calls=10000, max_retries=10)
def foo():
    pass
```

See the [ray.remote()](https://docs.ray.io/en/latest/package-ref.html#ray.remote) documentation for all the keyword arguments supported.

### Overriding with config()

Remote task and actor objects returned by `@ray.remote` can also be dynamically modified with the same arguments supported by `ray.remote()` using `options()` as in the following examples:

```python
@ray.remote(num_gpus=1, max_calls=1, num_return_vals=2)
def f():
    return 1, 2
g = f.options(num_gpus=2, max_calls=None)
```

## What Is the Optimal Task Granularity

How fine-grained should Ray tasks be? There's no fixed rule of thumb, but Ray clearly adds some overhead for task management and using object stores in a cluster. Therefore, it makes sense that tasks which are too small will perform poorly.

We'll explore this topic over several more lessons, but for now, let's get a sense of the overhead while running in your setup.

We'll continue to use NumPy arrays to create "load", but remove the `sleep` calls:

In [None]:
def noop(n):
    return n

def local_make_array(n):
    return np.random.standard_normal(n)

@ray.remote
def remote_make_array(n):
    return local_make_array(n)

Let's do `trials` runs for each experiment, to average out background noise:

In [None]:
trials=100

First, let's use `noop` to baseline local function calls. Note that we call `print` for the duration, rathern than `pd`, because the overhead is so low the `pd` formatting will print `0.000`:

In [None]:
start = time.time()
[noop(t) for t in range(trials)]
print(f'{time.time() - start} seconds')

Let's try the same run with `local_make_array(n)` for `n = 100000`:

In [None]:
start = time.time()
[local_make_array(100000) for _ in range(trials)]
print(f'{time.time() - start} seconds')

So, we can safely ignore the "noop" overhead for now. For completeness, here's what happens with remote execution:

In [None]:
start = time.time()
refs = [remote_make_array.remote(100000) for _ in range(trials)]
ray.get(refs)
print(f'{time.time() - start} seconds')

For arrays of 100000, using Ray is faster (at least on this test machine). The benefits of parallel computation, rather than synchronous, already outweight the Ray overhead.

So, let's run some trials with increasingly large array sizes, to compare the performance with local vs. remote execution. First, we'll set up `matplotlib`:

In [None]:
local_durations = []
remote_durations = []
# These n values were determined by experimentation on this test machine. 
# If you are using an old machine, and this cell takes a long time to execute,
# you could set the `trials` value above to a smaller number. 
ns = [i*(10**j) for j in range(2,5) for i in [1,2,3,5,8]]
for n in ns:
    start_local = time.time()
    [local_make_array(n) for _ in range(trials)]
    local_durations.append(time.time() - start_local)
    
    start_remote = time.time()
    refs = [remote_make_array.remote(n) for _ in range(trials)]
    ray.get(refs)
    remote_durations.append(time.time() - start_remote)
(ns, local_durations, remote_durations)

In [None]:
import numpy as np

from bokeh.layouts import gridplot
from bokeh.plotting import figure, output_file, show

import bokeh.io
# The next two lines prevent Bokeh from opening the graph in a new window.
bokeh.io.reset_output()
bokeh.io.output_notebook()

tooltips = [
    ("name", "$name"),
    ("array size", "$x"),
    ("time", "$y")]
p1 = figure(x_axis_type="log", y_axis_type="log", title="Execution Times", tooltips=tooltips)
p1.grid.grid_line_alpha=0.3
p1.xaxis.axis_label = 'array size'
p1.yaxis.axis_label = 'time'

p1.line(ns, local_durations, color='#A6CEE3', legend_label='local', name='local')
p1.circle(ns, local_durations, color='darkgrey', size=4)
p1.line(ns, remote_durations, color='#B2DF8A', legend_label='remote', name='remote')
p1.square(ns, remote_durations, color='darkgrey', size=4)
p1.legend.location = "top_left"

show(gridplot([[p1]], plot_width=800, plot_height=400))

Can't see the graph? Here's [one from a prior test run](../images/Execution-Times-Local-v-Remote.png). Your results may look a lot different!

Let's confirm what the graph shows as the crossing point:

In [None]:
i=0
while i < len(ns) and local_durations[i] < remote_durations[i]:
    i=i+1
print('The Ray times are faster starting at n = {:d}, local = {:6.3f} vs. remote = {:6.3f}'.format(
    ns[i], local_durations[i], remote_durations[i]))

## Profiling Tasks with ray.timeline()

Sometimes you need to debug performance problems in Ray tasks. Calling `ray.timeline(file)` ([documentation](https://ray.readthedocs.io/en/latest/package-ref.html#ray.timeline)) captures profiling information for subsequent task execution to the specified file. Afterwards, you can view the data in a Chrome web browser. The format used is unique to Chrome, so Chrome has be used to view the data.

Let's try it with our `make_array` and `add_arrays` methods in the following code. First some potential cleanup:

In [None]:
timeline_file = 'task-timeline.txt' # Will be found in the same directory as this notebook.
if os.path.isfile(timeline_file):   # Delete old one, if an old one exists already.
    os.remove(timeline_file)

In [None]:
ray.timeline(timeline_file)
start = time.time()
array_refs = [make_array.remote(n*10) for n in range(5)]
added_array_refs = [add_arrays.remote(ref, ref) for ref in array_refs]
for array in ray.get(added_array_refs):
    print(f'{array.size}: {array}')
pd(time.time() - start, prefix="Total time:")

Now, to view the data:

1. Open Chrome and enter chrome://tracing.
2. Click the _Load_ button to load the `task-timeline.txt` file, which will be in this notebook's directory. 
3. To zoom in or out, click the "asymmetric" up-down arrow button. Then hold the mouse button in the graph and roll the mouse scroll wheel up or down. (On a laptop trackpad, press and hold, then use another finger to slide up and down.)
4. To move around, click the crossed arrow and drag a section in view. 
5. Click on a box in the timeline to see details about it. 

Look for blocks corresponding to long-running tasks and look for idle periods, which reflect processing outside the context of Ray.

Here is a screen grab profiling the previous code, zoomed in on one block of tasks and with one task selected. Note the processes shown on the left for drivers (more than one notebook was running at this time) and workers.

![Ray Trace Example](../images/Ray-Trace-Example.png)

In [None]:
ray.shutdown()  # "Undo ray.init()".

The next lesson, [Ray Actors Revisited](02-Ray-Actors-Revisited.ipynb), revisits actors. It provides a more in-depth look at actor characteristics and profiling actor performance using the _Ray Dashboard_.