## First, some sequental code

In [1]:
import time

In [2]:
def inc(x):
    time.sleep(1)
    return x + 1

In [3]:
%%time
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
results = []

for x in inputs:
    result = inc(x)
    results.append(result)

CPU times: user 2.49 ms, sys: 3.29 ms, total: 5.78 ms
Wall time: 10 s


## concurrent.futures

In [4]:
from concurrent.futures import ThreadPoolExecutor

e = ThreadPoolExecutor(4)
e

<concurrent.futures.thread.ThreadPoolExecutor at 0x7fc9b93c4430>

In [5]:
%%time
future = e.submit(inc, 10)
future

CPU times: user 427 µs, sys: 4.03 ms, total: 4.45 ms
Wall time: 3 ms


<Future at 0x7fc9b93c4be0 state=running>

#### It is in running state, but call it again:

In [6]:
future

<Future at 0x7fc9b93c4be0 state=finished returned int>

#### Now, we can see from above, it is in finished state

#### Now we can get the result of the future:

In [7]:
future.result()

11

#### Now, let's modify the `for` loop code above to take advantage of concurrent.futures:

In [8]:
%%time
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
futures = []

for x in inputs:
    future = e.submit(inc, x)
    futures.append(future)
    
# Now lets "block" our futures results:
result = [future.result() for future in futures]

CPU times: user 1.04 ms, sys: 8.34 ms, total: 9.38 ms
Wall time: 3.01 s


#### Now, instead of taking 10 seconds, it only takes 3 seconds

In [9]:
result

[2, 3, 4, 5, 6, 7, 8, 9, 10, 11]

## Using Dask Client which satisfies concurrent.futures API

In [13]:
from dask.distributed import Client

client = Client("tcp://127.0.0.1:37059")
client

0,1
Client  Scheduler: tcp://127.0.0.1:37059  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 8.13 GB


#### Now, we can modify the for loop code by swapping out the concurrent.futures executor `e` with our Dask `client`:

In [14]:
%%time
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
futures = []

for x in inputs:
    future = client.submit(inc, x)
    futures.append(future)
    
# Now lets "block" our futures results:
result = [future.result() for future in futures]

CPU times: user 40.7 ms, sys: 1.71 ms, total: 42.4 ms
Wall time: 3.1 s


## More real workflow

In [15]:
import time
import numpy as np

def load(x):
    time.sleep(0.2)
    return np.arange(1000000) + x

def process(x):
    time.sleep(0.1)
    return x + 1

def save(x):
    time.sleep(0.4)
    return None

In [16]:
%%time

inputs = range(50)

for i in inputs:
    x = load(i)
    y = process(x)
    save(y)

CPU times: user 527 ms, sys: 56.5 ms, total: 584 ms
Wall time: 35.6 s


## Let's use Dask `client` in our for loop instead:

In [17]:
%%time

inputs = range(50)
futures = []

for i in inputs:
    x = client.submit(load, i)
    y = client.submit(process, x)
    # future needs to be saved into a variable
    # Otherwise, Dask will auto clean/destroy it
    z = client.submit(save, y)
    futures.append(z)
    
# Now lets "block" our futures results:
result = [future.result() for future in futures]

CPU times: user 203 ms, sys: 31.3 ms, total: 234 ms
Wall time: 9.45 s


## A future is pointer to remote data

If you delete all of the futures, then the data can be released

In [18]:
L = [client.submit(load, i) for i in range(100)]  # If only 8GB of RAM, do not use more than 100!
#L2 = [client.submit(process, x) for x in L] alternativelly use map()
L2 = client.map(process, L)
L3 = client.map(load, L2)

#### These variables are taking up memory, so we need to delete them to release the memory space

In [19]:
del L3

In [20]:
del L2

In [21]:
del L

distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
asyncio.exceptions.CancelledError
