Skip to content

Commit

Permalink
Use standard library functionality.
Browse files Browse the repository at this point in the history
  • Loading branch information
eerimoq committed Jul 6, 2019
1 parent 222c703 commit 2932e14
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 89 deletions.
18 changes: 9 additions & 9 deletions README.rst
Expand Up @@ -23,12 +23,12 @@ Installation
Examples
========

There are plenty of examples in the `examples folder`_.
There are more examples in the `examples folder`_.

Default worker
--------------
Call
----

Call ``work()`` in the default worker thread.
Call ``work()`` in a worker thread.

.. code-block:: python
Expand All @@ -40,11 +40,11 @@ Call ``work()`` in the default worker thread.
asyncio.run(asyncbg.call(work))
Worker pool
-----------
Thread pool executor
--------------------

Create a worker pool with two worker threads, and call ``work()``
three times in it (up to two callbacks called in parallel).
Create a thread pool executor with two worker threads, and call
``work()`` three times in it (up to two callbacks called in parallel).

.. code-block:: python
Expand All @@ -55,7 +55,7 @@ three times in it (up to two callbacks called in parallel).
pass
async def main():
pool = asyncbg.WorkerPool(2)
pool = asyncbg.ThreadPoolExecutor(max_workers=2)
await asyncio.gather(pool.call(work),
pool.call(work),
pool.call(work))
Expand Down
87 changes: 16 additions & 71 deletions asyncbg/__init__.py
@@ -1,106 +1,51 @@
import threading
from queue import Queue
import asyncio
import concurrent.futures

from .version import __version__


class Worker(threading.Thread):

def __init__(self):
super().__init__()
self.queue = Queue()

def run(self):
while True:
callback, loop, queue = self.queue.get()

try:
result = callback()
exception = None
except Exception as e:
result = None
exception = e

asyncio.run_coroutine_threadsafe(queue.put((result, exception)),
loop)


def create_worker():
worker = Worker()
worker.daemon = True
worker.start()

return worker


class WorkerPool:
"""The worker pool may only be used from a single asyncio loop. It is
*not* thread safe.
class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
"""Same as ``concurrent.futures.ThreadPoolExecutor``, but with the
``call()`` method added.
"""

def __init__(self, number_of_workers=4):
self._workers = asyncio.Queue()

for _ in range(number_of_workers):
self._workers.put_nowait(create_worker())

async def call(self, callback):
"""Call given callback in the worker pool when a worker is available.
async def call(self, callback, *args, **kwargs):
"""Call given callback with given arguments in the worker pool when a
worker is available.
Returns the value returned by the callback, or raises the
exceptions raised by the coroutine.
exceptions raised by the callback.
Call ``work()`` in a worker pool:
>>> def work():
>>> pass
>>>
>>> pool = asyncbg.WorkerPool()
>>> pool = asyncbg.ThreadPoolExecutor()
>>> asyncio.run(pool.call(work))
"""

worker = await self._workers.get()

try:
result = await call(callback, worker)
finally:
await self._workers.put(worker)

return result
return await asyncio.wrap_future(self.submit(callback, *args, **kwargs))


_DEFAULT_WORKER = create_worker()
_DEFAULT_POOL = ThreadPoolExecutor()


async def call(callback, worker=None):
"""Call given callback in given worker thread, or the default worker
thread if no worker thread is given.
async def call(callback, *args, **kwargs):
"""Call given callback with given arguments in a worker thread.
Returns the value returned by the callback, or raises the
exceptions raised by the callback.
This functions is thread safe.
Call ``work()`` in the default worker thread:
Call ``work()`` in a worker thread:
>>> def work():
>>> pass
>>>
>>> asyncio.run(asyncbg.call(work()))
>>> asyncio.run(asyncbg.call(work))
"""

if worker is None:
worker = _DEFAULT_WORKER

queue = asyncio.Queue()
worker.queue.put((callback, asyncio.get_event_loop(), queue))
result, exception = await queue.get()

if exception is not None:
raise exception

return result
return await _DEFAULT_POOL.call(callback, *args, **kwargs)
2 changes: 1 addition & 1 deletion asyncbg/version.py
@@ -1 +1 @@
__version__ = '0.3.0'
__version__ = '0.4.0'
2 changes: 1 addition & 1 deletion docs/index.rst
Expand Up @@ -16,4 +16,4 @@ Functions and classes

.. autofunction:: asyncbg.call

.. autoclass:: asyncbg.WorkerPool
.. autoclass:: asyncbg.ThreadPoolExecutor
2 changes: 1 addition & 1 deletion examples/pool.py
Expand Up @@ -17,7 +17,7 @@ async def foreground_work():


async def main():
pool = asyncbg.WorkerPool()
pool = asyncbg.ThreadPoolExecutor(max_workers=4)
await asyncio.gather(pool.call(background_work),
pool.call(background_work),
pool.call(background_work),
Expand Down
11 changes: 5 additions & 6 deletions tests/test_asyncbg.py
@@ -1,6 +1,5 @@
import asyncio
import unittest
from functools import partial

import asyncbg

Expand All @@ -26,17 +25,17 @@ def work():
with self.assertRaises(Exception):
await asyncbg.call(work)

def test_worker_pool(self):
asyncio.run(self.worker_pool())
def test_thread_pool_executor(self):
asyncio.run(self.thread_pool_executor())

async def worker_pool(self):
pool = asyncbg.WorkerPool()
async def thread_pool_executor(self):
pool = asyncbg.ThreadPoolExecutor()

def work(value):
return value

for i in range(10):
self.assertEqual(await pool.call(partial(work, i)), i)
self.assertEqual(await pool.call(work, i), i)


if __name__ == '__main__':
Expand Down

0 comments on commit 2932e14

Please sign in to comment.