# Ray Concepts - Data Parallelism (Part 2)

The previous lesson explored Ray's core concepts and how they work. We learned how to define Ray _tasks_, run them, and retrieve the results. We  also started learning about how Ray schedules tasks in a distributed environment.

This lesson completes the discussion of Ray tasks by exploring how task dependencies are handled. We'll also look under the hood at Ray's architecture and runtime behavior.

> **Tip:** Recall that the [Ray Package Reference](https://ray.readthedocs.io/en/latest/package-ref.html) in the [Ray Docs](https://ray.readthedocs.io/en/latest/) is useful for exploring the API features we'll learn.

In [1]:
# Imports and initialize Ray. We're adding NumPy for the examples and the tutorial `util` library:

import ray, time, sys    # New notebook, so new process
import numpy as np       # Used for examples
sys.path.append('..')    # Import our own libraries starting in the project root directory

from util.printing import pnd, pd

In [2]:
ray.init(ignore_reinit_error=True)

2020-04-06 13:29:41,448	INFO resource_spec.py:204 -- Starting Ray with 4.05 GiB memory available for workers and up to 2.03 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).
2020-04-06 13:29:41,786	INFO services.py:1146 -- View the Ray dashboard at [1m[32mlocalhost:8266[39m[22m


{'node_ip_address': '192.168.1.149',
 'redis_address': '192.168.1.149:52078',
 'object_store_address': '/tmp/ray/session_2020-04-06_13-29-41_439823_29297/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2020-04-06_13-29-41_439823_29297/sockets/raylet',
 'webui_url': 'localhost:8266',
 'session_dir': '/tmp/ray/session_2020-04-06_13-29-41_439823_29297'}

Let's work with a new remote function. Previously, our `expensive` and `expensive_task` functions returned tuples that included time durations. Obviously the durations were useful for understanding how long the functions took to execute. Now, it will be more convenient to not return "metadata" like this, but just data values that we care about, because we are going to pass them to other functions. 

Hence, we'll define _dependentcy_ relationships between tasks. We'll learn how Ray handles these dependent, asynchronous computations.

So, let's define a task to return a random NumPy array of some size `n`. As before, we'll add a sleep time, one tenth the size of `n`:

In [3]:
@ray.remote
def make_array(n):
    time.sleep(n/10.0)
    return np.random.standard_normal(n)

Now define a task that can add two NumPy arrays together. The arrays need to be the same size, but we'll ignore any checking for this requirement.

In [4]:
@ray.remote
def add_arrays(a1, a2):
    time.sleep(a1.size/10.0)
    return np.add(a1, a2)

Now lets use them!

In [5]:
start = time.time()
id1 = make_array.remote(20)
id2 = make_array.remote(20)
id3 = add_arrays.remote(id1, id2)
print(ray.get(id3))
pd(time.time() - start, prefix="Total time:")

[ 1.46278599  1.3091778   0.83837591  1.53840337 -0.30042672 -0.06983262
  0.12076512 -1.19392557  0.25440688 -0.12887382 -1.05815889  1.11084168
 -1.53943636  0.26230243 -0.72355366 -1.64677165  0.46320472 -1.11021867
  1.35706638 -1.99027404]
Total time: duration:  4.032 seconds


Something subtle and "magical" happened here; when we called `add_arrays`, we didn't need to call `ray.get()` first for `id1` and `id2`, since `add_arrays` expects NumPy arrays. Because `add_arrays` is a Ray task, Ray automatically does the extraction for us, so we can write code that looks more natural.

Furthermore, note that the `add_arrays` task effectively depends on the outputs of the two `make_array` tasks. Ray won't run `add_arrays` until the other tasks are finished. Hence, _Ray handles task dependencies automatically for us._ 

This is why the elapsed time is about 4 seconds. We used a size of 20, so we slept 2 seconds in each call to `make_array`, but those happened in parallel, _followed_ by a second sleep of 2 seconds in `add_arrays`.

Recall from the previous lesson that we explored when to call `ray.get()` to avoid forcing tasks to become synchronous when they should be asynchronous. This additional example illustrates two key points:

* _Don't ask for results you don't need._
* _Don't ask for the results you need until you really need them._

We don't need to see the objects for `id1` and `id2`. We only need the final array for `id3`.

## Using ray.wait() with ray.get()

We've seen several examples of the best idiomatic way to use `ray.get()`. Here again is an example from the last lesson:

```python
start = time.time()
ids = [expensive_task.remote(n) for n in range(5)]  # Fire off the asynchronous tasks
for n2, duration in ray.get(ids):                   # Retrieve all the values from the list of futures
    p(n2, duration)
pd(time.time() - start, prefix="Total time:")
```

Let's try it again with our new methods:

In [7]:
start = time.time()
array_ids = [make_array.remote(n*10) for n in range(5)]
added_array_ids = [add_arrays.remote(id, id) for id in array_ids]
for array in ray.get(added_array_ids):
    print(f'{array.size}: {array}')
pd(time.time() - start, prefix="Total time:")

0: []
10: [-2.70758035 -0.99925313 -1.84091163  1.96740565  2.4847109   0.98100963
 -1.24172508 -1.70306979 -1.90439842  2.30524534]
20: [-1.07595426  1.3081596  -1.68500055 -0.14567988 -0.08601576  0.95707809
 -0.01646987  1.45861077 -3.42676922 -1.51516786  3.82055149 -0.99646118
  3.50673127 -3.91386345 -1.33745696 -1.99430198  0.37948619  0.49195842
  1.28710605  0.04740637]
30: [-1.7477596  -0.57319718  0.6634652  -2.67788299 -2.76149058  3.49889132
  3.78105266 -0.19833039 -3.89597939 -1.60106731  0.63028907  1.86981812
 -1.69830529  0.73105321 -3.48515744 -0.46035605 -0.94157455 -0.71955543
 -0.44421647  0.00737847  1.22214649 -2.05272376 -4.71482278 -1.1625598
  2.56262296  0.1909566  -1.77348868  0.24206547  0.53507591 -2.23155352]
40: [ 3.66916878  1.34677088  0.09755048 -1.89966179 -0.76563557 -2.29965443
 -0.67844454 -2.97577402 -0.76400462 -0.30970419  0.20638629 -2.59799329
 -0.07680009  1.73103041  2.87109796  0.17240239 -0.16506798  0.52234051
  5.35822363  0.5717204  -

On my machine, I waited 8 seconds and then everything was printed at once.

There are two fundamental problems with the way we've used `ray.get()` so far:

1. There's no timeout, in case something gets "hung".
2. We have to wait for _all_ the objects to be available before `ray.get()` returns.

The ability to specify a timeout is essential in production code as a defensive measure. Many potential problems could happen in a real production system, any one of which could cause the task we're waiting on to take an abnormally long time to complete or never complete. Our application would be deadlocked waiting on this task. Hence, it's **strongly recommended** in production software to always use timeouts on blocking calls, so that the application can attempt some sort of recovery in situations like this, or at least report the error and "degrade gracefully".

Actually, there _is_ a `timeout=<value>` option you can pass to `ray.get()` ([documentation](https://ray.readthedocs.io/en/latest/package-ref.html#ray.get)), but it will most likely be removed in a future release of Ray. Why remove it if timeouts are important? This change will simplify the implementation of `ray.get()` and encourage the use of `ray.wait()` for waiting ([documentation](https://ray.readthedocs.io/en/latest/package-ref.html#ray.wait)) instead, followed by using `ray.get()` to retrieve values for tasks that `ray.wait()` tells us are finished. 

Using `ray.wait()` is also the way to fix the second problem with using `ray.get()` by itself, that we have to wait for all tasks to finish before we get any values back. Some of those tasks might finish quickly, like our contrived examples that sleep for short durations compared to other invocations. 

When you have a list of asynchronous tasks, you want to process the results of them as soon they become available, even while others continue to run. Use `ray.wait()` for this purpose.

Therefore, while `ray.get()` is simple and convenient, for _production code_, we recommend using `ray.wait()`, **with** timeouts, for blocking on running tasks. Then use `ray.get()` to retrieve values of completed tasks. Now we'll learn how to use these two together. For a longer discussion on `ray.wait()`, see [this blog post](https://medium.com/distributed-computing-with-ray/ray-tips-and-tricks-part-i-ray-wait-9ed7a0b9836d).

Here is the previous example rewritten to use `ray.wait()`:

In [10]:
start = time.time()
array_ids = [make_array.remote(n*10) for n in range(5)]
added_array_ids = [add_arrays.remote(id, id) for id in array_ids]

waiting_ids = list(added_array_ids)        # Assign a working list to the full list of ids
while len(waiting_ids) > 0:                # Loop until all tasks have completed
    # Call ray.wait with:
    #   1. the list of ids we're still waiting to complete,
    #   2. tell it to return immediately as soon as one of them completes,
    #   3. tell it wait up to 10 seconds before timing out.
    ready_ids, remaining_ids = ray.wait(waiting_ids, num_returns=1, timeout=10.0)
    print('Returned {:3d} completed tasks. (elapsed time: {:6.3f})'.format(len(ready_ids), time.time() - start))
    for array in ray.get(ready_ids):
        print(f'{array.size}: {array}')
        
    waiting_ids = remaining_ids  # Reset this list; don't include the completed ids in the list again!
    
pd(time.time() - start, prefix="Total time:")

Returned   1 completed tasks. (elapsed time:  0.005)
0: []
Returned   1 completed tasks. (elapsed time:  2.012)
10: [ 0.72887255  2.75195066 -2.49829625 -0.26751108  5.23830508 -0.67140578
  0.98708413  2.42503449  1.97375834  0.02753384]
Returned   1 completed tasks. (elapsed time:  4.015)
20: [-2.41666655e+00 -3.80435235e+00 -2.07063148e+00 -2.93908226e-01
 -2.99361546e-03 -2.79436701e+00  4.12693904e+00 -1.12538571e-01
  3.80724688e+00 -3.29507009e+00  1.38222164e+00 -2.52271210e+00
  1.54011302e+00  2.53180030e+00 -2.77913815e-01  1.45905084e+00
  2.61237500e-01 -2.02075131e+00  2.14265696e+00 -1.13976267e+00]
Returned   1 completed tasks. (elapsed time:  6.012)
30: [-0.20290182  2.36678258 -1.28286253  2.07838619 -1.16608706 -0.56844014
  4.72363961 -1.00907559 -1.94756655 -0.37628311 -1.05179824 -3.51640776
  1.05171619  1.18265669  0.36203459 -0.21969228 -0.24842565 -1.3490668
 -4.69822701  3.50000913  0.45740144  0.74539234  0.09017152  0.61303288
 -0.83737517 -2.74525008 -1.77

Now it still takes about 8 seconds to complete, 4 seconds for the longest invocation of `make_array` and 4 seconds for the invocation of `add_arrays`, but since the others complete more quickly, we see their results as soon as they become available, at 0, 2, 4, and 6 second intervals.

> **Warning:** For each call to `ray.wait()` in a loop like this, it's important to remove the ids that have completed. Otherwise, `ray.wait()` will return immediately with the same list containg the first completed item, over and over again; you'll loop forever!! Resetting the list is easy, since the second list returned by `ray.wait()` is the rest of the items that are still running. So, that's what we use.

Now let's try it with `num_returns = 2`:

In [13]:
start = time.time()
array_ids = [make_array.remote(n*10) for n in range(5)]
added_array_ids = [add_arrays.remote(id, id) for id in array_ids]

waiting_ids = list(added_array_ids)        # Assign a working list to the full list of ids
while len(waiting_ids) > 0:                # Loop until all tasks have completed
    # Call ray.wait with:
    #   1. the list of ids we're still waiting to complete,
    #   2. tell it to return immediately as soon as TWO of them complete,
    #   3. tell it wait up to 10 seconds before timing out.
    return_n = 2 if len(waiting_ids) >= 2 else 1   # See discussion in the next cell
    ready_ids, remaining_ids = ray.wait(waiting_ids, num_returns=return_n, timeout=10.0)
    print('Returned {:3d} completed tasks. (elapsed time: {:6.3f})'.format(len(ready_ids), time.time() - start))
    for array in ray.get(ready_ids):
        print(f'{array.size}: {array}')
        
    waiting_ids = remaining_ids  # Reset this list; don't include the completed ids in the list again!
    
pd(time.time() - start, prefix="Total time:")

Returned   2 completed tasks. (elapsed time:  2.010)
0: []
10: [ 1.7215186  -1.20758138 -0.44587926 -1.9540483   0.42110723 -1.10633336
 -0.69527172 -1.82910789 -1.5393299   0.83943492]
Returned   2 completed tasks. (elapsed time:  6.009)
20: [ 1.52389709 -1.5249968  -3.31701453  1.47170745  0.17517923 -0.22834857
  3.08451281 -2.64095245  0.56596446 -2.46669565 -1.29576393 -1.30380599
  0.84097717  0.02547861 -0.91041547  0.6328555  -4.02808826  2.71486281
 -2.29350491  1.40191421]
30: [-1.32017215  3.59105792 -0.87426021  0.58457614  2.0363863   2.25183006
  3.84991622 -0.19640412  1.13065231  0.2807348   0.22267571  0.18652756
  1.34203745  0.10917007 -0.28581186 -0.31800248  1.13338547 -1.0040262
 -3.16359625 -0.73747263  1.74298806  2.11467092 -1.46946596  0.01565229
  0.81415958 -0.71352269 -0.88725705 -1.69699777 -0.19842466 -4.92212589]
Returned   1 completed tasks. (elapsed time:  8.014)
40: [ 3.55486039  1.29231059  2.71110267 -1.1542018  -0.98062425  2.85048659
  0.57192423 

Now we get two at a time output. Note that we don't actually pass `num_returns=2` every time. If you ask for more items than the length of the input list, you get an error. So, we compute `num_returns`, using `2` except when there's only one task to wait on, in which case we use `1`. So, in fact, the output for `40` was a single task result, because we started with `5` and processed two at a time.

## Exercise 2

The following cell is identical to the last one. Modify it to use a timeout of `2.5` seconds, shorter than our longest tasks. What happens now? Try using other times.

In [None]:
start = time.time()
array_ids = [make_array.remote(n*10) for n in range(5)]
added_array_ids = [add_arrays.remote(id, id) for id in array_ids]

waiting_ids = list(added_array_ids)        # Assign a working list to the full list of ids
while len(waiting_ids) > 0:                # Loop until all tasks have completed
    # Call ray.wait with:
    #   1. the list of ids we're still waiting to complete,
    #   2. tell it to return immediately as soon as TWO of them complete,
    #   3. tell it wait up to 10 seconds before timing out.
    return_n = 2 if len(waiting_ids) >= 2 else 1   # See discussion in the next cell
    ready_ids, remaining_ids = ray.wait(waiting_ids, num_returns=return_n, timeout=10.0)
    print('Returned {:3d} completed tasks. (elapsed time: {:6.3f})'.format(len(ready_ids), time.time() - start))
    for array in ray.get(ready_ids):
        print(f'{array.size}: {array}')
        
    waiting_ids = remaining_ids  # Reset this list; don't include the completed ids in the list again!
    
pd(time.time() - start, prefix="Total time:")

In conclusion:

> **Tips:**
>
> 1. Use `ray.wait()` with a timeout to wait for one or more running tasks. Then use `ray.get()` to retrieve the values for the finished tasks.
> 2. Don't ask for results you don't need.
> 3. Don't ask for the results you need until you really need them.

## How Distributed Task Management Works

> **Note:** If you just want to learn the Ray API, you can safely skip the rest of this lesson (notebook) for now. It continues the exploration of how Ray works internally, which we started in the previous lesson. However, you should come back to this material at some point, so you'll develop a better understanding of how Ray works.

At the end of the last lesson, we examined Ray task scheduling at a high-level, by watching the Ray Dashboard and analyzing the performance times. Now we'll walk through some images that show the process Ray follows to place tasks around a cluster. 

Assume we will invoke the `make_array` task twice, then invoke `add_arrays` to sum the returned NumPy arrays. Graphically, it looks as follows:
![Ray under the hood 1](../images/Ray-Cluster/Ray-Cluster.001.jpeg)

How does this get scheduled in a cluster? Here we'll assume a three-node cluster that has resources for running two Ray worker tasks per node (under powered compared to what we learned using Ray Dashboard last lesson!).
![Ray under the hood 2](../images/Ray-Cluster/Ray-Cluster.002.jpeg)

First, assume that the driver program is running on Node1. So it will invoke the local scheduler to schedule the three tasks.
![Ray under the hood 3](../images/Ray-Cluster/Ray-Cluster.003.jpeg)

Immediately the ids for the task futures are returned. The _Global Control Store_ tracks where every task is running and every object is stored in the local _Object Stores_.
![Ray under the hood 4](../images/Ray-Cluster/Ray-Cluster.004.jpeg)

Suppose the local scheduler has available capacity in the first worker on the same node. It schedules the first `make_array` task there.
![Ray under the hood 5](../images/Ray-Cluster/Ray-Cluster.005.jpeg)

It decides to schedule the second `make_array` task in a worker on node 2.
![Ray under the hood 6](../images/Ray-Cluster/Ray-Cluster.006.jpeg)

When the two tasks finish, they place their result objects in their local object stores.
![Ray under the hood 7](../images/Ray-Cluster/Ray-Cluster.007.jpeg)

Now `add_array` can be scheduled, because the two tasks it depends on are done. Let's suppose it gets scheduled in the second worker on Node 1.
![Ray under the hood 8](../images/Ray-Cluster/Ray-Cluster.008.jpeg)

The first object it needs is already on the same node, in the object store, so the `add_arrays` task can _read it directly from shared memory_. No copying is required to the worker's process space.
![Ray under the hood 9](../images/Ray-Cluster/Ray-Cluster.009.jpeg)

However, the second object is on a different node, so Ray copies it to the local object store. 
![Ray under the hood 10](../images/Ray-Cluster/Ray-Cluster.010.jpeg)

Now it can also be read from shared memory.
![Ray under the hood 11](../images/Ray-Cluster/Ray-Cluster.011.jpeg)

When `add_arrays` is finished, it writes its results to the local object store.
![Ray under the hood 12](../images/Ray-Cluster/Ray-Cluster.012.jpeg)

At this point, if the driver calls `ray.get(id3)`, it will return `obj3`.
![Ray under the hood 13](../images/Ray-Cluster/Ray-Cluster.013.jpeg)

Whew! Hopefully you have a better sense of what Ray does under the hood. Scheduling tasks on other nodes and copying objects between object stores is efficient, but incurs unavoidable network overhead.