# 使用multiprocessing实现多核CPU并行处理  
@Author: Ray  
@Build time: 2022.08.23  
@Cite: Bilibili -> 莫烦Python  
@Note:  
    1. py文件中使用多进程, 需要使用if语句规避子进程循环调用问题  
    2. jupyter中使用多进程, 需要将多进程语句另存为其他py文件中的函数, 然后调用使用  

In [1]:
import time
import multiprocessing

# ^ 禁用同一单元格内的输出覆盖
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

## 创建进程 & 使用队列收集返回值
* 多进程文件应使用命令行执行
* 用法和treading非常类似

In [2]:
!cat code1.py

import multiprocessing

def job(Q):
    print("new process is start\n")
    res = 0
    for i in range(10000000):
        res += i+i**2+i**3
    Q.put(res)  # * 将返回值放进队列'Q'
    print("new process is finished\n")

if __name__=='__main__':  # 多进程不加这句不行，多线程可以不加这句
    Q = multiprocessing.Queue()  # 创建队列
    process1 = multiprocessing.Process(target=job, args=(Q,))  # ! 函数有一个参数的时候，必须有逗号
    process2 = multiprocessing.Process(target=job, args=(Q,))

    process1.start()  # 进程1开始
    process2.start()  # 进程2开始

    process1.join()   # 进程1加入主进程
    process2.join()   # 进程2加入主进程

    res1 = Q.get()
    res2 = Q.get()
    print(res1, res2)
    print("主进程结束")

In [3]:
!python code1.py

new process is start
new process is start


new process is finished

new process is finished

2499999833333358333330000000 2499999833333358333330000000
主进程结束


## 时间对比：多进程能快多少？
* 对于计算密集型程序，多进程提成显著；
* 多线程实际上还是单核运算，由于IO读取限制，速度反而更慢

In [4]:
# 不使用并行

start_time = time.time()

def job():
    res = 0
    for i in range(10000000):
        res += i+i**2+i**3
    return res

res1 = job()
res2 = job()
print(res1, res2)

end_time = time.time()
print("cost time: ", end_time-start_time)

2499999833333358333330000000 2499999833333358333330000000
cost time:  9.656765222549438


In [5]:
# 使用多进程

start_time = time.time()

!python code1.py

end_time = time.time()
print("cost time: ", end_time-start_time)

new process is start

new process is start

new process is finished

new process is finished

2499999833333358333330000000 2499999833333358333330000000
主进程结束
cost time:  5.174680948257446


In [6]:
# 使用多线程

import threading
from queue import Queue

start_time = time.time()

def job(Q):
    print("new thread is start\n")
    res = 0
    for i in range(10000000):
        res += i+i**2+i**3
    Q.put(res)  # * 将返回值放进队列'Q'
    print("new thread is finished\n")

if __name__=='__main__':  # 多进程不加这句不行，多线程可以不加这句
    Q = Queue()  # 创建队列
    thread1 = threading.Thread(target=job, args=(Q,))  # ! 函数有一个参数的时候，必须有逗号
    thread2 = threading.Thread(target=job, args=(Q,))

    thread1.start()  # 进程1开始
    thread2.start()  # 进程2开始

    thread1.join()   # 进程1加入主进程
    thread2.join()   # 进程2加入主进程

    res1 = Q.get()
    res2 = Q.get()
    print(res1, res2)
    print("主线程结束")

end_time = time.time()
print("cost time: ", end_time-start_time)

new thread is start
new thread is start


new thread is finished

new thread is finished

2499999833333358333330000000 2499999833333358333330000000
主线程结束
cost time:  9.16272521018982


## 进程池Pool
* multiprocessing.Process()创建单一进程，返回值需要用Queue来承接
* multiprocessing.Pool()可创建多个进程，自动分配核心，任务函数可以有返回值
* 执行进程池里的任务有两种方法
    1. `map`方法自动分配进程执行多个任务
    2. `apply_async`方法使用1个核心，执行一个任务

### `pool.map(<函数名>, <迭代器>)`

In [7]:
!cat code2.py

import multiprocessing

def job(x):
    for i in range(10):
        x = x+x**i
    return len(str(x))

if __name__=='__main__':

    # ^ 创建进程池
    pool = multiprocessing.Pool()  # 默认使用全部CPU
    # pool = multiprocessing.Pool(processes=5)  # 指定使用CPU的核心数

    # ^ 执行运算
    # 使用刚创建的进程池pool，执行job函数的运算；
    # 函数的输入参数是列表中的元素，多核心CPU一起处理全部运算，并将结果放到results变量里
    results = pool.map(job, (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) # 一共是10个不同初始值的函数运算
    print("-- job(1)的结果是：")
    print(results[0])
    print("-- job(1-10)的结果是：")
    print(results)

In [8]:
!python code2.py

-- job(1)的结果是：
236125
-- job(1-10)的结果是：
[236125, 294538, 337000, 370393, 397922, 421345, 441729, 459774, 475963, 490642]


### `pool.apply_async(<函数名>, <一个任务的参数组成的迭代器>)`

In [9]:
!cat code3.py

import multiprocessing

def job(x):
    for i in range(10):
        x = x+x**i
    return len(str(x))

if __name__=='__main__':

    # ^ 创建进程池
    pool = multiprocessing.Pool()  # 默认使用全部CPU
    # pool = multiprocessing.Pool(processes=5)  # 指定使用CPU的核心数
    pool.close()  # 执行完得关闭，不然会报错

    # ^ 执行运算
    # 使用刚创建的进程池pool，执行job函数的运算；
    # 只能输入一个任务的参数，返回一个任务的结果
    result = pool.apply_async(job, (1,))
    res = result.get()  # 使用get方法获得返回值
    print("job(1)的结果是: ")
    print(res)

    # ^ 如果想使用apply_async实现map的效果，需要对此方法迭代
    results = [pool.apply_async(job, (i,)) for i in [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]
    list_res = [res.get() for res in results]
    print("job(1-10)的结果是: ")
    print(list_res)


In [10]:
!python code3.py

Traceback (most recent call last):
  File "/Users/rui/Code/1_Astronote/16_multiprocessing/code3.py", line 18, in <module>
    result = pool.apply_async(job, (1,))
  File "/Users/rui/miniconda3/envs/astro/lib/python3.10/multiprocessing/pool.py", line 458, in apply_async
    self._check_running()
  File "/Users/rui/miniconda3/envs/astro/lib/python3.10/multiprocessing/pool.py", line 353, in _check_running
    raise ValueError("Pool not running")
ValueError: Pool not running


## 共享内存
* threading中可以通过全局变量实现多线程之间的参数传递
* 但是multiprocessing不可以，核之间是独立的
* 想要实现参数传递，只能用专门的方法

In [11]:
value = multiprocessing.Value('f', 0.313)  # (<数据类型>, <数据值>)
value.value

0.31299999356269836

In [12]:
array = multiprocessing.Array('i', [1, 2, 3])  # 数组只能是一维的

## 进程锁Lock

演示多进程在争抢共享内存里的变量

In [13]:
!cat code4.py

import multiprocessing
import time

def job(v, num, process_name):
    for _ in range(10):
        time.sleep(0.5)
        v.value += num
        print("{}: {}".format(process_name, v.value))

if __name__=='__main__':
    print("--- 演示多进程在争抢共享内存里的变量v")
    v = multiprocessing.Value('i', 0)
    process1 = multiprocessing.Process(target=job, args=(v, 1, 'process 1'))
    process2 = multiprocessing.Process(target=job, args=(v, 100, 'process 2'))
    process1.start()
    process2.start()
    process1.join()
    process2.join()

In [14]:
!python code4.py

--- 演示多进程在争抢共享内存里的变量v
process 2: 100
process 1: 101
process 2: 201
process 1: 202
process 1: 203
process 2: 303
process 1: 304
process 2: 404
process 1: 405
process 2: 505
process 1: 506
process 2: 606
process 1: 607
process 2: 707
process 1: 708
process 2: 808
process 1: 809
process 2: 909
process 1: 910
process 2: 1010


使用`Lock`锁住进程，防止像上面那样相互干扰

In [15]:
!cat code5.py

import multiprocessing
import time

lock = multiprocessing.Lock()  # ! 必须写在主函数中

def job(v, num, process_name, lock):   # ! 注意这里添加个lock
    lock.acquire()  # * 获取进程锁
    for _ in range(10):
        time.sleep(0.5)
        v.value += num
        print("{}: {}".format(process_name, v.value))
    lock.release()  # * 释放进程锁

if __name__=='__main__':

    print("--- 演示使用进程锁, 防止多进程争抢共享内存里的变量v")
    v = multiprocessing.Value('i', 0) # 创建共享内存里的变量

    process1 = multiprocessing.Process(target=job, args=(v, 1, 'process 1', lock))
    process2 = multiprocessing.Process(target=job, args=(v, 100, 'process 2', lock))
    process1.start()
    process2.start()
    process1.join()
    process2.join()

In [16]:
!python code5.py

--- 演示使用进程锁, 防止多进程争抢共享内存里的变量v
process 2: 100
process 2: 200
process 2: 300
process 2: 400
process 2: 500
process 2: 600
process 2: 700
process 2: 800
process 2: 900
process 2: 1000
process 1: 1001
process 1: 1002
process 1: 1003
process 1: 1004
process 1: 1005
process 1: 1006
process 1: 1007
process 1: 1008
process 1: 1009
process 1: 1010


## `if __name__ == '__main__':` 的作用  
* 这个if语句保证了子进程不执行if语句后面的语句, 防止程序无限循环
* 原理是: 
  * 每个程序都有一个`__name__`变量, 程序运行时, `__name__`为`__main__`; 而该程序被别的进程调用时, 则为文件名(multiprocessing分发程序后, 每个程序名都是`__mp_main__`)
  * 分发的程序在被子进程调用时, 由于if语句的存在(判断当前执行的程序并不是主进程), if语句后面的程序就不会被执行
  * 因此, 避免了子进程创建‘孙子进程’的无限循环
  * 注意: map把整个代码文件都分配给子进程执行

实例: 有if语句的情况
* 程序开始执行, 先顺序执行, 因此会输出一句"if语句之前"
* 因为直接运行该程序, if语句为True, 走到了创建进程池这一步
* 进程池将该程序复制分发给指定数量的cpu核心, 即使用的cpu核心数为子进程的数量
* 每个子进程都运行同样的代码, 只是输入参数分配的不一样
* 待全部任务执行完毕, 主进程结束

In [17]:
!python code6.py  # 使用3核心

if语句之前, pid=68036, name=__main__
进入if语句, pid=68036
if语句之前, pid=68038, name=__mp_main__
if语句之前, pid=68040, name=__mp_main__
if语句之前, pid=68039, name=__mp_main__
x=1, pid=68038
x=2, pid=68040
x=3, pid=68039
x=4, pid=68038
x=5, pid=68040
程序结束, pid=68036


实例2: 把if语句去掉的情况
* 当进程池分配好各个核心同样的代码后, 即父进程创建了子进程
* 由于每个子进程还是相同的代码, 没有if语句

In [18]:
# !python code7.py

## 在jupyter里使用多进程

错误示范  
* 以下两个cell实际上是等价的, 均不能正常运行(注意: 输出内容显示并不是因为子进程循环创建)
* 原因: jupyter里map方法无法调用待执行函数
  
解决方法
* 换用pathos库
* https://github.com/uqfoundation/pathos

In [19]:
# import multiprocessing

# def job(x):
#     for i in range(10):
#         x = x+x**i
#     return len(str(x))

# if __name__=='__main__':

#     # ^ 创建进程池
#     pool = multiprocessing.Pool()  # 默认使用全部CPU

#     # ^ 执行运算
#     results = pool.map(job, (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) # 一共是10个不同初始值的函数运算
#     print("-- job(1)的结果是：")
#     print(results[0])
#     print("-- job(1-10)的结果是：")
#     print(results)

In [20]:
# import multiprocessing

# def job(x):
#     for i in range(10):
#         x = x+x**i
#     return len(str(x))


# # ^ 创建进程池
# pool = multiprocessing.Pool()  # 默认使用全部CPU

# # ^ 执行运算
# results = pool.map(job, (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) # 一共是10个不同初始值的函数运算
# print("-- job(1)的结果是：")
# print(results[0])
# print("-- job(1-10)的结果是：")
# print(results)

解决方法  
* 将多进程写进py文件的一个函数中

In [21]:
import code8
print("说明这句没有被分到子进程中")
code8.run()

位置1, pid=66814
说明这句没有被分到子进程中
位置2, pid=66814
位置1, pid=68123
x=1, pid=68123
位置1, pid=68124
x=2, pid=68124
位置1, pid=68125
x=3, pid=68125
x=4, pid=68123
x=5, pid=68124
位置3, pid=66814


但是同样的代码以py文件执行, 则会出现循环调用(可用if语句解决问题)

In [22]:
# !python code9.py