Skip to content
A Python 3.4+ library that integrates the multiprocessing module with asyncio
Branch: master
Clone or download
dano Merge pull request #25 from dotlambda/patch-1
Include tests in PyPI tarball. Fixes #25.
Latest commit b34adb4 Nov 7, 2018

README.md

aioprocessing

Build Status

aioprocessing provides asynchronous, asyncio compatible, coroutine versions of many blocking instance methods on objects in the multiprocessing library. Here's an example demonstrating the aioprocessing versions of Event, Queue, and Lock:

    import time
    import asyncio
    import aioprocessing
    import multiprocessing


    def func(queue, event, lock, items):
        """ Demo worker function.

        This worker function runs in its own process, and uses
        normal blocking calls to aioprocessing objects, exactly 
        the way you would use oridinary multiprocessing objects.

        """
        with lock:
            event.set()
            for item in items:
                time.sleep(3)
                queue.put(item+5)
        queue.close()

    @asyncio.coroutine
    def example(queue, event, lock):
        l = [1,2,3,4,5]
        p = aioprocessing.AioProcess(target=func, args=(queue, event, lock, l))
        p.start()
        while True:
            result = yield from queue.coro_get()
            if result is None:
                break
            print("Got result {}".format(result))
        yield from p.coro_join()

    @asyncio.coroutine
    def example2(queue, event, lock):
        yield from event.coro_wait()
        with (yield from lock):
            yield from queue.coro_put(78)
            yield from queue.coro_put(None) # Shut down the worker

    if __name__ == "__main__":
        loop = asyncio.get_event_loop()
        queue = aioprocessing.AioQueue()
        lock = aioprocessing.AioLock()
        event = aioprocessing.AioEvent()
        tasks = [
            asyncio.ensure_future(example(queue, event, lock)), 
            asyncio.ensure_future(example2(queue, event, lock)),
        ]
        loop.run_until_complete(asyncio.wait(tasks))
        loop.close()

Python 3.5 syntax is supported, too. This means the example2 function above could look like this:

    async def example2(queue, event, lock):
        await event.coro_wait()
        async with lock:
            await queue.coro_put(78)
            await queue.coro_put(None) # Shut down the worker

The aioprocessing objects can be used just like their multiprocessing equivalents - as they are in func above - but they can also be seamlessly used inside of asyncio coroutines, without ever blocking the event loop.

How does it work?

In most cases, this library makes blocking calls to multiprocessing methods asynchronous by executing the call in a ThreadPoolExecutor, using asyncio.run_in_executor(). It does not re-implement multiprocessing using asynchronous I/O. This means there is extra overhead added when you use aioprocessing objects instead of multiprocessing objects, because each one is generally introducing a ThreadPoolExecutor containing at least one threading.Thread. It also means that all the normal risks you get when you mix threads with fork apply here, too (See http://bugs.python.org/issue6721 for more info).

The one exception to this is aioprocessing.AioPool, which makes use of the existing callback and error_callback keyword arguments in the various Pool.*_async methods to run them as asyncio coroutines. Note that multiprocessing.Pool is actually using threads internally, so the thread/fork mixing caveat still applies.

Each multiprocessing class is replaced by an equivalent aioprocessing class, distinguished by the Aio prefix. So, Pool becomes AioPool, etc. All methods that could block on I/O also have a coroutine version that can be used with asyncio. For example, multiprocessing.Lock.acquire() can be replaced with aioprocessing.AioLock.coro_acquire(). You can pass an asyncio EventLoop object to any coro_* method using the loop keyword argument. For example, lock.coro_acquire(loop=my_loop).

Note that you can also use the aioprocessing synchronization primitives as replacements for their equivalent threading primitives, in single-process, multi-threaded programs that use asyncio.

What parts of multiprocessing are supported?

Most of them! All methods that could do blocking I/O in the following objects have equivalent versions in aioprocessing that extend the multiprocessing versions by adding coroutine versions of all the blocking methods.

  • Pool
  • Process
  • Pipe
  • Lock
  • RLock
  • Semaphore
  • BoundedSemaphore
  • Event
  • Condition
  • Barrier
  • connection.Connection
  • connection.Listener
  • connection.Client
  • Queue
  • JoinableQueue
  • SimpleQueue
  • All managers.SyncManager Proxy versions of the items above (SyncManager.Queue, SyncManager.Lock(), etc.).

What versions of Python are compatible?

aioprocessing will work out of the box on Python 3.4+.

You can’t perform that action at this time.