# Custom Algorithms

In [None]:
from dask.distributed import Client

client = Client(n_workers=4, threads_per_worker=1, memory_limit='128MB')

client

We'll look at some minimal Monte Carlo algorithms and implement them using three Dask patterns for custom parallel programming:
* Dask Delayed
* `concurrent.Future`
* Dask Actors (experimental)

### Algorithm: Approximate π via Random Sampling in the Unit Square

π is approximately 4*(p/n) where n is a number of random samples in the unit square, and p is the number of those lying within 1 unit (Euclidean distance) of the origin.

This function generates a sample and returns 1 if the point is within the unit circle, otherwise it returns 0.

In [None]:
import random

def sample():
    x = random.uniform(0, 1)
    y = random.uniform(0, 1)
    d = x*x + y*y
    return 1 if d < 1 else 0

### Implementation with `delayed`

Using `delayed` we can create lazy versions of the sample and sum functions

In [None]:
from dask import delayed

lazy_sample = delayed(sample)

lazy_sum = delayed(sum)

n = 1000

In [None]:
lazy_sample()

In [None]:
lazy_sample().compute()

In [None]:
s = lazy_sum([lazy_sample(), lazy_sample(), lazy_sample()])
s

In [None]:
s.compute()

So the overall approximation for n samples is something like:

In [None]:
pi_hat = (4/n) * lazy_sum([lazy_sample() for s in range(n)])

pi_hat

In [None]:
pi_hat.compute()

Obviously, this is neither a great way to compute pi, nor even a great way to implement this estimation, since NumPy (or CuPy) will let us draw and add many more samples, faster and easier. This is just an example of how the pieces snap together to implement work for Dask's scheduler.

Note also that instead of wrapping our `sample` function via `delayed(sample)`, we could have marked it with the decorator `@dask.delayed`

### Implementation with `Future`

In [None]:
futures = []

for i in range(10):
    futures.append(client.submit(sample))

In [None]:
futures

In [None]:
futures = []

for i in range(1000):
    futures.append(client.submit(sample, pure=False))

In [None]:
results = client.gather(futures)
results[:10]

In [None]:
4*sum(results)/len(results)

#### Lab Activity

Implement this approximation iterating 1000 samples at a time and running until the previous and current values differ by less than 0.01

### Random Walks with Future: Stateless Compute, Stateful Client Code

We'll run a set of 2-D random walks, starting at the origin, with (x,y) step size drawn from Uniform[-1,1], and terminating when a walker is more than 2 units from the origin.

By using Dask's `as_completed` we can schedule new steps as necessary across all of the walkers.

In [None]:
def step(pos):
    id = pos[0]
    x = pos[1] + random.uniform(-1, 1)
    y = pos[2] + random.uniform(-1, 1)
    return (id, x,y)

In [None]:
t = step((0,0,0))
t

In [None]:
from dask.distributed import as_completed

num_walks = 3

max_distance_squared = 2*2

ac = as_completed([client.submit(step, (i,0,0)) for i in range(num_walks)])

In [None]:
for i in range(num_walks):
    print(ac.next().result())

In [None]:
ac = as_completed([client.submit(step, (i,0,0)) for i in range(num_walks)])

for future in ac:
    r = future.result()
    print(r)
    if (r[1]**2 + r[2]**2) < max_distance_squared:
        ac.add(client.submit(step, r))
    else:
        print(f"Walk {r[0]} is done")

### Random Walks with Stateful Actors (Experimental)

Dask's Actor support is a new feature and allows for large-scale collections of stateful actors (or agents), useful for techniques such as simulation. Actors operations do not incur scheduler overhead, so they may also be useful if a large number of small operations is needed and would exceed ~4000/sec, a current rough limit on the scheduler.

*Note: Dask Actors do not (yet) have fault-tolerance or load-balancing guarantees, nor robust diagnostics.* 

In [None]:
class Walker:
    distance_limit = max_distance_squared
    x = 0 #NOTE: these class variables will become instance property accessors on the Actor
    y = 0
    id = 0

    def __init__(self, id):
        self.x = 0
        self.y = 0
        self.id = id

    def walk(self):
        steps = 0
        while (self.x**2 + self.y**2) < Walker.distance_limit:
            self.x += random.uniform(-1,1)
            self.y += random.uniform(-1,1)
            steps += 1
        return steps

In [None]:
w = Walker(42)

In [None]:
w.walk()

In [None]:
wf = client.submit(Walker, 13, actor=True)

In [None]:
wf

In [None]:
w2 = wf.result()

In [None]:
w2

In [None]:
sf = w2.walk()

In [None]:
sf

`ActorFuture` is currently limited and does not support the full Future interface. Only `.result()` is supported.

In [None]:
sf.result()

In [None]:
w2.x

In [None]:
w2.id

In [None]:
num_walkers = 10

walker_futures = [client.submit(Walker, i, actor=True) for i in range(num_walkers)]

walkers = client.gather(walker_futures)

In [None]:
walkers

In [None]:
steps_futures = [w.walk() for w in walkers]
steps_futures

In [None]:
[sf.result() for sf in steps_futures]

### Distributed Peer-to-Peer with Actors

Multiple actors of multiple types can exist acros a cluster and call methods directly on one another. This allows for a many-many, distributed system, suitable for some specialized algorithms.

There are a couple of (current) limitations to watch out for, though, including:
* Actor method invocations are by default processed on a single thread per worker
* Actors may be assigned to the same worker

Together, those constraints mean that, if we want Actors to invoke methods on each other, we need to move them to the Worker's event loop, which we can do via `async / await` (and also make sure we're not doing any long/blocking work)

To make it a little easier to see some of these elements, we'll shutdown out cluster and spin up a new one:

In [None]:
client.close()

In [None]:
client = Client(n_workers=4, threads_per_worker=1, memory_limit='128MB')

client

In [None]:
import logging

class Borrower:
    debt = 0
    bank = None
    
    def __init__(self):
        self.bank = None
        self.debt = 0
        
    def assign_bank(self, bank):
        self.bank = bank
        return True
    
    async def borrow(self, amount):
        logger = logging.getLogger("distributed.worker")
        logger.info("Borrower executing borrow")
        loan = await self.bank.borrow(amount)
        self.debt += loan
        return loan
        
class Bank:
    funds = 0
    
    def __init__(self, funds):
        self.funds = funds

    async def borrow(self, amount):
        logger = logging.getLogger("distributed.worker")
        logger.info("Bank executing borrow")
        loan = min(amount, self.funds)
        self.funds -= loan
        return loan

In [None]:
borrower = client.submit(Borrower, actor=True).result()

In [None]:
borrower

In [None]:
borrower.debt

In [None]:
bank = client.submit(Bank, 1000, actor=True).result()
bank.funds

In [None]:
borrower.assign_bank(bank).result()

In [None]:
borrower.bank

In [None]:
borrower.borrow(600).result()

In [None]:
borrower.debt

In [None]:
bank.funds

In [None]:
borrower.borrow(500).result()

In [None]:
borrower.debt

In [None]:
bank.funds

In [None]:
bank2 = client.submit(Bank, 100, actor=True).result()
bank2.funds

In [None]:
borrower.assign_bank(bank2).result()

In [None]:
borrower.borrow(500).result()

In [None]:
borrower.debt

Great! But let's take a look at the worker detail page: in the dashboard, pick the info tab.

The __in memory__ column is showing the actors on each worker ... and they're all on the same one.

Also, check the worker logs: the actor logging messages are all on that same worker's log as well.

If you want, you can spin up another 100 or 1000 actors and they still end up on one worker. This may be remedied in the future, but for now we can balance or place our actors.

First, obtain the worker addresses.

In [None]:
workers = client.get_worker_logs(n=0).keys()
addresses = [w for w in workers]
addresses

Now, launch an actor and specify one or more allowed workers to host that actor

In [None]:
client.submit(Bank, 100, actor=True, workers=[addresses[3]]).result()

Check the dashboard again: you should see an actor on worker #3 now.

In [None]:
client.close()