# Table of Contents
 <p><div class="lev1"><a href="#The-AsyncResult-object"><span class="toc-item-num">1&nbsp;&nbsp;</span>The AsyncResult object</a></div><div class="lev2"><a href="#Beyond-multiprocessing’s-AsyncResult"><span class="toc-item-num">1.1&nbsp;&nbsp;</span>Beyond multiprocessing’s AsyncResult</a></div><div class="lev3"><a href="#get_dict"><span class="toc-item-num">1.1.1&nbsp;&nbsp;</span>get_dict</a></div><div class="lev2"><a href="#Metadata"><span class="toc-item-num">1.2&nbsp;&nbsp;</span>Metadata</a></div><div class="lev3"><a href="#Timing"><span class="toc-item-num">1.2.1&nbsp;&nbsp;</span>Timing</a></div><div class="lev2"><a href="#Map-results-are-iterable!"><span class="toc-item-num">1.3&nbsp;&nbsp;</span>Map results are iterable!</a></div>

In [12]:
from ipyparallel import Client
import ipyparallel as ipp
import os
import numpy as np
from math import sqrt
from functools import reduce
rc = Client(profile='asyncresult-object')

# The AsyncResult object


In non-blocking mode, apply() submits the command to be executed and then returns a AsyncResult object immediately. The AsyncResult object gives you a way of getting a result at a later time through its get() method, but it also collects metadata on execution.

## Beyond multiprocessing’s AsyncResult

**Note:**

    The AsyncResult object provides a superset of the interface in multiprocessing.pool.AsyncResult. See the official Python documentation for more on the basics of this interface.
    
The AsyncResult objects add a number of convenient features for working with parallel results, beyond what is provided by the original AsyncResult.

### get_dict

First, is AsyncResult.get_dict(), which pulls results as a dictionary keyed by engine_id, rather than a flat list. This is useful for quickly coordinating or distributing information about all of the engines.

As an example, here is a quick call that gives every engine a dict showing the PID of every other engine:

In [13]:
ar = rc[:].apply_async(os.getpid)
pids = ar.get_dict()
print(pids)
rc[:]['pid_map'] = pids

{0: 11700, 1: 11713, 2: 11714, 3: 11716}


##  Metadata

ipyparallel tracks some metadata about the tasks, which is stored in the Client.metadata dict. The AsyncResult object gives you an interface for this information as well, including timestamps stdout/err, and engine IDs.

### Timing

IPython tracks various timestamps as datetime objects, and the AsyncResult object has a few properties that turn these into useful times (in seconds as floats).

For use while the tasks are still pending:

- ar.elapsed is just the elapsed seconds since submission, for use before the AsyncResult is complete.
- ar.progress is the number of tasks that have completed. Fractional progress would be:

    1.0 * ar.progress / len(ar)
- AsyncResult.wait_interactive() will wait for the result to finish, but print out status updates on progress and elapsed time while it waits.

For use after the tasks are done:

- ar.serial_time is the sum of the computation time of all of the tasks done in parallel.
- ar.wall_time is the time between the first task submitted and last result received. This is the actual cost of computation, including IPython overhead.

**Note:**

    wall_time is only precise if the Client is waiting for results when the task finished, because the received timestamp is made when the result is unpacked by the Client, triggered by the spin() call. If you are doing work in the Client, and not waiting/spinning, then received might be artificially high.

An often interesting metric is the time it actually cost to do the work in parallel relative to the serial computation, and this can be given simply with

    speedup = ar.serial_time / ar.wall_time

## Map results are iterable!



When an AsyncResult object has multiple results (e.g. the AsyncMapResult object), you can actually iterate through results themselves, and act on them as they arrive:

In [14]:
import time
def sleep_here(t):
    import time
    time.sleep(t)
    return id,t

In [15]:
dv = rc[:]
v = rc.load_balanced_view()
# scatter 'id', so id=0,1,2 on engines 0,1,2
dv.scatter('id', rc.ids, flatten=True)
print("Engine IDs: ", dv['id'])

Engine IDs:  [0, 1, 2, 3]


In [16]:
# create a Reference to `id`. This will be a different value on each engine
ref = ipp.Reference('id')
print("sleeping for `id` seconds on each engine")
tic = time.time()
ar = dv.apply(time.sleep, ref)
for i,r in enumerate(ar):
    print("%i: %.3f"%(i, time.time()-tic))
print(ar.get())

sleeping for `id` seconds on each engine
0: 0.024
1: 1.022
2: 2.022
3: 3.025
[None, None, None, None]


In [17]:
# one call per task
print("running with one call per task")
amr = v.map(sleep_here, [.1*t for t in range(10)])
tic = time.time()
for i,r in enumerate(amr):
    print("task %i on engine %i: %.3f" % (i, r[0], time.time()-tic))
print(amr.get())

running with one call per task
task 0 on engine 3: 0.000
task 1 on engine 0: 0.102
task 2 on engine 1: 0.223
task 3 on engine 2: 0.325
task 4 on engine 3: 0.427
task 5 on engine 0: 0.611
task 6 on engine 1: 0.834
task 7 on engine 2: 1.039
task 8 on engine 3: 1.238
task 9 on engine 0: 1.526
[(3, 0.0), (0, 0.1), (1, 0.2), (2, 0.30000000000000004), (3, 0.4), (0, 0.5), (1, 0.6000000000000001), (2, 0.7000000000000001), (3, 0.8), (0, 0.9)]


In [18]:
print("running with four calls per task")
# with chunksize, we can have four calls per task
amr = v.map(sleep_here, [0.1*t for t in range(10)], chunksize=2)
tic = time.time()
for i,r in enumerate(amr):
    print("task %i on engine %i: %.3f" % (i, r[0], time.time()-tic))
print(amr.get())

running with four calls per task
task 0 on engine 1: 0.109
task 1 on engine 1: 0.109
task 2 on engine 2: 0.517
task 3 on engine 2: 0.518
task 4 on engine 3: 0.925
task 5 on engine 3: 0.925
task 6 on engine 0: 1.326
task 7 on engine 0: 1.326
task 8 on engine 1: 1.825
task 9 on engine 1: 1.826
[(1, 0.0), (1, 0.1), (2, 0.2), (2, 0.30000000000000004), (3, 0.4), (3, 0.5), (0, 0.6000000000000001), (0, 0.7000000000000001), (1, 0.8), (1, 0.9)]


In [19]:
print("running with two calls per task, with unordered results")
# We can even iterate through faster results first, with ordered=False
amr = v.map(sleep_here, [.1*t for t in range(10,0,-1)], ordered=False, chunksize=2)
tic = time.time()
for i,r in enumerate(amr):
    print("slept %.2fs on engine %i: %.3f" % (r[1], r[0], time.time()-tic))
print(amr.get())

running with two calls per task, with unordered results
slept 0.40s on engine 1: 0.711
slept 0.30s on engine 1: 0.714
slept 0.20s on engine 1: 1.022
slept 0.10s on engine 1: 1.023
slept 0.60s on engine 0: 1.112
slept 0.50s on engine 0: 1.112
slept 0.80s on engine 3: 1.509
slept 0.70s on engine 3: 1.509
slept 1.00s on engine 2: 1.885
slept 0.90s on engine 2: 1.885
[(2, 1.0), (2, 0.9), (3, 0.8), (3, 0.7000000000000001), (0, 0.6000000000000001), (0, 0.5), (1, 0.4), (1, 0.30000000000000004), (1, 0.2), (1, 0.1)]


That is to say, if you treat an AsyncMapResult as if it were a list of your actual results, it should behave as you would expect, with the only difference being that you can start iterating through the results before they have even been computed.

This lets you do a dumb version of map/reduce with the builtin Python functions, and the only difference between doing this locally and doing it remotely in parallel is using the asynchronous view.map instead of the builtin map.

Here is a simple one-line RMS (root-mean-square) implemented with Python’s builtin map/reduce.

In [20]:
lview = rc.load_balanced_view()
X = np.linspace(0,100)
add = lambda a,b: a+b
sq = lambda x: x*x

In [21]:
sqrt(reduce(add, map(sq, X)) / len(X))

58.028845747399714

In [22]:
sqrt(reduce(add, lview.map(sq, X)) / len(X))

58.028845747399714

To break that down:

- map(sq, X) Compute the square of each element in the list (locally, or in parallel)
- reduce(add, sqX) / len(X) compute the mean by summing over the list (or AsyncMapResult) and dividing by the size
- take the square root of the resulting number

**See also**

    When AsyncResult or the AsyncMapResult don’t provide what you need (for instance, handling individual results as they arrive, but with metadata), you can always just split the original result’s msg_ids attribute, and handle them as you like.