## Getting Started with Ray Tasks
Let's go ahead and dive in!

At a high level, there are two primitives when it comes to writing programs. Those that maintain state and those that do not. For instance, writing a simple function and calling it has *no state*.

In [None]:
import time

In [2]:
def a_simple_function(some_value, another_value):
    print("running a simple function")
    time.sleep(2)
    return (some_value + another_value) / 2

a_simple_function(15, 25)

running a simple function


20.0

This function runs *immediately* and returns the exact result. This will run within the python process of the local machine. In this case, we're running inside of a Jupyter Notebook, in a python process managed by Jupyter. It's a bit contrived, but we're making this function take more time than it should with `time.sleep(2)` to simulate a longer running function.

As expected, this will take take 2 seconds to run.

In [3]:
%timeit a_simple_function(15, 25)

running a simple function
running a simple function
running a simple function
running a simple function
running a simple function
running a simple function
running a simple function
running a simple function
2 s ± 71 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)


Now, what makes Ray so great is that it can take this simple function that we just wrote and do two things to it.

1. Have it run as a background process.
2. Have it run on a different machine.

This effectively takes any function you write and makes it into a scalable function service. In Ray terminology, this is **Ray Task**. Ray takes care of distributing and communicating to this function as well as scaling it out. Let's see that now.

To convert a function into a **Ray Task**, all that we add is a decorator to the function.

In [4]:
import ray

In [5]:
@ray.remote
def a_simple_function(some_value, another_value):
    print("running a simple function")
    time.sleep(2)
    return (some_value + another_value) / 2

Now we imported Ray and now we can submit calls to this function. To do so, there are two slight modifications.

1. We need to initialize a ray application that will manage the fault tolerance and communication for this process.
2. We need to slightly change the way that we call this function. Instead of calling it like a normal function, we've got to call the *remote* version.

In [6]:
ray.init(address='auto') # initialize Ray, because we're running on a cluster, 
# we've got to set the correct address. On a local machine, you wouldn't provide an address.



{'node_ip_address': '172.31.25.223',
 'redis_address': '172.31.25.223:6379',
 'object_store_address': '/tmp/ray/session_2020-03-31_16-08-57_185823_3200/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2020-03-31_16-08-57_185823_3200/sockets/raylet',
 'webui_url': 'localhost:8265',
 'session_dir': '/tmp/ray/session_2020-03-31_16-08-57_185823_3200'}

Now that Ray is initialized, we can 

In [7]:
res1 = a_simple_function.remote(15, 25)
res1

ObjectID(c1ea1f3f56dd5e74ffffffff030000c801000000)

This returns an object (and happened to print out an pid). This shows us the process in which this function is running (you might want this for debugging).

Now what is happening behind the scenes is that Ray is running this function on our behalf and storing the result. We can fetch that from Ray by referencing the object that holds the result.

In [8]:
ray.get(res1)

[2m[36m(pid=3243)[0m running a simple function


20.0

Now we can see how long it takes to execute and see that it's quite on par with what we saw before.

In [9]:
%timeit ray.get(a_simple_function.remote(15, 25))

[2m[36m(pid=3243)[0m running a simple function
[2m[36m(pid=3243)[0m running a simple function
[2m[36m(pid=2699, ip=172.31.21.160)[0m running a simple function
[2m[36m(pid=3243)[0m running a simple function
[2m[36m(pid=3243)[0m running a simple function
[2m[36m(pid=3243)[0m running a simple function
[2m[36m(pid=2699, ip=172.31.21.160)[0m running a simple function
[2m[36m(pid=3243)[0m running a simple function
2 s ± 716 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)


However the power of Ray doesn't lie in the fact that we can run a function serially, the power of Ray is that we can run this function in parallel extremely easily. For instance, let's rewrite this function to give it an explicit number of cores (right now it uses all of them **TODO CONFIRM**). In this example we'll set the `num_cpus=2` to give it two CPU cores to use.

In [15]:
@ray.remote(num_cpus=2)
def a_simple_function2(some_value, another_value):
    print("running a simple function")
    time.sleep(2)
    return (some_value + another_value) / 2

When we run the following code without using Ray. we would expect it to take 4 seconds to see the results (2 function calls X 2 seconds each). However, we'll see that Ray will return the result in approximately two seconds. The reason for this is that each function will run on one of the cores that we declared above.

In [16]:
%%timeit

res1 = a_simple_function.remote(15, 25)
res2 = a_simple_function.remote(25, 35)
ray.get([res1, res2])

[2m[36m(pid=3242)[0m running a simple function
[2m[36m(pid=3243)[0m running a simple function
[2m[36m(pid=3242)[0m running a simple function
[2m[36m(pid=3243)[0m running a simple function
[2m[36m(pid=3242)[0m running a simple function
[2m[36m(pid=3243)[0m running a simple function
[2m[36m(pid=3242)[0m running a simple function
[2m[36m(pid=3243)[0m running a simple function
[2m[36m(pid=3242)[0m running a simple function
[2m[36m(pid=3243)[0m running a simple function
[2m[36m(pid=2699, ip=172.31.21.160)[0m running a simple function
[2m[36m(pid=3242)[0m running a simple function
[2m[36m(pid=2699, ip=172.31.21.160)[0m running a simple function
[2m[36m(pid=3242)[0m running a simple function
[2m[36m(pid=3242)[0m running a simple function
[2m[36m(pid=3243)[0m running a simple function
2 s ± 489 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)


This gets at the power of Ray Tasks. It took something that we ran on our single machine (or in a single process) and made it trivial to scale it out to any number of workers. We could set this value much higher to leverage many more resources (if we had them available). To see the available resources, run the following command.

In [13]:
ray.cluster_resources()

{'object_store_memory': 60.0,
 'memory': 193.0,
 'CPU': 4.0,
 'node:172.31.25.223': 1.0,
 'node:172.31.21.160': 1.0}

Here's a copy of the print out when I ran this while writing this notebook:

```
{'object_store_memory': 60.0,
 'memory': 193.0,
 'CPU': 4.0,
 'node:172.31.25.223': 1.0,
 'node:172.31.21.160': 1.0}
```

We can see there are a total of 4 CPUs. We specified that we only wanted to run our function with 2 in the previous example, but we could have just as easily run it with 4 and gotten a similar result.