Skip to content

Commit

Permalink
Add events task (#76)
Browse files Browse the repository at this point in the history
* Automatically create task for events

* Fix: ClientResponse.close() is not a coroutine
  • Loading branch information
cecton authored and Christian Barra committed Aug 1, 2017
1 parent 5defebb commit db74893
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 13 deletions.
28 changes: 18 additions & 10 deletions aiodocker/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,13 +466,16 @@ def __init__(self, docker):
self.docker = docker
self.channel = Channel()
self.json_stream = None
self.task = None

def listen(self):
warnings.warn("use subscribe() method instead",
DeprecationWarning, stacklevel=2)
return self.channel.subscribe()

def subscribe(self):
def subscribe(self, create_task=True):
if create_task:
self.task = asyncio.ensure_future(self.run())
return self.channel.subscribe()

def _transform_event(self, data):
Expand Down Expand Up @@ -500,20 +503,25 @@ async def run(self, **params):
self._transform_event,
human_bool(params['stream']),
)
async for data in self.json_stream:
await self.channel.publish(data)
try:
async for data in self.json_stream:
await self.channel.publish(data)
finally:
await self.json_stream._close()
self.json_stream = None
finally:
# signal termination to subscribers
await self.channel.publish(None)
try:
await self.json_stream.close()
except:
pass
self.json_stream = None

async def stop(self):
if self.json_stream:
await self.json_stream.close()
if self.json_stream is not None:
await self.json_stream._close()
if self.task:
self.task.cancel()
try:
await self.task
except asyncio.CancelledError:
pass


class DockerLog:
Expand Down
2 changes: 1 addition & 1 deletion aiodocker/jsonstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async def _close(self):
# (see https://github.com/KeepSafe/aiohttp/issues/739)

# response error , it has been closed
await self._response.close()
self._response.close()


async def json_stream_result(response, transform=None, stream=True):
Expand Down
24 changes: 24 additions & 0 deletions tests/test_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import asyncio

import pytest


def test_events_default_task(docker):
loop = asyncio.get_event_loop()
docker.events.subscribe()
assert docker.events.task is not None
loop.run_until_complete(docker.events.stop())
assert docker.events.task.done()
assert docker.events.json_stream is None


def test_events_provided_task(docker):
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(docker.events.run())
docker.events.subscribe(create_task=False)
assert docker.events.task is None
loop.run_until_complete(docker.events.stop())
assert docker.events.json_stream is None
task.cancel()
with pytest.raises(asyncio.CancelledError):
loop.run_until_complete(task)
4 changes: 2 additions & 2 deletions tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ async def test_port(docker, testing_images, redis_container):

@pytest.mark.asyncio
async def test_events(docker, testing_images, event_loop):
monitor_task = event_loop.create_task(docker.events.run())
subscriber = docker.events.subscribe()

# Do some stuffs to generate events.
Expand Down Expand Up @@ -231,4 +230,5 @@ async def test_events(docker, testing_images, event_loop):
break

assert events_occurred == ['create', 'start', 'kill', 'die', 'destroy']
monitor_task.cancel()

await docker.events.stop()

0 comments on commit db74893

Please sign in to comment.