multiprocessing.Pool     
Pool类提供指定数量供用户使用，若有新的请求提交到Pool中时，若还未满，则创建一个新的进程来执行请求，若池满，则告知先等待，直到有进程借宿，才会创建新的集成执行请求    
1. apply(func[, args=()[, kwds={}]]) python3.x后不再出现
2. apply_async(func,[, args=()[, kwds={}[, callback=None]]], 为非阻塞的且支持结果返回后回调  
3. map(func, iterable[, chunksize=None])使进程阻塞直到结果返回
4. map_async(func, iterable[, chunksize[, callback]])为非阻塞的方式
5. close():关闭进程池，不接受新的任务
6. terminal(): 结束工作任务，不再处理未完成任务
7. join():主线程阻塞等待子线程退出， join方法要在close和terminate之后调用

In [48]:
import os
import time
from multiprocessing import Pool
import multiprocessing
# ref
# https://www.cnblogs.com/ailiailan/p/11850710.html

In [4]:
def run(fn):
    time.sleep(1)
    print(fn*fn)

In [5]:
if __name__ == "__main__":
    testFL = [1, 2, 3, 4, 5, 6]
    print("sequential")
    s = time.time()
    for fn in testFL:
        run(fn)
    t1 = time.time()
    print("sequential execute", int(t1-s))

sequential
1
4
9
16
25
36
sequential execute 6


In [18]:
# 基于线程池运行,这中放好似下，输入的参数列表像队列一样排列，并不是顺序执行，谁等到进程，就给谁执行,这样时间会短一点
if __name__ == "__main__":
    testFL = [1, 2, 3, 4, 5, 6]
    t1 = time.time()
    print("concurrent")
    pool = Pool(2) #当进程数为3时候，会一次性打印三个结果
    pool.map(run, testFL) # 这种方式比较简单，直接将参数序列映射到函数的参数列表中
    pool.close() # 关闭进程池，不再接受新的进程
    pool.join() # 主进程阻塞等待子线程推出
    s = time.time()
    t2 = time.time()
    print("sequential execute", int(t2-t1))

concurrent
1
4
9
16
25
36
sequential execute 3


In [19]:
# 使用map_async的方法执行
# 基于线程池运行,这中放好似下，输入的参数列表像队列一样排列，并不是顺序执行，谁等到进程，就给谁执行,这样时间会短一点
if __name__ == "__main__":
    testFL = [1, 2, 3, 4, 5, 6]
    t1 = time.time()
    print("concurrent")
    pool = Pool(2) #当进程数为3时候，会一次性打印三个结果
    pool.map_async(run, testFL) # 这种方式比较简单，直接将参数序列映射到函数的参数列表中
    pool.close() # 关闭进程池，不再接受新的进程
    pool.join() # 主进程阻塞等待子线程推出
    s = time.time()
    t2 = time.time()
    print("sequential execute", int(t2-t1))

concurrent
1
4
16
9
36
25
sequential execute 3


In [21]:
# 使用apply_async()非阻塞的方式
# 结果类似于map/map_async的方法
if __name__ == "__main__":
    testFL = [1, 2, 3, 4, 5, 6]
    t1 = time.time()
    print("concurrent")
    pool = Pool(2) #当进程数为3时候，会一次性打印三个结果
    for fn in testFL:
        pool.apply_async(run, (fn,))
    pool.close() # 关闭进程池，不再接受新的进程
    pool.join() # 主进程阻塞等待子线程推出
    s = time.time()
    t2 = time.time()
    print("sequential execute", int(t2-t1))

concurrent
4
1
9
16
25
36
sequential execute 3


In [22]:
# 基于apply_async()方法callback参数的方法

In [23]:
def fun_01(i):
    time.sleep(2)
    print("start_time", time.ctime())
    return (i+100)

In [24]:
def fun_02(arg):
    print("end_time", arg, time.ctime())

In [26]:
# 如果进程池里面的进程为１,则为顺序模型
if __name__ == "__main__":
    pool = Pool(1)
    for i in range(4):
        pool.apply_async(func=fun_01, args=(i,), callback=fun_02) #call_back作为一个参数传入，传入的值为地一个函数传递进去的值
    pool.close()
    pool.join()
    print("done")

start_time Fri Jun  5 17:18:16 2020
end_time 100 Fri Jun  5 17:18:16 2020
start_time Fri Jun  5 17:18:18 2020
end_time 101 Fri Jun  5 17:18:18 2020
start_time Fri Jun  5 17:18:20 2020
end_time 102 Fri Jun  5 17:18:20 2020
start_time Fri Jun  5 17:18:22 2020
end_time 103 Fri Jun  5 17:18:22 2020
done


In [27]:
def func(msg):
    print("hello:", msg, time.ctime())
    time.sleep(2)
    print("end", time.ctime())
    return "down" + msg

In [29]:
#　可以实现带参数的返回
if __name__ == "__main__":
    pool = Pool(3)
    result = []
    for i in range(3):
        msg = "hello %s" %i
        result.append(pool.apply_async(func=func, args=(msg,))) # 返回的是一个多线程的函数式列表，需要调用get的方法获取
    pool.close()
    pool.join()
    for res in result:
        print("***", res.get())
    print("All end--")

hello: hello 0 Fri Jun  5 17:24:41 2020
hello: hello 1 Fri Jun  5 17:24:41 2020
hello: hello 2 Fri Jun  5 17:24:41 2020
end Fri Jun  5 17:24:43 2020
end Fri Jun  5 17:24:43 2020
end Fri Jun  5 17:24:43 2020
*** downhello 0
*** downhello 1
*** downhello 2
All end--


In [30]:
result

[<multiprocessing.pool.ApplyResult at 0x7f40c286e4e0>,
 <multiprocessing.pool.ApplyResult at 0x7f40c286ea58>,
 <multiprocessing.pool.ApplyResult at 0x7f40c286e780>]

In [31]:
# 多进程执行多个函数

In [42]:
def Lee():
    print('\nRun task Lee--%s******ppid:%s' % (os.getpid(), os.getppid()), '~~~~', time.ctime())
    start = time.time()
    time.sleep(5)
    end = time.time()
    print('Task Lee,runs %0.2f seconds.' % (end - start), '~~~~', time.ctime())

In [43]:
def Marlon():
    print("\nRun task Marlon-%s******ppid:%s" % (os.getpid(), os.getppid()), '~~~~', time.ctime())
    start = time.time()
    time.sleep(10)
    end = time.time()
    print('Task Marlon runs %0.2f seconds.' % (end - start), '~~~~', time.ctime())

In [44]:
def Allen():
    print("\nRun task Allen-%s******ppid:%s" % (os.getpid(), os.getppid()), '~~~~', time.ctime())
    start = time.time()
    time.sleep(15)
    end = time.time()
    print('Task Allen runs %0.2f seconds.' % (end - start), '~~~~', time.ctime())

In [45]:
def Frank():
    print("\nRun task Frank-%s******ppid:%s" % (os.getpid(), os.getppid()), '~~~~', time.ctime())
    start = time.time()
    time.sleep(20)
    end = time.time()
    print('Task Frank runs %0.2f seconds.' % (end - start), '~~~~', time.ctime())

In [46]:
if __name__ == "__main__":
    func_list = [Lee, Marlon, Allen, Frank]
    print('parent process id %s' % os.getpid())
    pool = Pool(4)
    for func in func_list:
        pool.apply_async(func) # 直接把函数加进来运行，　这个是在同一时间段跑的，在sleep的时候，会同时跑其他的代码，因为是并行的4
    
    print('Waiting for all subprocesses done...')
    pool.close()
    pool.join()
    print('All subprocesses done.')

parent process id 21185

Run task Allen-8655******ppid:21185
Run task Marlon-8654******ppid:21185
Run task Frank-8656******ppid:21185
Run task Lee--8653******ppid:21185 ~~~~ Fri Jun  5 18:15:44 2020
 ~~~~ Fri Jun  5 18:15:44 2020
 ~~~~ Fri Jun  5 18:15:44 2020
 ~~~~ Fri Jun  5 18:15:44 2020
Waiting for all subprocesses done...
Task Lee,runs 5.01 seconds. ~~~~ Fri Jun  5 18:15:49 2020
Task Marlon runs 10.01 seconds. ~~~~ Fri Jun  5 18:15:54 2020
Task Allen runs 15.00 seconds. ~~~~ Fri Jun  5 18:15:59 2020
Task Frank runs 20.02 seconds. ~~~~ Fri Jun  5 18:16:04 2020
All subprocesses done.


In [49]:
# 获取cpu数
multiprocessing.cpu_count()

8