# Scalable Data Processing Using Ray


## 1: Learning the Ray API

Let's start by digging into the Ray API and familiarize ourselves with some of its functionalities. Again, please reference the [Ray documentation](https://ray.readthedocs.io/en/latest/) and [Ray codebase](https://github.com/ray-project/ray) if you have any questions.

In [2]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

import re
import time
import urllib.request

from utils import timeit

import ray
from colorama import Fore

import warnings
warnings.filterwarnings("ignore")


%matplotlib inline

To start Ray, we will first import ray and call `ray.init`. We will use 8 cpus. 



In [3]:
ray.init(
    num_cpus=8, # We will be using 8 workers
    include_webui=False,  
    plasma_directory='/tmp', # The object store is mounted to local file system
    ignore_reinit_error=True,
    object_store_memory=int(2*1e9), 
)



2019-05-06 23:00:34,621	INFO node.py:423 -- Process STDOUT and STDERR is being redirected to /tmp/ray/session_2019-05-06_23-00-34_107/logs.
2019-05-06 23:00:34,741	INFO services.py:363 -- Waiting for redis server at 127.0.0.1:51074 to respond...
2019-05-06 23:00:34,886	INFO services.py:363 -- Waiting for redis server at 127.0.0.1:32007 to respond...
2019-05-06 23:00:34,894	INFO services.py:760 -- Starting Redis shard with 0.43 GB max memory.
2019-05-06 23:00:34,953	INFO services.py:1384 -- Starting the Plasma object store with 2.0 GB memory using /tmp.


{'node_ip_address': None,
 'redis_address': '10.244.6.85:51074',
 'object_store_address': '/tmp/ray/session_2019-05-06_23-00-34_107/sockets/plasma_store',
 'webui_url': None,
 'raylet_socket_name': '/tmp/ray/session_2019-05-06_23-00-34_107/sockets/raylet'}

In [4]:
# x is a python object within this process.
x = 42  
print(f"""
    x's type is {Fore.BLUE} {type(x)} {Fore.RESET}
    and its value is {Fore.BLUE} {x} {Fore.RESET}
""")


    x's type is [34m <class 'int'> [39m
    and its value is [34m 42 [39m



Now we copy x to Ray's object store. This lets you share x among different Ray worker processes.

In [5]:
# Copy x to Ray's object store. 
# This lets you share x among different Ray worker processes.
x_id = ray.put(x)  
print(f"""
    x_id's type is {Fore.BLUE} {type(x_id)} {Fore.RESET}
    and its value is {Fore.BLUE} {x_id} {Fore.RESET}
""")


    x_id's type is [34m <class 'ray._raylet.ObjectID'> [39m
    and its value is [34m ObjectID(ffffffffa878b161ad66d275054aef7fe08ba819) [39m



We can retrieve the object using x_id from Ray's object store to current process with `ray.get`.

In [6]:
x_copy = ray.get(x_id)  
print(f"""
    x_copy's type is {Fore.BLUE} {type(x_copy)} {Fore.RESET}
    and its value is {Fore.BLUE} {x_copy} {Fore.RESET}
""")


    x_copy's type is [34m <class 'int'> [39m
    and its value is [34m 42 [39m



You can pass a list of `ObjectID`s to `ray.get` in order to retrieve a list of objects stored in Ray's object store:

In [7]:
# Store 49 in Ray's object store.
y_id = ray.put(49)  
# Retrieve a list of ObjectIDs from Ray's object store.
x_copy, y_copy = ray.get([x_id, y_id])  

print(f"""
    x_copy's type is {Fore.BLUE} {type(x_copy)} {Fore.RESET}
    and its value is {Fore.BLUE} {x_copy} {Fore.RESET}
    
    y_copy's type is {Fore.BLUE} {type(y_copy)} {Fore.RESET}
    and its value is {Fore.BLUE} {y_copy} {Fore.RESET}
""")


    x_copy's type is [34m <class 'int'> [39m
    and its value is [34m 42 [39m
    
    y_copy's type is [34m <class 'int'> [39m
    and its value is [34m 49 [39m



### Fooling around 


In [8]:
a = "Go bears!"
b = [i for i in range(10)]
c = {"Berkeley": "#1", "Stanford": "#2"}

a_id = ray.put(a)
b_id = ray.put(b)
c_id = ray.put(c)

a_copy = ray.get(a_id)
b_copy = ray.get(b_id)
c_copy = ray.get(c_id)

In [10]:
def square(x):
    return x**2

@ray.remote
def square_ray(x):
    return x**2

We'll call functions that we wish to parallelize *remote functions* because we intend to run them remotely in a different process instead of in the current process (the remote process could be on the same machine or on a different machine). A [*process*](https://en.wikipedia.org/wiki/Process_(computing)) consists of program code and activity. Programs, including those written with Ray, may consist of multiple processes which allow them to execute code in parallel.

Remote functions are invoked with the `.remote()` method. You can pass function arguments to the `.remote` method which will immediately return an `ObjectID` for the return value of the function and launch a *task* that executes the function. This means that Ray returns an `ObjectID` repersenting the result of a function before the function finishes exceuting which can be useful if you'd like a long-running function to execute in the background while running other computations.

In [11]:
result_id = square_ray.remote(42)

result = ray.get(result_id)

print(f"""
>>> result_id = square_ray.remote(42)
        result_id's type is {Fore.BLUE} {type(result_id)} {Fore.RESET}
        and its value is {Fore.BLUE} {result_id} {Fore.RESET}
>>> result = ray.get(result_id)    
        result's type is {Fore.BLUE} {type(result)} {Fore.RESET}
        and its value is {Fore.BLUE} {result} {Fore.RESET}
>>> 42**2
       {Fore.BLUE} {square(42)}  {Fore.RESET}
""")


>>> result_id = square_ray.remote(42)
        result_id's type is [34m <class 'ray._raylet.ObjectID'> [39m
        and its value is [34m ObjectID(0100000014308b5df42857f339241fe0b37b8fce) [39m
>>> result = ray.get(result_id)    
        result's type is [34m <class 'int'> [39m
        and its value is [34m 1764 [39m
>>> 42**2
       [34m 1764  [39m



### More fooling around


In [12]:
# This function is a proxy for a more interesting and computationally intensive function.
def slow_function():
    time.sleep(0.2)
    return 42

In [13]:
# Hint: use a decorator here
@ray.remote
def slow_function_ray():
    time.sleep(0.2)
    return 42

result = ray.get(slow_function_ray.remote())

### Verification

Now let's verify that remote functions are actually running in parallel. `slow_function` takes around 0.2 seconds to run. If we execute 2 slow functions sequentially, executing both should take $ 2 \times 0.2 = 0.4 $ seconds. However, if we run 2 slow functions in parallel, executing both should still take only 0.2 seconds.

Let's try it out! 

In [15]:
start_time = time.perf_counter()

# YOUR CODE HERE: 
# Make the following line execute in 0.2 seconds using Ray
# results = [slow_function() for _ in range(2)]

result_ids = [slow_function_ray.remote() for _ in range(2)]
results = ray.get(result_ids)

end_time = time.perf_counter()

print(f"Time to compute results: {Fore.BLUE} {end_time - start_time :.5f} {Fore.RESET} seconds")

Time to compute results: [34m 0.20891 [39m seconds


Arguments to remote functions can either be regular Python objects or `ObjectID`s. Passing `ObjectID`s as arguments can be useful for calling remote functions on the results of other remote functions. This allows you to initiate tasks that depend on other tasks before any of the tasks complete.

Therefore, the following is equivalent!

In [17]:
# Call square_ray on a python object
result1_id = square_ray.remote(42)

# Call square_ray on an ObjectID
arg_id = ray.put(42)
result2_id = square_ray.remote(arg_id)

result1, result2 = ray.get([result1_id, result2_id])

print(f"{Fore.BLUE} {result1} == {result2} {Fore.RESET}")

[34m 1764 == 1764 [39m


A [*task*](https://en.wikipedia.org/wiki/Task_%28computing%29) is a loosely-defined term referring to a unit of work. In the context of Ray, tasks correspond to code which Ray executes. Calling a remote function corresponds to creating a task.

In [18]:
result = 5
for _ in range(3):
    result = square(result)

print(f"5^8 = {Fore.BLUE} {result} {Fore.RESET}")

5^8 = [34m 390625 [39m


In [19]:
x = 5
x_id = ray.put(x)
for _ in range(3):
    x_id = square_ray.remote(x_id)

result = ray.get(x_id)

## 2: Parallel Bootstrap


In [21]:
def simple_resample(n):
    """
    Args:
        n: an integer
        
    Returns:
        an array of length n of a random sample with replacement of
        the integers 0, 1, ..., n-1
    """
    return np.random.randint(low=0, high=n, size=n)

def bootstrap_serial(boot_pop, statistic, resample, replicates = 1000):
    """
    Args:
        boot_pop: an array of shape n x d.
        statistic: a function which takes boot_pop and returns a number.
        resample: a function which takes n and returns a random sample from the integers [0, n)
        replicates: the number of resamples
        
    Returns:
        an array of length replicates, each entry being the statistic computed on a bootstrap sample of the data.
    """
    n = len(boot_pop)
    resample_estimates = np.array([statistic(boot_pop[resample(n)]) for _ in range(replicates)])
    return resample_estimates


Use the @ray.remote decorator along with the `bootstrap_serial` function defined above to write a `bootstrap_remote` function.


In [22]:
@ray.remote
def bootstrap_remote(boot_pop, statistic, resample, replicates = 1000):
    """Run bootstrap_serial remotely
    Args:
        boot_pop: an array of shape n x d.
        statistic: a function which takes boot_pop and returns a number.
        resample: a function which takes n and returns a random sample from the integers [0, n)
        replicates: the number of resamples
        
    Returns:
        an array of length replicates, each entry being the statistic computed on a bootstrap sample of the data.
    """
    n = len(boot_pop)
    resmaple_rep = np.array([statistic(boot_pop[resample(n)]) for _ in range(replicates)])
    return resample_rep

In [24]:
# Load data
data = pd.read_csv("grades_sample.csv")
boot_pop = np.array(data["Grade"])
num_bootstrap_resample = 500000

In [25]:
start_time = time.perf_counter()
boot_sample_means_serial = bootstrap_serial(boot_pop, np.mean, simple_resample, num_bootstrap_resample)
bootstrap_serial_time = time.perf_counter() - start_time
print(f"Bootstrap serial completed in {Fore.BLUE} {bootstrap_serial_time:.2f} seconds {Fore.RESET}")

Bootstrap serial completed in [34m 27.55 seconds [39m



That took a while to run! Now, let's implement bootstrap in parallel using your new `bootstrap_remote` function.

In [None]:
start_time = time.perf_counter()
NUM_TASKS = 10

boot_pop_id = ray.put(boot_pop)
num_bootstrap_resample_id = ray.put(num_bootstrap_resample//10)

results_id = [bootstrap_remote.remote(boot_pop, np.mean, simple_resample, num_bootstrap_resample//10) for _ in range(0, NUM_TASKS)]
boot_sample_means_parallel = ray.get(results_id)

boot_sample_means_parallel = np.array(boot_sample_means_parallel)
boot_sample_means_parallel = boot_sample_means_parallel.reshape(num_bootstrap_resample)


bootstrap_parallel_time = time.perf_counter() - start_time
print(f"Bootstrap parallel completed in {Fore.BLUE} {bootstrap_parallel_time:.2f} seconds {Fore.RESET}")

Now let's examine the resulting distributions. 

In [None]:
plt.figure(figsize=(8, 4))
plt.subplot(121)
plt.title("Serial Bootstrap Distribution")
plt.xlabel(r"$\bar{X}$")
plt.ylabel("Frequency");
sns.distplot(boot_sample_means_serial)

plt.subplot(122)
plt.title("Parallel Bootstrap Distribution")
plt.xlabel(r"$\bar{X}$")
plt.ylabel("Frequency");
sns.distplot(boot_sample_means_parallel);

At a high level, you parallelized a slow piece of code by splitting it into many tasks, and then *reduced* the result of the individual tasks to produce a output identical to the slow code.

In addition, you specifically parallelized the `for` loop in `bootstrap_serial` to speed up bootstrap. Parallelizing loops can often speed up code which allows data scientists to process data more quickly.

This idea of parallelizing certain code and reducing the results led to a more general programming model used to reason about and implement parallel programs called MapReduce. We'll introduce and implement MapReduce in Problem 3.

## 3: Implementing MapReduce

In [26]:
def map_serial(function, element_lst):
    """Apply a function to each element of a list
    Args:
        function: a function that takes in one argument as input and outputs a value
        element_lst: a list of elements that function will be applied to
    Returns:
        A list of all of the elements each transformed by the function
        (ie [function(elem_1), function(elem_2), ..., function(elem_n)])
    """
    return [function(elem) for elem in element_lst]

Here's an example of running `map_serial`:

In [27]:
map_serial(lambda x: x * x, [1, 2, 3, 4, 5])

[1, 4, 9, 16, 25]

Now, implement `map_parallel` in the cell below using the Ray API.
<!--
BEGIN QUESTION
name: q3a
manual: false
points: 1
-->

In [28]:
def map_parallel(function, arglist):
    """Apply a function to each element of a list in parallel.
    Args:
        function: a remote function that takes in one argument as input and outputs an ObjectID
        arglist: a list of arguments that the function will be applied to
    Returns:
        A list of ObjectIDs
    """
    if not isinstance(arglist, list):
        raise ValueError("The arglist argument must be a list.")
    
    if not hasattr(function, "remote"):
        raise ValueError("The function argument must be a remote function.")
            
    else:
        results_id = [function.remote(y) for y in arglist]
    
    return results_id


# Please DO NOT EDIT the code below this comment
# The code below compares map_parallel and map_serial's performance
@ray.remote
def square_remote(x):
    return x * x

arglist = [1, 2, 3, 4, 5]

with timeit('parallel'):
    result_ids = map_parallel(square_remote, arglist)

assert isinstance(result_ids[0], ray.ObjectID), "map_parallel should return a list of ObjectIDs"

result_ray = ray.get(result_ids)

with timeit('serial'):
    result_serial = map_serial(lambda x: x * x, arglist)

parallel: 0.0008160080760717392 s elapsed
serial: 7.799826562404633e-06 s elapsed


You may notice that for a simple function like square operating on a small list of arguments, `map_parallel` is **slower** than `map_serial`. The difference in speed is due to _communication and data transfer_ overhead between the processes which Ray uses to run functions in parallel.


In [30]:
def reduce_serial(function, items):
    """Apply a function repeatedly to pairs of items until only 1 remains
    
    Args:
        function: remote function that takes 2 items as input and returns 1 new item.
        items: a list of items which are reduced to 1 output by repeatedly calling function.
    
    Example:
    ```
    >>> reduce_serial(sum, [1,2,3])
    6
    >>> reduce_serial(lambda x, y: x - y, [3,2,1])
    0
    ```
    
    Returns the resulting item.
    """
    if len(items) == 1:
        return items[0]
    
    result = items[0]
    for i in range(1, len(items)):
        result = function(result, items[i])

    return result

Now implement `reduce_parallel` below.
<!--
BEGIN QUESTION
name: q3b
manual: false
points: 1
-->

In [32]:
def reduce_parallel(function, items):
    """Apply a function repeatedly to pairs of items until only 1 remains.
    
    Args:
        function: remote function that takes 2 items as input and returns 1 new item.
        items: a list of items which are reduced to 1 output by repeatedly calling function.
    
    Returns an ObjectID.
    
    Hint:
        1. Divide the list of items into pairs.
        2. Reduce each pair to generate a new list of items.
        3. If there was an unpaired item in (1), add it to the new list.
        The new list should be about 1/2 the size of the old list.
        4. If there is only 1 item in the new list, return that item. Otherwise, repeat steps 1-3.
        
        This algorithm is called a "tree-reduce", where the original items are the leaves
        and the final result is the root. The tree is balanced. Each non-leaf node has 2 child nodes.
        Each non-root node has 1 parent node.
    """
    if not isinstance(items, list):
        raise ValueError("The items argument must be a list.")

    if not hasattr(function, "remote"):
        raise ValueError("The function argument must be a remote function.")
    
    else:
        items_id = items.copy()   # Avoids mutating the items argument
    
        while len(items_id) != 1:
            new_items_id = []
            for i in range(0, len(items_id), 2):
                L = items_id[i:i+2]
                l = len(L)
                if len(L) != 1:
                    a = reduce_serial(function.remote, L)
                    new_items_id.append(a)
                else:
                    new_items_id.append(L[0])
            items_id = new_items_id
            
    return items_id[0]


# Please DO NOT EDIT the code below this comment
# This code should help verify that reduce_parallel is working correctly
def add_normal(a, b):
    # Simulate a longer running function
    # Necessary to check the correct implementation for reduce_parallel
    time.sleep(0.3)
    return a + b

@ray.remote
def add_remote(a, b):
    # Simulate a longer running function
    # Necessary to check the correct implementation for reduce_parallel
    time.sleep(0.3)
    return a + b

items = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

start_time = time.time()
result_serial = reduce_serial(add_normal, items)
time_serial = time.time() - start_time

start_time = time.time()
result_id = reduce_parallel(add_remote, items)
result_parallel = ray.get(result_id)
time_parallel = time.time() - start_time

print(f"Time serial: {time_serial}")
print(f"Time parallel: {time_parallel}")

Time serial: 2.703690767288208
Time parallel: 1.253077745437622


## 4: Analyzing Word Frequency using MapReduce
Let's analyze the frequency of words used in Shakespeare. For this question, we'll need to do the following:
1. Download the text of a few of Shakespeare's plays.
2. For each play, count the number of times each word occurs.
3. Merge the word frequencies across plays.


In [34]:
def download_text(url):
    """Returns the string of text on some webpage"""
    request = urllib.request.urlopen(url)
    return request.read().decode("utf-8")


@ray.remote
def download_text_remote(url):
    return download_text(url)

In [35]:
def count_words(text):
    """Finds the frequency of each word in a string"""
    assert isinstance(text, str), "text should be a string"
    
    frequency = dict()
    
    text = text.lower()
    matches = re.findall(r"\b\w+\b", text)
    
    for word in matches:
        count = frequency.get(word, 0)
        frequency[word] = count + 1
        
    return frequency


@ray.remote
def count_words_remote(text):
    return count_words(text)

In [36]:
def merge_dicts(a, b):
    """Merges 2 dictionaries such that the result contains keys of both a and b.
    
    If a key k is in a and in b, result[k] = a[k] + b[k].
    """
    result = a.copy()  # Don't mutate the input dictionaries
    for key, value in b.items():
        result[key] = result.get(key, 0) + value
        
    return result


@ray.remote
def merge_dicts_remote(a, b):
    return merge_dicts(a, b)

In [37]:
urls = [
    "https://www.gutenberg.org/files/1524/1524-0.txt",       # Hamlet
    "https://www.gutenberg.org/cache/epub/2264/pg2264.txt",  # Macbeth
    "https://www.gutenberg.org/cache/epub/2267/pg2267.txt",  # Othello
    "https://www.gutenberg.org/cache/epub/1777/pg1777.txt",  # Romeo and Juliet
]

Let's now use the MapReduce API to speed up the code in the cell below. This code computes word frequencies across several of Shakespeare's works.

In [38]:
# Parallelize the following code using the MapReduce API
start_time = time.time()

total_frequencies_serial = {}
for url in urls:
    # Download the text of the play
    text = download_text(url)
    # Count the frequency of each word in the play
    frequencies = count_words(text)
    # Add the play's word frequencies to the global word frequencies
    total_frequencies_serial = merge_dicts(total_frequencies_serial, frequencies)
    
word_freq_time_serial = time.time() - start_time
print("Word frequency count serial completed in {} seconds".format(word_freq_time_serial))

Word frequency count serial completed in 3.5341620445251465 seconds


In [39]:
start_time = time.time()

# Try to do each of these in one line!
text_ids = map_parallel(download_text_remote, urls)
frequency_ids = [count_words_remote.remote(txt) for txt in text_ids]
total_frequency_id = reduce_parallel(merge_dicts_remote, frequency_ids)
total_frequencies_ray = ray.get(total_frequency_id)

word_freq_time_parallel = time.time() - start_time
print("Word frequency count parallel completed in {} seconds".format(word_freq_time_parallel))

Word frequency count parallel completed in 1.5353896617889404 seconds


Let's examine some of Shakespeare's most used words. 

In [41]:
def most_used_words(freq_dict, num_words=10):
    ordered_keys = sorted(freq_dict, key=freq_dict.get, reverse=True)
    return [word for _, word in zip(range(num_words), ordered_keys)]

most_used_words(total_frequencies_ray, 10)

['the', 'and', 'to', 'i', 'of', 'a', 'you', 'my', 'that', 'in']

## 5: Ray Actors
So far, we have explored Ray's *remote functions*. The tasks generated by invoking remote functions are stateless in the sense that they are intended to map inputs to outputs without side effects. But suppose we want state to be shared and mutated by multiple tasks. In this case, we can use [Ray's *actors*](https://ray.readthedocs.io/en/latest/actors.html) to encapsulate mutable state.


In [42]:
@ray.remote
class Counter(object):
    def __init__(self):
        self.value = 0
    
    def increment(self):
        self.value += 1
    
    def get_value(self):
        return self.value

We can create an actor instance by invoking `.remote()` on the actor class. This starts a new actor process, which holds a copy of the `Counter` object.

In [43]:
c = Counter.remote()

We can run tasks on the actor process by invoking the actor's methods. These methods can mutate the actor's internal state (in this case, the field `self.value`). The actor executes tasks serially.

In [44]:
x1_id = c.increment.remote()
print("The actor's value is {}.".format(ray.get(c.get_value.remote())))

x2_id = c.increment.remote()
print("The actor's value is {}.".format(ray.get(c.get_value.remote())))

The actor's value is 1.
The actor's value is 2.


Suppose we want multiple tasks, actors, or processes to invoke methods on a single actor. In this case, we can pass *actor handles* around between tasks. In the example below, we pass a handle to the counter actor to a handful of tasks executing in parallel.

In [45]:
@ray.remote
def increment_counter(c):
    for _ in range(10):
        x_id = c.increment.remote()

    # Wait for the last increment call to complete before returning.
    ray.get(x_id)


initial_value = ray.get(c.get_value.remote())

# Start 4 tasks that run in parallel and all increment the counter.
increment_results = [increment_counter.remote(c) for _ in range(4)]

# Wait for all tasks to finish
ray.get(increment_results)

new_value = ray.get(c.get_value.remote())

print("The actor's value was {} and now it is {}.".format(initial_value, new_value))
assert new_value - initial_value == 4 * 10

The actor's value was 2 and now it is 42.



Instead of launching four tasks that compute word counts and then aggregating the results on the "driver" process that issued the tasks, we are going to create a separate `ResultAggregator` actor for doing the aggregation. We will start four tasks that each compute frequencies for a given URL and then push those frequencies to the aggregator. The main "driver" process will then fetch the aggregated results from the aggregator.

The code below implements a serial version of this.

In [46]:
class ResultAggregator:
    """Aggregates word frequencies"""
    def __init__(self):
        self.total_frequencies = {}
    
    def add_frequencies(self, frequencies):
        """Adds a new dictionary mapping words to word frequencies to the overall word frequencies"""
        self.total_frequencies = merge_dicts(self.total_frequencies, frequencies)
    
    def get_frequencies(self):
        """Returns a dictionary mapping each word to its frequency"""
        return self.total_frequencies


def add_results(url, result_aggregator):
    """Downloads text from the url, counts the word frequencies, and adds the result to result_aggregator"""
    # Download the text of the play
    text = download_text(url)
    # Count the frequency of each word in the play
    frequencies = count_words(text)
    # Add the results to the aggregator
    done = result_aggregator.add_frequencies(frequencies)

In [47]:
result_aggregator = ResultAggregator()

start_time = time.time()

get_and_add_frequencies = [add_results(url, result_aggregator) for url in urls]

# Get the results
total_frequencies = result_aggregator.get_frequencies()

end_time = time.time()
print("Counting the words took {} seconds.".format(end_time - start_time))

serial_words = most_used_words(total_frequencies, 10)

Counting the words took 3.416891574859619 seconds.



Now implement a parallel version in which `ResultAggregator` is an actor, and `add_results` is a remote function.


In [48]:
@ray.remote
class ResultAggregator(object):
    """Aggregates word frequencies"""
    def __init__(self):
        self.total_frequencies = {}
    
    def add_frequencies(self, frequencies):
        """Adds a new dictionary mapping words to word frequencies to the overall word frequencies"""
        self.total_frequencies = merge_dicts(self.total_frequencies, frequencies)
    
    def get_frequencies(self):
        """Returns a dictionary mapping each word to its frequency"""
        return self.total_frequencies

@ray.remote   
def add_results(url, result_aggregator):
    """Downloads text from the url, counts the word frequencies, and adds the result to result_aggregator"""
    # Download text
    txt = download_text_remote.remote(url)
    # frequency of words
    frequencies = count_words_remote.remote(txt)
    # Add results
    done = result_aggregator.add_frequencies.remote(frequencies)


Now use  modified `ResultAggregator` and `add_results` created above to count word frequencies. 

In [50]:
start_time = time.time()

# Create an instance of a ResultAggregator actor
result_aggregator = ResultAggregator.remote()

# Call the add_results remote function on each URL
get_and_add_frequencies = [add_results.remote(url, result_aggregator) for url in urls]

# Wait for all add_results remote functions to complete
ray.get(get_and_add_frequencies)

# Get the results from the actor
total_frequencies = result_aggregator.get_frequencies.remote()

end_time = time.time()
print("Counting the words took {} seconds.".format(end_time - start_time))

parallel_words = most_used_words(total_frequencies, 10)

Counting the words took 0.038543701171875 seconds.


AttributeError: 'ray._raylet.ObjectID' object has no attribute 'get'