# Using Ray for Highly Parallelizable Tasks

While Ray can be used for very complex parallelization tasks,
often we just want to do something simple in parallel.
For example, we may have 100,000 time series to process with exactly the same algorithm,
and each one takes a minute of processing.

Clearly running it on a single processor is prohibitive: this would take 70 days.
Even if we managed to use 8 processors on a single machine,
that would bring it down to 9 days. But if we can use 8 machines, each with 16 cores,
it can be done in about 12 hours.

How can we use Ray for these types of task? 

We take the simple example of computing the digits of pi.
The algorithm is simple: generate random x and y, and if ``x^2 + y^2 < 1``, it's
inside the circle, we count as in. This actually turns out to be pi/4
(remembering your high school math).

The following code (and this notebook) assumes you have already set up your Ray cluster and that you are running on the head node. For more details on how to set up a Ray cluster please see [Ray Clusters Getting Started](https://docs.ray.io/en/master/cluster/getting-started.html). 


In [2]:
import ray
import random
import time
import math
from fractions import Fraction

In [4]:
# Let's start Ray
ray.init(address='auto')

AssertionError: Module: http does not have ClientBuilder.

We use the ``@ray.remote`` decorator to create a Ray task.
A task is like a function, except the result is returned asynchronously.

It also may not run on the local machine, it may run elsewhere in the cluster.
This way you can run multiple tasks in parallel,
beyond the limit of the number of processors you can have in a single machine.

In [5]:
@ray.remote
def pi4_sample(sample_count):
    """pi4_sample runs sample_count experiments, and returns the 
    fraction of time it was inside the circle. 
    """
    in_count = 0
    for i in range(sample_count):
        x = random.random()
        y = random.random()
        if x*x + y*y <= 1:
            in_count += 1
    return Fraction(in_count, sample_count)


To get the result of a future, we use ray.get() which 
blocks until the result is complete. 

In [12]:
SAMPLE_COUNT = 1000 * 1000
start = time.time() 
future = pi4_sample.remote(sample_count = SAMPLE_COUNT)
pi4 = ray.get(future)
end = time.time()
dur = end - start
print(f'Running {SAMPLE_COUNT} tests took {dur} seconds')

2024-10-20 18:49:02,717	INFO worker.py:1786 -- Started a local Ray instance.


Running 1000000 tests took 3.4979419708251953 seconds


Bad pipe message: %s [b'0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7\r\nHost: localhost:40759\r\nUs', b'-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrom']
Bad pipe message: %s [b'129.0.0.0 Safari/537.36\r\nAccept-Encoding: gzip, deflate, br, zstd\r\nAccept-Language: en-GB,en;q=0.9\r\n', b'che-Control: max-age=0\r\nReferer: https://super-eureka-v9j7j4gjrjq3', b'74.github.dev/\r\nX-Request-ID: f3ba3150b3246fb49b21e93']
Bad pipe message: %s [b'c3f0b9d\r\nX-Real-IP: 10.240.4.161\r\nX-Forwarded-Port: 443\r\nX-Forwarded-Scheme: https\r\nX-Original-U']
Bad pipe message: %s [b': /\r\nX-Scheme: https\r\nsec-fetch-site: same-site\r\nsec-fetch-mode: navigate\r\nsec-fe', b'h-dest: document\r\nsec-ch-ua: "Google Chrome";v="129", "Not=A?Brand";v="8", "Chromium";v="129"\r\nsec-ch-ua-mobile: ?0', b'sec-ch-ua-pl']
Bad pipe message: %s [b'form: "macOS"\r\npriority: u=0, i\r\nX-Original-Proto: https\r\nX-Forwarde

Now let's see how good our approximation is.

In [13]:
pi = pi4 * 4

In [14]:
float(pi)

3.14148

In [15]:
abs(pi-math.pi)/pi

3.58600372413849e-05

Not bad at all -- we're off by roughly 4 decimal places.

Once we are done let's shut down the cluster!

In [16]:
ray.shutdown()