# multiprocessing python package

基于官方文档[https://docs.python.org/zh-cn/3.12/library/multiprocessing.html](https://docs.python.org/zh-cn/3.12/library/multiprocessing.html)

## Pool 对象

赋予函数并行化处理一系列输入值的能力，可以将输入数据分配给不同进程处理（数据并行）

In [1]:
from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

[1, 4, 9]


## Process 类

通过创建一个 Process 对象然后调用它的 start() 方法来生成进程

In [6]:
from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

main line
module name: __main__
parent process: 286500
process id: 287079
function f
module name: __main__
parent process: 287079
process id: 287642
hello bob


关于为什么 `if __name__ == '__main__'` 部分是必需的解释，请参见 [编程指导](https://docs.python.org/zh-cn/3.12/library/multiprocessing.html#multiprocessing-programming)。

## Queue

multiprocessing.Queue支持在进程间同步信息

In [1]:
from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

[42, None, 'hello']


普通的list不行

In [2]:
from multiprocessing import Process, Queue

def f(q:list):
    q.append([42, None, 'hello'])

if __name__ == '__main__':
    q = []
    p = Process(target=f, args=(q,))
    p.start()
    print(q)    # prints "[42, None, 'hello']"
    p.join()

[]


## 管道

In [3]:
from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()

[42, None, 'hello']


小知识：python的try, finally

参考[https://blog.csdn.net/leviopku/article/details/104265284](https://blog.csdn.net/leviopku/article/details/104265284)

In [4]:
file_path = 't1.txt'
try:
    f = open(file_path, 'r')
    str_content = f.read()
    a = int(str_content)
    print(a)
except Exception:
    print('未知错误！')
finally:
    print(file_path)

未知错误！
t1.txt


## 锁

利用锁实现进程间同步互斥

In [10]:
from multiprocessing import Process, Lock
# import sys

def f(l, i):
    l.acquire()
    try:
        print(f'hello world {i}\n', end='')
        # sys.stdout.flush()
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

hello world 0
hello world 1
hello world 2
hello world 3
hello world 4
hello world 5
hello world 6
hello world 7
hello world 8
hello world 9


In [11]:
from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

hello world 0
hello world 1
 2hello world
hello world3 
hello world 4
hello world 5
hello world 6
hello world 7


hello world 8
hello world 9


## 共享内存

可以使用 Value 或 Array 将数据存储在共享内存映射中

In [12]:
from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]


使用服务进程管理共享内存

In [13]:
from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        p = Process(target=f, args=(d, l))
        p.start()
        p.join()

        print(d)
        print(l)

{1: '1', '2': 2, 0.25: None}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]


## 使用Pool工作进程

In [14]:
from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    # start 4 worker processes
    with Pool(processes=4) as pool:

        # print "[0, 1, 4,..., 81]"
        print(pool.map(f, range(10)))

        # print same numbers in arbitrary order
        for i in pool.imap_unordered(f, range(10)):
            print(i)

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
0
1
4
9
16
25
36
49
64
81


## 共享列表和字典

参考 https://blog.csdn.net/qq_41375318/article/details/131508962

In [1]:
import multiprocessing
def worker(mgr_dict, mgr_list, key, value):
    mgr_dict[key] = value
    mgr_list.append(key)

In [2]:
mgr_dict = multiprocessing.Manager().dict()  # 主进程与子进程共享这个字典
mgr_list = multiprocessing.Manager().list()  # 主进程与子进程共享这个字典
jobs = [multiprocessing.Process(target=worker, args=(mgr_dict, mgr_list, i, i * 2)) for i in range(10)]
for j in jobs:
    j.start()
for j in jobs:
    j.join()
print('Results:')
print('字典', mgr_dict)
print('列表', mgr_list)

Results:
字典 {0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18}
列表 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
