# Part 2: MapReduce on Ray

In [1]:
# Optional pip3 install 
# !pip3 install ipywidgets

In [2]:
# Run this either in your Notebook or your shell.
# If you have done so already, skip it.
# !bash ./download_gutenberg.sh

In [3]:
import ray
import time
import os

In [4]:
ray.init(address="ray://172.31.2.53:10001")

SIGTERM handler is not set because current thread is not the main thread.


0,1
Python version:,3.10.12
Ray version:,2.9.3
Dashboard:,http://127.0.0.1:8265


In [5]:
print('''This cluster consists of
    {} nodes in total
    {} CPU resources in total
    {} Memory resources in total
    {} Object store memory resources in total
    '''.format(len(ray.nodes()), ray.cluster_resources()['CPU'], 
           ray.cluster_resources()['memory'] / (1024*1024*1024), 
           ray.cluster_resources()['object_store_memory'] / (1024*1024*1024)))

This cluster consists of
    5 nodes in total
    10.0 CPU resources in total
    25.069031909108162 Memory resources in total
    11.055059049278498 Object store memory resources in total
    


In [6]:
%%time

'''
(Optional testing code) 
Test your Ray cluster, see the outputs to check if all nodes are ready to work
the output should be like "123 tasks on 1.2.3.4".
By default, you have 5 nodes to execute remote tasks.
'''
import socket
from collections import Counter

@ray.remote
def f():
    time.sleep(0.001)
    # Return IP address.
    return socket.gethostbyname(socket.gethostname())

object_ids = [f.remote() for _ in range(1000)]
ip_addresses = ray.get(object_ids)

print('Tasks executed')
for ip_address, num_tasks in Counter(ip_addresses).items():
    print('    {} tasks on {}'.format(num_tasks, ip_address))

Tasks executed
    14 tasks on 172.31.12.93
    664 tasks on 172.31.13.109
    321 tasks on 172.31.1.0
    1 tasks on 172.31.2.53
CPU times: user 422 ms, sys: 92 ms, total: 514 ms
Wall time: 2.62 s


In [7]:
%%time

# You should have a mr_txt folder created after running download_gutenberg.sh. 
# Go ahead read the files into a list.
# Do not modify this cell.
corpus = []
for file_name in os.listdir('mr_txt'):
    file_path = os.path.join('mr_txt', file_name)
    f_txt = open(file_path,'r')
    content = f_txt.read()
    filtered_content = ''.join([char if char.isalpha() or char.isspace() or char == "'" else ' ' for char in content])
    corpus.extend(filtered_content.split())

f_txt.close()

CPU times: user 8.69 s, sys: 631 ms, total: 9.32 s
Wall time: 9.36 s


In [8]:
'''
Default num_map_tasks = 8 and num_reduce_tasks = 4
TODO: Try modifying the number of map tasks and the number of reduce tasks to see how that affect performance
'''
# The input data is initially partitioned into N buckets where N = num_map_tasks.
num_map_tasks = 8
num_reduce_tasks = 4

chunk = len(corpus) // num_map_tasks
input_buckets = [
    corpus[i * chunk: (i + 1) * chunk] if i < num_map_tasks - 1 else corpus[i * chunk:]
    for i in range(num_map_tasks)
]

In [9]:
'''
TODO: Task 1: Implement the map task with ray.remote() decorator
'''
@ray.remote
def do_map_task(map_input, num_reduce_tasks = 4): # by default num_reduce_tasks = 4
    """
    Launches a map task on the given 'map_input' and divide the keys into buckets for 
        'num_reduce_tasks' reduce tasks, where 'num_reduce_tasks' is the number of reduce tasks.
    Steps:
    1. Initialize an empty list named intermediate_results with a length equal to num_reduce_tasks.
    2. Call the map_function on the map_input bucket to emit a list of key-value (KV) pairs.
    3. Calculate which reduce task a KV pair should go to and append the KV pair to the corresponding
        intermediate results buckets. To do so:
        Iterate through each entry in the KV pair list and hash partition (with hash(key)) 
        the KV pairs across M reduce tasks using the modulo (%) operation where M = 'num_reduce_tasks'. 
    4. Return a list of intermediate result buckets. 

    Parameters:
    - map_input (list): The input data to be mapped.
    - num_reducers (int): The number of buckets to divide the 'map_input' into (default is 4).

    Returns:
    - intermediate_results (list): A list of intermediate results buckets.
    """
    emitted_kv_pairs = map_function(map_input)
    intermediate_results = [list() for _ in range(num_reduce_tasks)]
    # TODO: start your implementation below this line
    for key, value in emitted_kv_pairs:
        # Determine the reduce task for this KV pair
        reduce_task_index = hash(key) % num_reduce_tasks
        # Append the KV pair to the corresponding bucket
        intermediate_results[reduce_task_index].append((key, value))
    # Do not modify the statement below        
    return intermediate_results

In [10]:
'''
TODO: Task 2: Implement the map function
'''
def map_function(map_input):
    """
    Converts a 'map_input' into a list of key-value pairs, where the key is the word and the 
        value is set to 1.
    Steps:
        1. Create an empty list named kvs to store key-value pairs.
        2. Iterate through each word in the input and create a (word, 1) tuple and append it to the kvs list 
            (IMPORTANT: the 'word' emitted should be in **lowercase** so that the same words, regardless of their 
            original case, can be combined).
        3. After processing all words, return the kvs list as the result.

    Parameters:
    - map_input (list): List of words representing a partition of the entire document txts.

    Returns:
    - list: A list of key-value pairs generated from the input document.
    """

    kv_list = list()
    # TODO: start your implementation below this line
    for word in map_input:
        # Convert word to lowercase and create a (word, 1) tuple
        word_kv = (word.lower(), 1)
        # Append the tuple to the kv_list
        kv_list.append(word_kv)
    # Do not modify the statement below
    return kv_list

In [40]:
'''
TODO: Task 3: Implement the reduce task with ray.remote() decorator
'''
@ray.remote
def do_reduce_task(*my_intermediate_results):
    """
    Launches a reduce task on a list of 'my_intermediate_results' buckets assigned to this reducer.
    Steps:
        1. Iterate over each bucket in the argument and append the KV pair to a dict. To do so:
            Iterate over each KV pair in the bucket, and append the value ('1') to its corresponding key (a word) in 
            the dict.
        2. Iterate over each item in the dict and calculate the word count (i.e., number of '1's) for each key by calling
            reduce_function() that you will implement for Task 4, then assign the word count to the corresponding entry in
            partial_output (which is another dict).

    Parameters:
    - *my_intermediate_results: A list of 'num_map_tasks' intermediate results buckets containing '(word, 1)'
        KV pairs, where 'num_map_tasks' is the number of map tasks. 

    Returns:
    - partial_output (dict): A dict that holds the result of this reduce task, using the word as the key and its count 
        as the value.
    """

    partial_output = dict()
    # TODO: start your implementation below this line
    kv_dict = dict()
    for bucket in my_intermediate_results:
        for key, value in bucket:
            if key in kv_dict:
                kv_dict[key].append(value)
            else:
                kv_dict[key] = [value]
    for key, val in kv_dict.items():
        count = reduce_function(val)
        partial_output[key] = count
    # Do not modify the statement below          
    return partial_output

In [41]:
'''
TODO: Task 4: Implement the reduce function
'''
def reduce_function(val):
    """
    Simply return the number of '1's in val

    Parameters:
    - val: A list of '1's.

    Returns:
    - count: number of '1's in val

    """

    count = 0
    # TODO: Start your implementation below this line
    count = sum(val)
    # Do not modify the statement below
    return count

In [None]:
'''
Driver code to launch the entire MapReduce WordCount job.
It will first execute the map phase, then the reduce phase.
While this cell is running, you should keep an eye on the Ray dashboard to monitor the task execution progress.
'''

# Driver code to launch remote map tasks on the Ray cluster.
# Do not modify this cell! 
intermediate_results = [
    do_map_task.options(num_returns = num_reduce_tasks) # each do_map_task should return a list of results
    .remote(input_bucket, num_reduce_tasks) # each remote task takes in two arguments: input_bucket and num_reducers
    for input_bucket in input_buckets
]

# Driver code to launch remote reduce tasks on the Ray cluster.
# Do not modify this cell!
merged_partial_results = []
for reducer_idx in range(num_reduce_tasks):
    merged_partial_results.append(
        do_reduce_task.remote(*[map_output[reducer_idx] for map_output in intermediate_results])
    )

In [37]:
#intermediate_results

In [38]:
#merged_partial_results

In [39]:
'''
Collect and aggregate reduce tasks' output results at the client.
'''
wc = dict()
wc_sorted = []
for res in ray.get(merged_partial_results):
    for key, val in res.items():
        if key not in wc:
            wc[key] = val
        else:
            wc[key] += val

# Sort all word counts
wc_sorted = sorted(wc.items(), key=lambda item: item[1], reverse=True)

# Print the top 20 words (for debugging purpose)
for word, count in wc_sorted[:20]:
    print(f"{word}, {count}")

# Save wc_sorted to a file
with open('mr_wc.output', 'w') as file:
    for word, count in wc_sorted:
        file.write(f"{word} {count}\n")

RayTaskError(TypeError): [36mray::do_reduce_task()[39m (pid=1177, ip=172.31.12.93)
  File "/tmp/ipykernel_1465/1033647018.py", line 32, in do_reduce_task
  File "/tmp/ipykernel_1465/3088041794.py", line 18, in reduce_function
TypeError: 'int' object is not iterable

In [None]:
'''
Run the wc test script (it returns either a PASS or a FAIL).
'''
!bash ./test_mr.sh

In [None]:
# Disconnect the ray cluster by ray.shutdown()
ray.shutdown()