# Part 2: MapReduce on Ray

In [1]:
# Optional pip3 install 
# !pip3 install ipywidgets

Defaulting to user installation because normal site-packages is not writeable
Collecting ipywidgets
  Downloading ipywidgets-8.1.2-py3-none-any.whl (139 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.4/139.4 KB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
Collecting widgetsnbextension~=4.0.10
  Downloading widgetsnbextension-4.0.10-py3-none-any.whl (2.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.3/2.3 MB[0m [31m63.3 MB/s[0m eta [36m0:00:00[0m:00:01[0m
Collecting jupyterlab-widgets~=3.0.10
  Downloading jupyterlab_widgets-3.0.10-py3-none-any.whl (215 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m215.0/215.0 KB[0m [31m27.6 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: widgetsnbextension, jupyterlab-widgets, ipywidgets
Successfully installed ipywidgets-8.1.2 jupyterlab-widgets-3.0.10 widgetsnbextension-4.0.10


In [4]:
# Run this either in your Notebook or your shell.
# If you have done so already, skip it.
# !bash ./download_gutenberg.sh

In [7]:
import ray
import time
import os
from collections import defaultdict

In [2]:
ray.init(address="ray://172.31.22.219:10001")

0,1
Python version:,3.10.12
Ray version:,2.9.3
Dashboard:,http://172.31.22.219:8265


In [3]:
print('''This cluster consists of
    {} nodes in total
    {} CPU resources in total
    {} Memory resources in total
    {} Object store memory resources in total
    '''.format(len(ray.nodes()), ray.cluster_resources()['CPU'], 
           ray.cluster_resources()['memory'] / (1024*1024*1024), 
           ray.cluster_resources()['object_store_memory'] / (1024*1024*1024)))

This cluster consists of
    5 nodes in total
    10.0 CPU resources in total
    25.173188403248787 Memory resources in total
    11.100003430619836 Object store memory resources in total
    


In [4]:
%%time

'''
(Optional testing code) 
Test your Ray cluster, see the outputs to check if all nodes are ready to work
the output should be like "123 tasks on 1.2.3.4".
By default, you have 5 nodes to execute remote tasks.
'''
import socket
from collections import Counter

@ray.remote
def f():
    time.sleep(0.001)
    # Return IP address.
    return socket.gethostbyname(socket.gethostname())

object_ids = [f.remote() for _ in range(1000)]
ip_addresses = ray.get(object_ids)

print('Tasks executed')
for ip_address, num_tasks in Counter(ip_addresses).items():
    print('    {} tasks on {}'.format(num_tasks, ip_address))

Tasks executed
    122 tasks on 172.31.22.219
    309 tasks on 172.31.29.149
    252 tasks on 172.31.22.79
    208 tasks on 172.31.30.214
    109 tasks on 172.31.17.183
CPU times: user 407 ms, sys: 74.3 ms, total: 481 ms
Wall time: 1.63 s


In [5]:
%%time

# You should have a mr_txt folder created after running download_gutenberg.sh. 
# Go ahead read the files into a list.
# Do not modify this cell.
corpus = []
for file_name in os.listdir('mr_txt'):
    file_path = os.path.join('mr_txt', file_name)
    f_txt = open(file_path,'r')
    content = f_txt.read()
    filtered_content = ''.join([char if char.isalpha() or char.isspace() or char == "'" else ' ' for char in content])
    corpus.extend(filtered_content.split())

f_txt.close()

CPU times: user 8.14 s, sys: 694 ms, total: 8.83 s
Wall time: 9.16 s


In [6]:
'''
Default num_map_tasks = 8 and num_reduce_tasks = 4
TODO: Try modifying the number of map tasks and the number of reduce tasks to see how that affect performance
'''
# The input data is initially partitioned into N buckets where N = num_map_tasks.
num_map_tasks = 8
num_reduce_tasks = 4

chunk = len(corpus) // num_map_tasks
input_buckets = [
    corpus[i * chunk: (i + 1) * chunk] if i < num_map_tasks - 1 else corpus[i * chunk:]
    for i in range(num_map_tasks)
]

In [26]:
'''
TODO: Task 1: Implement the map task with ray.remote() decorator
'''
@ray.remote(scheduling_strategy="SPREAD")
def do_map_task(map_input, num_reduce_tasks = 4): # by default num_reduce_tasks = 4
    """
    Launches a map task on the given 'map_input' and divide the keys into buckets for 
        'num_reduce_tasks' reduce tasks, where 'num_reduce_tasks' is the number of reduce tasks.
    Steps:
    1. Initialize an empty list named intermediate_results with a length equal to num_reduce_tasks.
    2. Call the map_function on the map_input bucket to emit a list of key-value (KV) pairs.
    3. Calculate which reduce task a KV pair should go to and append the KV pair to the corresponding
        intermediate results buckets. To do so:
        Iterate through each entry in the KV pair list and hash partition (with hash(key)) 
        the KV pairs across M reduce tasks using the modulo (%) operation where M = 'num_reduce_tasks'. 
    4. Return a list of intermediate result buckets. 

    Parameters:
    - map_input (list): The input data to be mapped.
    - num_reducers (int): The number of buckets to divide the 'map_input' into (default is 4).

    Returns:
    - intermediate_results (list): A list of intermediate results buckets.
    """

    intermediate_results = [list() for _ in range(num_reduce_tasks)]
    # TODO: start your implementation below this line

    # step 2: Call the map_function on the map_input bucket to emit a list of key-value (KV) pairs.
    kv_pairs = map_function(map_input)
    
    # step 3: Calculate which reduce task a KV pair should go to and append the KV pair to the corresponding intermediate results buckets.
    for k, v in kv_pairs:
        # calculate the reduce index of this KV pair should go to in the intermediate results list
        reduce_idx = hash(k) % num_reduce_tasks
        # put this KV pair to the buket in the intermediate results list according to the index we calculated
        intermediate_results[reduce_idx].append((k, v))

    # Do not modify the statement below        
    return intermediate_results

In [9]:
'''
TODO: Task 2: Implement the map function
'''
def map_function(map_input):
    """
    Converts a 'map_input' into a list of key-value pairs, where the key is the word and the 
        value is set to 1.
    Steps:
        1. Create an empty list named kvs to store key-value pairs.
        2. Iterate through each word in the input and create a (word, 1) tuple and append it to the kvs list 
            (IMPORTANT: the 'word' emitted should be in **lowercase** so that the same words, regardless of their 
            original case, can be combined).
        3. After processing all words, return the kvs list as the result.

    Parameters:
    - map_input (list): List of words representing a partition of the entire document txts.

    Returns:
    - list: A list of key-value pairs generated from the input document.
    """

    kv_list = list()
    # TODO: start your implementation below this line

    # step 2: Iterate through each word in the input and create a (word, 1) tuple and append it to the kvs list 
    for w in map_input:
        kv_list.append((w.lower(), 1))

    # Do not modify the statement below
    return kv_list

In [10]:
'''
TODO: Task 3: Implement the reduce task with ray.remote() decorator
'''
@ray.remote(scheduling_strategy="SPREAD")
def do_reduce_task(*my_intermediate_results):
    """
    Launches a reduce task on a list of 'my_intermediate_results' buckets assigned to this reducer.
    Steps:
        1. Iterate over each bucket in the argument and append the KV pair to a dict. To do so:
            Iterate over each KV pair in the bucket, and append the value ('1') to its corresponding key (a word) in 
            the dict.
        2. Iterate over each item in the dict and calculate the word count (i.e., number of '1's) for each key by calling
            reduce_function() that you will implement for Task 4, then assign the word count to the corresponding entry in
            partial_output (which is another dict).

    Parameters:
    - *my_intermediate_results: A list of 'num_map_tasks' intermediate results buckets containing '(word, 1)'
        KV pairs, where 'num_map_tasks' is the number of map tasks. 

    Returns:
    - partial_output (dict): A dict that holds the result of this reduce task, using the word as the key and its count 
        as the value.
    """

    partial_output = dict()
    # TODO: start your implementation below this line

    # step 1: Iterate over each bucket in the argument and append the KV pair to a dict.
    # we use defaultdict, so do not need to consider multiple cases for whether the k in dict
    this_is_a_dict = defaultdict(list)
    for bucket in my_intermediate_results:
        for k, v in bucket:
            this_is_a_dict[k].append(1)

    # step 2: Iterate over each item in the dict and calculate the word count (i.e., number of '1's) for each key by calling reduce_function()
    for k, v in this_is_a_dict.items():
        partial_output[k] = reduce_function(v)

    # Do not modify the statement below          
    return partial_output

In [11]:
'''
TODO: Task 4: Implement the reduce function
'''
def reduce_function(val):
    """
    Simply return the number of '1's in val

    Parameters:
    - val: A list of '1's.

    Returns:
    - count: number of '1's in val

    """

    count = 0
    # TODO: Start your implementation below this line
    # count the number of 1 in this list val
    # len() O(1)
    # sum() O(n)
    # so, we use len()
    count = len(val)

    # Do not modify the statement below
    return count

In [27]:
'''
Driver code to launch the entire MapReduce WordCount job.
It will first execute the map phase, then the reduce phase.
While this cell is running, you should keep an eye on the Ray dashboard to monitor the task execution progress.
'''

# Driver code to launch remote map tasks on the Ray cluster.
# Do not modify this cell! 
intermediate_results = [
    do_map_task.options(num_returns = num_reduce_tasks) # each do_map_task should return a list of results
    .remote(input_bucket, num_reduce_tasks) # each remote task takes in two arguments: input_bucket and num_reducers
    for input_bucket in input_buckets
]

# Driver code to launch remote reduce tasks on the Ray cluster.
# Do not modify this cell!
merged_partial_results = []
for reducer_idx in range(num_reduce_tasks):
    merged_partial_results.append(
        do_reduce_task.remote(*[map_output[reducer_idx] for map_output in intermediate_results])
    )

In [28]:
intermediate_results

[[ClientObjectRef(a15b7accf9aefd74ffffffffffffffffffffffff0100000001000000),
  ClientObjectRef(a15b7accf9aefd74ffffffffffffffffffffffff0100000002000000),
  ClientObjectRef(a15b7accf9aefd74ffffffffffffffffffffffff0100000003000000),
  ClientObjectRef(a15b7accf9aefd74ffffffffffffffffffffffff0100000004000000)],
 [ClientObjectRef(eefa36c40aa64f51ffffffffffffffffffffffff0100000001000000),
  ClientObjectRef(eefa36c40aa64f51ffffffffffffffffffffffff0100000002000000),
  ClientObjectRef(eefa36c40aa64f51ffffffffffffffffffffffff0100000003000000),
  ClientObjectRef(eefa36c40aa64f51ffffffffffffffffffffffff0100000004000000)],
 [ClientObjectRef(39a9c16c1b7014b3ffffffffffffffffffffffff0100000001000000),
  ClientObjectRef(39a9c16c1b7014b3ffffffffffffffffffffffff0100000002000000),
  ClientObjectRef(39a9c16c1b7014b3ffffffffffffffffffffffff0100000003000000),
  ClientObjectRef(39a9c16c1b7014b3ffffffffffffffffffffffff0100000004000000)],
 [ClientObjectRef(12e820145b29561effffffffffffffffffffffff010000000100000

In [29]:
merged_partial_results

[ClientObjectRef(427377ff619e0452ffffffffffffffffffffffff0100000001000000),
 ClientObjectRef(36ce8c2ef257df6affffffffffffffffffffffff0100000001000000),
 ClientObjectRef(42f2c5171ec3e4c8ffffffffffffffffffffffff0100000001000000),
 ClientObjectRef(c04bfe8c99742b41ffffffffffffffffffffffff0100000001000000)]

In [30]:
'''
Collect and aggregate reduce tasks' output results at the client.
'''
wc = dict()
wc_sorted = []
for res in ray.get(merged_partial_results):
    for key, val in res.items():
        if key not in wc:
            wc[key] = val
        else:
            wc[key] += val

# Sort all word counts
wc_sorted = sorted(wc.items(), key=lambda item: item[1], reverse=True)

# Print the top 20 words (for debugging purpose)
for word, count in wc_sorted[:20]:
    print(f"{word}, {count}")

# Save wc_sorted to a file
with open('mr_wc.output', 'w') as file:
    for word, count in wc_sorted:
        file.write(f"{word} {count}\n")

the, 527056
and, 343262
of, 300909
to, 204749
a, 157829
in, 141234
i, 109727
that, 108740
he, 82750
it, 81941
for, 75291
with, 73204
his, 69617
was, 66737
is, 63174
you, 62459
be, 56286
not, 54910
as, 53968
but, 46252


In [31]:
'''
Run the wc test script (it returns either a PASS or a FAIL).
'''
!bash ./test_mr.sh

--- wc test: PASS


In [None]:
# Disconnect the ray cluster by ray.shutdown()
# ray.shutdown()