Skip to content

Commit

Permalink
Merge pull request #36 from h2non/feat-v2
Browse files Browse the repository at this point in the history
feat(v0.2): new minor version + error behavioral changes
  • Loading branch information
h2non committed Oct 22, 2017
2 parents d66b173 + 4b8ce74 commit d2f7897
Show file tree
Hide file tree
Showing 10 changed files with 162 additions and 80 deletions.
14 changes: 14 additions & 0 deletions History.rst
Expand Up @@ -2,6 +2,20 @@
History
=======

v0.2.0 / 2017-10-21
-------------------

* refactor(api): API breaking change that modifies behavior by raising any legit exception generated by a coroutine.
* feat(examples): add examples file
* feat(v2): v2 pre-release, propagate raise exception if return_exceptions is False
* refactor(tests): add map error exception assertion test
* Merge branch 'master' of https://github.com/h2non/paco
* refactor(tests): add map error exception assertion test
* feat(docs): add sponsor banner
* feat(docs): add sponsor banner
* feat(LICENSE): update copyright year
* Update setup.py

v0.1.11 / 2017-01-28
--------------------

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Expand Up @@ -16,7 +16,7 @@ htmldocs:

lint:
@echo "$(OK_COLOR)==> Linting code...$(NO_COLOR)"
@flake8 .
@flake8 --exclude examples .

test: clean lint
@echo "$(OK_COLOR)==> Runnings tests...$(NO_COLOR)"
Expand Down
4 changes: 2 additions & 2 deletions README.rst
Expand Up @@ -3,11 +3,11 @@ paco |Build Status| |PyPI| |Coverage Status| |Documentation Status| |Stability|

Small and idiomatic utility library for coroutine-driven asynchronous generic programming in Python +3.4.

Built on top of `asyncio`_, paco provides missing capabilities from Python `stdlib`
Built on top of `asyncio`_, ``paco`` provides missing capabilities from Python `stdlib`
in order to write asynchronous cooperative multitasking in a nice-ish way.
Also, paco aims to port some of `functools`_ and `itertools`_ standard functions to the asynchronous world.

paco can be your utility belt to deal with asynchronous, I/O-bound, non-blocking concurrent code in a cleaner and idiomatic way.
``paco`` can be your utility belt to deal with asynchronous, I/O-bound, non-blocking concurrent code in a cleaner and idiomatic way.

.. raw:: html

Expand Down
56 changes: 2 additions & 54 deletions docs/examples.rst
Expand Up @@ -5,62 +5,10 @@ Examples
Asynchronously and concurrently execute multiple HTTP requests.
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. code-block:: python
import paco
import aiohttp
async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as res:
return res
async def fetch_urls():
urls = [
'https://www.google.com',
'https://www.yahoo.com',
'https://www.bing.com',
'https://www.baidu.com',
'https://duckduckgo.com',
]
# Map concurrent executor with concurrent limit of 3
responses = await paco.map(fetch, urls, limit=3)
for res in responses:
print('Status:', res.status)
# Run in event loop
paco.run(fetch_urls())
.. literalinclude:: ../examples/http_requests.py


Concurrent pipeline-style chain composition of functors over any iterable object.
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. code-block:: python
import paco
async def filterer(x):
return x < 8
async def mapper(x):
return x * 2
async def drop(x):
return x < 10
async def reducer(acc, x):
return acc + x
async def task(numbers):
return await (numbers
| paco.filter(filterer)
| paco.map(mapper)
| paco.dropwhile(drop)
| paco.reduce(reducer, initializer=0))
# Run in event loop
number = paco.run(task((1, 2, 3, 4, 5, 6, 7, 8, 9, 10)))
print('Number:', number) # => 36
.. literalinclude:: ../examples/pipeline.py
28 changes: 28 additions & 0 deletions examples/http_requests.py
@@ -0,0 +1,28 @@
import paco
import aiohttp


async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as res:
return res


async def fetch_urls():
urls = [
'https://www.google.com',
'https://www.yahoo.com',
'https://www.bing.com',
'https://www.baidu.com',
'https://duckduckgo.com',
]

# Map concurrent executor with concurrent limit of 3
responses = await paco.map(fetch, urls, limit=3)

for res in responses:
print('Status:', res.status)


# Run in event loop
paco.run(fetch_urls())
30 changes: 30 additions & 0 deletions examples/pipeline.py
@@ -0,0 +1,30 @@
import paco


async def filterer(x):
return x < 8


async def mapper(x):
return x * 2


async def drop(x):
return x < 10


async def reducer(acc, x):
return acc + x


async def task(numbers):
return await (numbers
| paco.filter(filterer)
| paco.map(mapper)
| paco.dropwhile(drop)
| paco.reduce(reducer, initializer=0)) # noqa


# Run in event loop
number = paco.run(task((1, 2, 3, 4, 5, 6, 7, 8, 9, 10)))
print('Number:', number) # => 36
2 changes: 1 addition & 1 deletion paco/__init__.py
Expand Up @@ -34,7 +34,7 @@
__license__ = 'MIT'

# Current package version
__version__ = '0.1.11'
__version__ = '0.2.0'

# Explicit symbols to export
__all__ = (
Expand Down
45 changes: 32 additions & 13 deletions paco/concurrent.py
Expand Up @@ -64,7 +64,7 @@ class ConcurrentExecutor(object):
subscribe normal functions or coroutines to certain events that happen
internally.
ConcurrentExecutor is a low-level implementation that powers most of the
ConcurrentExecutor is_running a low-level implementation that powers most of the
utility functions provided in `paco`.
For most cases you won't need to rely on it, instead you can
Expand All @@ -78,6 +78,8 @@ class ConcurrentExecutor(object):
- finish (executor): triggered when all the coroutine finished.
- task.start (task): triggered before coroutine starts.
- task.finish (task, result): triggered when the coroutine finished.
- task.error (task, error): triggered when a coroutined task
raised an exception.
Arguments:
limit (int): concurrency limit. Defaults to 10.
Expand Down Expand Up @@ -105,6 +107,7 @@ async def sum(x, y):
"""

def __init__(self, limit=10, loop=None, coros=None, ignore_empty=False):
self.errors = []
self.running = False
self.return_exceptions = False
self.limit = max(int(limit), 0)
Expand Down Expand Up @@ -135,7 +138,7 @@ def reset(self):
RuntimeError: is the executor is still running.
"""
if self.running:
raise RuntimeError('executor is still running')
raise RuntimeError('paco: executor is still running')

self.pool.clear()
self.observer.clear()
Expand Down Expand Up @@ -198,7 +201,7 @@ def add(self, coro, *args, **kw):

# Verify coroutine
if not asyncio.iscoroutine(coro):
raise TypeError('coro must be a coroutine object')
raise TypeError('paco: coro must be a coroutine object')

# Store coroutine with arguments for deferred execution
index = max(len(self.pool), 0)
Expand Down Expand Up @@ -234,14 +237,13 @@ def _run_sequentially(self):
future.set_result(result)

# Swap future between queues
future = pending.pop()
done.append(future)
done.append(pending.pop())

# Build futures tuple to be compatible with asyncio.wait() interface
return set(done), set(pending)

@asyncio.coroutine
def _run_concurrently(self, timeout=None, return_when=None):
def _run_concurrently(self, timeout=None, return_when='ALL_COMPLETED'):
coros = []
limit = self.limit

Expand Down Expand Up @@ -274,8 +276,13 @@ def _run_coro(self, task):
index, coro = task

# Safe coroutine execution
result = yield from safe_run(coro,
return_exceptions=self.return_exceptions)
try:
result = yield from safe_run(
coro, return_exceptions=self.return_exceptions)
except Exception as err:
self.errors.append(err)
yield from self.observer.trigger('task.error', task, err)
raise err # important: re-raise exception for asyncio propagation

# Trigger task post-execution event
yield from self.observer.trigger('task.finish', task, result)
Expand All @@ -298,7 +305,7 @@ def _schedule_coro(self, task):
@asyncio.coroutine
def run(self,
timeout=None,
return_when='ALL_COMPLETED',
return_when=None,
return_exceptions=None,
ignore_empty=None):
"""
Expand All @@ -324,7 +331,7 @@ def run(self,
"""
# Only allow 1 concurrent execution
if self.running:
raise RuntimeError('executor is already running')
raise RuntimeError('paco: executor is already running')

# Overwrite ignore empty behaviour, if explicitly defined
ignore_empty = (self.ignore_empty if ignore_empty is None
Expand All @@ -336,7 +343,7 @@ def run(self,
if ignore_empty:
return (tuple(), tuple())
# Othwerise raise an exception
raise ValueError('Set of coroutines is empty')
raise ValueError('paco: pool of coroutines is empty')

# Set executor state to running
self.running = True
Expand All @@ -345,8 +352,14 @@ def run(self,
if return_exceptions is not None:
self.return_exceptions = return_exceptions

if return_exceptions is False and return_when is None:
return_when = 'FIRST_EXCEPTION'

if return_when is None:
return_when = 'ALL_COMPLETED'

# Trigger pre-execution event
self.observer.trigger('start', self)
yield from self.observer.trigger('start', self)

# Sequential coroutines execution
if self.limit == 1:
Expand All @@ -361,8 +374,14 @@ def run(self,
# Reset internal state and queue
self.running = False

# Raise exception, if needed
if self.return_exceptions is False and self.errors:
err = self.errors[0]
err.errors = self.errors[1:]
raise err

# Trigger pre-execution event
self.observer.trigger('finish', self)
yield from self.observer.trigger('finish', self)

# Reset executor state to defaults after each execution
self.reset()
Expand Down
45 changes: 45 additions & 0 deletions tests/concurrent_test.py
Expand Up @@ -132,3 +132,48 @@ def on_finish(task, result):

finish.sort()
assert finish == [4, 8, 12]


def test_concurrent_observe_exception():
start = []
error = []
finish = []

@asyncio.coroutine
def coro(num):
if num > 4:
raise ValueError('invalid number')
return num * 2

@asyncio.coroutine
def on_start(task):
start.append(task)

@asyncio.coroutine
def on_error(task, err):
error.append(err)

@asyncio.coroutine
def on_finish(task, result):
finish.append(result)

p = concurrent(1)
p.on('task.start', on_start)
p.on('task.error', on_error)
p.on('task.finish', on_finish)

p.add(coro, 2)
p.add(coro, 4)
p.add(coro, 6)
assert len(p.pool) == 3

with pytest.raises(ValueError):
run_in_loop(p.run(return_exceptions=False))

# Assert event calls
assert len(start) == 3
assert len(error) == 1
assert len(finish) == 2

finish.sort()
assert finish == [4, 8]
16 changes: 7 additions & 9 deletions tests/map_test.py
Expand Up @@ -42,7 +42,7 @@ def test_map_invalid_coro():
run_in_loop(map(None))


def test_map_exceptions():
def test_map_return_exceptions():
@asyncio.coroutine
def coro(num):
raise ValueError('foo')
Expand All @@ -52,14 +52,12 @@ def coro(num):
assert isinstance(exp, ValueError)


def test_map_return_exceptions():
def test_map_raise_exceptions():
@asyncio.coroutine
def coro(num):
raise ValueError('foo')

task = map(coro, [1, 2, 3, 4, 5], return_exceptions=True)
results = run_in_loop(task)
assert len(results) == 5
if num > 3:
raise ValueError('foo')
return num * 2

for err in results:
assert str(err) == 'foo'
with pytest.raises(ValueError):
run_in_loop(map(coro, [1, 2, 3, 4, 5], return_exceptions=False))

0 comments on commit d2f7897

Please sign in to comment.