Skip to content

Commit

Permalink
asyncio: Proper idle implementation
Browse files Browse the repository at this point in the history
This is Python 3.6 again and will be fixed similar to the desugaring of
_parse_objects_direct
  • Loading branch information
chrysn committed Apr 21, 2017
1 parent 4eee007 commit 1efb003
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 17 deletions.
8 changes: 7 additions & 1 deletion examples/asyncio_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,13 @@ async def main():
except Exception as e:
print("An erroneous asynchronously looped command, as expected, raised:", e)

print("Idle result", await client.idle().get())
i = 0
async for subsystem in client.idle():
print("Idle change in", subsystem)
i += 1
if i > 5:
print("Enough changes, quitting")
break

if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
67 changes: 51 additions & 16 deletions mpd/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@
(aiter, `async for`) interface; using it allows the library user to obtain
items of result as soon as they arrive.
The .idle() method works as expected, but there .noidle() method is not
implemented pending a notifying (and automatically idling on demand) interface.
The asynchronous .idle() method is thus only suitable for clients which only
want to send commands after an idle returned (eg. current song notification
pushers).
The .idle() method works differently here: It is an asynchronous iterator that
produces a list of changed subsystems whenever a new one is available. The
MPDClient object automatically switches in and out of idle mode depending on
which subsystems there is currently interest in.
Command lists are currently not supported.
Expand Down Expand Up @@ -117,7 +116,7 @@ async def connect(self, host, port=6600, loop=None):

self.__commandqueue = asyncio.Queue(loop=loop)
self.__idle_results = asyncio.Queue(loop=loop) #: a queue of CommandResult("idle") futures
self.__idle_events = asyncio.Queue(loop=loop) #: a temporary dispatch mechanism, fed with subsystem strings
self.__idle_consumers = [] #: list of (subsystem-list, callbacks) tuples

try:
helloline = await asyncio.wait_for(self.__readline(), timeout=5)
Expand All @@ -138,7 +137,26 @@ def disconnect(self):
self.__rfile = self.__wfile = None
self.__run_task = self.__idle_task = None
self.__commandqueue = self.__command_enqueued = None
self.__idle_results = self.__idle_events = None
self.__idle_results = self.__idle_consumers = None

def _get_idle_interests(self):
"""Accumulate a set of interests from the current __idle_consumers.
Returns the union of their subscribed subjects, [] if at least one of
them is the empty catch-all set, or None if there are no interests at
all."""

if not self.__idle_consumers:
return None
if any(len(s) == 0 for (s, c) in self.__idle_consumers):
return []
return set.union(*(set(s) for (s, c) in self.__idle_consumers))

def _nudge_idle(self):
"""If the main task is currently idling, make it leave idle and process
the next command (if one is present) or just restart idle"""

if self.__command_enqueued is not None and not self.__command_enqueued.done():
self.__command_enqueued.set_result(None)

async def __run(self):
result = None
Expand All @@ -156,7 +174,13 @@ async def __run(self):
# in this case is intended, and is just what asyncio.Queue
# suggests for "get with timeout".

result = CommandResult("idle", [], self._parse_list)
subsystems = self._get_idle_interests()
if subsystems is None:
# the presumably most quiet subsystem -- in this case,
# idle is only used to keep the connection alive
subsystems = ["database"]

result = CommandResult("idle", subsystems, self._parse_list)
self.__idle_results.put_nowait(result)

self.__command_enqueued = asyncio.Future()
Expand Down Expand Up @@ -203,9 +227,12 @@ async def __distribute_idle_results(self):
# unhandled task exception and that's probably the best we can do
while True:
result = await self.__idle_results.get()
idle_changes = await result
for change in idle_changes:
self.__idle_events.put_nowait(change)
idle_changes = list(await result)
if not idle_changes:
continue
for subsystems, callback in self.__idle_consumers:
if not subsystems or any(s in subsystems for s in idle_changes):
callback(idle_changes)

# helper methods

Expand Down Expand Up @@ -326,18 +353,26 @@ def f(self, *args):
if self.__run_task is None:
raise ConnectionError("Can not send command to disconnected client")
self.__commandqueue.put_nowait(result)
if self.__command_enqueued is not None and not self.__command_enqueued.done():
self.__command_enqueued.set_result(None)
self._nudge_idle()
return result
escaped_name = name.replace(" ", "_")
f.__name__ = escaped_name
setattr(cls, escaped_name, f)

# commands that just work differently

def idle(self, subsystems=[]):
# FIXME this is not the final interface
return self.__idle_events
async def idle(self, subsystems=()):
interests_before = self._get_idle_interests()
changes = asyncio.Queue()
try:
entry = (subsystems, changes.put_nowait)
self.__idle_consumers.append(entry)
if self._get_idle_interests != interests_before:
self._nudge_idle()
while True:
yield await changes.get()
finally:
self.__idle_consumers.remove(entry)

def noidle(self):
raise AttributeError("noidle is not supported / required in mpd.asyncio")

0 comments on commit 1efb003

Please sign in to comment.