Skip to content

Commit

Permalink
Use a simple Event wrapper instead of a full Event class implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
pfreixes committed Jul 19, 2017
1 parent 5586325 commit 11afb32
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 111 deletions.
9 changes: 6 additions & 3 deletions aiohttp/connector.py
Expand Up @@ -17,7 +17,7 @@
from .client_proto import ResponseHandler
from .client_reqrep import ClientRequest
from .helpers import SimpleCookie, is_ip_address, noop, sentinel
from .locks import Event
from .locks import ErrorfulOneShotEvent
from .resolver import DefaultResolver


Expand Down Expand Up @@ -675,10 +675,13 @@ def _resolve_host(self, host, port):
if key in self._throttle_dns_events:
yield from self._throttle_dns_events[key].wait()
else:
self._throttle_dns_events[key] = Event(loop=self._loop)
self._throttle_dns_events[key] = \
ErrorfulOneShotEvent(loop=self._loop)
try:
addrs = yield from \
self._resolver.resolve(host, port, family=self._family)
asyncio.shield(self._resolver.resolve(host,
port,
family=self._family))
self._cached_hosts.add(key, addrs)
self._throttle_dns_events[key].set()
except Exception as e:
Expand Down
76 changes: 12 additions & 64 deletions aiohttp/locks.py
@@ -1,77 +1,25 @@
import asyncio
import collections

from .helpers import create_future


class Event:
class ErrorfulOneShotEvent:
"""
Adhoc Event class mainly copied from the official asyncio.locks.Event, but
modifying the `set` method. It allows to pass an exception to wake
the waiters with an exception.
This class wrappers the Event asyncio lock allowing either awake the
locked Tasks without any error or raising an exception.
This is used when the event creator cant accommplish the requirements
due to an exception, instead of try to built a sophisticated solution
the same exeption is passed to the waiters.
thanks to @vorpalsmith for the simple design.
"""

def __init__(self, *, loop=None):
self._waiters = collections.deque()
self._value = False
if loop is not None:
self._loop = loop
else:
self._loop = asyncio.get_event_loop()

def __repr__(self):
res = super().__repr__()
extra = 'set' if self._value else 'unset'
if self._waiters:
extra = '{},waiters:{}'.format(extra, len(self._waiters))
return '<{} [{}]>'.format(res[1:-1], extra)

def is_set(self):
"""Return True if and only if the internal flag is true."""
return self._value
self._event = asyncio.Event(loop=loop)
self._exc = None

def set(self, exc=None):
"""Set the internal flag to true. All coroutines waiting for it to
become true are awakened. Coroutine that call wait() once the flag is
true will not block at all.
If `exc` is different than None the `future.set_exception` is called
"""
if not self._value:
self._value = True

for fut in self._waiters:
if not fut.done():
if not exc:
fut.set_result(True)
else:
fut.set_exception(exc)

def clear(self):
"""Reset the internal flag to false. Subsequently, coroutines calling
wait() will block until set() is called to set the internal flag
to true again."""
self._value = False
self._exc = exc
self._event.set()

@asyncio.coroutine
def wait(self):
"""Block until the internal flag is true.
If the internal flag is true on entry, return True
immediately. Otherwise, block until another coroutine calls
set() to set the flag to true, then return True.
"""
if self._value:
return True
val = yield from self._event.wait()
if self._exc is not None:
raise self._exc

fut = create_future(self._loop)
self._waiters.append(fut)
try:
yield from fut
return True
finally:
self._waiters.remove(fut)
return val
1 change: 1 addition & 0 deletions docs/spelling_wordlist.txt
Expand Up @@ -7,6 +7,7 @@ aiohttpdemo
aiopg
alives
api
api’s
app
app’s
arg
Expand Down
48 changes: 4 additions & 44 deletions tests/test_locks.py
Expand Up @@ -2,14 +2,14 @@
import asyncio

from aiohttp import helpers
from aiohttp.locks import Event
from aiohttp.locks import ErrorfulOneShotEvent


class TestEvent:
class TestErrorfulOneShotEvent:

@asyncio.coroutine
def test_set_exception(self, loop):
ev = Event(loop=loop)
ev = ErrorfulOneShotEvent(loop=loop)

@asyncio.coroutine
def c():
Expand All @@ -28,7 +28,7 @@ def c():

@asyncio.coroutine
def test_set(self, loop):
ev = Event(loop=loop)
ev = ErrorfulOneShotEvent(loop=loop)

@asyncio.coroutine
def c():
Expand All @@ -40,43 +40,3 @@ def c():
ev.set()
yield from asyncio.sleep(0, loop=loop)
assert t.result() == 1

# next lines help to get the 100% coverage.
ev.set()
ev.clear()
t = helpers.ensure_future(c(), loop=loop)
yield from asyncio.sleep(0, loop=loop)
t.cancel()
ev.set()

@asyncio.coroutine
def test_set_no_blocking(self, loop):
ev = Event(loop=loop)
ev.set()

@asyncio.coroutine
def c():
yield from ev.wait()
return 1

t = helpers.ensure_future(c(), loop=loop)
yield from asyncio.sleep(0, loop=loop)
assert t.result() == 1

@asyncio.coroutine
def test_repr(self, loop):
ev = Event(loop=loop)
assert "waiters" not in repr(ev)

@asyncio.coroutine
def c():
yield from ev.wait()

helpers.ensure_future(c(), loop=loop)
yield from asyncio.sleep(0, loop=loop)
assert "waiters" in repr(ev)

@asyncio.coroutine
def test_is_set(self, loop):
ev = Event(loop=loop)
assert not ev.is_set()

0 comments on commit 11afb32

Please sign in to comment.