<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg"
     align="right"
     width="30%"
     alt="Dask logo\">

# Futures - non-blocking distributed calculations

Submit arbitrary functions for computation in a parallelized, eager, and non-blocking way. 

The `futures` interface (derived from the built-in `concurrent.futures`) provide fine-grained real-time execution for custom situations. We can submit individual functions for evaluation with one set of inputs, or evaluated over a sequence of inputs with `submit()` and `map()`. The call returns immediately, giving one or more *futures*, whose status begins as "pending" and later becomes "finished". There is no blocking of the local Python session.

This is the important difference between futures and delayed. Both can be used to support arbitrary task scheduling, but delayed is lazy (it just constructs a graph) whereas futures are eager. With futures, as soon as the inputs are available and there is compute available, the computation starts. 

**Related Documentation**

* [Futures documentation](https://docs.dask.org/en/latest/futures.html)
* [Futures screencast](https://www.youtube.com/watch?v=07EiCpdhtDE)
* [Futures examples](https://examples.dask.org/futures.html)

In [1]:
from dask.distributed import Client

client = Client(n_workers=4)
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 44309 instead


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:44309/status,

0,1
Dashboard: http://127.0.0.1:44309/status,Workers: 4
Total threads: 12,Total memory: 30.99 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:36201,Workers: 4
Dashboard: http://127.0.0.1:44309/status,Total threads: 12
Started: Just now,Total memory: 30.99 GiB

0,1
Comm: tcp://127.0.0.1:40757,Total threads: 3
Dashboard: http://127.0.0.1:37601/status,Memory: 7.75 GiB
Nanny: tcp://127.0.0.1:41041,
Local directory: /tmp/dask-scratch-space/worker-_yeyuj35,Local directory: /tmp/dask-scratch-space/worker-_yeyuj35

0,1
Comm: tcp://127.0.0.1:38085,Total threads: 3
Dashboard: http://127.0.0.1:36483/status,Memory: 7.75 GiB
Nanny: tcp://127.0.0.1:43135,
Local directory: /tmp/dask-scratch-space/worker-kecdni2n,Local directory: /tmp/dask-scratch-space/worker-kecdni2n

0,1
Comm: tcp://127.0.0.1:39427,Total threads: 3
Dashboard: http://127.0.0.1:40869/status,Memory: 7.75 GiB
Nanny: tcp://127.0.0.1:42473,
Local directory: /tmp/dask-scratch-space/worker-hx_5j7uw,Local directory: /tmp/dask-scratch-space/worker-hx_5j7uw

0,1
Comm: tcp://127.0.0.1:44157,Total threads: 3
Dashboard: http://127.0.0.1:40067/status,Memory: 7.75 GiB
Nanny: tcp://127.0.0.1:43225,
Local directory: /tmp/dask-scratch-space/worker-7dlpb0n5,Local directory: /tmp/dask-scratch-space/worker-7dlpb0n5


## A Typical Workflow

This is the same workflow that we saw in the delayed notebook. It is for-loopy and the data is not necessarily an array or a dataframe. The following example outlines a read-transform-write:

```python
def process_file(filename):
    data = read_a_file(filename)
    data = do_a_transformation(data)
    destination = f"results/{filename}"
    write_out_data(data, destination)
    return destination

futures = []
for filename in filenames:
    future = client.submit(process_file, filename)
    futures.append(future)
    
futures
```

## Basics

Just like we did in the delayed notebook, let's make some toy functions, `inc` and `add`, that sleep for a while to simulate work. We'll then time running these functions normally.

In [2]:
from time import sleep


def inc(x):
    sleep(1)
    return x + 1


def double(x):
    sleep(2)
    return 2 * x


def add(x, y):
    sleep(1)
    return x + y

We can run these locally

In [3]:
inc(1)

2

Or we can submit them to run remotely with Dask. This immediately returns a future that points to the ongoing computation, and eventually to the stored result.

In [4]:
future = client.submit(inc, 1)  # returns immediately with pending future
future

If you wait a second, and then check on the future again, you’ll see that it has finished.

In [5]:
future

You can block on the computation and gather the result with the `.result()` method.

In [6]:
future.result()

2

#### Other ways to wait for a future
```python
from dask.distributed import wait, progress
progress(future)
```

shows a progress bar in *this* notebook, rather than having to go to the dashboard. This progress bar is also asynchronous, and doesn't block the execution of other code in the meanwhile.

```python
wait(future)
```
blocks and forces the notebook to wait until the computation pointed to by `future` is done. However, note that if the result of `inc()` is sitting in the cluster, it would take **no time** to execute the computation now, because Dask notices that we are asking for the result of a computation it already knows about. More on this later.

#### Other ways to gather results
```python
client.gather(futures)
```

gathers results from more than one future.

## `client.compute`

Generally, any Dask operation that is executed using `.compute()` or `dask.compute()` can be submitted for asynchronous execution using `client.compute()` instead.

Here is an example from the delayed notebook:

In [7]:
import dask


@dask.delayed
def inc(x):
    sleep(1)
    return x + 1


@dask.delayed
def add(x, y):
    sleep(1)
    return x + y


x = inc(1)
y = inc(2)
z = add(x, y)

So far we have a regular `dask.delayed` output. When we pass `z` to `client.compute` we get a future back and Dask starts evaluating the task graph. 

In [8]:
# notice the difference from z.compute()
# notice that this cell completes immediately
future = client.compute(z)
future

In [9]:
future.result()  # waits until result is ready

5

In [10]:
future

When using futures, the *computation moves to the data* rather than the other way around, and the client, in the local Python session, need never see the intermediate values.

## `client.submit`

`client.submit` takes a function and arguments, pushes these to the cluster, returning a `Future` representing the result to be computed. The function is passed to a worker process for evaluation. This looks a lot like doing `client.compute()`, above, except now we are passing the function and arguments directly to the cluster.

In [11]:
def inc(x):
    sleep(1)
    return x + 1


future_x = client.submit(inc, 1)
future_y = client.submit(inc, 2)
future_z = client.submit(sum, [future_x, future_y])
future_z

In [12]:
future_z

In [13]:
future_z.result()  # waits until result is ready

5

You can also get the result with client.gather()

In [14]:
client.gather(future)

5

The arguments to`client.submit` can be regular Python functions and objects, futures from other submit operations or `dask.delayed` objects.

### How does it work?

Each future represents a result held, or being evaluated by the cluster. Thus we can control caching of intermediate values - when a future is no longer referenced, its value is forgotten. In the solution, above, futures are held for each of the function calls. These results would not need to be re-evaluated if we chose to submit more work that needed them.

We can explicitly pass data from our local session into the cluster using `client.scatter()`, but usually it is better to construct functions that do the loading of data within the workers themselves, so that there is no need to serialize and communicate the data. Most of the loading functions within Dask, such as `dd.read_csv`, work this way. Similarly, we normally don't want to `gather()` results that are too big in memory.

## Why use Futures?

The futures API offers a work submission style that can easily emulate the map/reduce paradigm. If that is familiar to you then futures might be the simplest entrypoint into Dask. 

The other big benefit of futures is that the intermediate results, represented by futures, can be passed to new tasks without having to pull data locally from the cluster. New operations can be setup to work on the output of previous jobs that haven't even begun yet.


## Use Futures to parallelize task in a loop



This is an example of an embarrassingly parallel computation.  We want to run the same Python code on many pieces of data.  This is a very simple and also very common case that comes up all the time.


#### Sequential code

In [15]:
%%time 

data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
results = []

for x in data:
    y = inc(x)
    z = double(y)
    results.append(z)
    
results

CPU times: user 1.65 s, sys: 334 ms, total: 1.99 s
Wall time: 30 s


[4, 6, 8, 10, 12, 14, 16, 18, 20, 22]

#### Parallel code

In [16]:
%%time 

data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
results = []

for x in data:
    y = client.submit(inc, x)
    z = client.submit(double, y)
    results.append(z)
    
results = client.gather(results)
results

CPU times: user 279 ms, sys: 33.8 ms, total: 313 ms
Wall time: 3.14 s


[4, 6, 8, 10, 12, 14, 16, 18, 20, 22]

## Use futures for evolving computations

We're going to get a taste of this by learning about one Dask futures feature, [`as_completed`](https://docs.dask.org/en/stable/futures.html#distributed.as_completed), which lets us dynamically build up a computation as it completes.

We will use this to build a parallel web crawler over Stack Overflow.  

1.  First, we'll build this sequentially.
2.  Second, we'll learn how `as_completed` works in a simple example
3.  Third, we'll convert the sequential code into parallel code

### Sequential code to process files

In [17]:
filenames = ["file.{}.txt".format(i) for i in range(10)]
filenames[:3]

['file.0.txt', 'file.1.txt', 'file.2.txt']

In [18]:
import random, time

def parse_file(fn: str) -> list:
    """ Returns a list work items of unknown length """
    time.sleep(random.random())
    return [random.random() for _ in range(random.randint(1, 10))]

def process_item(x: float):
    """ Process each work item """
    time.sleep(random.random() / 4)
    return x + 1

In [19]:
%%time

# This takes around 10-20s

results = []

for fn in filenames:
    L = parse_file(fn)
    for x in L:
        out = process_item(x)
        results.append(out)

results

CPU times: user 730 ms, sys: 136 ms, total: 866 ms
Wall time: 12.2 s


[1.4445711769963663,
 1.4140448121265952,
 1.3483974206662148,
 1.8933150982575802,
 1.0446602505860456,
 1.9268368577147403,
 1.7469870949452906,
 1.862198558670623,
 1.1380021714907471,
 1.3547806366875568,
 1.031980611283724,
 1.698293317805444,
 1.3090419714040478,
 1.4279604467009372,
 1.5869770088377637,
 1.741358463157305,
 1.242927639543801,
 1.9691087283861295,
 1.622125100543673,
 1.6878374706854695,
 1.6043911370954387,
 1.7468969652379287,
 1.871327888757449,
 1.9526547334137943,
 1.1788005788456832,
 1.1713065730708596,
 1.5642778100170158,
 1.1885722931125866,
 1.2732558802743172,
 1.9379865614772749,
 1.4104209721301941,
 1.5845681148813864,
 1.7439611692747583,
 1.822488631574517,
 1.8076441174890137,
 1.4664879492555785,
 1.617273968951367,
 1.6979228476480661,
 1.9675774108233168,
 1.75922922632086,
 1.1314358333220256,
 1.3265716384774617,
 1.3712833586008215,
 1.628347149823241,
 1.7691801995546113,
 1.2106951241115411,
 1.7668190043071026,
 1.2869338301175004,
 1.4

### Learn about `as_completed`

When we want to change our computation on the fly the `as_completed` object becomes useful. 

In [20]:
from dask.distributed import as_completed
x, y, z = client.map(inc, [1, 2, 3])  
for future in as_completed([x, y, z]):  
    print(future.result())  

2
4
3


In [21]:
x, y, z = client.map(inc, [1, 2, 3])  
ac = as_completed([x, y, z])  

for future in ac:  
    print(future.result())  
    if random.random() < 0.5:  
        ac.add(client.submit(double, future))  

2
3
4
6


### Parallelize code to process files with as_completed

In [22]:
%%time

from dask.distributed import as_completed
import operator

lists = client.map(parse_file, filenames, pure=False)
lengths = client.map(len, lists)

mapping = dict(zip(lengths, lists))

futures = []

for future in as_completed(lengths):
    n = future.result()
    L = mapping[future]
    for i in range(n):
        new = client.submit(operator.getitem, L, i, priority=1)
        new = client.submit(process_item, new, priority=1)
        futures.append(new)

client.gather(futures)


CPU times: user 574 ms, sys: 54.8 ms, total: 629 ms
Wall time: 1.62 s


[1.516918189554453,
 1.331130901271693,
 1.3550902004934788,
 1.9118371647222192,
 1.0245595232901155,
 1.6992492642396293,
 1.308371603078526,
 1.2532233709949825,
 1.1642349049872345,
 1.5130151459801287,
 1.0513969863574908,
 1.4386469524404837,
 1.7327428376754406,
 1.8200916133048723,
 1.6632331667361273,
 1.038539033887397,
 1.614011197959774,
 1.8670292053315873,
 1.8710157142889527,
 1.4781147173140772,
 1.7554326950746213,
 1.1596568846338131,
 1.4954471106713907,
 1.9303035525849495,
 1.6499628016114587,
 1.1682276810091767,
 1.0001950716814796,
 1.2888742776517548,
 1.507227448446028,
 1.1247872943586994,
 1.6790680856707267,
 1.532592633495672,
 1.3333886360783147,
 1.773814957047351,
 1.587762236053707,
 1.416053199525821,
 1.7622077979657187,
 1.1062280478081639,
 1.2557728922946114,
 1.4275379294807449,
 1.9784753096808392,
 1.4458695197840563,
 1.4130502363475554,
 1.2735276154531925,
 1.9555759945709708,
 1.910649500968767,
 1.0978426960699015,
 1.4659224101755224,
 1.

## Clean up 

In [23]:
client.close()

## Useful links

- [Futures documentation](https://docs.dask.org/en/latest/futures.html)
- [Futures screencast](https://www.youtube.com/watch?v=07EiCpdhtDE)
- [Futures examples](https://examples.dask.org/futures.html)