Dask delayed
* delayed can potentially save memory in that you don't actually run anything
* since delayed is lazy, nothing is run if not .compute()
* JIT computation so the full runtime will still need to be there
* delayed can take in other delayed objects, which at runtime appear to be the true value
* dask delayed can be applied to a function or an object. Can use dask.compujte(delayed_objs) or delayed_obj.compute()
* dask delayed will spread onto multiple processes (even if you emulate actor model) unless there's a reason to stay on 1 process such as data locality from dependent calculations
* delayed has optimization using argument `pure=True`. Not truly a persistent cache, it means take shortcut at `.compute()` time by looking at the delayed object's key and run 1 and probably copy over those results to the other delayed with the same key
* has the `nout` argument when decorating function so you can specify the length of the resulting delayed object after function call
* print() is not seen since printed on worker process--unless don't turn on client
* created delayed objects is slightly faster than future objects

Dask Future (mostly equivalent to Ray remote function [AKA task]):
* immediately starts computing so immediately uses RAM
* full runtime will be there but multiple things can process in the background
* future can take in other future objects, which at runtime will be the true value
* have to use client.submit()/client.map()/client.persist() but can use future.result() or client.gather(futures)
* dask futures will spread onto multiple processes (even if you emulate actor model) unless there's a reason to stay on 1 process such as data locality from dependent calculations
* futures also has optimization using `pure=True` where if future with same key name is run, then it will just copy that result. If all futures with that same key is deleted, then it will have to actually run again. This constrats with Ray remote, which does not have this optimization--all tasks have to be run.
* however, futures are not like Ray's Actor model, so futures can finish very quickly as they aren't running sequentially on an actor object
* futures does not have `nout` argument
* future object creation is a little slower than Ray remote object_id creation
* print() is not seen since printed on worker process--unless don't turn on client
* chained futures are not guaranteed to run if reference to future is not saved. You can force it to run using `fire_and_forget()`

Ray remote function (task):
* immediately starts computing soi immediately uses RAM
* full runtime will be there multiple things can process in the background
* remote function can take in other object_id, which at runtime will appear to be the true value
* you have to use ray for everything: ray.remote()/ray.get()/ray.wait()/ray.put(). All function/object instantiation must bue .remote()
* function calls on ray functions (tasks) can be on any of the processes; however method calls onto actorsr stay on the same process
* ray does not seem to have cache optimization--thus all remote objects will have to be run
* ray remote object id creation is a little faster than creating Dask delayed and future
* print() does get capturesd and sent to the master process. However the exceptions are also printed to master
* remote function guaranteed to run even if reference to object_id is not saved

In [None]:
# Dask delayed (and futures and Ray remote) can be used to implement map-reduce. Delayed probably best mimics
#    Spark's laziness. Spark's .persist() is more like Delayedd futures and Ray remote.
# Don't mutate object state within a dask delayed as it causes weird problems--when multiple dask delayed
#   are being executed, you cannot determine order as they are mutating the same object simultaneously--
#   essentially replicating a multithreading race condition
# What happens when you put dask.delayed on a property? It appears to mess up the setter even if you decorate the getter.
#    Don't put dask.delayed on a property.
# Was able to emulate actor model with Dask delayed/futures which offers resiliency but everything goes through central scheduler.
#    Dask's official Actor Model does not offer resiliency but does not go through central scheduler so is fastetr.
# Explore Dask's actor model

# Exercise 4 - Introducing Actors

**Goal:** The goal of this exercise is to show how to create an actor and how to call actor methods.

See the documentation on actors at http://ray.readthedocs.io/en/latest/actors.html.

Sometimes you need a "worker" process to have "state". For example, that state might be a neural network, a simulator environment, a counter, or something else entirely. However, remote functions are side-effect free. That is, they operate on inputs and produce outputs, but they don't change the state of the worker they execute on.

Actors are different. When we instantiate an actor, a brand new worker is created, and all methods that are called on that actor are executed on the newly created worker.

This means that with a single actor, no parallelism can be achieved because calls to the actor's methods will be executed one at a time. However, multiple actors can be created and methods can be executed on them in parallel.

### Concepts for this Exercise - Actors

To create an actor, decorate Python class with the `@ray.remote` decorator.

```python
@ray.remote
class Example(object):
    def __init__(self, x):
        self.x = x
    
    def set(self, x):
        self.x = x
    
    def get(self):
        return self.x
```

Like regular Python classes, **actors encapsulate state that is shared across actor method invocations**.

Actor classes differ from regular Python classes in the following ways.
1. **Instantiation:** A regular class would be instantiated via `e = Example(1)`. Actors are instantiated via
    ```python
    e = Example.remote(1)
    ```
    When an actor is instantiated, a **new worker process** is created by a local scheduler somewhere in the cluster.
2. **Method Invocation:** Methods of a regular class would be invoked via `e.set(2)` or `e.get()`. Actor methods are invoked differently.
    ```python
    >>> e.set.remote(2)
    ObjectID(d966aa9b6486331dc2257522734a69ff603e5a1c)
    
    >>> e.get.remote()
    ObjectID(7c432c085864ed4c7c18cf112377a608676afbc3)
    ```
3. **Return Values:** Actor methods are non-blocking. They immediately return an object ID and **they create a task which is scheduled on the actor worker**. The result can be retrieved with `ray.get`.
    ```python
    >>> ray.get(e.set.remote(2))
    None
    
    >>> ray.get(e.get.remote())
    2
    ```

In [1]:
import ray

@ray.remote
def increment(counter):
    return counter + 1

ray.init(num_cpus=4, ignore_reinit_error=True) #  include_webui=False,

2021-02-01 13:26:27,619	INFO services.py:1173 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


{'node_ip_address': '10.0.0.58',
 'raylet_ip_address': '10.0.0.58',
 'redis_address': '10.0.0.58:6379',
 'object_store_address': 'tcp://127.0.0.1:62541',
 'raylet_socket_name': 'tcp://127.0.0.1:64667',
 'webui_url': '127.0.0.1:8265',
 'session_dir': 'C:\\Users\\Eugene\\AppData\\Local\\Temp\\ray\\session_2021-02-01_13-26-26_258785_14608',
 'metrics_export_port': 60470,
 'node_id': '7817f5474b3a3f9f242bd621b813cae029f7b56e'}

In [4]:
%%time
results = [0]
for _ in range(1000):
    results.append(increment.remote(results[-1]))  # creating Ray object id is slightly faster than creating Dask futures and delayed

Wall time: 193 ms


In [5]:
print(len(results))
%time ray.get(results[-1])  # at least >1000 per second

1001
Wall time: 1.56 s


1000

In [1]:
# use Dask futures
import dask
from distributed import Client, LocalCluster

def increment(counter):
    return counter + 1

client = Client(LocalCluster(n_workers=4, threads_per_worker=1, processes=True))
client

0,1
Client  Scheduler: tcp://127.0.0.1:51766  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 19.28 GB


In [4]:
%%time
results = [0]
for _ in range(1000):
    results.append(client.submit(increment, results[-1]))  # creating Dask futures is slightly slwoer than creating Ray object id

Wall time: 876 ms


In [5]:
print(len(results))
%time results[-1].result() # ~200 futures executed per second

1001
Wall time: 2.96 s


1000

In [1]:
# use Dask delayed
import dask

@dask.delayed
def increment(counter):
    return counter + 1

In [2]:
%%time
results = [0]
for _ in range(1000):
    results.append(increment(results[-1]))  # creating Dask delayed is slightly slwoer than creating Ray object id

Wall time: 571 ms


In [3]:
print(len(results))
%time results[-1].compute()  # ~200 delayed executed per second

1001
Wall time: 152 ms


1000

In [1]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
import ray
import time

ray.init(num_cpus=4, ignore_reinit_error=True) #  include_webui=False,

2021-02-01 13:34:26,310	INFO services.py:1173 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


{'node_ip_address': '10.0.0.58',
 'raylet_ip_address': '10.0.0.58',
 'redis_address': '10.0.0.58:6379',
 'object_store_address': 'tcp://127.0.0.1:57817',
 'raylet_socket_name': 'tcp://127.0.0.1:52816',
 'webui_url': '127.0.0.1:8265',
 'session_dir': 'C:\\Users\\Eugene\\AppData\\Local\\Temp\\ray\\session_2021-02-01_13-34-24_877035_10900',
 'metrics_export_port': 62754,
 'node_id': 'c3a5e1503a0c343387c4631df2a962941b1995fa'}

**EXERCISE:** Change the `Foo` class to be an actor class by using the `@ray.remote` decorator.

In [2]:
@ray.remote
class Foo(object):
    def __init__(self):
        self.counter = 0

    def reset(self):
        self.counter = 0

    def increment(self):
        time.sleep(0.5)
        self.counter += 1
        return self.counter

assert hasattr(Foo, 'remote'), 'You need to turn "Foo" into an actor with @ray.remote.'

**EXERCISE:** Change the intantiations below to create two actors by calling `Foo.remote()`.

In [3]:
# Create two Foo objects.
f1 = Foo.remote()
f2 = Foo.remote()

**EXERCISE:** Parallelize the code below. The two actors can execute methods in parallel (though each actor can only execute one method at a time).

In [4]:
# Sleep a little to improve the accuracy of the timing measurements below.
time.sleep(2.0)
start_time = time.time()

# Reset the actor state so that we can run this cell multiple times without
# changing the results.
f1.reset.remote()
f2.reset.remote()

# We want to parallelize this code. However, it is not straightforward to
# make "increment" a remote function, because state is shared (the value of
# "self.counter") between subsequent calls to "increment". In this case, it
# makes sense to use actors.
results = []
for _ in range(5):
    results.append(f1.increment.remote())
    results.append(f2.increment.remote())

results = ray.get(results)

end_time = time.time()
duration = end_time - start_time

assert not any([isinstance(result, ray.ObjectID) for result in results]), 'Looks like "results" is {}. You may have forgotten to call ray.get.'.format(results)

**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass.

In [5]:
assert results == [1, 1, 2, 2, 3, 3, 4, 4, 5, 5]

assert duration < 3, ('The experiments ran in {} seconds. This is too '
                      'slow.'.format(duration))
assert duration > 2.5, ('The experiments ran in {} seconds. This is too '
                        'fast.'.format(duration))

print('Success! The example took {} seconds.'.format(duration))

Success! The example took 2.5111820697784424 seconds.


In [1]:
# use Dask futures
import dask
from distributed import Client, LocalCluster

client = Client(LocalCluster(n_workers=4, threads_per_worker=1, processes=True))
client

0,1
Client  Scheduler: tcp://127.0.0.1:59104  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 19.28 GB


In [2]:
import time

def increment(counter):
    time.sleep(0.5)
    return counter + 1

future = 0
results = []
for _ in range(5):
    future = client.submit(increment, future)
    results.append(future)

In [3]:
%time client.gather(results)  # time elapsed so far can decrease runtime under 2.5 seconds

Wall time: 2.27 s


[1, 2, 3, 4, 5]

In [1]:
# use Dask Futures with OOP
import dask
from distributed import Client, LocalCluster
import time


class Foo:
    def __init__(self):
        self.counter = 0

    def reset(self):
        self.counter = 0

    @staticmethod
    def _increment(counter):
        time.sleep(0.5)
        return counter + 1

    def increment(self, client):
        self.counter = client.submit(self._increment, self.counter)
        return self.counter

client = Client(LocalCluster(n_workers=4, threads_per_worker=1))
client

0,1
Client  Scheduler: tcp://127.0.0.1:59610  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 19.28 GB


In [2]:
%%time
f1 = Foo()

results = []
for _ in range(5):
    results.append(f1.increment(client))

results

Wall time: 8 ms


[<Future: pending, key: _increment-a2707d0c09230e915b175fce41013b8e>,
 <Future: pending, key: _increment-8ce2408294a6d6ea0efd31e4a6666e09>,
 <Future: pending, key: _increment-4b392f973b2ae0653d43f14ed0424215>,
 <Future: pending, key: _increment-b845f9c27c1b70c533f3a60961d9842b>,
 <Future: pending, key: _increment-6f0714f9828c5576e873a14f2064a0ef>]

In [3]:
%%time
client.gather(results)

Wall time: 2.53 s


[1, 2, 3, 4, 5]

In [4]:
f1.counter.result()

5

In [1]:
# use Dask delayed
import dask
import time

@dask.delayed
def increment(counter):
    time.sleep(0.5)
    return counter + 1

delayed = 0
results = []
for _ in range(5):
    delayed = increment(delayed)
    results.append(delayed)

In [2]:
%time dask.compute(*results)  # will always take at least 2.5 seconds

Wall time: 2.53 s


(1, 2, 3, 4, 5)

In [1]:
# use Dask delayed with OOP
import dask
import time

class Foo:
    def __init__(self):
        self.counter = dask.delayed(0)

    def reset(self):
        self.counter = dask.delayed(0)

    @dask.delayed
    def increment(self, counter):
        time.sleep(0.5)
        return counter + 1

In [2]:
%%time
f1 = Foo()
results = []
for _ in range(5):
    f1.counter = f1.increment(f1.counter)
    results.append(f1.counter)

Wall time: 0 ns


In [3]:
%time dask.compute(*results)

Wall time: 2.53 s


(1, 2, 3, 4, 5)

In [4]:
%time f1.counter.compute()

Wall time: 2.52 s


5

In [5]:
%%time
f1.reset()
f1.counter.compute()

Wall time: 4 ms


0

In [6]:
import dask
import time

class Foo:
    def __init__(self):
        self.counter = dask.delayed(0)

    def reset(self):
        self.counter = dask.delayed(0)

    @staticmethod
    @dask.delayed
    def _increment(counter):
        time.sleep(0.5)
        return counter + 1

    def increment(self):
        self.counter = self._increment(self.counter)
        return self.counter

In [7]:
f1 = Foo()

results = []
for _ in range(5):
    results.append(f1.increment())

In [8]:
%time dask.compute(*results)

Wall time: 2.52 s


(1, 2, 3, 4, 5)

In [9]:
%time f1.counter.compute()

Wall time: 2.53 s


5

In [10]:
%%time
f1.reset()
f1.counter.compute()

Wall time: 0 ns


0