# 并行编程 - 多进程篇

## 为什么要使用并行编程？
**提高性能，最大限度的压榨硬件的性能**

并行的分类
- 单机并行
    - 多进程
    - 多线程
    - 协程
    - GPU
- 多机并行
    - 分布式


- 大型互联网
- 数据分析
- 大规模算法计算

## 芯片结构历史
![](../asset/cpu-history.jpg)
CPU演进的两个方向
- 提高单时钟周期内的处理速度；
    - 提高时钟周期的处理速度，会增加散热，降低每瓦特的性能
    - 甚至会需要特殊的冷却设备，增加成本
- 提高芯片的核心数 ✨

## 什么是进程、线程，以及它们的关系
![](../asset/factory.jpg)

- CPU
    + 相当于一个工厂，这个工厂是一个流水线的作业，在同一时间内只能做一道生产工序
- 进程
    + 就相当于在生产过程中的每一道生产工序，这些工序之间都是互相独立的，也由于这是一个流水线的作业，所以工厂在同一时刻只能做一道生产工序
- 线程
    + 相当于工厂中的工人，在每一道生产工序中可能包含很多的工人，每个进程中可以有多个线程
- 共享内存
    + 车间（进程）的空间对于多个工人来讲都是共享的，里面的所有数据/资源大家都是可以访问的
- 资源竞争
    + 有些车间的空间很小，比如说卫生间。里面有人的时候，其他人就不能进去了。这代表一个线程使用某些共享内存时，其他线程必须等它结束，才能使用这一块内存。
- 锁
    + 一个防止他人进入的简单方法，就是门口加一把锁。先到的人锁上门，后到的人看到上锁，就在门口排队，等锁打开再进去。这就叫"互斥锁"（Mutual exclusion，缩写 Mutex），防止多个线程同时读写某一块内存区域。
- 信号量
    + 还有些房间，可以同时容纳n个人，比如厨房。也就是说，如果人数大于n，多出来的人只能在外面等着。这好比某些内存区域，只能供给固定数目的线程使用。这时的解决方法，就是在门口挂n把钥匙。进去的人就取一把钥匙，出来时再把钥匙挂回原处。后到的人发现钥匙架空了，就知道必须在门口排队等着了。这种做法叫做"信号量"（Semaphore），用来保证多个线程不会互相冲突。不难看出，mutex是semaphore的一种特殊情况（n=1时）。也就是说，完全可以用后者替代前者。但是，因为mutex较为简单，且效率高，所以在必须保证资源独占的情况下，还是采用这种设计。
    
       
**总结操作系统的设计，可以归结为三点**

（1）多进程：允许多个任务同时运行；

（2）多线程：允许单个任务分成不同的部分运行；

（3）协调机制：一方面防止进程之间和线程之间产生冲突，另一方面允许进程之间和线程之间共享资源。

![](../asset/process-thread.jpeg)

## 单进程任务

In [16]:
import time
import math


def task():
    # 计算任务
    for _ in range(6):
        for _ in range(int(1e7)):
            math.sin(40) + math.cos(40)
    return
        
    
start_time = time.time()    
task()
print("cost time: {:.4f}s".format(time.time() - start_time))

cost time: 16.5727s


## 改造成多进程处理

In [21]:
# %load src/multiprocess-reconstruct.py
import multiprocessing
import time
import math


def task():
    """一些耗时比较长的计算任务"""
    for _ in range(int(1e7)):
        math.sin(40) + math.cos(40)
    return


if __name__ == '__main__':
    start_time = time.time()
    processes = []
    for _ in range(2):
        p = multiprocessing.Process(target=task)
        processes.append(p)
        p.start()

    for p in processes:
        p.join()
        
    print("cost time: {:.4f}s".format(time.time() - start_time))

cost time: 3.4562s


In [20]:
!python src/multiprocess-reconstruct.py

cost time: 3.0708s


## 改造成进程池处理

![](../asset/process-pool.png)

In [None]:
# %load src/multiprocess-reconstruct-pool.py
from multiprocessing import Pool
import time
import math


def task(a):
    # 计算密集型
    for _ in range(int(1e7)):
        math.sin(40) + math.cos(40)
    return


if __name__ == '__main__':
    start_time = time.time()

    with Pool(processes=8) as p:
        res = p.map(task, list(range(6)))
    print("cost time: {:.4f}s".format(time.time() - start_time))


In [None]:
!python src/multiprocess-reconstruct.py

In [24]:
l = [1, 2, 3]

def printI(i):
    print(i)
    
list(map(printI, l))

1
2
3


[None, None, None]

**进程池的好处**
- 可以节省初始化进程的时间
- 可以有效控制进程个数，最大化整体性能

**进程池最常使用的形式**

In [25]:
import multiprocessing
cpu = multiprocessing.cpu_count()

print(cpu)
pool = multiprocessing.Pool(cpu)

64


<multiprocessing.pool.Pool at 0x7f2823f6ead0>

## 进程的更多种用法
https://docs.python.org/3/library/multiprocessing.html?highlight=multiprocess#module-multiprocessing
### 常规多进程的写法

In [None]:
# %load src/multiprocessing-case1.py
import multiprocessing


def foo(i):
    print('called function in process: %s' % i)
    return


if __name__ == '__main__':
    process_jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=foo, args=(i,))
        process_jobs.append(p)
        p.start()
        p.join()

In [73]:
!python src/multiprocessing-case1.py

called function in process: 0
called function in process: 1
called function in process: 2
called function in process: 3
called function in process: 4


### 进程池

In [2]:
%time
# 创建进程的时间消耗
import multiprocessing


p = multiprocessing.Process()

CPU times: user 5 µs, sys: 0 ns, total: 5 µs
Wall time: 12.2 µs


In [None]:
# %load src/multiprocessing-pool.py
import multiprocessing


def function_square(data):
    result = data * data
    return result


if __name__ == '__main__':
    inputs = list(range(100))
    pool = multiprocessing.Pool(processes=4)
    pool_outputs = pool.map(function_square, inputs)
    pool.close()
    pool.join()
    print('Pool    :', pool_outputs)

In [6]:
!python src/multiprocessing-pool.py

Pool    : [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521, 1600, 1681, 1764, 1849, 1936, 2025, 2116, 2209, 2304, 2401, 2500, 2601, 2704, 2809, 2916, 3025, 3136, 3249, 3364, 3481, 3600, 3721, 3844, 3969, 4096, 4225, 4356, 4489, 4624, 4761, 4900, 5041, 5184, 5329, 5476, 5625, 5776, 5929, 6084, 6241, 6400, 6561, 6724, 6889, 7056, 7225, 7396, 7569, 7744, 7921, 8100, 8281, 8464, 8649, 8836, 9025, 9216, 9409, 9604, 9801]


### 守护进程

In [None]:
# %load multiprocessing-case2.py
import multiprocessing
import time


def foo():
    name = multiprocessing.current_process().name
    print("Starting %s " % name)
    time.sleep(3)
    print("Exiting %s " % name)


if __name__ == '__main__':
    background_process = multiprocessing.Process(name='background_process', target=foo)
    background_process.daemon = True
    NO_background_process = multiprocessing.Process(name='NO_background_process', target=foo)
    NO_background_process.daemon = False
    background_process.start()
    NO_background_process.start()

In [10]:
!python src/multiprocessing-case2.py

Starting NO_background_process 
Exiting NO_background_process 


### 杀死进程

In [26]:
# %load multiprocessing-case3.py
import multiprocessing
import time


def foo():
    print('Starting function')
    time.sleep(0.1)
    print('Finished function')


if __name__ == '__main__':
    p = multiprocessing.Process(target=foo)
    print('Process before execution:', p, p.is_alive())
    p.start()
    print('Process running:', p, p.is_alive())
    p.terminate()
    print('Process terminated:', p, p.is_alive())
    p.join()
    print('Process joined:', p, p.is_alive())
    print('Process exit code:', p.exitcode)


Process before execution: <Process(Process-154, initial)> False
Process running: <Process(Process-154, started)> True
Process terminated: <Process(Process-154, started)> True
Process joined: <Process(Process-154, stopped[SIGTERM])> False
Process exit code: -15


In [12]:
!python src/multiprocessing-case3.py

Process before execution: <Process name='Process-1' parent=44966 initial> False
Process running: <Process name='Process-1' pid=44968 parent=44966 started> True
Process terminated: <Process name='Process-1' pid=44968 parent=44966 started> True
Process joined: <Process name='Process-1' pid=44968 parent=44966 stopped exitcode=-SIGTERM> False
Process exit code: -15


### 自定义进程类

In [27]:
# %load multiprocessing-case4.py
import multiprocessing


class MyProcess(multiprocessing.Process):
    def run(self):
        print('called run method in process: %s' % self.name)
        return


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = MyProcess()
        jobs.append(p)
        p.start()
        p.join()


called run method in process: MyProcess-155
called run method in process: MyProcess-156
called run method in process: MyProcess-157
called run method in process: MyProcess-158
called run method in process: MyProcess-159


In [15]:
!python src/multiprocessing-case4.py

called run method in process: MyProcess-1
called run method in process: MyProcess-2
called run method in process: MyProcess-3
called run method in process: MyProcess-4
called run method in process: MyProcess-5


## 进程间的数据交换
### 使用Queue进行数据交换
![](../asset/Data_Queue.svg)
![](../asset/communication-channel.png)

In [None]:
# %load src/multiprocessing-case5.py
import multiprocessing
import random
import time


class Producer(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        for i in range(10):
            item = random.randint(0, 256)
            self.queue.put(item)
            print("Process Producer : item %d appended to queue %s" % (item, self.name))
            time.sleep(1)
            print("The size of queue is %s" % self.queue.qsize())


class Consumer(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            if self.queue.empty():
                print("the queue is empty")
                break
            else:
                time.sleep(2)
                item = self.queue.get()
                print('Process Consumer : item %d popped from by %s \n' % (item, self.name))
                time.sleep(1)


if __name__ == '__main__':
    queue = multiprocessing.Queue()
    process_producer = Producer(queue)
    process_consumer = Consumer(queue)
    process_producer.start()
    process_consumer.start()
    process_producer.join()
    process_consumer.join()


一个在OSX平台上的Bug

In [28]:
!python src/multiprocessing-case5.py

Process Producer : item 39 appended to queue Producer-1
The size of queue is 1
Process Producer : item 89 appended to queue Producer-1
The size of queue is 2
Process Producer : item 3 appended to queue Producer-1
Process Consumer : item 39 popped from by Consumer-2 

The size of queue is 2
Process Producer : item 143 appended to queue Producer-1
The size of queue is 3
Process Producer : item 239 appended to queue Producer-1
The size of queue is 4
Process Producer : item 104 appended to queue Producer-1
Process Consumer : item 89 popped from by Consumer-2 

The size of queue is 4
Process Producer : item 58 appended to queue Producer-1
The size of queue is 5
Process Producer : item 129 appended to queue Producer-1
Process Consumer : item 3 popped from by Consumer-2 

The size of queue is 5
Process Producer : item 34 appended to queue Producer-1
The size of queue is 6
Process Producer : item 114 appended to queue Producer-1
The size of queue is 7
Process Consumer : item 143 popped from by

### 使用管道交换数据
![](../asset/pipeline.jpg)

In [29]:
# %load src/multiprocessing-case6.py
import multiprocessing


def create_items(pipe):
    output_pipe, _ = pipe
    for item in range(10):
        output_pipe.send(item)
    output_pipe.close()


def multiply_items(pipe_1, pipe_2):
    close, input_pipe = pipe_1
    close.close()
    output_pipe, _ = pipe_2
    try:
        while True:
            item = input_pipe.recv()
            output_pipe.send(item * item)
    except EOFError:
        output_pipe.close()


if __name__ == '__main__':
    # 第一个进程管道发出数字
    pipe_1 = multiprocessing.Pipe(True)
    process_pipe_1 = multiprocessing.Process(target=create_items, args=(pipe_1,))
    process_pipe_1.start()
    # 第二个进程管道接收数字并计算
    pipe_2 = multiprocessing.Pipe(True)
    process_pipe_2 = multiprocessing.Process(target=multiply_items, args=(pipe_1, pipe_2,))
    process_pipe_2.start()
    pipe_1[0].close()
    pipe_2[0].close()
    try:
        while True:
            print(pipe_2[1].recv())
    except EOFError:
        print("End")

(<multiprocessing.connection.Connection object at 0x7f2838486b10>, <multiprocessing.connection.Connection object at 0x7f2838486b50>)
0
1
4
9
16
25
36
49
64
81
End


In [30]:
!python src/multiprocessing-case6.py

0
1
4
9
16
25
36
49
64
81
End


### 共享内存

In [None]:
# %load multiprocessing-state-manager.py
import multiprocessing


def worker(dictionary, key, item):
    dictionary[key] = item
    print("key = %d value = %d" % (key, item))


if __name__ == '__main__':
    mgr = multiprocessing.Manager()
    dictionary = mgr.dict()
    jobs = [multiprocessing.Process(target=worker, args=(dictionary, i, i * 2)) for i in range(10)]
    for j in jobs:
        j.start()
    for j in jobs:
        j.join()
    print('Results:', dictionary)

In [32]:
!python src/multiprocessing-state-manager.py

key = 0 value = 0
key = 1 value = 2
key = 2 value = 4
key = 3 value = 6
key = 5 value = 10
key = 4 value = 8
key = 6 value = 12
key = 7 value = 14
key = 8 value = 16
key = 9 value = 18
Results: {0: 0, 1: 2, 2: 4, 3: 6, 5: 10, 4: 8, 6: 12, 7: 14, 8: 16, 9: 18}


在3.8之前，并非是真正的共享内存，通过内存pickle成文件来间接实现的，但是效率比较真实的共享内存差很多
[Python3.8+后的共享内存](https://www.osgeo.cn/cpython/library/multiprocessing.shared_memory.html)

![](../asset/shared-memory.png)

### 它们之间的差别是什么？

- 共享内存 性能最高，但是同步处理最麻烦。
- Pipe() 只能有一个生产者和一个消费者。
- Queue() 可以有多个生产者和消费者。


**如果需要多个节点通信，使用Queue；如果需要绝对的性能，Pipe比Queue要快出很多，在底层上Queue的实现是使用Pipe。**

In [None]:
# %load src/multiprocessing-pipe.py
from multiprocessing import Process, Pipe
import time


def reader_proc(pipe):
    ## Read from the pipe; this will be spawned as a separate Process
    p_output, p_input = pipe
    p_input.close()  # We are only reading
    while True:
        msg = p_output.recv()  # Read from the output pipe and do nothing
        if msg == 'DONE':
            break


def writer(count, p_input):
    for ii in range(0, count):
        p_input.send(ii)  # Write 'count' numbers into the input pipe
    p_input.send('DONE')


if __name__ == '__main__':
    for count in [10 ** 4, 10 ** 5, 10 ** 6]:
        # Pipes are unidirectional with two endpoints:  p_input ------> p_output
        p_output, p_input = Pipe()  # writer() writes to p_input from _this_ process
        reader_p = Process(target=reader_proc, args=((p_output, p_input),))
        reader_p.daemon = True
        reader_p.start()  # Launch the reader process

        p_output.close()  # We no longer need this part of the Pipe()
        _start = time.time()
        writer(count, p_input)  # Send a lot of stuff to reader_proc()
        p_input.close()
        reader_p.join()
        print("Sending {0} numbers to Pipe() took {1} seconds".format(count,
                                                                      (time.time() - _start)))


In [20]:
!python src/multiprocessing-pipe.py

Sending 10000 numbers to Pipe() took 0.10325503349304199 seconds
Sending 100000 numbers to Pipe() took 0.5739719867706299 seconds
Sending 1000000 numbers to Pipe() took 4.997359037399292 seconds


In [None]:
# %load src/multiprocessing-queue.py
from multiprocessing import Process, Queue
import time
import sys


def reader_proc(queue):
    ## Read from the queue; this will be spawned as a separate Process
    while True:
        msg = queue.get()  # Read from the queue and do nothing
        if (msg == 'DONE'):
            break


def writer(count, queue):
    ## Write to the queue
    for ii in range(0, count):
        queue.put(ii)  # Write 'count' numbers into the queue
    queue.put('DONE')


if __name__ == '__main__':
    pqueue = Queue()  # writer() writes to pqueue from _this_ process
    for count in [10 ** 4, 10 ** 5, 10 ** 6]:
        ### reader_proc() reads from pqueue as a separate process
        reader_p = Process(target=reader_proc, args=((pqueue),))
        reader_p.daemon = True
        reader_p.start()  # Launch reader_proc() as a separate python process

        _start = time.time()
        writer(count, pqueue)  # Send a lot of stuff to reader()
        reader_p.join()  # Wait for the reader to finish
        print("Sending {0} numbers to Queue() took {1} seconds".format(count,
                                                                       (time.time() - _start)))


In [1]:
!python src/multiprocessing-queue.py

Sending 10000 numbers to Queue() took 0.08758997917175293 seconds
Sending 100000 numbers to Queue() took 0.824897050857544 seconds
Sending 1000000 numbers to Queue() took 7.996196269989014 seconds


In [None]:
# %load src/multiprocessing-joinablequeue.py
from multiprocessing import Process, JoinableQueue
import time


def reader_proc(queue):
    ## Read from the queue; this will be spawned as a separate Process
    while True:
        msg = queue.get()  # Read from the queue and do nothing
        queue.task_done()


def writer(count, queue):
    for ii in range(0, count):
        queue.put(ii)  # Write 'count' numbers into the queue


if __name__ == '__main__':
    for count in [10 ** 4, 10 ** 5, 10 ** 6]:
        jqueue = JoinableQueue()  # writer() writes to jqueue from _this_ process
        # reader_proc() reads from jqueue as a different process...
        reader_p = Process(target=reader_proc, args=((jqueue),))
        reader_p.daemon = True
        reader_p.start()  # Launch the reader process
        _start = time.time()
        writer(count, jqueue)  # Send a lot of stuff to reader_proc() (in different process)
        jqueue.join()  # Wait for the reader to finish
        print("Sending {0} numbers to JoinableQueue() took {1} seconds".format(count,
                                                                               (time.time() - _start)))

In [24]:
!python src/multiprocessing-joinablequeue.py

Sending 10000 numbers to JoinableQueue() took 0.17133736610412598 seconds
Sending 100000 numbers to JoinableQueue() took 2.081264019012451 seconds
Sending 1000000 numbers to JoinableQueue() took 21.84877896308899 seconds


## 如何进行进程同步
- Lock: 这个对象可以有两种状态：锁住的（locked）和没锁住的（unlocked）。一个Lock对象有两个方法， acquire() 和 release() ，来控制共享数据的读写权限。
- Event: 实现了进程间的简单通讯，一个进程发事件的信号，另一个进程等待事件的信号。 Event 对象有两个方法， set() 和 clear() ，来管理自己内部的变量。
- Condition: 此对象用来同步部分工作流程，在并行的进程中，有两个基本的方法： wait() 用来等待进程， notify_all() 用来通知所有等待此条件的进程。
- Semaphore: 用来共享资源，例如，支持固定数量的共享连接。
- Rlock: 递归锁对象。其用途和方法同 Threading 模块一样。
- Barrier: 将程序分成几个阶段，适用于有些进程必须在某些特定进程之后执行。处于障碍（Barrier）之后的代码不能同处于障碍之前的代码并行。

### Lock
![](../asset/lock.png)

In [None]:
# %load src/multiprocessing-unlock.py
from multiprocessing import Process
import os, time


def work():
    print('%s is running' % os.getpid())
    time.sleep(2)
    print('%s is done' % os.getpid())


if __name__ == '__main__':
    for i in range(3):
        p = Process(target=work)
        p.start()

In [34]:
!python src/multiprocessing-unlock.py

91364 is running
91365 is running
91366 is running
91364 is done
91365 is done
91366 is done


In [33]:
# %load src/multiprocessing-lock.py
import multiprocessing
from multiprocessing import Process, Lock
import os, time


def work(lock):
    lock.acquire()
    print('%s is running' % os.getpid())
    time.sleep(2)
    print('%s is done' % os.getpid())
    lock.release()


if __name__ == '__main__':
    # python3.8在OSX中存在的一个问题
    # https://docs.python.org/zh-cn/3/library/multiprocessing.html
    multiprocessing.set_start_method('fork')
    lock = Lock()
    for i in range(3):
        p = Process(target=work, args=(lock,))
        p.start()


53367 is running
53367 is done
53370 is running
53370 is done
53371 is running
53371 is done


In [40]:
!python src/multiprocessing-lock.py

92142 is running
92142 is done
92143 is running
92143 is done
92144 is running
92144 is done


### Condition
条件同步机制是指：一个进程等待特定条件，而另一个线程发出特定条件满足的信号。 解释条件同步机制的一个很好的例子就是生产者/消费者（producer/consumer）模型。生产者随机的往列表中“生产”一个随机整数，而消费者从列表中“消费”整数。

In [None]:
# %load src/multiprocessing-condition.py
import time
import random
import multiprocessing


class Producer(multiprocessing.Process):
    def __init__(self, productList, condition):
        multiprocessing.Process.__init__(self)
        self.productList = productList  # type: List
        self.condition = condition  # type: multiprocess.Condition

    def run(self):
        while True:
            product = random.randint(0, 100)
            with self.condition:
                print("条件锁：被 生产者 获取")
                self.productList.append(product)
                print(f"生产者：产生了 {product}。")
                print("生产者：唤醒消费者线程")
                self.condition.notify()
                print("条件锁：被 生产者 释放")
            time.sleep(1)


class Customer(multiprocessing.Process):

    def __init__(self, productList, condition):
        multiprocessing.Process.__init__(self)
        self.productList = productList  # type: List
        self.condition = condition  # type: multiprocess.Condition

    def run(self):
        while True:
            with self.condition:
                print("条件锁：被 消费者 获取")
                while True:
                    if self.productList:
                        product = self.productList.pop()
                        print(f"消费者：消费了 {product}")
                        break
                    print("消费者：等待生产者")
                    self.condition.wait()
                print("条件锁：被 消费者 释放")


def main():
    manager = multiprocessing.Manager()
    productList = manager.list()
    condition = multiprocessing.Condition()
    process_producer = Producer(productList, condition)
    process_customer = Customer(productList, condition)
    process_producer.start()
    process_customer.start()
    process_producer.join()
    process_customer.join()


if __name__ == '__main__':
    main()


In [44]:
!python src/multiprocessing-condition.py

条件锁：被 消费者 获取
消费者：等待生产者
条件锁：被 生产者 获取
生产者：产生了 30。
生产者：唤醒消费者线程
条件锁：被 生产者 释放
消费者：消费了 30
条件锁：被 消费者 释放
条件锁：被 消费者 获取
消费者：等待生产者
条件锁：被 生产者 获取
生产者：产生了 8。
生产者：唤醒消费者线程
条件锁：被 生产者 释放
消费者：消费了 8
条件锁：被 消费者 释放
条件锁：被 消费者 获取
消费者：等待生产者
条件锁：被 生产者 获取
生产者：产生了 65。
生产者：唤醒消费者线程
条件锁：被 生产者 释放
消费者：消费了 65
条件锁：被 消费者 释放
条件锁：被 消费者 获取
消费者：等待生产者
条件锁：被 生产者 获取
生产者：产生了 39。
生产者：唤醒消费者线程
条件锁：被 生产者 释放
消费者：消费了 39
条件锁：被 消费者 释放
条件锁：被 消费者 获取
消费者：等待生产者
条件锁：被 生产者 获取
生产者：产生了 44。
生产者：唤醒消费者线程
条件锁：被 生产者 释放
消费者：消费了 44
条件锁：被 消费者 释放
条件锁：被 消费者 获取
消费者：等待生产者
^C
Traceback (most recent call last):
  File "src/multiprocessing-condition.py", line 59, in <module>
    main()
  File "src/multiprocessing-condition.py", line 54, in main
    process_producer.join()
  File "/Users/zhangchunyang/opt/anaconda3/lib/python3.8/multiprocessing/process.py", line 149, in join
    res = self._popen.wait(timeout)
  File "/Users/zhangchunyang/opt/anaconda3/lib/python3.8/multiprocessing/popen_fork.py", line 47, in wait
Process Customer-3:
Process Producer-2:
    ret

### Semaphore

In [None]:
# %load src/multiprocessing-semaphore.py
import multiprocessing
from multiprocessing import Process, current_process, Semaphore
import time


def worker(s, i):
    s.acquire()
    print(current_process().name + "acquire");
    time.sleep(i)
    print(current_process().name + "release\n");
    s.release()


if __name__ == "__main__":
    multiprocessing.set_start_method('fork')
    s = Semaphore(2)
    for i in range(5):
        p = Process(target=worker, args=(s, i * 2))
        p.start()

In [48]:
!python src/multiprocessing-semaphore.py

Process-1acquire
Process-1release

Process-2acquire
Process-3acquire
Process-2release

Process-4acquire
Process-3release

Process-5acquire
Process-4release

Process-5release



### Event
Event 是同步通信的方式，有些类似于条件锁。由于是它是同步的，而且不能传递数据。

In [None]:
# %load src/multiprocessing-event.py
import multiprocessing
import time


def wait_for_event(e):
    print("wait_for_event: starting")
    e.wait()
    print("wairt_for_event: e.is_set()->" + str(e.is_set()))


def wait_for_event_timeout(e, t):
    print("wait_for_event_timeout:starting")
    e.wait(t)
    print("wait_for_event_timeout:e.is_set->" + str(e.is_set()))


if __name__ == "__main__":
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(target=wait_for_event, args=(e,))
    w2 = multiprocessing.Process(target=wait_for_event_timeout, args=(e, 6))
    w1.start()
    w2.start()
    time.sleep(10)
    print("main: event setting")
    e.set()
    print("main: event is set")


In [51]:
!python src/multiprocessing-event.py

wait_for_event: starting
wait_for_event_timeout:starting
wait_for_event_timeout:e.is_set->False
main: event setting
main: event is set
wairt_for_event: e.is_set()->True


### Barrier

![](../asset/barrier.png)

In [None]:
# %load src/multiprocessing-barrier.py
import multiprocessing
from multiprocessing import Barrier, Lock, Process
from time import time
from datetime import datetime


def test_with_barrier(synchronizer, serializer):
    name = multiprocessing.current_process().name
    synchronizer.wait()
    now = time()
    # lock
    with serializer:
        print("process %s ----> %s" % (name, datetime.fromtimestamp(now)))


def test_without_barrier():
    name = multiprocessing.current_process().name
    now = time()
    print("process %s ----> %s" % (name, datetime.fromtimestamp(now)))


if __name__ == '__main__':
    # python3.8在OSX中存在的一个问题
    # https://docs.python.org/zh-cn/3/library/multiprocessing.html
    multiprocessing.set_start_method('fork')
    synchronizer = Barrier(2)
    serializer = Lock()
    Process(name='p1 - test_with_barrier', target=test_with_barrier, args=(synchronizer, serializer)).start()
    Process(name='p2 - test_with_barrier', target=test_with_barrier, args=(synchronizer, serializer)).start()
    Process(name='p3 - test_without_barrier', target=test_without_barrier).start()
    Process(name='p4 - test_without_barrier', target=test_without_barrier).start()


In [34]:
!python src/multiprocessing-barrier.py

process p2 - test_with_barrier ----> 2020-09-17 13:25:21.759643
process p1 - test_with_barrier ----> 2020-09-17 13:25:21.759702
process p3 - test_without_barrier ----> 2020-09-17 13:25:21.761097
process p4 - test_without_barrier ----> 2020-09-17 13:25:21.762514


### RLock（可重入锁）
**一个进程获取这个可重入锁后，这个进程可以无阻塞的再次获取它，并且每次获取后必须进行释放；对于不同的进程，它就相当于一把普通的互斥锁**

Lock和Python中的Rlock之间的一个区别是，常规锁可以由不同的进程/线程释放，而重入锁必须由获取它的同一个进程/线程释放，同时要求解锁次数应与加锁次数相同，才能用于另一个进程/线程。另外，需要注意的是一定要避免在多个线程之间拆分锁定操作，如果一个线程试图释放一个尚未获取的锁，Python将引发错误并导致程序崩溃。

## 误区
- 单核在同一时间内只能使用CPU处理一个操作，没有所谓的“并行”
- 并不是进程越多处理的就越快

## 多进程面试题

### 进程之间的通信方式以及优缺点？

**1)管道**
管道分为有名管道和无名管道

无名管道：优点：简单方便高效；缺点：1）局限于单向通信2）只能创建在它的进程以及其有亲缘关系的进程之间;3）缓冲区有限；

有名管道：优点：可以实现任意关系的进程间的通信；缺点：1）长期存于系统中，使用不当容易出错；2）缓冲区有限


**2)消息队列**

优点：可以实现任意进程间的通信，并通过系统调用函数来实现消息发送和接收之间的同步，无需考虑同步问题，方便；缺点：信息的复制需要额外消耗CPU的时间，不适宜于信息量大或操作频繁的场合


**3)共享内存**

优点：无须复制，快捷，信息量大；

缺点：1）处理同步问题容易出现问题，编程难度大；2)利用内存缓冲区直接交换信息，内存的实体存在于计算机中，只能同一个计算机系统中的诸多进程共享，不方便网络通信；3）容灾性差


**4)套接字：可用于不同及其间的进程通信**

优点：1）传输数据为字节级，传输数据可自定义，数据量小效率高；2）传输数据时间短，性能高；3) 适合于客户端和服务器端之间信息实时交互；4) 可以加密,数据安全性强

缺点：1) 需对传输的数据进行解析，转化成应用级的数据。