# MapReduce

**GOAL:** The goal of this exercise is to show how to implement a toy version of the MapReduce system on top of Ray.

[MapReduce](https://en.wikipedia.org/wiki/MapReduce) is essentially a computational pattern for computing aggregate statistics of large datasets. It is the core primitive in systems like MapReduce, Hadoop, and Spark.

At its core, MapReduce consists of two primitives:
- The **map** transformation takes a dataset and a function and applies the function to each data point.
- The **reduce** transformation aggregates the output of the map stage.

For example, suppose that our starting point is a collection of documents. If we wish to count the number of occurrences of each word in the document, we can first apply a "map" transformation, which turns each document into a dictionary mapping words to the number of occurrences within that document. Then we can apply the "reduce" transformation, which sums the counts for each word.

In [1]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
import ray
import time

In [2]:
if not ray.is_initialized():
    ray.init(num_cpus=4, include_dashboard=False, ignore_reinit_error=True)

Below is a serial implementation of the **map** function. Note that Python already has a [built-in map function](https://docs.python.org/2/library/functions.html#map).

In [3]:
def map_serial(function, xs):
    return [function(x) for x in xs]

**EXERCISE:** Implement a parallel version of the map function. You'll need to modify the function below.

**NOTE:** Because we want `map_parallel` to be non-blocking, the function signature for `map_parallel` should be different from the signature of `map_serial`.
- The argument `function` must be a Ray remote function instead of a regular Python function.
- The return value should be a list of Ray `ObjectID`s instead of a list of the actual transformed values (so that `map_parallel` can return immediately).

In [4]:
if not ray.is_initialized():
    ray.init('auto')

def map_parallel(function, xs):
    """Apply a remote function to each element of a list."""
    if not isinstance(xs, list):
        raise ValueError('The xs argument must be a list.')
    
    if not hasattr(function, 'remote'):
        raise ValueError('The function argument must be a remote function.')

    return [function.remote(x) for x in xs]

def increment_regular(x):
    return x + 1

@ray.remote
def increment_remote(x):
    return x + 1

xs = [1, 2, 3, 4, 5]
result_ids = map_parallel(increment_remote, xs)
assert isinstance(result_ids, list), 'The output of "map_parallel" must be a list.'
assert all([isinstance(x, ray.ObjectID) for x in result_ids]), 'The output of map_parallel must be a list of ObjectIDs.'
assert ray.get(result_ids) == map_serial(increment_regular, xs)
print('Congratulations, the test passed!')  

Traceback (most recent call last):
  File "/tmp/streamflow/TzZgHJ/executor.py", line 105, in run_code
    exec(code_obj, user_global_ns, user_ns)
  File "", line 1, in <module>
TypeError: 'bool' object is not callable



**EXERCISE:** Run the cell below and verify that `parallel_map` runs instantaneously and that fetching the result takes the expected amount of time for a simple task that sleeps.

In [9]:
if not ray.is_initialized():
    ray.init('auto')

def sleep_regular(x):
    time.sleep(1)
    return x + 1


@ray.remote
def sleep_remote(x):
    time.sleep(1)
    return x + 1


# Regular sleep should take 4 seconds.
print('map_serial:')
%time results_serial = map_serial(sleep_regular, [1, 2, 3, 4])

# Initiaing the map_parallel should be instantaneous.
print('\ncalling map_parallel:')
%time result_ids = map_parallel(sleep_remote, [1, 2, 3, 4])

# Fetching the results from map_parallel should take 1 second
# (since we started Ray with num_cpus=4).
print('\ngetting results from map_parallel:')
%time results_parallel = ray.get(result_ids)

assert results_parallel == results_serial

map_serial:
CPU times: user 170 ms, sys: 40.5 ms, total: 210 ms
Wall time: 4 s

calling map_parallel:
CPU times: user 8.55 ms, sys: 0 ns, total: 8.55 ms
Wall time: 7.56 ms

getting results from map_parallel:
CPU times: user 42.5 ms, sys: 26.8 ms, total: 69.3 ms
Wall time: 1.01 s


**EXERCISE:** Take a look at the task timeline and verify that the four map tasks executed in parallel. Do this by running the next cell and clicking *"View task timeline"*.

**NOTE:** This will show all tasks that have been executed since `ray.init()` was called, which may be a lot.

To navigate the timeline:
- Click and drag to move.
- Scroll to zoom.

In [9]:
ray.timeline()

[{'cat': 'wait_for_function',
  'name': 'wait_for_function',
  'pid': '192.168.1.105',
  'tid': 'worker:ef16cfd065fec969ae1ecad845a8359dbdd9cd81',
  'ts': 1609845812139905.2,
  'dur': 5986.690521240234,
  'ph': 'X',
  'cname': 'detailed_memory_dump',
  'args': {}},
 {'cat': 'task:deserialize_arguments',
  'name': 'task:deserialize_arguments',
  'pid': '192.168.1.105',
  'tid': 'worker:ef16cfd065fec969ae1ecad845a8359dbdd9cd81',
  'ts': 1609845812146389.5,
  'dur': 195.50323486328125,
  'ph': 'X',
  'cname': 'rail_load',
  'args': {}},
 {'cat': 'wait_for_function',
  'name': 'wait_for_function',
  'pid': '192.168.1.105',
  'tid': 'worker:ef16cfd065fec969ae1ecad845a8359dbdd9cd81',
  'ts': 1609845793487254.0,
  'dur': 9683.370590209961,
  'ph': 'X',
  'cname': 'detailed_memory_dump',
  'args': {}},
 {'cat': 'task:deserialize_arguments',
  'name': 'task:deserialize_arguments',
  'pid': '192.168.1.105',
  'tid': 'worker:ef16cfd065fec969ae1ecad845a8359dbdd9cd81',
  'ts': 1609845793497068.8,
 

Below is a serial implementation of a simple **reduce** function. Note that Python has a [built-in reduce function](https://docs.python.org/2/library/functions.html#reduce).

The reduce function essentially aggregates all of the elements in a list (e.g., by summing them together).

In [10]:
def reduce_serial(function, xs):
    if len(xs) == 1:
        return xs[0]
    
    result = xs[0]
    for i in range(1, len(xs)):
        result = function(result, xs[i])

    return result


def add_regular(x, y):
    time.sleep(0.3)
    return x + y


assert reduce_serial(add_regular, [1, 2, 3, 4, 5, 6, 7, 8]) == 36

**EXERCISE:** Implement `reduce_parallel` below by modifying the serial reduce implementation to simply invoke `function` remotely (via `function.remote`).

Note that the underlying assumption here is that **function** is commutative and associative. That is, it shouldn't matter what order the elements are aggregated in (this is necessary for achieving parallelism).

In [17]:
def reduce_parallel(function, xs):
    if not isinstance(xs, list):
        raise ValueError('The xs argument must be a list.')

    if not hasattr(function, 'remote'):
        raise ValueError('The function argument must be a remote function.')

    if len(xs) == 1:
        return xs[0]

    result = xs[0]
    for i in range(1, len(xs)):
        result = function.remote(result, xs[i])

    return result


@ray.remote
def add_remote(x, y):
    time.sleep(0.3)
    return x + y

if not ray.is_initialized():
    ray.init(address='auto')

xs = [1, 2, 3, 4, 5, 6, 7, 8]
result_id = reduce_parallel(add_remote, xs)
assert ray.get(result_id) == reduce_serial(add_regular, xs)
print('Congratulations, the test passed!')

Congratulations, the test passed!


**EXERCISE:** Take a look at the timeline for the above computation graph from the call to `reduce_parallel`. Is there any parallelism?

In [20]:
ray.timeline()

[{'cat': 'fetch_and_run_function',
  'name': 'fetch_and_run_function',
  'pid': '192.168.1.105',
  'tid': 'worker:7d5ccf4fba261357c9b142e23677a1981241439b',
  'ts': 1609845787570135.8,
  'dur': 224.59030151367188,
  'ph': 'X',
  'cname': 'detailed_memory_dump',
  'args': {}},
 {'cat': 'fetch_and_run_function',
  'name': 'fetch_and_run_function',
  'pid': '192.168.1.105',
  'tid': 'worker:7d5ccf4fba261357c9b142e23677a1981241439b',
  'ts': 1609845787570414.5,
  'dur': 80.58547973632812,
  'ph': 'X',
  'cname': 'detailed_memory_dump',
  'args': {}},
 {'cat': 'wait_for_function',
  'name': 'wait_for_function',
  'pid': '192.168.1.105',
  'tid': 'worker:7d5ccf4fba261357c9b142e23677a1981241439b',
  'ts': 1609847120892560.8,
  'dur': 1161.5753173828125,
  'ph': 'X',
  'cname': 'detailed_memory_dump',
  'args': {}},
 {'cat': 'task:deserialize_arguments',
  'name': 'task:deserialize_arguments',
  'pid': '192.168.1.105',
  'tid': 'worker:7d5ccf4fba261357c9b142e23677a1981241439b',
  'ts': 1609847

There is no parallelism above. That is because the tasks were executed in a for loop, and the output of each task was passed as a dependency to the subsequent task! In order to expose more parallelism, we need to reduce the elements in a different order.

To illustrate the issue, note that we cannot execute the calls to `f` in parallel in

```python
f(f(f(f(f(f(f(1, 2), 3), 4), 5), 6), 7), 8)
```

which is what the above implementation does. However, we can execute some of the calls to `f` in parallel in

```python
f(f(f(1, 2), f(3, 4)), f(f(5, 6), f(7, 8)))
```

which is what a **tree reduce** does. Note that the two computations above are not equivalent in general, however, when `f` is associative and commutative, they are equivalent.

**EXERCISE:** Modify the reduce implementation to expose more parallelism.

In [21]:
def reduce_parallel_tree(function, xs):
    if not isinstance(xs, list):
        raise ValueError('The xs argument must be a list.')
    
    if not hasattr(function, 'remote'):
        raise ValueError('The function argument must be a remote function.')

        
    while len(xs) > 1:
        result_id = function.remote(xs[0], xs[1])
        xs = xs[2:]
        xs.append(result_id)
    
    return xs[0]


xs = [1, 2, 3, 4, 5, 6, 7, 8]
result_id = reduce_parallel_tree(add_remote, xs)
assert ray.get(result_id) == reduce_serial(add_regular, xs)

**EXERCISE:** Take a look at the timeline and see if the tasks for `reduce_parallel_tree` were executed in parallel.

In [22]:
ray.timeline()

[{'cat': 'wait_for_function',
  'name': 'wait_for_function',
  'pid': '192.168.1.105',
  'tid': 'worker:7d5ccf4fba261357c9b142e23677a1981241439b',
  'ts': 1609845812141988.0,
  'dur': 2457.857131958008,
  'ph': 'X',
  'cname': 'detailed_memory_dump',
  'args': {}},
 {'cat': 'task:deserialize_arguments',
  'name': 'task:deserialize_arguments',
  'pid': '192.168.1.105',
  'tid': 'worker:7d5ccf4fba261357c9b142e23677a1981241439b',
  'ts': 1609845812145132.0,
  'dur': 280.14183044433594,
  'ph': 'X',
  'cname': 'rail_load',
  'args': {}},
 {'cat': 'task:deserialize_arguments',
  'name': 'task:deserialize_arguments',
  'pid': '192.168.1.105',
  'tid': 'worker:7d5ccf4fba261357c9b142e23677a1981241439b',
  'ts': 1609847132092735.0,
  'dur': 223.1597900390625,
  'ph': 'X',
  'cname': 'rail_load',
  'args': {}},
 {'cat': 'task:execute',
  'name': 'task:execute',
  'pid': '192.168.1.105',
  'tid': 'worker:7d5ccf4fba261357c9b142e23677a1981241439b',
  'ts': 1609847132092965.2,
  'dur': 300420.999526

**EXERCISE:** Run the cell below and verify that `reduce_parallel_tree` runs instantaneously and that fetching the result takes the expected amount of time.

Each task takes 0.3 seconds, so both `reduce_serial` and `reduce_parallel` should take `8 * 0.3 = 2.4` seconds. The tree should have depth 3, so `reduce_parallel_tree` should take about `3 * 0.3 = 0.9` seconds.

In [23]:
# Regular sleep should take 4 seconds.
print('reduce_serial:')
%time results_serial = reduce_serial(add_regular, [1, 2, 3, 4, 5, 6, 7, 8])

# Initiating the map_parallel should be instantaneous.
print('\ncalling reduce_parallel:')
%time result_ids = reduce_parallel(add_remote, [1, 2, 3, 4, 5, 6, 7, 8])

# Fetching the results from map_parallel should take 1 second
# (since we started Ray with num_cpus=4).
print('\ngetting results from reduce_parallel:')
%time results_parallel = ray.get(result_ids)

assert results_parallel == results_serial

# Initiating the map_parallel should be instantaneous.
print('\ncalling reduce_parallel_tree:')
%time result_tree_ids = reduce_parallel_tree(add_remote, [1, 2, 3, 4, 5, 6, 7, 8])

# Fetching the results from map_parallel should take 1 second
# (since we started Ray with num_cpus=4).
print('\ngetting results from reduce_parallel_tree:')
%time results_parallel_tree = ray.get(result_tree_ids)

assert results_parallel_tree == results_serial

reduce_serial:
CPU times: user 84.5 ms, sys: 37.3 ms, total: 122 ms
Wall time: 2.1 s

calling reduce_parallel:
CPU times: user 6.2 ms, sys: 286 µs, total: 6.48 ms
Wall time: 4.03 ms

getting results from reduce_parallel:
CPU times: user 116 ms, sys: 20.2 ms, total: 136 ms
Wall time: 2.12 s

calling reduce_parallel_tree:
CPU times: user 6.08 ms, sys: 261 µs, total: 6.35 ms
Wall time: 3.95 ms

getting results from reduce_parallel_tree:
CPU times: user 53.1 ms, sys: 8.5 ms, total: 61.6 ms
Wall time: 909 ms
