Skip to content

Commit

Permalink
Refactored request_resource() for better readability and efficiency
Browse files Browse the repository at this point in the history
  • Loading branch information
agronholm committed Sep 28, 2015
1 parent a1dbc55 commit ff236d0
Showing 1 changed file with 35 additions and 37 deletions.
72 changes: 35 additions & 37 deletions asphalt/core/context.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from asyncio.futures import Future

from typing import Optional, Callable, Any, Union, Iterable, Sequence
from asyncio import get_event_loop, coroutine, iscoroutinefunction
from collections import defaultdict
import asyncio
import time

from .util import qualified_name, asynchronous
from .event import EventSource, Event
Expand Down Expand Up @@ -129,11 +130,6 @@ def __init__(self, parent: 'Context'=None, default_timeout: int=10):
self._resource_creators = {} # type: Dict[str, Callable[[Context], Any]
self.default_timeout = default_timeout

# Forward resource events from the parent(s)
if parent is not None:
parent.add_listener('resource_published', self.dispatch)
parent.add_listener('resource_removed', self.dispatch)

def __getattr__(self, name):
creator = self._resource_creators.get(name)
if creator is not None:
Expand Down Expand Up @@ -280,13 +276,6 @@ def remove_resource(self, resource: Resource):

yield from self.dispatch('resource_removed', resource)

def _get_resource(self, resource_type: str, alias: str) -> Optional[Resource]:
resource = self._resources.get(resource_type, {}).get(alias)
if resource is None and self._parent is not None:
resource = self._parent._get_resource(resource_type, alias)

return resource

@asynchronous
def request_resource(self, type: Union[str, type], alias: str='default', *,
timeout: Union[int, float, None]=None, optional: bool=False):
Expand All @@ -311,30 +300,39 @@ def request_resource(self, type: Union[str, type], alias: str='default', *,
if not alias:
raise ValueError('alias must be a nonempty string')

resource_type = qualified_name(type) if not isinstance(type, str) else type
timeout = timeout if timeout is not None else self.default_timeout
assert timeout >= 0, 'timeout must be a positive integer'

resource_type = qualified_name(type) if not isinstance(type, str) else type
handle = event = start_time = None
resource = self._get_resource(resource_type, alias)
while resource is None:
if not handle:
event = asyncio.Event()
start_time = time.monotonic()
handle = self.add_listener('resource_published', lambda e: event.set())
try:
delay = timeout - (time.monotonic() - start_time) if timeout is not None else None
yield from asyncio.wait_for(event.wait(), delay)
except asyncio.TimeoutError:
self.remove_listener(handle)
if optional:
return None
else:
raise ResourceNotFound(resource_type, alias)

resource = self._get_resource(resource_type, alias)

if handle:
self.remove_listener(handle)

return resource.get_value(self)
# Build a context chain from this context and its parents
context_chain = [self]
while context_chain[-1]._parent:
context_chain.append(context_chain[-1]._parent)

# First try to look up the resource in the context chain
for ctx in context_chain:
resource = ctx._resources.get(resource_type, {}).get(alias)
if resource is not None:
return resource.get_value(self)

# Listen to resource publish events in the whole chain and wait for the right kind of
# resource to be published
def resource_listener(event: ResourceEvent):
if event.resource.alias == alias and resource_type in event.resource.types:
future.set_result(event.resource)

future = Future()
listeners = [ctx.add_listener('resource_published', resource_listener) for
ctx in context_chain]
try:
resource = yield from asyncio.wait_for(future, timeout)
except asyncio.TimeoutError:
if optional:
return None
else:
raise ResourceNotFound(resource_type, alias)
else:
return resource.get_value(self)
finally:
for listener in listeners:
listener.remove()

0 comments on commit ff236d0

Please sign in to comment.