# Exercise 6 - Process Tasks in Order of Completion

**GOAL:** The goal of this exercise is to show how to use `ray.wait` to process tasks in the order that they finish.

See the documentation for ray.wait at http://ray.readthedocs.io/en/latest/api.html#waiting-for-a-subset-of-tasks-to-finish.

The code below runs 10 tasks and retrieves the results in the order that the tasks were launched. However, since each task takes a random amount of time to finish, we could instead process the tasks in the order that they finish.

In [1]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
import ray
import time

In [2]:
ray.init(num_cpus=5, redirect_output=True)

Waiting for redis server at 127.0.0.1:15123 to respond...
Waiting for redis server at 127.0.0.1:20382 to respond...
Starting local scheduler with 5 CPUs, 0 GPUs

View the web UI at http://localhost:8891/notebooks/ray_ui59731.ipynb?token=a027916a6cc3d38c9e440d0016362d420c28b929f7f10077



{'local_scheduler_socket_names': ['/tmp/scheduler65144554'],
 'node_ip_address': '127.0.0.1',
 'object_store_addresses': [ObjectStoreAddress(name='/tmp/plasma_store90797388', manager_name='/tmp/plasma_manager92935720', manager_port=36299)],
 'redis_address': '127.0.0.1:15123',
 'webui_url': 'http://localhost:8891/notebooks/ray_ui59731.ipynb?token=a027916a6cc3d38c9e440d0016362d420c28b929f7f10077'}

In [3]:
@ray.remote
def f():
    time.sleep(np.random.uniform(0, 5))
    return time.time()

**EXERCISE:** Change the code below to use `ray.wait` to get the results of the tasks in the order that they complete.

**NOTE:** It would be a simple modification to maintain a pool of 10 experiments and to start a new experiment whenever one finishes.

In [4]:
ray.wait(result_ids, num_returns=len(result_ids))

NameError: name 'result_ids' is not defined

In [5]:
# Sleep a little to improve the accuracy of the timing measurements below.
time.sleep(2.0)
start_time = time.time()

result_ids = [f.remote() for _ in range(10)]

# Get the results.
results = []
while len(result_ids) >= 1:
    result_w, remaining = ray.wait(result_ids) # two lists
    result = ray.get(result_w)[0]
    results.append(result)
    print('Processing result which finished after {} seconds. {}'.format(result - start_time, result_w))
    result_ids = result_ids[1:]
    
end_time = time.time()
duration = end_time - start_time

Processing result which finished after 0.39975500106811523 seconds. [ObjectID(eb746ac0f732421c73a1905a2a56bf0990b0ea96)]
Processing result which finished after 1.9497718811035156 seconds. [ObjectID(bf15f2d51c301c739a6f0cc1599396bfb8e7dd40)]
Processing result which finished after 1.9497718811035156 seconds. [ObjectID(bf15f2d51c301c739a6f0cc1599396bfb8e7dd40)]
Processing result which finished after 1.9497718811035156 seconds. [ObjectID(bf15f2d51c301c739a6f0cc1599396bfb8e7dd40)]
Processing result which finished after 1.9497718811035156 seconds. [ObjectID(bf15f2d51c301c739a6f0cc1599396bfb8e7dd40)]
Processing result which finished after 3.538628101348877 seconds. [ObjectID(cf18fe13620c90c44987ba28eeed5f0a96fe12a8)]
Processing result which finished after 3.538628101348877 seconds. [ObjectID(cf18fe13620c90c44987ba28eeed5f0a96fe12a8)]
Processing result which finished after 3.6061460971832275 seconds. [ObjectID(99c2d35125916ae63f64c2e7c2c8dfeb41d72acd)]
Processing result which finished after 6.

**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass.

In [6]:
assert results == sorted(results), ('The results were not processed in the '
                                    'order that they finished.')

print('Success! The example took {} seconds.'.format(duration))

Success! The example took 6.941716909408569 seconds.
