到底需要那种`multiprocessing`呢？

- `multiprocessing.Process()`: 单纯且死板的适合多个相同或者不同的function同时运行
- `multiprocessing.Pool()`: 适合需要创建大量子进程，并且灵活选择core个数，最后非常适合`map` 
    - `pool.map()`: 支持单一input
    - [`pool.starmap()`](https://stackoverflow.com/questions/5442910/how-to-use-multiprocessing-pool-map-with-multiple-arguments): 支持多input

In [1]:
import multiprocessing as mp
from tqdm.notebook import tqdm
import random
import string

random.seed(123)

# `multiprocessing.Process()`

In [2]:
%%time

import multiprocessing  as mp

def func1(q, a, d, n):
    
    res = []
    
    for i in tqdm(range(n)):

        res.append(i + i**a + i**d)
        
    q.put(sum(res))    # 使用mp.Process的话不能return，要用专门的容器装输出
    
def func2(q, a, d, n):
    
    res = []
    
    for i in tqdm(range(n)):

        res.append(i + i*a + i*d)
        
    q.put(sum(res))    # 使用mp.Process的话不能return，要用专门的容器装输出
    

q = mp.Queue()    # 此处选择Queue这个容器，先进先出
p1 = mp.Process(target = func1, args = (q, 1, 2, 100000000))  # args 给出需要运行的参数
p2 = mp.Process(target = func2, args = (q, 1, 2, 100000000))

p1.start()
p2.start()
p1.join()
p2.join()

res1 = q.get()
res2 = q.get()

CPU times: user 20 ms, sys: 8 ms, total: 28 ms
Wall time: 1min 29s


In [3]:
res1+res2

333333358333333050000000

In [4]:
%%time

res1 = []
res2 = []

for i in tqdm(range(100000000)):
    
        res1.append(i + i**1 + i**2)
        res2.append(i + i*1 + i*2)
         
res1 = sum(res1)
res2 = sum(res2)

  0%|          | 0/100000000 [00:00<?, ?it/s]

CPU times: user 2min 8s, sys: 7.61 s, total: 2min 16s
Wall time: 2min 14s


In [5]:
res1+res2

333333358333333050000000

很明显这样的一个for loop两个append的任务，multiprocess要快一倍

# `multiprocessing.Pool()`


如果要启动大量的子进程，可以用进程池的方式批量创建子进程：

**Note**: 还有一个东西叫`multiprocessing.dummy`，也能`import Pool`. 但是这玩意儿是个摆设, 因为它采用的是Threading。

## `pool.map`: 只接收一个argument

In [6]:
import multiprocessing as mp

def job(x):
    return x**2 

cores = mp.cpu_count()
pool = mp.Pool(processes=2)

res = list(tqdm(pool.map(job,  range(100000000))))

  0%|          | 0/100000000 [00:01<?, ?it/s]

In [9]:
import multiprocessing as mp

def job(x):
    return x**2 

cores = mp.cpu_count()
pool = mp.Pool(processes=2)

res = list(tqdm(pool.map(job,  range(100000000))))

  0%|          | 0/100000000 [00:00<?, ?it/s]

In [10]:
sum(res)

333333328333333350000000

In [11]:
import multiprocessing as mp

cores = mp.cpu_count()
pool = mp.Pool(processes=8)

res = list(tqdm(pool.map(job,  range(100000000), 3)))

  0%|          | 0/100000000 [00:00<?, ?it/s]

In [12]:
sum(res)

333333328333333350000000

更多核看起来运算还是要快一些

## `pool.starmap`: 可接收多个argument

`starmap`拯救了世界，不过还是需要用`zip`将argument给wrap起来

In [13]:
import multiprocessing as mp

def job(x, var):
    return x**2 + var

cores = mp.cpu_count()
pool = mp.Pool(processes=2)

list(tqdm(pool.starmap(job,  zip(range(10), range(100)))))   # 以第一个arg长度为主

  0%|          | 0/10 [00:00<?, ?it/s]

[0, 2, 6, 12, 20, 30, 42, 56, 72, 90]

In [14]:
# 单纯的`map`倒是可以为所欲为

list(tqdm(map(job, range(10), range(100))))  

0it [00:00, ?it/s]

[0, 2, 6, 12, 20, 30, 42, 56, 72, 90]

# `multiprocessing.dummy`

还有一个东西叫`multiprocessing.dummy`，也能`import Pool`. 但是这玩意儿是个摆设, 因为它采用的是Threading。

In [15]:
from multiprocessing.dummy import Pool as ThreadPool

In [16]:
pool = ThreadPool(2)


def job(x):
    return x**2


res = list(tqdm(pool.map(job,  range(100000000))))

  0%|          | 0/100000000 [00:00<?, ?it/s]

In [17]:
sum(res)

333333328333333350000000

In [18]:
pool = ThreadPool(8)

def job(x):
    return x**2


res = list(tqdm(pool.map(job,  range(100000000))))

  0%|          | 0/100000000 [00:00<?, ?it/s]

In [19]:
sum(res)

333333328333333350000000

# Shared Memory

不同进程可以共享的值

In [20]:
import multiprocessing
import time

def job(v, num, l):
    l.acquire()
    
    for _ in range(10):
        time.sleep(0.1)
        v.value += num
        print(v.value)
    
    
    l.release()


    
l = mp.Lock()    
v = multiprocessing.Value('i',0)
#array = multiprocessing.Array('i', [1,2,3])


p1 = mp.Process(target = job, args = (v, 1, l))  # args 给出需要运行的参数
p2 = mp.Process(target = job, args = (v, 1, l))

p1.start()
p2.start()
p1.join()
p2.join()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
