In [1]:
# SOURCE: https://github.com/anyscale/academy/blob/5046fdb4d94fcd7b9be6fa44e877283d401345c7/ray-crash-course/01-Ray-Tasks.ipynb


import numpy as np
import ray

In [2]:
ray.init(address='auto', ignore_reinit_error=True)
print(f'Dashboard URL: http://{ray.get_dashboard_url()}')

2021-05-24 21:56:53,095	INFO worker.py:640 -- Connecting to existing Ray cluster at address: 192.168.178.38:6379


Dashboard URL: http://127.0.0.1:8265


In [3]:
num_workers = 8
trials = 20

In [4]:
def estimate_pi(num_samples):
    xs = np.random.uniform(low=-1.0, high=1.0, size=num_samples)   # Generate num_samples random samples for the x coordinate.
    ys = np.random.uniform(low=-1.0, high=1.0, size=num_samples)   # Generate num_samples random samples for the y coordinate.
    xys = np.stack((xs, ys), axis=-1)                              # Like Python's "zip(a,b)"; creates np.array([(x1,y1), (x2,y2), ...]).
    inside = xs*xs + ys*ys <= 1.0                                  # Creates a predicate over all the array elements.
    xys_inside = xys[inside]                                       # Selects only those "zipped" array elements inside the circle.
    in_circle = xys_inside.shape[0]                                # Return the number of elements inside the circle.
    approx_pi = 4.0*in_circle/num_samples                          # The Pi estimate.
    return approx_pi

In [5]:
Ns = [20000, 100000, 200000, 1000000, 2000000] #, 5000000, 10000000]  # Larger values take a long time on small VMs and machines!
maxN = Ns[-1]
maxN

2000000

In [6]:
fmt = '{:10.5f} seconds: pi ~ {:7.6f}, stddev = {:5.4f}, error = {:5.4f}%'

In [7]:
import  statistics,  time,  locale

def str_large_n(n, padding=None):
    if padding == None:
        padding=len(str(n))
    return locale.format_string(f'%{padding}d', n, grouping=True)

def try_it(n, trials):
    print('trials = {:3d}, N = {:s}: '.format(trials, str_large_n(n, padding=12)), end='')   # str_large_n imported above.
    start = time.time()
    pis = [estimate_pi(n) for _ in range(trials)]
    approx_pi = statistics.mean(pis)
    stdev = statistics.stdev(pis)
    duration = time.time() - start
    error = (100.0*abs(approx_pi-np.pi)/np.pi)
    print(fmt.format(duration, approx_pi, stdev, error))   # str_large_n imported above.
    return trials, n, duration, approx_pi, stdev, error



data_ns = [try_it(n, trials) for n in Ns]
data_trials = [try_it(maxN, trials) for trials in range(5,20,2)]

trials =  20, N =        20000:    0.03111 seconds: pi ~ 3.139670, stddev = 0.0100, error = 0.0612%
trials =  20, N =       100000:    0.16692 seconds: pi ~ 3.142198, stddev = 0.0046, error = 0.0193%
trials =  20, N =       200000:    0.28914 seconds: pi ~ 3.141158, stddev = 0.0036, error = 0.0138%
trials =  20, N =      1000000:    1.34030 seconds: pi ~ 3.141432, stddev = 0.0016, error = 0.0051%
trials =  20, N =      2000000:    2.49356 seconds: pi ~ 3.141977, stddev = 0.0011, error = 0.0122%
trials =   5, N =      2000000:    0.74663 seconds: pi ~ 3.140746, stddev = 0.0014, error = 0.0270%
trials =   7, N =      2000000:    1.01378 seconds: pi ~ 3.141008, stddev = 0.0014, error = 0.0186%
trials =   9, N =      2000000:    1.17821 seconds: pi ~ 3.141744, stddev = 0.0010, error = 0.0048%
trials =  11, N =      2000000:    1.40128 seconds: pi ~ 3.141825, stddev = 0.0013, error = 0.0074%
trials =  13, N =      2000000:    1.62633 seconds: pi ~ 3.141858, stddev = 0.0012, error = 0.0085%


# Da Funzioni Python a Task Ray

In [8]:
# Wrappo la funzione per stimare pi greco, con il decoratore @ray.remote che crea un
# Task ray, che verra schedulato sul cluster ray.
@ray.remote
def ray_estimate_pi(num_samples):
    return estimate_pi(num_samples)

In [9]:
# Invoco la funzione con funzione.remote(args)
ray_estimate_pi.remote(100)

# Object ref, e un future, che verra usato per prendere il risultato
# una volta terminata la computazione con ray.get(ObjectRef)

ObjectRef(a67dc375e60ddd1affffffffffffffffffffffff0100000001000000)

In [10]:
ref = ray_estimate_pi.remote(100)
print(ray.get(ref))

refs = [ray_estimate_pi.remote(n) for n in [100, 1000, 10000]]
print(ray.get(refs))

#%

3.28
[3.28, 3.152, 3.1676]


In [11]:
def ray_try_it(n, trials):
    print('trials = {:3d}, N = {:s}: '.format(trials, str_large_n(n, padding=12)), end='')   # str_large_n imported above.
    start = time.time()
    refs = [ray_estimate_pi.remote(n) for _ in range(trials)]
    pis = ray.get(refs)
    approx_pi = statistics.mean(pis)
    stdev = statistics.stdev(pis)
    duration = time.time() - start
    error = (100.0*abs(approx_pi-np.pi)/np.pi)
    print(fmt.format(duration, approx_pi, stdev, error))   # str_large_n imported above.
    return trials, n, duration, approx_pi, stdev, error

In [12]:
ray_data_ns = [ray_try_it(n, trials) for n in Ns]
ray_data_trials = [ray_try_it(maxN, trials) for trials in range(5,20,2)]



trials =  20, N =        20000:    0.02364 seconds: pi ~ 3.143530, stddev = 0.0120, error = 0.0617%
trials =  20, N =       100000:    0.06281 seconds: pi ~ 3.140752, stddev = 0.0054, error = 0.0268%
trials =  20, N =       200000:    0.10304 seconds: pi ~ 3.141559, stddev = 0.0035, error = 0.0011%
trials =  20, N =      1000000:    0.58882 seconds: pi ~ 3.141402, stddev = 0.0013, error = 0.0061%
trials =  20, N =      2000000:    1.99470 seconds: pi ~ 3.141320, stddev = 0.0012, error = 0.0087%
trials =   5, N =      2000000:    0.35098 seconds: pi ~ 3.142328, stddev = 0.0005, error = 0.0234%
trials =   7, N =      2000000:    0.46663 seconds: pi ~ 3.141238, stddev = 0.0013, error = 0.0113%
trials =   9, N =      2000000:    0.45909 seconds: pi ~ 3.141181, stddev = 0.0018, error = 0.0131%
trials =  11, N =      2000000:    0.53310 seconds: pi ~ 3.140941, stddev = 0.0007, error = 0.0208%
trials =  13, N =      2000000:    0.52276 seconds: pi ~ 3.141909, stddev = 0.0008, error = 0.0101%


In [14]:
np_data_ns         = np.array(data_ns)
np_data_trials     = np.array(data_trials)
np_ray_data_ns     = np.array(ray_data_ns)
np_ray_data_trials = np.array(ray_data_trials)

In [15]:
from bokeh_util import two_lines_plot, means_stddevs_plot  # Some plotting utilities in `./bokeh_util.py`.
from bokeh.plotting import show, figure
from bokeh.layouts import gridplot

In [16]:

two_lines = two_lines_plot(
    "N vs. Execution Times (Smaller Is Better)", 'N', 'Time', 'No Ray', 'Ray',
    np_data_ns[:,1], np_data_ns[:,2], np_ray_data_ns[:,1], np_ray_data_ns[:,2],
    x_axis_type='linear', y_axis_type='linear')
show(two_lines, plot_width=800, plot_height=400)

In [17]:
two_lines = two_lines_plot(
    "N vs. Execution Times (Smaller Is Better)", 'N', 'Time', 'No Ray', 'Ray',
    np_data_ns[:,1], np_data_ns[:,2], np_ray_data_ns[:,1], np_ray_data_ns[:,2])
show(two_lines, plot_width=800, plot_height=400)

In [18]:
two_lines = two_lines_plot(
    "Trials (N=10,000,000) vs. Execution Times (Smaller Is Better)", 'Trials', 'Time', 'No Ray', 'Ray',
    np_data_trials[:,0], np_data_trials[:,2], np_ray_data_trials[:,0], np_ray_data_trials[:,2],
    x_axis_type='linear', y_axis_type='linear')
show(two_lines, plot_width=800, plot_height=400)
