# Exercise 6 - Handling Slow Tasks

**GOAL:** The goal of this exercise is to show how to use `ray.wait` to avoid waiting for slow tasks.

See the documentation for `ray.wait` at https://ray.readthedocs.io/en/latest/api.html#ray.wait.

### Concepts for this Exercise - ray.wait

After launching a number of tasks, you may want to know which ones have finished executing. This can be done with `ray.wait`. The function works as follows.

```python
ready_ids, remaining_ids = ray.wait(object_ids, num_returns=1, timeout=None)
```

**Arguments:**
- `object_ids`: This is a list of object IDs.
- `num_returns`: This is maximum number of object IDs to wait for. The default value is `1`.
- `timeout`: This is the maximum amount of time in milliseconds to wait for. So `ray.wait` will block until either `num_returns` objects are ready or until `timeout` milliseconds have passed.

**Return values:**
- `ready_ids`: This is a list of object IDs that are available in the object store.
- `remaining_ids`: This is a list of the IDs that were in `object_ids` but are not in `ready_ids`, so the IDs in `ready_ids` and `remaining_ids` together make up all the IDs in `object_ids`.

In [1]:
import os
os.system('pip install ray')

0

In [0]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
import ray
import time

In [3]:
ray.init(num_cpus=6, include_webui=False, ignore_reinit_error=True)

2019-05-16 11:45:50,488	INFO node.py:469 -- Process STDOUT and STDERR is being redirected to /tmp/ray/session_2019-05-16_11-45-50_121/logs.
2019-05-16 11:45:50,598	INFO services.py:407 -- Waiting for redis server at 127.0.0.1:11916 to respond...
2019-05-16 11:45:50,737	INFO services.py:407 -- Waiting for redis server at 127.0.0.1:32277 to respond...
2019-05-16 11:45:50,744	INFO services.py:804 -- Starting Redis shard with 2.58 GB max memory.
2019-05-16 11:45:50,778	INFO node.py:483 -- Process STDOUT and STDERR is being redirected to /tmp/ray/session_2019-05-16_11-45-50_121/logs.
2019-05-16 11:45:50,781	INFO services.py:1427 -- Starting the Plasma object store with 3.87 GB memory using /dev/shm.


{'node_ip_address': '172.28.0.2',
 'object_store_address': '/tmp/ray/session_2019-05-16_11-45-50_121/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2019-05-16_11-45-50_121/sockets/raylet',
 'redis_address': '172.28.0.2:11916',
 'webui_url': None}

Define a remote function that takes a variable amount of time to run.

In [0]:
@ray.remote
def f(i):
    np.random.seed(5 + i)
    x = np.random.uniform(0, 4)
    time.sleep(x)
    return i, time.time()

**EXERCISE:** Using `ray.wait`, change the code below so that `initial_results` consists of the outputs of the first three tasks to complete instead of the first three tasks that were submitted.

In [12]:
# Sleep a little to improve the accuracy of the timing measurements below.
time.sleep(2.0)
start_time = time.time()

# This launches 6 tasks, each of which takes a random amount of time to
# complete.
result_ids = [f.remote(i) for i in range(6)]
# Get one batch of tasks. Instead of waiting for a fixed subset of tasks, we
# should instead use the first 3 tasks that finish.
initial_results , remaining_ids = ray.wait(result_ids, num_returns=3, timeout=None)
initial_results = ray.get(initial_results)
end_time = time.time()
duration = end_time - start_time
print("ready", initial_results)
print("remaining", remaining_ids)

ready [(0, 1558007481.0837038), (2, 1558007480.5026531), (4, 1558007480.2386622)]
remaining [ObjectID(0100000087fbd26f5a2c2ba3ddf50804cd65ace7), ObjectID(010000003f3bc4889b86e7aa0ec8a7344c0457c6), ObjectID(010000001eb0269a9da52e1d00683b361c268f12)]


**EXERCISE:** Change the code below so that `remaining_results` consists of the outputs of the last three tasks to complete.

In [0]:
# Wait for the remaining tasks to complete.
remaining_results = ray.get(remaining_ids)

**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass.

In [14]:
assert len(initial_results) == 3
assert len(remaining_results) == 3

initial_indices = [result[0] for result in initial_results]
initial_times = [result[1] for result in initial_results]
remaining_indices = [result[0] for result in remaining_results]
remaining_times = [result[1] for result in remaining_results]

assert set(initial_indices + remaining_indices) == set(range(6))

assert duration < 1.5, ('The initial batch of ten tasks was retrieved in '
                        '{} seconds. This is too slow.'.format(duration))

assert duration > 0.8, ('The initial batch of ten tasks was retrieved in '
                        '{} seconds. This is too slow.'.format(duration))

# Make sure the initial results actually completed first.
assert max(initial_times) < min(remaining_times)

print('Success! The example took {} seconds.'.format(duration))

Success! The example took 0.8934965133666992 seconds.
