# Ray Crash Course - Exercise Solutions

© 2019-2020, Anyscale. All Rights Reserved

![Anyscale Academy](../../images/AnyscaleAcademyLogo.png)

This notebook discusses solutions for the exercises in the _crash course_.

## 01 Ray Crash Course - Tasks - Exercise 1

As currently written, the memory footprint of `estimate_pi` scales linearly with `N`, because it allocates two NumPy arrays of size `N`. This limits the size of `N` we can evaluate (as I confirmed by locking up my laptop...). However, this isn't actually necessary. We could do the same calculation in "blocks, for example `m` blocks of size `N/m` and then combine the results. Furthermore, there's no dependencies between the calculations with those blocks, giving us further potential speed-up by parellelizing them with Ray.

Adapt `ray_estimate_pi` to use this technique. Pick some `N` value above which the calculation is done in blocks. Compare the performance of the old vs. new implementation. 

As you do this exercise, you might ponder the fact that we often averaged multiple trials for a given `N` and then ask yourself, what's the difference between averaging `10` trials for `N = 1000` vs. `1` trial for `N = 10000`, for example?

First, import things we need and redefine functions and data we need from the notebook:

In [None]:
import numpy as np
import sys, time, statistics, math
import ray
sys.path.append('..')
from pi_calc import str_large_n

In [None]:
trials = 5

In [None]:
ray.init(ignore_reinit_error=True)

In [None]:
print(f'Dashboard URL: http://{ray.get_webui_url()}')

Here's `estimate_pi` again, but now we'll also return the counts, for reasons we'll discuss shortly.

In [None]:
def estimate_pi(num_samples):
    xs = np.random.uniform(low=-1.0, high=1.0, size=num_samples)   # Generate num_samples random samples for the x coordinate.
    ys = np.random.uniform(low=-1.0, high=1.0, size=num_samples)   # Generate num_samples random samples for the y coordinate.
    xys = np.stack((xs, ys), axis=-1)                              # Like Python's "zip(a,b)"; creates np.array([(x1,y1), (x2,y2), ...]).
    inside = xs*xs + ys*ys <= 1.0                                  # Creates a predicate over all the array elements.
    xys_inside = xys[inside]                                       # Selects only those "zipped" array elements inside the circle.
    in_circle = xys_inside.shape[0]                                # Return the number of elements inside the circle.
    approx_pi = 4.0*in_circle/num_samples                          # The Pi estimate.
    return approx_pi, in_circle, num_samples

Here's the original `ray_estimate_pi`, but now it will also return the counts, not just $\pi$. 

In [None]:
@ray.remote
def ray_estimate_pi(num_samples):
    return estimate_pi(num_samples)

In [None]:
fmt = '{:10.5f} seconds: pi ~ {:7.6f}, stddev = {:5.4f}, error = {:5.4f}%'

Here's `ray_try_it`, but now we handle the additional returned values from `ray_estimate_pi`:

In [None]:
def ray_try_it(n, trials):
    print('trials = {:5d}, N = {:s}: '.format(trials, str_large_n(n, padding=15)), end='')   # str_large_n imported above.
    start = time.time()
    refs = [ray_estimate_pi.remote(n) for _ in range(trials)]
    pis_counts = ray.get(refs)
    pis = list(map(lambda t: t[0], pis_counts))
    approx_pi = statistics.mean(pis)
    stdev = 0.0 if trials == 1 else statistics.stdev(pis)
    duration = time.time() - start
    error = (100.0*abs(approx_pi-np.pi)/np.pi)
    print(fmt.format(duration, approx_pi, stdev, error))   # str_large_n imported above.
    return trials, n, duration, approx_pi, stdev, error

First, let's look at the "ponder" question at the end, just using the original implementation. We'll do a few runs of the following cell. Note that we're using large maximum `n` values here. If you are working on a slow machine or VM, consider deleting the last value `10000000` here and below:

In [None]:
for n in [1000, 10000, 100000, 1000000, 10000000]:
    ray_try_it(n, round(10000000/n))

In [None]:
for n in [1000, 10000, 100000, 1000000, 10000000]:
    ray_try_it(n, round(10000000/n))

In [None]:
for n in [1000, 10000, 100000, 1000000, 10000000]:
    ray_try_it(n, round(10000000/n))

The standard deviation is misleading now, because the number of trials change. The errors are roughly within an order of magnitude, due in part to expected statistical variation. Generally speaking, larger `N` and lower `trials` had lower errors. This may be due to the other big source of variation, the inevitable rounding error computing $\pi$ (`4 * inside_count/N`), one time per trial (`1` to `10,000` times). Experiments are supposed to eliminate as many extraneous variables as possible, so I would argue that sticking to one value for `trials` and varying `N` is more meaningful. In fact, in the implementation that follows, we'll eliminate the potential rounding error variation by keep track of the inside and total counts, then computing $\pi$ once at the end.

First, a function to return sample sizes for a given `N` and `m`.

In [None]:
def sample_sizes(N, m):
    ranges = [(m*i, m*(i+1)) for i in range(math.ceil(N/m))]
    if ranges[-1][1] > N:
        ranges[-1] = (ranges[-1][0], N)
    return list(map(lambda x: x[1]-x[0], ranges))

In [None]:
@ray.remote
def ray_estimate_pi_blocks(num_samples, m):
    """
    Perform the estimate in blocks up to ``m`` samples in size. A more user-friendly solution would embed logic to
    determine an reasonably good ``m`` value, but for our purposes, passing in ``m`` is more convenient.
    """
    sizes = sample_sizes(num_samples, m)
    refs = [ray_estimate_pi.remote(size) for size in sizes]
    values = ray.get(refs)  # Not using ray.wait() is okay; the tasks are all roughly the same size
    inside_count = 0
    total_count = 0
    for _, icount, tcount in values:    # Toss the pi value returned
        inside_count += icount
        total_count += tcount
    return 4.0*inside_count/total_count, inside_count, total_count

Let's try it:

In [None]:
for m in [10000, 100000, 1000000]:
    print(f'm = {m}:')
    for n in [1000, 10000, 100000, 1000000, 10000000, 100000000]:
        start = time.time()
        approx_pi, inside_count, total_count = ray.get(ray_estimate_pi_blocks.remote(n, 100000))
        duration = time.time() - start
        print(f'{n:15}: duration = {duration:6.5} seconds, pi = {approx_pi:6.5}, # inside/outside = {inside_count:12}/{total_count}')

Let's compare to the original implementation:

In [None]:
for n in [1000, 10000, 100000, 1000000, 10000000, 100000000]:
    start = time.time()
    approx_pi, inside_count, total_count = ray.get(ray_estimate_pi.remote(n))
    duration = time.time() - start
    print(f'{n:15}: duration = {duration:6.5} seconds, pi = {approx_pi:6.5}, # inside/outside = {inside_count:12}/{total_count}')

Note that for larger `N`, `ray_estimate_pi_blocks` time scale noticeably slower than the original implementation, e.g., for the highest `N`, `100,000,000`, the durations are approximately `1.2` seconds vs. `9.6` seconds.

## 01 Ray Crash Course - Tasks - Exercise 2

What `N` value is needed to get a reliable estimate to five decimal places, `3.1415` (for some definition of "reliable")? If you have a powerful machine or a cluster, you could try a higher accuracy. You'll need to use the solution to Exercise 1 or you can make a guess based on the results we've already seen in this notebook.

To use the solution from Exercise 1, we'll need a modified `ray_try_it` to add the `m` blocks parameter:

In [None]:
def ray_try_it_blocks(n, m, trials):
    print('trials = {:5d}, N = {:s}: '.format(trials, str_large_n(n, padding=15)), end='')   # str_large_n imported above.
    start = time.time()
    refs = [ray_estimate_pi_blocks.remote(n, m) for _ in range(trials)]
    pis_counts = ray.get(refs)
    pis = list(map(lambda t: t[0], pis_counts))
    approx_pi = statistics.mean(pis)
    stdev = 0.0 if trials == 1 else statistics.stdev(pis)
    duration = time.time() - start
    error = (100.0*abs(approx_pi-np.pi)/np.pi)
    print(fmt.format(duration, approx_pi, stdev, error))   # str_large_n imported above.
    return trials, n, duration, approx_pi, stdev, error

Let's compute the error we would have to achieve for this accuracy. 

In [None]:
target_error = 100*abs(3.1415 - np.pi)/np.pi
target_error

Okay, let's keep trying bigger `N` until we get to this number, but now we need to pick a definition of "reliable", because the results will depend on the number of `trials` we do. Also, some experiments will get "lucky" for relatively low `N` values.

> **WARNING:** This could take a while. You could choose a less accurate error goal if you have limited compute resources.

In [None]:
N = 100
error = 10.0
while error > target_error:
    N *= 10
    _, _, duration, approx_pi, _, error = ray_try_it_blocks(N, 1000000, trials)
    if N > 100000000:
        print("Stopping so we don't crash the machine...")
        break
print(f'{N} samples is sufficient to get the error below {target_error}%')

You should run the previous cell several times. Some runs might succeed with `N = 100,000`, while more often it will be above 1M or 10M.

## 01 Ray Crash Course - Tasks - Exercise 3

For small computation problems, Ray adds enough overhead that its benefits are outweighed. You can see from the performance graphs in the lesson that smaller `N` or smaller trial values will likely cause the performance curves to cross. Try small values of `N` and small trial numbers. When do the lines cross? Try timing individual runs for small `N` around the crossing point. What can you infer from this "tipping point" about appropriate sizing of tasks, at least for your test environment?

First, here is more code from the notebook. Here is `try_it`, modified to handle the extra return values from the modified `estimate_pi`:

In [None]:
def try_it(n, trials):
    print('trials = {:3d}, N = {:s}: '.format(trials, str_large_n(n, padding=12)), end='')   # str_large_n imported above.
    start = time.time()
    pis_counts = [estimate_pi(n) for _ in range(trials)]
    pis = list(map(lambda t: t[0], pis_counts))
    approx_pi = statistics.mean(pis)
    stdev = statistics.stdev(pis)
    duration = time.time() - start
    error = (100.0*abs(approx_pi-np.pi)/np.pi)
    print(fmt.format(duration, approx_pi, stdev, error))   # str_large_n imported above.
    return trials, n, duration, approx_pi, stdev, error

In [None]:
small_ns = [1, 10, 100, 1000, 10000, 100000]

In [None]:
data_ns = [try_it(n, trials) for n in small_ns]

In [None]:
ray_data_ns = [ray_try_it(n, trials) for n in small_ns]

In [None]:
np_data_ns         = np.array(data_ns)
np_ray_data_ns     = np.array(ray_data_ns)

In [None]:
from bokeh_util import two_lines_plot, means_stddevs_plot  # Some plotting utilities in `./bokeh_util.py`.
from bokeh.plotting import show, figure
from bokeh.layouts import gridplot

In [None]:
two_lines = two_lines_plot(
    "N vs. Execution Times (Smaller Is Better)", 'N', 'Time', 'No Ray', 'Ray', 
    np_data_ns[:,1], np_data_ns[:,2], np_ray_data_ns[:,1], np_ray_data_ns[:,2],
    x_axis_type='log', y_axis_type='log')
show(two_lines, plot_width=800, plot_height=400)

(If you can't see it, click [here](../../images/Pi-small-Ns-vs-times.png).)

Let's calculate the `N` where they cross:

In [None]:
for i in range(len(small_ns)):
    if data_ns[i] >= ray_data_ns[i]:
        print(f'Crossing point: N = {small_ns[i]}')

## 02 Ray Crash Course - Actors - Exercise 1

You are asked these questions about the `Counter` vs. `RayCounter` performance:

> Ignoring pause = 0, can you explain why the Ray times are almost, but slightly larger than the non-ray times consistently? Study the implementations for `ray_counter_trial` and `RayCounter`. What code is synchronous and blocking vs. concurrent? In fact, is there _any_ code that is actually concurrent when you have just one instance of `Counter` or `RayCounter`?

Here is `ray_counter_trial` again, with comments about concurrency vs. synchronous blocking calls:

In [None]:
def ray_counter_trial(count_to, num_counters = 1, pause = 0.01):
    print('ray:     count_to = {:5d}, num counters = {:4d}, pause = {:5.3f}: '.format(count_to, num_counters, pause), end='')
    start = time.time()
    final_count_futures = []
    # Actor instantiation blocks, but returns almost immediately. The actor creation overhead is low. It is a little bit larger
    # than normal class instantiation, but insignificant for overall performance.
    counters = [RayCounter.remote(pause) for _ in range(num_counters)]
    for i in range(num_counters):
        for n in range(count_to):
            counters[i].next.remote()                                # Nonblocking, so will be faster for long pause scenarios...
        final_count_futures.append(counters[i].get_count.remote())  
    ray.get(final_count_futures)                                     # but block until all invocations are finished!
    duration = time.time() - start
    print('time = {:9.5f} seconds'.format(duration))
    return count_to, num_counters, pause, duration

Both `next` methods in `Counter` and `RayCounter`, call `time.sleep(pause)` before completing, but for `RayCounter` it runs asynchronously, while it blocks for `Counter`. You do have to block to get the current count and if lots of async invocations of `next` are being processed, a call to `ray.get(actor.get_counter())` will block until all of them are finished.

Hence, the reason a single `RayCounter` instance never outperforms a `Counter` instance is because _all_ the code in `ray_counter_trial` becomes effectively _synchronous_ because of the single line `ray.get(final_count_futures)`. Since the Ray implementation has extra overhead for Ray, it will always take a little longer.

The real benefit is running many counters concurrently. `ray_counter_trial` does this seamlessly, while `counter_trial` remains fully synchronous.

At the end of the exercise is this statement and question:

> Once past zero pauses, the Ray overhead is constant. It doesn't grow with the pause time. Can you explain why it doesn't grow?

The Ray overhead doesn't change because the number of Ray-related invocations don't change as the pause time grows. We still use one counter instance and ten invocations of it. Hence the overhead is a constant, even though the method invocations will take longer to complete, depending on the `pause` value. 

# 03 Ray Crash Course - Why Ray?

There were no exercises for this lesson.

# 04 Ray Crash Course - Python Multiprocessing with Ray

There were no exercises for this lesson.

# 05 Ray Crash Course - Ray Parallel Iterators - Exercises 1-3

Here we combine the solutions for the first three exercises. This code is also available as a complete, standalone Ray program in [word-count-exercises.py](word-count-exercises.py).

In [None]:
import glob, gzip, re, sys, os
import numpy as np

class WordCount:
    "Wraps a dictionary of words and counts."
    def __init__(self):
        self.counts = {}

    def __call__(self, word, increment):
        count = increment
        if word in self.counts:
            count = self.counts[word]+increment
        self.counts[word] = count
        return (word, count)

    def sort_counts(self, descending=True):
        "Returns a generator of word-count pairs sorted by count."
        return (wc for wc in sorted(self.counts.items(), key = lambda wc: wc[1], reverse=descending))

def unzip(f):
    if f.endswith(".gz"):
        return gzip.open(f)
    else:
        return open(f, 'r')

# Exercise 3: Remove stop words. Edit this set to taste!
stop_words1 = {
    'that', 'the', 'this', 'an',
    'and', 'or', 'but', 'of'
}
## All the single digits and ASCII letters:
l=[str(i) for i in range(10)]
l.extend([chr(i) for i in range(ord('a'), ord('z')+1)])

stop_words = stop_words1.union(set(l))

def is_stop_word(word):
    """
    Treat all single-character words, blanks, and integers as stop words.
    (Try adding floating point numbers.)
    Otherwise, check for membership in a set of words.
    We use a set because it provides O(1) lookup!
    """
    w = word.strip()
    if len(w) <= 1 or w.isdigit():
        return True
    return w in stop_words

def count_words(file_globs, top_n = 100, batch_window = 1024):
    # The working directory of this application may be _different_
    # than the Ray cluster's working directory. (In a real cluster,
    # the files available will be different, too, but we'll ignore
    # the problem here.) So, we need to pass absolute paths or our
    # ray.util.iter.from_items won't find the files!
    globs = [g for f in file_globs for g in glob.glob(f)]
    file_list = list(map(lambda f: os.path.abspath(f), globs))

    print(f'Processing {len(file_list)} files: {file_list}')
    # Exercise 1: use combine instead of for_each(...).flatten(...).
    # We replace two occurrences:
    word_count = (
        ray.util.iter.from_items(file_list, num_shards=4)
           .combine(lambda f: unzip(f).readlines())
           # Exercise 2: convert to lower case!
           .combine(lambda line: re.split('\W+', line.lower())) # split into words.
           # Exercise 3: remove stop words.
           .filter(lambda word: not is_stop_word(word))
           .for_each(lambda word: (word, 1))
           .batch(batch_window)
    )
    # Combine the dictionaries of counts across shards with a sliding window
    # of "batch_window" lines.
    wordCount = WordCount()
    for shard_counts in word_count.gather_async():
        for word, count in shard_counts:
            wordCount(word, count)
    sorted_list_iterator = wordCount.sort_counts()
    return [sorted_list_iterator.__next__() for i in range(top_n)]

In [None]:
%time word_counts = count_words(['../*.ipynb'], top_n=100)  # The notebooks are now in the parent directory.

In [None]:
word_counts

# 05 Ray Crash Course - Ray Parallel Iterators - Exercise 4

Now let's run `count_words` on the `README.md` for the tutorial repo:

In [None]:
%time word_counts_readme = count_words(['../../README.md'], top_n=100)  # The README is two directories up!

In [None]:
word_counts_readme

Now which words are most prominent?

In [None]:
ray.shutdown()  # "Undo ray.init()". Terminate all the processes started in this notebook.