In [1]:
from ipyparallel import Client

In [2]:
rc=Client()

![](res/architecture.png)

- **direct interface**: have a direct access to every engine
- **load-balanced interface**: submit jobs to a scheduler which dynamically assigns them to the engines.

### direct interface

In [3]:
rc.ids

[0, 1, 2, 3, 4, 5, 6, 7]

In [4]:
%%px

# 分配任务给所有的engines
import os, time
print(os.getpid())

[stdout:0] 5792
[stdout:1] 2872
[stdout:2] 7872
[stdout:3] 6896
[stdout:4] 5260
[stdout:5] 2880
[stdout:6] 7356
[stdout:7] 7964


In [5]:
%%px --targets 2:6
# 指定engine分配任务
print(os.getpid())

[stdout:2] 7872
[stdout:3] 6896
[stdout:4] 5260
[stdout:5] 2880


In [6]:
# create direct view with 0-6 engines
view=rc[:-1]
view

<DirectView [0, 1, 2, 3,...]>

In [7]:
# balanced views
balance_view=rc.load_balanced_view()

### without parallel

In [8]:
def sample(n):
    import numpy as np
    x, y=np.random.rand(2,n)
    # 统计所有在1/4圆内的点个数
    return (x**2+y**2<=1).sum()

def get_pi(n_in, n):
    return 4*n_in/n

In [9]:
N=100000000

# 两次运行，每次一个循环
%timeit -n 1 -r 2 get_pi(sample(N), N)

2.95 s ± 42.5 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)


### with parallel

In [10]:
# 平均分成8份
subtasks_num=len(rc)
args=[N//subtasks_num]*subtasks_num

`map()`, `map_async()` is asynchronous, and return `AsyncResult` object for getting tasks status and retriving result

In [11]:
# args必须是一个list, sample()会被调用len(args)次
ar=balance_view.map(sample, args)

In [12]:
len(ar)

8

In [13]:
# block the interactive session until all tasks have been completed.
ar.wait()

True

In [14]:
ar.elapsed, ar.serial_time

(1.073355, 8.196223000000002)

In [15]:
import numpy as np

In [16]:
get_pi(np.sum(ar.result()), N)

3.1415662

`map_sync()` blocks until the tasks have completed, and directly return the results

In [17]:
ar2=balance_view.map_sync(sample, args)