# Exercise 2 - Parallel Data Processing with Task Dependencies

**GOAL:** The goal of this exercise is to show how to pass object IDs into remote functions to encode dependencies between tasks.

In this exercise, we construct a sequence of tasks each of which depends on the previous mimicking a data parallel application. Within each sequence, tasks are executed serially, but multiple sequences can be executed in parallel.

In this exercise, you will use Ray to parallelize the computation below and speed it up.

### Concept for this Exercise - Task Dependencies

Suppose we have two remote functions defined as follows.

```python
@ray.remote
def f(x):
    return x
```

Arguments can be passed into remote functions as usual.

```python
>>> x1_id = f.remote(1)
>>> ray.get(x1_id)
1

>>> x2_id = f.remote([1, 2, 3])
>>> ray.get(x2_id)
[1, 2, 3]
```

**Object IDs** can also be passed into remote functions. When the function actually gets executed, **the argument will be a retrieved as a regular Python object**.

```python
>>> y1_id = f.remote(x1_id)
>>> ray.get(y1_id)
1

>>> y2_id = f.remote(x2_id)
>>> ray.get(y2_id)
[1, 2, 3]
```

So when implementing a remote function, the function should expect a regular Python object regardless of whether the caller passes in a regular Python object or an object ID.

**Task dependencies affect scheduling.** In the example above, the task that creates `y1_id` depends on the task that creates `x1_id`. This has the following implications.

- The second task will not be executed until the first task has finished executing.
- If the two tasks are scheduled on different machines, the output of the first task (the value corresponding to `x1_id`) will be copied over the network to the machine where the second task is scheduled.

In [3]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
import ray
import time

In [4]:
ray.init(num_cpus=4, redirect_output=True)

Waiting for redis server at 127.0.0.1:19699 to respond...
Waiting for redis server at 127.0.0.1:52173 to respond...
Starting local scheduler with the following resources: {'CPU': 4, 'GPU': 0}.

View the web UI at http://localhost:8890/notebooks/ray_ui14943.ipynb?token=db9147cfd73bb4c4afe6ceb76a4dbf224ec40b1fcf03b389



{'local_scheduler_socket_names': ['/tmp/scheduler7091341'],
 'node_ip_address': '127.0.0.1',
 'object_store_addresses': [ObjectStoreAddress(name='/tmp/plasma_store60616400', manager_name='/tmp/plasma_manager53787345', manager_port=64137)],
 'redis_address': '127.0.0.1:19699',
 'webui_url': 'http://localhost:8890/notebooks/ray_ui14943.ipynb?token=db9147cfd73bb4c4afe6ceb76a4dbf224ec40b1fcf03b389'}

These are some helper functions that mimic an example pattern of a data parallel application.

**EXERCISE:** You will need to turn these functions into remote functions. When you turn these functions into remote function, you do not have to worry about whether the caller passes in an object ID or a regular object. In both cases, the arguments will be regular objects when the function executes. This means that even if you pass in an object ID, you **do not need to call `ray.get`** inside of these remote functions.

In [6]:
@ray.remote
def load_data(filename):
    time.sleep(0.1)
    return np.ones((1000, 100))
@ray.remote
def normalize_data(data):
    time.sleep(0.1)
    return data - np.mean(data, axis=0)
@ray.remote
def extract_features(normalized_data):
    time.sleep(0.1)
    return np.hstack([normalized_data, normalized_data ** 2])
@ray.remote
def compute_loss(features):
    num_data, dim = features.shape
    time.sleep(0.1)
    return np.sum((np.dot(features, np.ones(dim)) - np.ones(num_data)) ** 2)

**EXERCISE:** The loop below takes too long. Parallelize the four passes through the loop by turning `load_data`, `normalize_data`, `extract_features`, and `compute_loss` into remote functions and then retrieving the losses with `ray.get`.

**NOTE:** You should only use **ONE** call to `ray.get`. For example, the object ID returned by `load_data` should be passed directly into `normalize_data` without needing to be retrieved by the driver.

In [8]:
# Sleep a little to improve the accuracy of the timing measurements below.
time.sleep(2.0)
start_time = time.time()

losses = []
for filename in ['file1', 'file2', 'file3', 'file4']:
    data = load_data.remote(filename)
    normalized_data = normalize_data.remote(data)
    features = extract_features.remote(normalized_data)
    loss = compute_loss.remote(features)
    losses.append(loss)

loss = sum(ray.get(losses))

end_time = time.time()
duration = end_time - start_time

**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass.

In [9]:
assert loss == 4000
assert duration < 0.8, ('The loop took {} seconds. This is too slow.'
                        .format(duration))
assert duration > 0.4, ('The loop took {} seconds. This is too fast.'
                        .format(duration))

print('Success! The example took {} seconds.'.format(duration))

Success! The example took 0.4794468879699707 seconds.


**EXERCISE:** Use the UI to view the task timeline and to verify that the relevant tasks were executed in parallel. After running the cell below, you'll need to click on **View task timeline**".
- Using the **second** button, you can click and drag to **move** the timeline.
- Using the **third** button, you can click and drag to **zoom**. You can also zoom by holding "alt" and scrolling.

In the timeline, click on **View Options** and select **Flow Events** to visualize tasks dependencies.

In [10]:
import ray.experimental.ui as ui
ui.task_timeline()