In [7]:
from time import sleep
from concurrent.futures import ProcessPoolExecutor, wait
from multiprocessing import Manager, Queue


N_PARALLEL = 5


def worker(i: int, q: Queue) -> None:
    print(f'worker {i} start')
    while 1:
        data = q.get()
        if data is None:  # 采用毒丸（poison pill）方式来结束进程池
            q.put(data)
            print(f'worker {i} exit')
            return
        
        print(f'dealing with data {data}...')
        sleep(1)
        

        
        
def main():
    executor = ProcessPoolExecutor(max_workers=N_PARALLEL)  # 控制并发量
    with Manager() as manager:
        queue = manager.Queue(maxsize=50)  # 控制缓存量

        workers = [executor.submit(worker, i, queue) for i in range(N_PARALLEL)]
        for i in range(50):
            queue.put(i)
            
        print('all task data submitted')

        queue.put(None)
        wait(workers)
        print('all done')
        

main()


worker 1 start
worker 0 start
worker 2 start
worker 3 start
worker 4 start
dealing with data 0...
dealing with data 1...
dealing with data 2...
dealing with data 3...
dealing with data 4...
all tas data submitted
dealing with data 5...
dealing with data 6...
dealing with data 7...
dealing with data 8...
dealing with data 9...
dealing with data 10...
dealing with data 11...
dealing with data 12...
dealing with data 13...
dealing with data 14...
dealing with data 15...
dealing with data 16...
dealing with data 17...
dealing with data 18...
dealing with data 19...
dealing with data 21...
dealing with data 20...
dealing with data 22...
dealing with data 23...
dealing with data 24...
dealing with data 26...
dealing with data 25...
dealing with data 27...
dealing with data 28...
dealing with data 29...
dealing with data 30...
dealing with data 31...
dealing with data 32...
dealing with data 33...
dealing with data 34...
dealing with data 35...
dealing with data 36...
dealing with data 37...


In [12]:
## shared value

from time import sleep
from concurrent.futures import ProcessPoolExecutor, wait
from multiprocessing import Manager, Queue
from ctypes import c_int64


def worker(i, normal_v, shared_v):
    normal_v += 1            # 因为进程间内存隔离，所以每个进程都会得到 1
    shared_v.value += 1      # 因为使用了共享内存，所以会分别得到 1 和 2
    
    print(f'worker[{i}] got normal_v {normal_v}, shared_v {shared_v.value}')
        
        
def main():
    executor = ProcessPoolExecutor(max_workers=2)
    with Manager() as manager:
        lock = manager.Lock()
        shared_v = manager.Value(c_int64, 0, lock=lock)
        normal_v = 0

        workers = [executor.submit(worker, i, normal_v, shared_v) for i in range(2)]
        wait(workers)
        print('all done')
        

main()


worker[0] got normal_v 1, shared_v 1
worker[1] got normal_v 1, shared_v 2
all done


In [8]:
"""
shared memory
=============

Output:
::
    worker[0] got normal_v 1, shared_v 1
    worker[2] got normal_v 1, shared_v 2
    worker[3] got normal_v 1, shared_v 3
    worker[1] got normal_v 1, shared_v 4
    worker[4] got normal_v 1, shared_v 5
    worker[5] got normal_v 1, shared_v 6
    worker[6] got normal_v 1, shared_v 7
    worker[8] got normal_v 1, shared_v 8
    worker[7] got normal_v 1, shared_v 9
    worker[9] got normal_v 1, shared_v 10
    all done
"""

from traceback import print_exc
from time import sleep
from concurrent.futures import ProcessPoolExecutor, wait
from multiprocessing import Event, RLock
from multiprocessing.shared_memory import ShareableList
from multiprocessing.managers import SharedMemoryManager, SyncManager
from ctypes import c_int64


def worker(l: RLock, evt: Event, i: int, normal_v: int, shared_v: ShareableList):
    try:
        evt.wait()      # 确保任务同时开始
        normal_v += 1   # 因为进程间内存隔离，所以每个进程都会得到 1
        with RLock():   # 需要自行处理锁
            shared_v[0] += 1  # 因为使用了共享内存，所以会得到连续累加的值

        print(f"worker[{i}] got normal_v {normal_v}, shared_v {shared_v[0]}")
    except Exception:
        print_exc()
        raise


def main():
    executor = ProcessPoolExecutor(max_workers=10)
    with SharedMemoryManager() as smm, SyncManager() as sm:
        evt = sm.Event()
        shared_v = smm.ShareableList([0])
        normal_v = 0
        workers = [
            executor.submit(worker, sm.RLock(), evt, i, normal_v, shared_v)
            for i in range(10)
        ]

        evt.set()
        wait(workers)
        [f.result() for f in workers]
        print("all done")


if __name__ == "__main__":
    main()


BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

In [16]:
from time import sleep
from asyncio import get_event_loop, sleep as asleep, gather, ensure_future
from concurrent.futures import ThreadPoolExecutor, wait, Future
from functools import wraps


executor = ThreadPoolExecutor(max_workers=10)
ioloop = get_event_loop()


def nonblocking(func) -> Future:
    @wraps(func)
    def wrapper(*args):
        return ioloop.run_in_executor(executor, func, *args)
    return wrapper


@nonblocking  # 用线程池封装没法协程化的普通阻塞程序
def foo(n: int):
    """假装我是个很耗时的阻塞调用"""
    print('start blocking task...')
    sleep(n)
    print('end blocking task')


async def coroutine_demo(n: int):
    """我就是个普通的协程"""

    # 协程内不能出现任何的阻塞调用，所谓一朝协程，永世协程
    # 那我偏要调一个普通的阻塞函数怎么办？
    # 最简单的办法，套一个线程池…
    await foo(n)


async def coroutine_demo_2():
    print('start coroutine task...')
    await asleep(1)
    print('end coroutine task')


async def coroutine_main():
    """一般我们会写一个 coroutine 的 main 函数，专门负责管理协程"""
    await gather(
        coroutine_demo(1),
        coroutine_demo_2()
    )


def main():
    ioloop.run_until_complete(coroutine_main())
    print('all done')


main()

RuntimeError: This event loop is already running

In [None]:
# kipp

from kipp.aio import run_until_complete, 