# Multiprocessing模块

multiprocessing属于Python标准模块，支持多线程和多进程编写。

## （一）为什么要使用多进程

> Python语言中的全局解释器锁GIL限制了1个进程中，所有线程序列化执行，即执行1个线程的同时，并不能执行另一个线程，但支持异步执行。
Multiprocessing可以充分调动CPU多核心优势，开启多进程处理。

## （二）multiprocessing常用函数

创建管理进程模块：

- Process（用于创建进程模块）
- Pool（用于创建管理进程池）
- Queue（用于进程通信，资源共享）
- Value，Array（用于进程通信，资源共享）
- Pipe（用于管道通信）
- Manager（用于资源共享）

同步子进程模块：

- Condition
- Event
- Lock
- RLock
- Semaphore



# Process
> 创建一个Process实例，创建子进程时，target指定一个执行函数，args指定函数的参数

> 用start()方法启动，join()方法可以等待子进程结束后再继续往下运行，通常用于进程间的同步。

In [3]:
from multiprocessing import Process
import os


def run_proc(name):
    print('Run child process %s (%s)...' % (name, os.getpid()))


if __name__ == '__main__':
    print('Parent process %s.' % os.getpid())
    p1 = Process(target=run_proc, args=('test', ))
    print('Child process will start.')
    p1.start()
    p1.join()
    print('Child process end.')

Parent process 10512.
Child process will start.
Child process end.


代码块中的内容需要保存为py文件后执行才能使`__main__`正确引用

In [2]:
from multiprocessing import Process
import os


def run_n(number):
    print("Now we run process{} with pid {}".format(number, os.getpid()))


if __name__ == '__main__':
    # 进程列表
    procs = []
    print("Run Init!", os.getpid())
    for number in range(10):
        proc = Process(target=run_n, args=(number, ))
        procs.append(proc)
        proc.start()

    for proc in procs:
        proc.join()  # 等待进程结束
    print("Run End")

Run Init! 10384
Run End


如果未指定Process的target参数，则默认运行Process.run()

`if __name__ == '__main__':`是必要的


# 环境与启动方法

-  [`multiprocessing`](https://docs.python.org/release/3.7.2/library/multiprocessing.html?highlight=multiprocessing#module-multiprocessing) 支持三种方法启动，且依赖于平台。

  > - ***spawn***
  >
  >   父进程重启一个新的python进程, 子进程只继承运行进程对象所必要的资源[`run()`](https://docs.python.org/release/3.7.2/library/multiprocessing.html?highlight=multiprocessing#multiprocessing.Process.run) 方法.尤其是不必要的文件描述符和父进程的句柄不会被继承。比起使用*fork* or *forkserver*，使用这个方法会比较慢.兼容 Unix and Windows。 默认为Windows。
  >
  > - ***fork***
  >
  >   父进程通过 [`os.fork()`](https://docs.python.org/release/3.7.2/library/os.html#os.fork) 分叉一个Python解释器。 子进程完全和父进程相同。所有父进程资源都会继承到子进程。需要注意的是，想安全的分叉多线程的进程可能存在问题。只有Unix允许使用。
  >
  > - ***forkserver***
  >
  >   无论何时需要加入新进程，那么父进程会接入服务器请求分叉一个新进程。分叉服务器是单行线程，所以使用[`os.fork()`](https://docs.python.org/release/3.7.2/library/os.html#os.fork)会比较安全。非必要的资源不会继承，可以支持在Unix管道上传递文件描述符的Unix平台。



在 `if __name__ == '__main__'` 使用 [`set_start_method()`](https://docs.python.org/release/3.7.2/library/multiprocessing.html?highlight=multiprocessing#multiprocessing.set_start_method) 选择一个启动方法。在一个程序中，仅使用一次。


In [None]:
import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    mp.set_start_method('spawn')
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

- 另外, 也可以选择 [`get_context()`](https://docs.python.org/release/3.7.2/library/multiprocessing.html?highlight=multiprocessing#multiprocessing.get_context) ，使用方法同上，但二者仅取一。

In [None]:
import multiprocessing as mp


def foo(q):
    q.put('hello')


if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

# 进程中替换对象

multiprocessing支持两种类型间的进程通讯。

## Queues
Queue.qsize()    返回队列的大小

Queue.empty()    如果队列为空，返回True,反之False

Queue.full()     如果队列满了，返回True,反之False

Queue.full 与 maxsize 大小对应

Queue.get([block[, timeout]]) 获取队列，timeout是等待时间

Queue.get_nowait()            相当Queue.get(False)

Queue.put(item)               写入队列，timeout是等待时间

Queue.put_nowait(item)        相当Queue.put(item, False)

Queue.task_done()             在完成一项工作之后，Queue.task_done()函数向任务已经完成的队列发送一个信号

Queue.join()                  实际上意味着等到队列为空，再执行别的操作

In [None]:
from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)

if __name__=='__main__':
    # 父进程创建Queue，并传给各个子进程：
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw，写入:
    pw.start()
    # 启动子进程pr，读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环，无法等待其结束，只能强行终止:
    pr.terminate()

## Pipes

> Pipes()函数返回一对通过双向管道连接的对象。

> 通过Pipe()连接的两个对象表示管道的两端。每个连接对象都有send()和recv()方法。 

> 如果2个进程或线程同时对管道的尾端读取或写入可能导致管道的崩溃。 如果使用不同的管道尾端处理那就没有崩溃风险。

发送对象必须满足序列化。

In [None]:
from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    #构造Pipe的两端。
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()

# 进程间同步

## 进程锁

Lock():为了保证在同一时间，多个进程使用共享资源不会被冲突，可以使用进程锁解决。

acquire():锁定进程

release():释放进程


In [None]:
import multiprocessing as mp
import time
#加进程锁
def job(v, num, l):
    l.acquire() # 锁住
    for _ in range(5):
        time.sleep(0.1) 
        v.value += num # 获取共享内存
        print(v.value)
    l.release() # 释放

def multicore():
    l = mp.Lock() # 定义一个进程锁
    v = mp.Value('i', 0) # 定义共享内存
    p1 = mp.Process(target=job, args=(v,1,l)) # 需要将lock传入
    p2 = mp.Process(target=job, args=(v,3,l)) 
    p1.start()
    p2.start()
    p1.join()
    p2.join()

if __name__ == '__main__':
    multicore()

In [None]:
#不加进程锁
import multiprocessing as mp
import time

def job(v, num):
    for _ in range(5):
        time.sleep(0.1) # 暂停0.1秒，让输出效果更明显
        v.value += num # v.value获取共享变量值
        print(v.value, end="")
        
def multicore():
    v = mp.Value('i', 0) # 定义共享变量
    p1 = mp.Process(target=job, args=(v,1))
    p2 = mp.Process(target=job, args=(v,3)) # 设定不同的number看如何抢夺内存
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    
if __name__ == '__main__':
    multicore()

如果不用lock进行不同进程间输出控制，会导致冲突。

# 进程间状态共享

如上所述，在同时执行程序时，需尽可能避免使用共享状态。

当然，如果在某些情况确实需要使用到，也可使用如下几个方法。

## 共享内存

数据可以存储在共享内存中，使用Value()或者Array()方法映射。

Array类，可以和共享内存交互，来实现在进程之间共享数据。它只能是一维的，不能是多维的。

In [None]:
from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

>在创建 `num` 和 `arr` 时使用的`i`,`d`是[`array`](https://docs.python.org/3.7/library/array.html#module-array) 模块的代码类型: `'d'`双倍精度浮点 ，`'i'` 表示有符整数。
>
> 更多其他用法参考 [`multiprocessing.sharedctypes`](https://docs.python.org/3.7/library/multiprocessing.html?highlight=process#module-multiprocessing.sharedctypes) 模块，支持任意ctypes对象分配共享内存。

| Type code | C Type             | Python Type       | Minimum size in bytes | Notes |
| --------- | ------------------ | ----------------- | --------------------- | ----- |
| `'b'`     | signed char        | int               | 1                     |       |
| `'B'`     | unsigned char      | int               | 1                     |       |
| `'u'`     | Py_UNICODE         | Unicode character | 2                     | (1)   |
| `'h'`     | signed short       | int               | 2                     |       |
| `'H'`     | unsigned short     | int               | 2                     |       |
| `'i'`     | signed int         | int               | 2                     |       |
| `'I'`     | unsigned int       | int               | 2                     |       |
| `'l'`     | signed long        | int               | 4                     |       |
| `'L'`     | unsigned long      | int               | 4                     |       |
| `'q'`     | signed long long   | int               | 8                     | (2)   |
| `'Q'`     | unsigned long long | int               | 8                     | (2)   |
| `'f'`     | float              | float             | 4                     |       |
| `'d'`     | double             | float             | 8                     |       |

Notes:

1. `'u'`类型和Python的弃用的unicode字符 ([`Py_UNICODE`](https://docs.python.org/3.5/c-api/unicode.html#c.Py_UNICODE) which is `wchar_t`). 通过平台区分16bits或32bits.

   `'u'`将在4.0版本从 [`Py_UNICODE`](https://docs.python.org/3.5/c-api/unicode.html#c.Py_UNICODE) API移除.

2. `'q'` 和`'Q'`只有在支持用C编译器编译Python的平台支持C `long long`, 如果是在Windows上，则为`__int64`.


# 服务端进程

> `Manager()`方法返回的一个管理对象，控制一个支持Python对象的服务端进程，使其他进程可以通过代理操作。
>
> 支持的类型： [`list`](https://docs.python.org/3.7/library/stdtypes.html#list), [`dict`](https://docs.python.org/3.7/library/stdtypes.html#dict), [`Namespace`](https://docs.python.org/3.7/library/multiprocessing.html?highlight=process#multiprocessing.managers.Namespace), [`Lock`](https://docs.python.org/3.7/library/multiprocessing.html?highlight=process#multiprocessing.Lock), [`RLock`](https://docs.python.org/3.7/library/multiprocessing.html?highlight=process#multiprocessing.RLock), [`Semaphore`](https://docs.python.org/3.7/library/multiprocessing.html?highlight=process#multiprocessing.Semaphore), [`BoundedSemaphore`](https://docs.python.org/3.7/library/multiprocessing.html?highlight=process#multiprocessing.BoundedSemaphore), [`Condition`](https://docs.python.org/3.7/library/multiprocessing.html?highlight=process#multiprocessing.Condition), [`Event`](https://docs.python.org/3.7/library/multiprocessing.html?highlight=process#multiprocessing.Event), [`Barrier`](https://docs.python.org/3.7/library/multiprocessing.html?highlight=process#multiprocessing.Barrier), [`Queue`](https://docs.python.org/3.7/library/multiprocessing.html?highlight=process#multiprocessing.Queue), [`Value`](https://docs.python.org/3.7/library/multiprocessing.html?highlight=process#multiprocessing.Value) , [`Array`](https://docs.python.org/3.7/library/multiprocessing.html?highlight=process#multiprocessing.Array). 
>


In [None]:
from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        p = Process(target=f, args=(d, l))
        p.start()
        p.join()

        print(d)
        print(l)

# 进程池

使用[`Pool`](https://docs.python.org/3.7/library/multiprocessing.html?highlight=process#multiprocessing.pool.Pool) 类表示一池的进程，提供方法允许多个任务从运行进程以不同方式做卸下处理。

In [None]:
from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    # start 4 worker processes
    with Pool(processes=4) as pool:

        # print "[0, 1, 4,..., 81]"
        print(pool.map(f, range(10)))

        # print same numbers in arbitrary order
        for i in pool.imap_unordered(f, range(10)):
            print(i)

        # evaluate "f(20)" asynchronously
        res = pool.apply_async(f, (20,))      # runs in *only* one process
        print(res.get(timeout=1))             # prints "400"

        # evaluate "os.getpid()" asynchronously
        res = pool.apply_async(os.getpid, ()) # runs in *only* one process
        print(res.get(timeout=1))             # prints the PID of that process

        # launching multiple evaluations asynchronously *may* use more processes
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])

        # make a single worker sleep for 10 secs
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("We lacked patience and got a multiprocessing.TimeoutError")

        print("For the moment, the pool remains available for more work")

    # exiting the 'with'-block has stopped the pool
    print("Now the pool is closed and no longer available")

> 要注意，pool的方法只能被创建的进程使用。

>For循环中执行步骤：

>（1）循环遍历，将500个子进程添加到进程池（相对父进程会阻塞）

>（2）每次执行4个子进程，等一个子进程执行完后，立马启动新的子进程。（相对父进程不阻塞）

apply_async为异步进程池写法。即非阻塞异步指的是启动子进程的过程，与父进程本身的执行（print）是异步的，而For循环中往进程池添加子进程的过程，与父进程本身的执行却是同步的。

对Pool对象调用join()方法会等待所有子进程执行完毕，
调用join()之前必须先调用close()，
调用close()之后就不能继续添加新的Process了。

In [None]:
# 如果要启动大量的子进程，可以用进程池的方式批量创建子进程：
from multiprocessing import Pool
import os
import time
import random


def long_time_task(name):
    print('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))


if __name__ == '__main__':
    print('Parent process %s.' % os.getpid())
    # 如何是基于CPU核数量
    p = Pool(4)
    for i in range(500):
        p.apply_async(long_time_task, args=(i, ))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')

>实际测试发现，for循环内部执行步骤：

>（1）遍历500个可迭代对象，往进程池放一个子进程

>（2）执行这个子进程，等子进程执行完毕，再往进程池放一个子进程，再执行。（同时只执行一个子进程）

>for循环执行完毕，再执行print函数。

In [None]:
from multiprocessing import Pool
def test(p):
    print (p)
    time.sleep(3)
if __name__=="__main__":
    pool = Pool(processes=10)
    for i  in range(500):
        pool.apply(test, args=(i,))   #维持执行的进程总数为10，当一个进程执行完后启动一个新进程.
    print ("test")
    pool.close()
    pool.join()

# 分布式进程

参考廖雪峰python3 分布式进程

在Thread和Process中，应当优选Process，因为Process更稳定，而且，Process可以分布到多台机器上，而Thread最多只能分布到同一台机器的多个CPU上。

Python的`multiprocessing`模块不但支持多进程，其中`managers`子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者，将任务分布到其他多个进程中，依靠网络通信。由于`managers`模块封装很好，不必了解网络通信的细节，就可以很容易地编写分布式多进程程序。

举个例子：如果我们已经有一个通过`Queue`通信的多进程程序在同一台机器上运行，现在，由于处理任务的进程任务繁重，希望把发送任务的进程和处理任务的进程分布到两台机器上。怎么用分布式进程实现？

原有的`Queue`可以继续使用，但是，通过`managers`模块把`Queue`通过网络暴露出去，就可以让其他机器的进程访问`Queue`了。

我们先看服务进程，服务进程负责启动`Queue`，把`Queue`注册到网络上，然后往`Queue`里面写入任务：

In [1]:
#task_master.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import random, time, queue
from multiprocessing.managers import BaseManager

# 发送任务的队列:
task_queue = queue.Queue()
# 接收结果的队列:
result_queue = queue.Queue()

# 从BaseManager继承的QueueManager:
class QueueManager(BaseManager):
    pass

# 把两个Queue都注册到网络上, callable参数关联了Queue对象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 绑定端口5000, 设置验证码'abc':
manager = QueueManager(address=('', 5000), authkey=b'abc')
# 启动Queue:
manager.start()
# 获得通过网络访问的Queue对象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放几个任务进去:
for i in range(10):
    n = random.randint(0, 10000)
    print('Put task %d...' % n)
    task.put(n)
# 从result队列读取结果:
print('Try get results...')
for i in range(10):
    r = result.get(timeout=10)
    print('Result: %s' % r)
# 关闭:
manager.shutdown()
print('master exit.')

PicklingError: Can't pickle <function <lambda> at 0x000002026FF31B70>: attribute lookup <lambda> on __main__ failed

In [None]:
#task_worker.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import time, sys
from multiprocessing.managers import BaseManager

# 创建类似的QueueManager:
class QueueManager(BaseManager):
    pass

# 由于这个QueueManager只从网络上获取Queue，所以注册时只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 连接到服务器，也就是运行task_master.py的机器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和验证码注意保持与task_master.py设置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 客户端从网络连接:
m.connect()
# 获取Queue的对象:
task = m.get_task_queue()
result = m.get_result_queue()
# 从task队列取任务,并把结果写入result队列:
for i in range(10):
    try:
        n = task.get(timeout=1)
        print('run task %d * %d...' % (n, n))
        r = '%d * %d = %d' % (n, n, n*n)
        time.sleep(1)
        result.put(r)
    except Queue.Empty:
        print('task queue is empty.')
# 处理结束:
print('worker exit.')

而`Queue`之所以能通过网络访问，就是通过`QueueManager`实现的。由于`QueueManager`管理的不止一个`Queue`，所以，要给每个`Queue`的网络调用接口起个名字，比如`get_task_queue`。

`authkey`有什么用？这是为了保证两台机器正常通信，不被其他机器恶意干扰。如果`task_worker.py`的`authkey`和`task_master.py`的`authkey`不一致，肯定连接不上。