# Python的并行模块ipyparallel

ipyparallel是ipython项目下的一个子模块,主要是解决并行计算和分布式计算的问题

https://github.com/ipython/ipyparallel

这个模块是独立于ipython的独立子项目,需要额外安装

    $ pip install ipyparallel


## 单机并行计算


最简单的并行计算方法就是打开一个terminal,输入

    $ ipcluster start

然后在python,ipython中就都可使用并行计算了


## 为并行环境制作一个专用profile

    ipython profile create --parallel --profile=myprofile

命令可以简单的创建一个通用的并行环境profile,之后我们就可以通过编辑`~/.ipython/`这个文件夹下的配置文件来配置这个profile了

### 一个例子:做一次wordcount

数据来源是

    $wget http://www.gutenberg.org/files/27287/27287-0.txt
+ 不并行的版本

In [1]:
import re
import io
non_word = re.compile(r'[\W\d]+', re.UNICODE)
common_words = {
    'the','of','and','in','to','a','is','it','that','which','as','on','by',
    'be','this','with','are','from','will','at','you','not','for','no','have',
    'i','or','if','his','its','they','but','their','one','all','he','when',
    'than','so','these','them','may','see','other','was','has','an','there',
    'more','we','footnote', 'who', 'had', 'been',  'she', 'do', 'what',
    'her', 'him', 'my', 'me', 'would', 'could', 'said', 'am', 'were', 'very',
    'your', 'did', 'not',
}

In [2]:
filename = 'source/README.md'

In [3]:
def yield_words(filename):
    import io
    with io.open(filename, encoding='utf-8') as f:
        for line in f:
            for word in line.split():
                word = non_word.sub('', word.lower())
                if word and word not in common_words:
                    yield word

In [4]:
def word_count(filename):
    word_iterator = yield_words(filename)
    counts = {}
    counts = defaultdict(int)
    while True:
        try:
            word = next(word_iterator)
        except StopIteration:
            break
        else:
            counts[word] += 1
    return counts

In [5]:
from collections import defaultdict

In [6]:
%time counts = word_count(filename)

CPU times: user 3.32 ms, sys: 1.4 ms, total: 4.72 ms
Wall time: 10.9 ms


+ 并行版本

In [7]:
def split_text(filename):
    text = open(filename).read()
    lines = text.splitlines()
    nlines = len(lines)
    n = 10
    block = nlines//n
    for i in range(n):
        chunk = lines[i*block:(i+1)*(block)]
        with open('count_file%i.txt' % i, 'w') as f:
            f.write('\n'.join(chunk))
    cwd = os.path.abspath(os.getcwd())
    fnames = [ os.path.join(cwd, 'count_file%i.txt' % i) for i in range(n)] # 不用glob是为了精准
    return fnames

In [8]:
from ipyparallel import Client 

In [9]:
rc = Client()
view = rc.load_balanced_view()
v = rc[:]
v.push(dict(
     non_word=non_word,
     yield_words=yield_words,
     common_words=common_words
 ))

<AsyncResult: _push>

In [10]:
with rc[:].sync_imports():
    import  os
    from collections import defaultdict

importing os on engine(s)
importing defaultdict from collections on engine(s)


In [11]:
fnames = split_text(filename)

In [12]:
def count_parallel():
    from collections import defaultdict
    pcounts = view.map(word_count, fnames)
    counts = defaultdict(int)
    for pcount in pcounts.get():
        for k, v in pcount.iteritems():
            counts[k] += v
    return counts, pcounts

In [13]:
%time counts, pcounts = count_parallel()

CPU times: user 50.6 ms, sys: 8.82 ms, total: 59.4 ms
Wall time: 99.6 ms


可以看出cpu时间上确实减少了,几乎一半,但真实时间上却反而增加到了164ms,用`%timeit`查看,发现实际使用时间反而多出了20ms
这是因为cpu计算完后还要聚合结果,这个过程也得耗时,也就是说,并行是有额外开销的

## 最简单的应用--将函数提交到引擎中

并行就是多个核心同时执行任务了,最简单的就是执行重复任务了

In [14]:
c = Client()
a = lambda :"hi~"

In [15]:
%time c[:].apply_sync(a)

CPU times: user 22.6 ms, sys: 5.05 ms, total: 27.7 ms
Wall time: 35.4 ms


['hi~', 'hi~', 'hi~', 'hi~']

In [16]:
%time [a() for i in range(2)]

CPU times: user 10 µs, sys: 6 µs, total: 16 µs
Wall time: 17.9 µs


['hi~', 'hi~']

看得出,cpython还是相当给力的,在这种小规模计算上并行反而比用列表生成器慢很多

## 直接调用ipyparallel

我们可以通过`DirectView`直接在ipython中通过Client对象直接的操作多个engine

In [17]:
from ipyparallel import Client
rc = Client()

In [18]:
rc.ids#查看有多少个engine

[0, 1, 2, 3]

In [19]:
dview = rc[:]#使用全部engine

In [20]:
%time map(lambda x:x**2,range(32))

CPU times: user 21 µs, sys: 5 µs, total: 26 µs
Wall time: 26.9 µs


[0,
 1,
 4,
 9,
 16,
 25,
 36,
 49,
 64,
 81,
 100,
 121,
 144,
 169,
 196,
 225,
 256,
 289,
 324,
 361,
 400,
 441,
 484,
 529,
 576,
 625,
 676,
 729,
 784,
 841,
 900,
 961]

In [21]:
%time dview.map_sync(lambda x:x**2,range(32))# 并行的map工具

CPU times: user 31.3 ms, sys: 5.12 ms, total: 36.4 ms
Wall time: 41.4 ms


[0,
 1,
 4,
 9,
 16,
 25,
 36,
 49,
 64,
 81,
 100,
 121,
 144,
 169,
 196,
 225,
 256,
 289,
 324,
 361,
 400,
 441,
 484,
 529,
 576,
 625,
 676,
 729,
 784,
 841,
 900,
 961]

看来还是单进程给力哇

### 负载均衡view

并行的一大难题便是负载均衡,直接使用`DirectView`并没有这方面优化,可以使用`LoadBalancedView`来使用负载均衡的view

In [22]:
lview = rc.load_balanced_view()

In [23]:
%time lview.map_sync(lambda x:x**2,range(32))

CPU times: user 230 ms, sys: 47.3 ms, total: 277 ms
Wall time: 305 ms


[0,
 1,
 4,
 9,
 16,
 25,
 36,
 49,
 64,
 81,
 100,
 121,
 144,
 169,
 196,
 225,
 256,
 289,
 324,
 361,
 400,
 441,
 484,
 529,
 576,
 625,
 676,
 729,
 784,
 841,
 900,
 961]