# Ray Concepts - Data Parallelism (Part 2)

The previous lesson explored Ray's core concepts and how they work. We learned how to define Ray _tasks_, run them, and retrieve the results. We  also started learning about how Ray schedules tasks in a distributed environment.

This lesson completes the discussion of Ray tasks by exploring how task dependencies are handled. We'll also look under the hood at Ray's architecture and runtime behavior.

> **Tip:** Recall that the [Ray Package Reference](https://ray.readthedocs.io/en/latest/package-ref.html) in the [Ray Docs](https://ray.readthedocs.io/en/latest/) is useful for exploring the API features we'll learn.

In [None]:
# Set up, like the previous lesson, but adding NumPy:

import ray, time, sys    # New notebook, so new process
import numpy as np       # Used for examples
sys.path.append('..')    # Import our own libraries starting in the project root directory

from util.printing import p

In [None]:
ray.init(ignore_reinit_error=True)

Let's work with a new remote function. Previously, our `expensive` and `expensive_task` functions returned tuples that included time durations. Obviously the durations were useful for understanding how long the functions took to execute. Now, it will be more convenient to not return "metadata" like this, but just data values that we care about, because we are going to pass them to other functions. Hence, we'll define _dependents_ to those other functions, and we'll learn how dependent, asynchronous computations are handled in Ray.

So, let's define a task to return a random NumPy array of some size `n`:

In [None]:
@ray.remote
def make_array(n):
    return np.random.standard_normal(n)

Now define a task that can add two NumPy arrays together. The arrays need to be the same size, but we'll ignore any checking for this requirement.

In [None]:
@ray.remote
def add_array(a1, a2):
    return np.add(a1, a2)

Now lets use them!

In [None]:
start = time.time()
id1 = make_array.remote(50)
id2 = make_array.remote(50)
id3 = add_array.remote(id1, id2)
p(0, time.time() - start)  # The integer argument to `p()` is arbitrary.

magic

In [None]:
ray.get(id3)

p(1, time.time() - start)

TODO

## Ray.wait() - A Better Alternative to ray.get()

TODO

In [None]:
start_all = time.time()
ids = [expensive_task.remote(n) for n in range(5)]
for n2, duration in ray.get(ids):    # Retrieve all the values for a list of futures
    p(n2, duration)
print("Total time:")
p(10, time.time() - start_all)

## How Distributed Task Management Works

> **Note:** If you just want to learn the Ray API, you can safely skip the rest of this lesson (notebook) for now. It continues the exploration of how Ray works internally, which we started in the previous lesson. However, you should come back to this material at some point, so you'll develop a better understanding of how Ray works.

To better see what's happening with the dashboard, run the following cells to determine the number of CPU hardware threads on your laptop, each of which is running a `ray` process. We've expanded this code over several cells so you can see what each step returns, but you could write it all at once, `num_cpus = ray.nodes()[0]['Resources']['CPU']`.


In [None]:
import json
nodes = ray.nodes()  # Get a JSON object with metadata about all the nodes in your "cluster".
nodes                # On your laptop, a list with one node.

In [None]:
node = nodes[0]    # Get the single node
node

In [None]:
resources = node['Resources']   # Get the resources for the node
resources

In [None]:
num_cpus = resources['CPU']  # Get the number of CPU hardware threads
num_cpus

The final number will be `8.0`, `16.0`, etc. The next cell is one of our previous examples of calling `expensive_task`, but now the loop counter is `2*int(num_cpus)` instead of `5`. This will mean that half of the tasks will have to wait for an open slot. Now run the following cell and watch the Ray dashboard. (You'll know the cell is finished when all the `ray` workers return to `IDLE`.)

What's the total time now? How about the individual times?

In [None]:
start_all = time.time()
ids = []
for n in range(2*int(num_cpus)):     # What's changed!
    id = expensive_task.remote(n)
    ids.append(id)
    p(n, time.time() - start_all)

for n2, duration in ray.get(ids):    # Retrieve all the values for a list of futures
    p(n2, duration)
print("Total time:")
p(10, time.time() - start_all)

On my 8-worker machine, 16 tasks were run.

Look at the first set of times, for the submissions. They are still fast and nonblocking, but on my machine they took about ~0.02 seconds to complete, so some competition for CPU time occurred.

As before, each asynchronous task still takes roughly `n` seconds to finish (for `n` equals 0 through 15). This makes sense, because each `expensive_task` does essentially nothing but sleep, and since there's only one task per worker, there should be no apreciable difference for the individual times, as before. 

However, the whole process took about 22 seconds, not 16, as we might have expected from our previous experience (i.e., the time for the longest task). This reflects the fact that half the tasks had to wait for an available worker.

In fact, we can explain the 22 seconds exactly. Here is how my 16 tasks, with durations 0 to 15 seconds, were allocated to the 8 workers. Keep in mind that the scheduling happened in order for increasing `n`.

The first 8 tasks, of duration 0 to 7 seconds, where scheduled immediately in the 8 available workers. The 0-second task finished immediately, so the next waiting task, the 8-second task was scheduled on that worker. It finished in 8 seconds, so the _total_ time for the 0-second and 8-second tasks was about 8 seconds. Similarly, after the 1-second task finished, the 9-second task was scheduled. Total time: 10 seconds. Using induction ;), the last worker started with the 7-second task followed by the 15-second task for a total of 22 seconds!

Here's a table showing this in detail. where `n1` and `n2` refers to the first and second tasks, with durations `n1` seconds and `n2` seconds, for a total of `n1+n2` seconds. For consistency, the `ray` workers are numbered from zero:

| Worker | n1 | n2 | Total Time |
| -----: | -: | -: | ---------: |
| 0 | 0 |  8 |  8 |
| 1 | 1 |  9 | 10 |
| 2 | 2 | 10 | 12 |
| 3 | 3 | 11 | 14 |
| 4 | 4 | 12 | 16 |
| 5 | 5 | 13 | 18 |
| 6 | 6 | 14 | 20 |
| 7 | 7 | 15 | 22 |



In [None]:
import numpy as np

@ray.remote
def make_array(n):
    return np.random.standard_normal(n)

@ray.remote
def add_array(a1, a2):
    return np.add(a1, a2)

start = time.time()
id1 = make_array.remote(50)
id2 = make_array.remote(50)
id3 = add_array.remote(id1, id2)
p(0, time.time() - start)

ray.get(id3)

p(1, time.time() - start)

Of course a real-world scheduling scenario would be more complicated, but hopefully you have a better sense of how Ray distributes work, whether you're working on a single laptop or a large cluster!