# Load-balancing with IPython.parallel

In [1]:
import os,sys,time
import numpy as np

from IPython.core.display import display
from IPython import parallel
rc = parallel.Client()
dview = rc[:]

Create a LoadBalancedView

In [2]:
lview = rc.load_balanced_view()
lview

<LoadBalancedView None>

LoadBalancedViews behave very much like a DirectView on a single engine:

Each call to `apply()` results in a single remote computation,
and the result (or AsyncResult) of that call is returned directly,
rather than in a list, as in the multi-engine DirectView.

In [3]:
e0 = rc[0]

In [4]:
from numpy.linalg import norm
A = np.random.random(1024)

e0.apply_sync(norm, A, 2)

18.545900657405625

In [5]:
lview.apply_sync(norm, A, 2)

18.545900657405625

However, unlike the DirectView of a single engine, you are letting the IPython Scheduler decide which engine should do the work:

In [6]:
e0.apply_sync(os.getpid)

8191

In [7]:
for i in range(2*len(rc.ids)):
    pid = lview.apply_sync(os.getpid)
    print("task %i ran on: %i" % (i, pid))

task 0 ran on: 8196
task 1 ran on: 8197
task 2 ran on: 8198
task 3 ran on: 8191
task 4 ran on: 8196
task 5 ran on: 8197
task 6 ran on: 8198
task 7 ran on: 8191


# Map

The LoadBalancedView also has a load-balanced version of the builtin `map()`

In [8]:
lview.block = True

serial_result   =       map(lambda x:x**10, range(32))
parallel_result = lview.map(lambda x:x**10, range(32))

serial_result==parallel_result

False

Just like `apply()`, you can use non-blocking map with `block=False` or `map_async`

In [9]:
amr = lview.map_async(lambda x:x**10, range(32))
amr.msg_ids

['9ca1e2ac-8d2c-4ddf-bf23-1b548e808b7b',
 '4a7d5b12-648b-4b35-8655-b8c98b39c40b',
 '953c2c13-5614-4f97-8ecd-c23eba79f1e5',
 'befa90bb-a98a-48ab-aad9-125552f623fa',
 '5ce44ff5-c665-44ae-a4e4-23ae3a42a34c',
 'c337a2e0-1f9b-40d4-84fc-ddc38a9bf867',
 'b36c01ad-0518-4a1d-963b-ead3557f0403',
 'd6b26432-681a-459c-9691-441dbb1577c0',
 'ae98f719-13f6-4ad0-8bbf-1bff4832bc30',
 'e24e7467-456b-42a2-a972-bff5396d4edb',
 '061a57e1-c3da-49ec-a128-c7a7fb80a240',
 '85a3e251-bcc3-479a-9cd4-2caabd1f98fa',
 '4281000b-d0d7-48d3-be8c-32fe71c7d011',
 '008d4bc2-1c90-48d8-8eb8-fcb727a839f6',
 '8f1ab6ff-f298-4a8b-bc8e-99b5bedd7efb',
 '3c25be93-66c7-4805-b219-b93b20306446',
 '517a83a6-1a88-48c2-8e93-57b136b1d4d6',
 '8b30a252-ad30-4fbf-9c7e-c14051988de2',
 '47a22b56-d1fa-4a6b-aa19-9171ad8a7786',
 '85e9fadc-8e25-4f4b-91bf-3a1a119691fc',
 '2b45a804-09db-46fa-80e3-2427ab273bbd',
 '2c17117a-6782-4878-a551-442920b8133a',
 '52539e02-8bd3-40c1-9a68-355054d040fa',
 '3fc85e8f-cee4-4451-a070-44907a5b34fc',
 'bb0a8ead-80f2-

In [10]:
lview.map??

[1;31mSignature: [0m[0mlview[0m[1;33m.[0m[0mmap[0m[1;33m([0m[0mf[0m[1;33m,[0m [1;33m*[0m[0msequences[0m[1;33m,[0m [1;33m**[0m[0mkwargs[0m[1;33m)[0m[1;33m[0m[0m
[1;31mDocstring:[0m
``view.map(f, *sequences, block=self.block, chunksize=1, ordered=True)`` => list|AsyncMapResult

Parallel version of builtin `map`, load-balanced by this View.

`block`, and `chunksize` can be specified by keyword only.

Each `chunksize` elements will be a separate task, and will be
load-balanced. This lets individual elements be available for iteration
as soon as they arrive.

Parameters
----------

f : callable
    function to be mapped
*sequences: one or more sequences of matching length
    the sequences to be distributed and passed to `f`
block : bool [default self.block]
    whether to wait for the result or not
track : bool
    whether to create a MessageTracker to allow the user to
    safely edit after arrays and buffers during non-copying
    sends.
chunksize : int [de

In [11]:
amr = lview.map_async(lambda x:x**10, range(32), chunksize=4)
amr.msg_ids

['68a25ebf-b153-4968-a232-6712b283db65',
 'e020bcac-1712-4a7c-892f-081c05818188',
 '27e59f98-0b56-4b40-9b87-d577f19e5009',
 'f16e8605-d817-400b-a725-f9125dbe7d24',
 '94ca805f-43b1-481f-8465-3256786cd977',
 '9af0ad93-7b83-4c6b-8e6f-fd6a354faed9',
 '6310cd88-36d0-4fa9-823a-4185b7e406c8',
 '6c6504fc-f5c7-495e-87ea-9480907d258a']

## Map results are iterable!

AsyncResults with multiple results are actually iterable before their
results arrive.

This means that you can perform map/reduce operations on elements as
they come in:

In [12]:
lview.block = False

In [13]:
# scatter 'id', so id=0,1,2 on engines 0,1,2
dv = rc[:]
dv.scatter('id', rc.ids, flatten=True)
print(dv['id'])

# create a Reference to `id`. This will be a different value on each engine
ref = parallel.Reference('id')

tic = time.time()
ar = dv.apply(time.sleep, ref)
for i,r in enumerate(ar):
    print("%i: %.3f"%(i, time.time()-tic))

[0, 1, 2, 3]
0: 0.017
1: 1.013
2: 2.012
3: 3.022


In [14]:
amr = lview.map_async(time.sleep, [1] * 12)

In [15]:
amr.wait_interactive()

  12/12 tasks finished after    3 s
done


In [16]:
amr.wall_time, amr.elapsed

(3.049178, 3.049178)

In [17]:
amr.serial_time

12.034514000000001

Now we submit a bunch of tasks of increasing magnitude, and
watch where they happen, iterating through the results as they come.

In [21]:
def sleep_here(t):
    """sleep here for a time, return where it happened"""
    import time
    time.sleep(t)
    return id

amr = lview.map(sleep_here, [.01*t for t in range(100)])
tic = time.time()
for i,r in enumerate(amr):
    print("task %i on engine %i: %.3f" % (i, r, time.time()-tic))


task 0 on engine 2: 0.034
task 1 on engine 1: 0.035
task 2 on engine 0: 0.036
task 3 on engine 3: 0.036
task 4 on engine 2: 0.037
task 5 on engine 1: 0.038
task 6 on engine 0: 0.038
task 7 on engine 3: 0.039
task 8 on engine 2: 0.039
task 9 on engine 1: 0.039
task 10 on engine 0: 0.040
task 11 on engine 3: 0.040
task 12 on engine 2: 0.074
task 13 on engine 1: 0.110
task 14 on engine 0: 0.130
task 15 on engine 3: 0.161
task 16 on engine 2: 0.248
task 17 on engine 1: 0.292
task 18 on engine 0: 0.327
task 19 on engine 3: 0.351
task 20 on engine 2: 0.460
task 21 on engine 1: 0.522
task 22 on engine 0: 0.553
task 23 on engine 3: 0.593
task 24 on engine 2: 0.707
task 25 on engine 1: 0.787
task 26 on engine 0: 0.823
task 27 on engine 3: 0.870
task 28 on engine 2: 1.004
task 29 on engine 1: 1.079
task 30 on engine 0: 1.130
task 31 on engine 3: 1.193
task 32 on engine 2: 1.339
task 33 on engine 1: 1.427
task 34 on engine 0: 1.479
task 35 on engine 3: 1.547
task 36 on engine 2: 1.712
task 37 on 

In [22]:
amr.wall_time

13.051295

In [23]:
amr.serial_time

49.786598000000005

In [24]:
amr.serial_time / amr.wall_time

3.8146864353307475

Unlike `DirectView.map()`, which always results in one task per engine,
LoadBalance map defaults to one task per *item* in the sequence.  This
can be changed by specifying the `chunksize` keyword arg.

In [25]:
amr = lview.map(sleep_here, [.01*t for t in range(100)], chunksize=4)
tic = time.time()
for i,r in enumerate(amr):
    print("task %i on engine %i: %.3f"%(i, r, time.time()-tic))

task 0 on engine 2: 0.043
task 1 on engine 2: 0.043
task 2 on engine 2: 0.043
task 3 on engine 2: 0.043
task 4 on engine 1: 0.205
task 5 on engine 1: 0.206
task 6 on engine 1: 0.206
task 7 on engine 1: 0.206
task 8 on engine 0: 0.373
task 9 on engine 0: 0.373
task 10 on engine 0: 0.373
task 11 on engine 0: 0.373
task 12 on engine 3: 0.539
task 13 on engine 3: 0.539
task 14 on engine 3: 0.539
task 15 on engine 3: 0.539
task 16 on engine 2: 0.752
task 17 on engine 2: 0.753
task 18 on engine 2: 0.753
task 19 on engine 2: 0.753
task 20 on engine 1: 1.096
task 21 on engine 1: 1.096
task 22 on engine 1: 1.096
task 23 on engine 1: 1.096
task 24 on engine 0: 1.413
task 25 on engine 0: 1.413
task 26 on engine 0: 1.413
task 27 on engine 0: 1.413
task 28 on engine 3: 1.745
task 29 on engine 3: 1.746
task 30 on engine 3: 1.746
task 31 on engine 3: 1.746
task 32 on engine 2: 2.110
task 33 on engine 2: 2.110
task 34 on engine 2: 2.110
task 35 on engine 2: 2.110
task 36 on engine 1: 2.602
task 37 on 

# Excercise

## Parallelize nested loops

Often we want to run a function with a variety of combinations of arguments.
A useful skill is the ability to express a nested loop in terms of a map.

In [29]:
def area(w,h):
    return w*h


widths = range(1,4)
heights = range(6,10)

areas = []
for w in widths:
    for h in heights:
        areas.append(area(w,h))
areas

[6, 7, 8, 9, 12, 14, 16, 18, 18, 21, 24, 27]

In [26]:
%run ../hints
nesthint()

In [30]:
%load ../soln/nestedloop.py

 widths (1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3)
heights (6, 7, 8, 9, 6, 7, 8, 9, 6, 7, 8, 9)


Validate the result:

In [31]:
p_areas = ar.get()
p_areas

[6, 7, 8, 9, 12, 14, 16, 18, 18, 21, 24, 27]

In [32]:
areas == p_areas

True

## Examples and Exercises

- [Counting Words!](../examples/Counting%20Words.ipynb)
- [Monte Carlo Options Pricing](../examples/MC%20Options.ipynb)

Now that we've seen multiplexing and load-balancing, let's see how they are [used together](All%20Together.ipynb).