# Ray Concepts - Task Parallelism (Part 1)

Now let's explore Ray's core concepts and understand how they work. As much as possible, Ray tries to leverage familiar Python idioms, extending them as necessary.

This lesson covers how to define Ray _tasks_, run them, and retrieve the results. We'll also end with an optional section to help you understand how Ray schedules tasks in a distributed environment.

The next lesson will complete the discussion of Ray tasks by exploring how task dependencies are handled and look under the hood at Ray's architecture and runtime behavior.

First, we need to import `ray` and we'll also import the `time` API. (If you get an error in the next cell, make sure you set up the tutorial as described in the project [README](../README.md).

> **Tip:** The [Ray Package Reference](https://ray.readthedocs.io/en/latest/package-ref.html) in the [Ray Docs](https://ray.readthedocs.io/en/latest/) is useful for exploring the API features we'll learn.

In [None]:
# If you are running on Google Colab, uncomment and run the following linkes
# to install the necessary dependencies.

# print("Setting up colab environment")
# !pip install -q ray

In [2]:
import ray, time, sys

Here are some convenience functions `pnd()` and `pd()` for printing a number and time duration or a number, respectively. They also take an optional `prefix` argument for the beginning of the line.

In [3]:
def pnd(n, duration, prefix=''):
    """Print an integer and a time duration, with an optional prefix."""
    prefix2 = prefix if len(prefix) == 0 else prefix+' '
    print('{:s}n: {:2d}, duration: {:6.3f} seconds'.format(prefix2, n, duration))

def pd(duration, prefix=''):
    """Print a time duration, with an optional prefix."""
    prefix2 = prefix if len(prefix) == 0 else prefix+' '
    print('{:s}duration: {:6.3f} seconds'.format(prefix2, duration))

Now consider the following Python function, where we simulate doing something that's slow to complete, using the `sleep` method. A real world example might do a complex calculation (like a training step for machine learning) or call an external web service where a response could take many milliseconds. We'll use more interesting examples later.

In [4]:
def expensive(n):
    start = time.time()       # Let's time how long this takes.
    time.sleep(n)             # Sleep for n seconds
    return (n, time.time() - start)   # Return n and the duration in seconds

In [5]:
(n, duration) = expensive(2)
pnd(n, duration)

n:  2, duration:  2.000 seconds


You should see the output `n: 2, duration: 2.00X seconds`, where `X` is an integer. As we might expect, it took about two seconds to execute.

Now suppose we need to fire off five of these at once:

In [6]:
start_all = time.time()
for n in range(5):
    n2, duration = expensive(n)
    pnd(n, duration)
pd(time.time() - start_all, prefix="Total time:")

n:  0, duration:  0.000 seconds
n:  1, duration:  1.002 seconds
n:  2, duration:  2.001 seconds
n:  3, duration:  3.001 seconds
n:  4, duration:  4.005 seconds
Total time: duration: 10.009 seconds


It takes about 10 seconds to run, because we do this process _synchronously_, but we don't need to do this. Each call to `expensive()` is independent of the others, so ideally we should run them in _parallel_, i.e., _asynchronously_, so all of them finish more quickly.

Ray makes this easy. Let's define a new function and annotate it with `@ray.remote` ([documentation](https://ray.readthedocs.io/en/latest/package-ref.html#ray.remote)). In Ray terminology, the annotation converts the function to a _task_, because we'll now be able to let Ray schedule this "task" (i.e., unit of work) on any CPU core in our laptop or in our cluster when we use one.

> **Key Point:** Annotating a function with `@ray.remote` turns it into a remote _task_.

In [7]:
@ray.remote
def expensive_task(n):
    return expensive(n)

Note that we could simply call `expensive()`, we don't have to redefine it.

Now when we invoke `expensive_task`, we have to use `expensive_task.remote(n)` instead of `expensive_task(n)`, like before. Python is malleable; the Ray team could have instrumented `expensive_task` so that we can call it like a normal function, but the explicit `.remote` is reminder to the reader what code is using Ray vs. normal Python code.

Okay, let's try the same loop as before. But first, we have to initialize Ray with `ray.init()` ([documentation](https://ray.readthedocs.io/en/latest/package-ref.html#ray.init)). There are optional key-value pairs you can provide. We'll explore many of them later, but for now, we'll just pass an option that allows us to re-initialize Ray without triggering an error. It would be useful if you decided to reevalute the following cell for some reason.

In [8]:
ray.init(ignore_reinit_error=True)

2020-04-14 11:35:01,024	INFO resource_spec.py:212 -- Starting Ray with 12.79 GiB memory available for workers and up to 6.41 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).
2020-04-14 11:35:01,346	INFO services.py:1148 -- View the Ray dashboard at [1m[32mlocalhost:8265[39m[22m


{'node_ip_address': '192.168.1.102',
 'redis_address': '192.168.1.102:57581',
 'object_store_address': '/tmp/ray/session_2020-04-14_11-35-01_014711_50605/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2020-04-14_11-35-01_014711_50605/sockets/raylet',
 'webui_url': 'localhost:8265',
 'session_dir': '/tmp/ray/session_2020-04-14_11-35-01_014711_50605'}

> **Troubleshooting**
>
> 1. If you get an error like `... INFO services.py:... -- Failed to connect to the redis server, retrying.`, it probably means you are running a VPN on your machine. [At this time](https://github.com/ray-project/ray/issues/6573), you can't use `ray.init()` with a VPN running. You'll have to stop your VPN to run `ray.init()`, then once it finishes, you can restart your VPN.
> 
> 2. If `ray.init()` worked (for example, you see a message like _View the Ray dashboard at localhost:8265_) and you're using a Mac, you may get several annoying dialogs asking you if you want to allow incoming connections for `python` and/or `redis-server`. Click "Accept" for each one and they shouldn't appear again during this lesson. MacOS is trying to verify if these executables have been properly signed. Ray uses Redis. If you installed Python using Anaconda or other mechanism, then it probably isn't properly signed from the point of view of MacOS. To permanently fix this problem, [see this StackExchange post](https://apple.stackexchange.com/questions/3271/how-to-get-rid-of-firewall-accept-incoming-connections-dialog).

If `ray.init()` worked successfully, you'll see a JSON block with information such as the `node_ip_address` and `webui_rul`. 

A separate message tells you that URL is for the [Ray Dashboard](https://ray.readthedocs.io/en/latest/ray-dashboard.html#ray-dashboard). Open it now in a separate browser tab. It should look something like this:
![Ray Dashboard screenshot](../images/Ray-Dashboard.png)

> **Tip:** You can ask Ray for this URL later if needed. Use `ray.get_webui_url()`.
> 
> **Note:** There are many options you can pass to `ray.init()`. See the [ray.init](https://ray.readthedocs.io/en/latest/package-ref.html#ray.init) and [configuration](https://ray.readthedocs.io/en/latest/configure.html) documentation for details, some of which we'll explore in later lessons.

My laptop has four cores, each of which has a hardware _thread_, for a total of eight. Ray started a `ray` worker process for each hardware thread. These workers are used to run tasks. Click around the dashboard, especially when we run tasks like we're about to do. We'll explore the dashboard more later on. Many laptops have eight cores, so if you may see 16 ray processes.

Now let's run our new Ray task!

In [9]:
expensive_task.remote(2)

ObjectID(45b95b1c8bd3a9c4ffffffff010000c801000000)

What's this `ObjectID` thing? Recall that `expensive()` returned `(n, seconds)`. Now, when we invoke a task, it will be executed _asynchronously_, so instead of the tuple we will eventually want, we get a reference to a Python [Future](https://docs.python.org/3/library/asyncio-future.html), which we'll use to retrieve the tuple when the task has completed. One way to do this is to use `ray.get()` ([documentation](https://ray.readthedocs.io/en/latest/package-ref.html#ray.get)). So, let's modify our previous loop to use the task and retrieve the values using the futures.

> **Key Point:** You call a task with `mytask.remote(...)`. It immediately returns with an ID for a future that you'll use to access the result of the task, after it finishes.

In [10]:
start_all = time.time()
for n in range(5):
    id = expensive_task.remote(n)  # Call the remote task
    n2, duration = ray.get(id)     # Retrieve the value using the future
    pnd(n, duration)
pd(time.time() - start_all, prefix="Total time:")

n:  0, duration:  0.000 seconds
n:  1, duration:  1.002 seconds
n:  2, duration:  2.002 seconds
n:  3, duration:  3.004 seconds
n:  4, duration:  4.004 seconds
Total time: duration: 10.395 seconds


Shouldn't Ray make everything go faster? The performance is the same! 

The reason is because we used `ray.get()` incorrectly. This is a _blocking call_; we're telling Ray, "I need the value and I'm going to wait until the task is done and you can return it to me." This blocking call in the loop defeats the goal of leveraging asynchrony.

Instead, we need to "fire off" all the asynchronous calls, building up a list of futures, then wait for all of them at once. We'll do that as follows, where for this purposes we'll introduce a list comprehension to call the tasks:

In [11]:
start_all = time.time()
ids = []
for n in range(5):
    id = expensive_task.remote(n)
    ids.append(id)
    pnd(n, time.time() - start_all)

print('Calling ray.get:')
for n2, duration in ray.get(ids):    # Retrieve all the values for a list of futures
    pnd(n2, duration)
pd(time.time() - start_all, prefix="Total time:")

n:  0, duration:  0.001 seconds
n:  1, duration:  0.002 seconds
n:  2, duration:  0.002 seconds
n:  3, duration:  0.003 seconds
n:  4, duration:  0.004 seconds
Calling ray.get:
n:  0, duration:  0.000 seconds
n:  1, duration:  1.005 seconds
n:  2, duration:  2.001 seconds
n:  3, duration:  3.004 seconds
n:  4, duration:  4.001 seconds
Total time: duration:  4.006 seconds


Notice what happened. In the first loop, when we called `expensive_task.remote(n)`, each call returned immediately, so the "durations" were tiny. Then you probably noticed that nothing happend for about four seconds, then suddenly everything was printed, for a total elapsed time of about four seconds. 

Why four? When we pass a list of futures to `ray.get()`, it blocks until the results are available for _all_ of them. Our longest task was four seconds, so once that one finished, the others were already done and all could be returned immediately.

Run the next cell, which is basically the same calculation, but it uses a more idiomatic list comprehension for the `expensive_task` invocations and doesn't log the times for those calls, as we now know these times are trivial.

**However**, as soon as the call starts, switch to the Ray Dashboard browser tab and watch what happens. You'll notice instances of `expensive_task` being executed by the different `ray` processes. You might try using a larger number than `5` so it's easier to watch.

In [12]:
start_all = time.time()
ids = [expensive_task.remote(n) for n in range(5)]  # Fire off the asynchronous tasks
for n2, duration in ray.get(ids):                   # Retrieve all the values from the list of futures
    pnd(n2, duration)
pd(time.time() - start_all, prefix="Total time:")

n:  0, duration:  0.000 seconds
n:  1, duration:  1.003 seconds
n:  2, duration:  2.000 seconds
n:  3, duration:  3.004 seconds
n:  4, duration:  4.004 seconds
Total time: duration:  4.009 seconds


## Exercise 1

Let's make sure you understand how to use Ray's task parallelism. In the following two cells, we define a new Python function and then use it several times to perform work. Modify both cells to use Ray. The third cell uses `assert` statements to check your work.

> **Tip:** The solution is in the `solutions` folder.

In [13]:
def slow_square(n):
    time.sleep(n)
    return n*n

In [14]:
@ray.remote
def fast_square(n):
    return slow_square(n)

In [15]:
start = time.time()
squares = [fast_square.remote(n) for n in range(4)]
squares = [s for s in ray.get(squares)]
duration = time.time() - start

In [16]:
assert squares == [0, 1, 4, 9], f'Did you retrieve the objects using the ids? squares = {squares}'
assert duration < 4.1, f'Did you exploit Ray parallelism? The duration is too long: {duration} seconds' 

## A Closer Look at Scheduling

> **Note:** If you just want to learn the Ray API, you can safely skip the rest of this lesson (notebook) for now. It begins our exploration of how Ray works internally. However, you should come back to it at some point, so you'll develop a better understanding of how Ray works.

To better see what's happening with the dashboard, run the following cells to determine the number of CPU hardware threads on your laptop, each of which is running a `ray` process. We've expanded this code over several cells so you can see what each step returns, but you could write it all at once, `num_cpus = ray.nodes()[0]['Resources']['CPU']`.

In [17]:
import json
nodes = ray.nodes()  # Get a JSON object with metadata about all the nodes in your "cluster".
nodes                # On your laptop, a list with one node.

[{'NodeID': 'cc7a4b4a6162aaa5e3bfb8bf9b09ebcf2f36a613',
  'Alive': True,
  'NodeManagerAddress': '192.168.1.102',
  'NodeManagerHostname': 'setzer.hsd1.co.comcast.net',
  'NodeManagerPort': 58277,
  'ObjectManagerPort': 58335,
  'ObjectStoreSocketName': '/tmp/ray/session_2020-04-14_11-35-01_014711_50605/sockets/plasma_store',
  'RayletSocketName': '/tmp/ray/session_2020-04-14_11-35-01_014711_50605/sockets/raylet',
  'Resources': {'object_store_memory': 90.0,
   'node:192.168.1.102': 1.0,
   'CPU': 12.0,
   'memory': 262.0},
  'alive': True}]

In [18]:
node = nodes[0]    # Get the single node
node

{'NodeID': 'cc7a4b4a6162aaa5e3bfb8bf9b09ebcf2f36a613',
 'Alive': True,
 'NodeManagerAddress': '192.168.1.102',
 'NodeManagerHostname': 'setzer.hsd1.co.comcast.net',
 'NodeManagerPort': 58277,
 'ObjectManagerPort': 58335,
 'ObjectStoreSocketName': '/tmp/ray/session_2020-04-14_11-35-01_014711_50605/sockets/plasma_store',
 'RayletSocketName': '/tmp/ray/session_2020-04-14_11-35-01_014711_50605/sockets/raylet',
 'Resources': {'object_store_memory': 90.0,
  'node:192.168.1.102': 1.0,
  'CPU': 12.0,
  'memory': 262.0},
 'alive': True}

In [19]:
resources = node['Resources']   # Get the resources for the node
resources

{'object_store_memory': 90.0,
 'node:192.168.1.102': 1.0,
 'CPU': 12.0,
 'memory': 262.0}

In [20]:
num_cpus = resources['CPU']  # Get the number of CPU hardware threads
num_cpus

12.0

The final number will be `2.0`, `4.0`, `8.0`, `16.0`, etc. The next cell is one of our previous examples of calling `expensive_task`, but now the loop counter is `2*int(num_cpus)` instead of `5`. This will mean that half of the tasks will have to wait for an open slot. Now run the following cell and watch the Ray dashboard. (You'll know the cell is finished when all the `ray` workers return to `IDLE`.)

What's the total time now? How about the individual times?

In [21]:
start_all = time.time()
ids = []
for n in range(2*int(num_cpus)):     # What's changed!
    id = expensive_task.remote(n)
    ids.append(id)
    pnd(n, time.time() - start_all)

print('Calling ray.get:')
for n2, duration in ray.get(ids):    # Retrieve all the values for a list of futures
    pnd(n2, duration)
pd(time.time() - start_all, prefix="Total time:")

n:  0, duration:  0.001 seconds
n:  1, duration:  0.002 seconds
n:  2, duration:  0.003 seconds
n:  3, duration:  0.004 seconds
n:  4, duration:  0.004 seconds
n:  5, duration:  0.004 seconds
n:  6, duration:  0.005 seconds
n:  7, duration:  0.006 seconds
n:  8, duration:  0.007 seconds
n:  9, duration:  0.007 seconds
n: 10, duration:  0.008 seconds
n: 11, duration:  0.011 seconds
n: 12, duration:  0.013 seconds
n: 13, duration:  0.016 seconds
n: 14, duration:  0.020 seconds
n: 15, duration:  0.021 seconds
n: 16, duration:  0.022 seconds
n: 17, duration:  0.022 seconds
n: 18, duration:  0.023 seconds
n: 19, duration:  0.023 seconds
n: 20, duration:  0.024 seconds
n: 21, duration:  0.024 seconds
n: 22, duration:  0.024 seconds
n: 23, duration:  0.025 seconds
Calling ray.get:
n:  0, duration:  0.000 seconds
n:  1, duration:  1.004 seconds
n:  2, duration:  2.003 seconds
n:  3, duration:  3.003 seconds
n:  4, duration:  4.001 seconds
n:  5, duration:  5.003 seconds
n:  6, duration:  6.003

On my 8-worker machine, 16 tasks were run.

Look at the first set of times, for the submissions. They are still fast and nonblocking, but on my machine they took about ~0.02 seconds to complete, so some competition for CPU time occurred.

As before, each asynchronous task still takes roughly `n` seconds to finish (for `n` equals 0 through 15). This makes sense, because each `expensive_task` does essentially nothing but sleep, and since there's only one task per worker, there should be no apreciable difference for the individual times, as before. 

However, the whole process took about 22 seconds, not 16, as we might have expected from our previous experience (i.e., the time for the longest task). This reflects the fact that half the tasks had to wait for an available worker.

In fact, we can explain the 22 seconds exactly. Here is how my 16 tasks, with durations 0 to 15 seconds, were allocated to the 8 workers. Keep in mind that the scheduling happened in order for increasing `n`.

The first 8 tasks, of duration 0 to 7 seconds, where scheduled immediately in the 8 available workers. The 0-second task finished immediately, so the next waiting task, the 8-second task was scheduled on that worker. It finished in 8 seconds, so the _total_ time for the 0-second and 8-second tasks was about 8 seconds. Similarly, after the 1-second task finished, the 9-second task was scheduled. Total time: 10 seconds. Using induction ;), the last worker started with the 7-second task followed by the 15-second task for a total of 22 seconds!

Here's a table showing this in detail. where `n1` and `n2` refers to the first and second tasks, with durations `n1` seconds and `n2` seconds, for a total of `n1+n2` seconds. For consistency, the `ray` workers are numbered from zero:

| Worker | n1 | n2 | Total Time |
| -----: | -: | -: | ---------: |
| 0 | 0 |  8 |  8 |
| 1 | 1 |  9 | 10 |
| 2 | 2 | 10 | 12 |
| 3 | 3 | 11 | 14 |
| 4 | 4 | 12 | 16 |
| 5 | 5 | 13 | 18 |
| 6 | 6 | 14 | 20 |
| 7 | 7 | 15 | 22 |



If your number of workers is different than 8, try changing the previous table to match your system and see if you get a total-time number in the last row that matches what actually happened. 

Of course a real-world scheduling scenario would be more complicated, but hopefully you have a better sense of how Ray distributes work, whether you're working on a single laptop or a large cluster!