## 存储返回值，最后一次性取出

In [None]:
from multiprocessing.pool import Pool
import traceback
import time

def test(p):
    time.sleep(1)
#     print(p)
    return p

try:
    pool = Pool(processes=10)
    result=[]
    for i  in range(50):
       '''
       for循环执行流程：
       （1）添加子进程到pool，并将这个对象（子进程）添加到result这个列表中。（此时子进程并没有运行）
       （2）执行子进程（同时执行10个）
       '''
       result.append(pool.apply_async(test, args=(i,)))#维持执行的进程总数为10，当一个进程执行完后添加新进程.  
    '''
    遍历result列表，取出子进程对象，访问get()方法，获取返回值。（此时所有子进程已执行完毕）
    '''
    print(len(result))
    pool.close()
    pool.join()
    for i in result:
        print(i.get())
except:
    print(traceback.format_exc())

## 父进程实时获取子进程返回值，条件符合，结束所有进程
### 方法一
- 当需要执行的子进程非常大时，不适用，因为for循环在添加子进程时，要花费很长的时间，虽然是异步，但是也需要等待for循环添加子进程操作结束才能执行while代码，因此会比较慢

In [None]:
from multiprocessing.pool import Pool
# from multiprocessing.dummy import Pool
import queue
import time

def test(p):
    time.sleep(1)
    return p

pool = Pool(processes=10)
q=queue.Queue()

for i  in range(50):  # 这里如果非常大，会堵塞在这里，等所有进程加完
    '''
    将子进程对象存入队列中。
    '''
    q.put(pool.apply_async(test, args=(i,)))# 维持执行的进程总数为10，当一个进程执行完后添加新进程. 只是添加并没有执行完所有的子进程      

'''
因为这里使用的为pool.apply_async异步方法，因此子进程执行的过程中，父进程会执行while，获取返回值并校验。
'''
print(q.qsize())
while 1:
    a=q.get().get() # 获取子进（线）程返回值
    print(a)
    if a==25: # 执行到 a=25中断所有任务
        pool.terminate() # 结束所有子进（线）程
        break

pool.close()
pool.join()

### 方法二
- 多线程+多进程，添加执行子进程的过程中，不断获取返回值并校验，如果返回值为True则结果所有进程
- 弥补了方法一的不足，即使for循环的子进程数量很多，也能提高性能，因为for循环与判断子进程返回值同时进行
- 只能按照调用顺序进行检查，并不能按照结束顺序返回

In [None]:
import queue
import threading
import time
from multiprocessing.dummy import Pool # 多线程
# from multiprocessing.pool import Pool # 多进程 ，可以完全替换

def test(p):
    time.sleep(1)
    return p

result=queue.Queue() # 队列
pool = Pool(10)

def pool_th():
    for i  in range(50): # 这里需要创建执行的任务非常多
        try:
            result.put(pool.apply_async(test, args=(i,)))
        except:
            break

def result_th():
    while 1:
        a=result.get().get() # 获取子进（线）程返回值
        print(a)
        if a==25: # 执行到 a=25中断所有任务
            pool.terminate() # 结束所有子进（线）程
            break

'''
利用多线程，同时运行Pool函数执行任务，result函数获取返回值，检查是否结束工作
'''
# 开启多线程，启动任务
t1=threading.Thread(target=pool_th)
t2=threading.Thread(target=result_th)
t1.start()
t2.start()
t1.join()
t2.join()

# 关闭进（线）程池，释放资源
pool.close()
pool.join()

### 方法三
- 在方法二的基础上，改为按照结束顺序检查

In [None]:
import queue
import threading
import time
import random
from multiprocessing.dummy import Pool # 多线程
# from multiprocessing.pool import Pool # 多进程 ，可以完全替换

def test(p):
    time.sleep(1+random.random())
    return p

nums = 50
result=queue.Queue() # 队列
pool = Pool(nums)

def store_result(item):
    result.put(item)

def pool_th():
    for i  in range(nums): # 这里需要创建执行的任务非常多
        try:
            pool.apply_async(test, args=(i,), callback=store_result)
        except:
            break

def result_th():
    while 1:
        a=result.get() # 获取子进（线）程返回值，没有的话堵塞等待
        print(a)
        if a==25: # 执行到 a=25中断所有任务
            pool.terminate() # 结束所有子进（线）程
            break

'''
利用多线程，同时运行Pool函数执行任务，result函数获取返回值，检查是否结束工作
'''
try:
    # 开启多线程，启动任务
    t1=threading.Thread(target=pool_th)
    t2=threading.Thread(target=result_th)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

    # 关闭进（线）程池，释放资源
    pool.close()
    pool.join()
except:
    print(traceback.format_exc())

### 实际使用demo

In [26]:
import queue
import threading
import time
import random
import traceback
# from multiprocessing.dummy import Pool # 多线程
from multiprocessing.pool import Pool # 多进程 ，可以完全替换

def test(p):
    time.sleep(1+random.random())
    return p

nums = 50
tel_tmp_result=queue.Queue() # 号查名队列
tel_result=queue.Queue() # 号查名队列
tel_pool = Pool(nums)  # 号查名进（线）程池
name_result=queue.Queue() # 名查号队列
name_pool = Pool(nums)  # 名查号进（线）程池

def tel_store_result(item):
    tel_tmp_result.put(item)

def tel_pool_th():
    for i  in range(nums): # 这里需要创建执行的任务非常多
        try:
            tel_pool.apply_async(test, args=(i,), callback=tel_store_result)
        except:
            break

def tel_result_th():
    while 1:
        a=tel_tmp_result.get() # 获取子进（线）程返回值，没有的话堵塞等待
        tel_result.put(a)
        if a==25: # 执行到 a=25中断所有任务
            # 结束所有子进（线）程
            tel_pool.terminate() 
            # 关闭进（线）程池，释放资源
            tel_pool.close()
            tel_pool.join()
            break

def name_result_th():
    for i in range(nums):
        name_result.put(name_pool.apply_async(test, args=(i,)))  # 维持执行的进程总数为10，当一个进程执行完后添加新进程.  
#     print(name_result.qsize())
    name_pool.close()
    name_pool.join()

try:
    # 开启多线程，启动任务
    t1=threading.Thread(target=tel_pool_th)
    t2=threading.Thread(target=tel_result_th)
    t3=threading.Thread(target=name_result_th)
    t1.start()
    t2.start()
    t3.start()
    t1.join()
    t2.join()
    t3.join()
    print([i.get() for i in name_result.queue], '\n', list(tel_result.queue))
except:
    print(traceback.format_exc())

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49] 
 [36, 22, 18, 2, 48, 6, 40, 12, 29, 10, 46, 9, 38, 23, 33, 19, 16, 42, 13, 41, 26, 27, 45, 7, 8, 5, 43, 28, 39, 44, 30, 20, 1, 32, 49, 37, 31, 4, 14, 11, 15, 47, 35, 17, 24, 21, 0, 34, 25]


### 线程池方案

In [34]:
import queue
import threading
import time
import random
import traceback
from multiprocessing.dummy import Pool # 多线程
# from multiprocessing.pool import Pool # 多进程 ，可以完全替换

def test(item):
    try:
        sleeps = 1+random.random()
        time.sleep(sleeps)
    except:
        print(traceback.format_exc())
    return item

def tel_store_result(item):
    item[0].put(item[1])

def tel_pool_th(tasks, tmp_result, pool):
    for func, items in tasks:
        try:
#             print('tel', func, items)
            pool.apply_async(func, args=((tmp_result, items),), callback=tel_store_result)
        except:
            print(traceback.format_exc())

def tel_result_th(result, tmp_result, pool):
    while 1:
        try:
#             print('bingo')
            a=tmp_result.get() # 获取子进（线）程返回值，没有的话堵塞等待
#             print('tel', a)
            result.put(a)
            if a[0]=='es' and a[3] is True:
                # 结束所有子进（线）程
                pool.terminate() 
                # 关闭进（线）程池，释放资源
                pool.close()
                pool.join()
                break
        except:
            print(traceback.format_exc())

def name_result_th(tasks, result, pool):
    try:
        for funcs, items in tasks:
#             print('name', funcs, items, result, pool)
            result.put(pool.apply_async(funcs, args=(items,)))  # 维持执行的进程总数为10，当一个进程执行完后添加新进程.  
        pool.close()
        pool.join()
    except:
        print(traceback.format_exc()) 

def get_result(tel_tasks, name_tasks):
    """并发执行
    
    """
    tel_tmp_result=queue.Queue() # 号查名队列
    tel_result=queue.Queue() # 号查名队列
    tel_pool = Pool(len(tel_tasks))  # 号查名进（线）程池
    name_result=queue.Queue() # 名查号队列
    name_pool = Pool(len(name_tasks))  # 名查号进（线）程池
    try:
        # 开启多线程，启动任务
        t1=threading.Thread(target=tel_pool_th, args=(tel_tasks, tel_tmp_result, tel_pool))
        t2=threading.Thread(target=tel_result_th, args=(tel_result, tel_tmp_result, tel_pool))
        t3=threading.Thread(target=name_result_th, args=(name_tasks, name_result, name_pool))
        t1.start()
        t2.start()
        t3.start()
        t1.join()
        t2.join()
        t3.join()
        print([i.get() for i in name_result.queue], list(tel_result.queue))
        return ([i.get() for i in name_result.queue], list(tel_result.queue))
    except:
        print(traceback.format_exc())

name_tasks = tel_tasks = [(test, ('es', 'data', 'sid', True)),
                          (test, ('tyc', 'data', 'sid', True)),
                          (test, ('map', 'data', 'sid', True))]

%time ret = get_result(tel_tasks, name_tasks)
print('name:', ret[0])
print('tel:', ret[1])

[('es', 'data', 'sid', True), ('tyc', 'data', 'sid', True), ('map', 'data', 'sid', True)] [('es', 'data', 'sid', True)]
CPU times: user 13.6 ms, sys: 2.77 ms, total: 16.3 ms
Wall time: 1.92 s
name: [('es', 'data', 'sid', True), ('tyc', 'data', 'sid', True), ('map', 'data', 'sid', True)]
tel: [('es', 'data', 'sid', True)]


In [54]:
import queue
import threading
import time
import random
import traceback
from multiprocessing.dummy import Pool # 多线程
# from multiprocessing.pool import Pool # 多进程 ，可以完全替换

tel_tmp_result=queue.Queue() # 号查名队列
tel_result=queue.Queue() # 号查名队列
tel_pool = Pool(len(tel_tasks))  # 号查名进（线）程池
name_result=queue.Queue() # 名查号队列
name_pool = Pool(len(name_tasks))  # 名查号进（线）程池

def test(item):
    try:
        sleeps = 2+random.random()
        if item[0] == 'es':
            print(sleeps)
        time.sleep(sleeps)
    except:
        print(traceback.format_exc())
    return item

def tel_store_result(item):
    tel_tmp_result.put(item)

def tel_pool_th(tasks):
    for func, items in tasks:
        try:
            tel_pool.apply_async(func, args=(items,), callback=tel_store_result)
        except:
            print(traceback.format_exc())

def tel_result_th():
    while 1:
        try:
#             print('bingo')
            a=tel_tmp_result.get() # 获取子进（线）程返回值，没有的话堵塞等待
#             print('tel', a)
            tel_result.put(a)
            if a[0]=='es' and a[3] is True:
                # 结束所有子进（线）程, 关闭进（线）程池，释放资源
#                 tel_pool.close()
                tel_pool.terminate() 
                tel_pool.join()
                break
        except:
            print(traceback.format_exc())

def name_result_th(tasks):
    try:
        for funcs, items in tasks:
#             print('name', funcs, items, result, pool)
            name_result.put(name_pool.apply_async(funcs, args=(items,)))  # 维持执行的进程总数为10，当一个进程执行完后添加新进程.  
        name_pool.close()
        name_pool.join()
    except:
        print(traceback.format_exc()) 

def get_result(tel_tasks, name_tasks):
    """并发执行
    
    """
    try:
        # 开启多线程，启动任务
        t1=threading.Thread(target=tel_pool_th, args=(tel_tasks, ))
        t2=threading.Thread(target=tel_result_th)
        t3=threading.Thread(target=name_result_th, args=(name_tasks,))
        t1.start()
        t2.start()
        t3.start()
        t1.join()
        t2.join()
        t3.join()
        print([i.get() for i in name_result.queue], list(tel_result.queue))
        return ([i.get() for i in name_result.queue], list(tel_result.queue))
    except:
        print(traceback.format_exc())

name_tasks = tel_tasks = [(test, ('es', 'data', 'sid', True)),
                          (test, ('tyc', 'data', 'sid', True)),
                          (test, ('map', 'data', 'sid', True))]

%time ret = get_result(tel_tasks, name_tasks)
print('name:', ret[0])
print('tel:', ret[1])

2.988902494628972
2.400641040419831
[('es', 'data', 'sid', True), ('tyc', 'data', 'sid', True), ('map', 'data', 'sid', True)] [('tyc', 'data', 'sid', True), ('map', 'data', 'sid', True), ('es', 'data', 'sid', True)]
CPU times: user 19.3 ms, sys: 4.68 ms, total: 23.9 ms
Wall time: 3.01 s
name: [('es', 'data', 'sid', True), ('tyc', 'data', 'sid', True), ('map', 'data', 'sid', True)]
tel: [('tyc', 'data', 'sid', True), ('map', 'data', 'sid', True), ('es', 'data', 'sid', True)]


In [56]:
import queue
import threading
import time
import random
import traceback
import multiprocessing
# from multiprocessing.dummy import Pool  # 多线程
from multiprocessing.pool import Pool  # 多进程 ，可以完全替换


def test(item):
    try:
        sleeps = 1 + random.random()
        time.sleep(sleeps)
    except:
        print(traceback.format_exc())
    return item


def tel_store_result(item):
    item[0].put(item[1])


def tel_pool_th(tasks, tmp_result, tel_pool):
    try:
        for func, items in tasks:
            tel_pool.apply_async(func, args=((tmp_result, items),), callback=tel_store_result)
    except:
        print(traceback.format_exc())


def tel_result_th(result, tmp_result, tel_pool):
    try:
        while 1:
            a = tmp_result.get()  # 获取子进（线）程返回值，没有的话堵塞等待
            result.put(a)
            if a[0] == 'es' and a[3] is True:
                # 结束所有子进（线）程
                tel_pool.terminate()
                break
    except:
        print(traceback.format_exc())


def name_result_th(tasks, result, pool):
    try:
        for funcs, items in tasks:
            result.put(pool.apply_async(funcs, args=(items,)))  # 维持执行的进程总数为10，当一个进程执行完后添加新进程.
    except:
        print(traceback.format_exc())


def get_result(tel_tasks, name_tasks):
    """并发执行

    """
    manager = multiprocessing.Manager()
    tel_tmp_result = manager.Queue()  # 号查名队列
    tel_result = queue.Queue()  # 号查名队列
    name_result = queue.Queue()  # 名查号队列
    tel_pool = Pool(len(tel_tasks))  # 号查名进（线）程池
    name_pool = Pool(len(name_tasks))  # 名查号进（线）程池
    try:
        # 开启多线程，启动任务
        t1 = threading.Thread(target=tel_pool_th, args=(tel_tasks, tel_tmp_result, tel_pool))
        t2 = threading.Thread(target=tel_result_th, args=(tel_result, tel_tmp_result, tel_pool))
        t3 = threading.Thread(target=name_result_th, args=(name_tasks, name_result, name_pool))
        t1.start()
        t2.start()
        t3.start()
        t1.join()
        t2.join()
        t3.join()

        # 关闭进（线）程池，释放资源
        tel_pool.close()
        tel_pool.join()
        name_pool.close()
        name_pool.join()
 
        print([i.get() for i in name_result.queue], list(tel_result.queue))
        return ([i.get() for i in name_result.queue], list(tel_result.queue))
    except:
        print(traceback.format_exc())


name_tasks = tel_tasks = [(test, ('es', 'data', 'sid', True)),
                          (test, ('tyc', 'data', 'sid', True)),
                          (test, ('map', 'data', 'sid', True))]

%time ret = get_result(tel_tasks, name_tasks)
print('name:', ret[0])
print('tel:', ret[1])

[('es', 'data', 'sid', True), ('tyc', 'data', 'sid', True), ('map', 'data', 'sid', True)] [('map', 'data', 'sid', True), ('es', 'data', 'sid', True)]
CPU times: user 21.1 ms, sys: 33.3 ms, total: 54.4 ms
Wall time: 2.09 s
name: [('es', 'data', 'sid', True), ('tyc', 'data', 'sid', True), ('map', 'data', 'sid', True)]
tel: [('map', 'data', 'sid', True), ('es', 'data', 'sid', True)]


In [69]:
22/68

0.3235294117647059