Skip to content

Commit

Permalink
refactor code for Python >= 3.5, add service logs support (#71)
Browse files Browse the repository at this point in the history
  • Loading branch information
Christian Barra committed Jul 29, 2017
1 parent 225903b commit 426436c
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 63 deletions.
19 changes: 9 additions & 10 deletions aiodocker/jsonstream.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import sys
import types
import asyncio
import json
import logging
Expand All @@ -18,25 +19,21 @@ def __aiter__(self):
if sys.version_info <= (3, 5, 2):
__aiter__ = asyncio.coroutine(__aiter__)

async def __anext__(self):
response = await self.fetch()
if response:
return response
else:
raise StopAsyncIteration

async def fetch(self):
@types.coroutine
def __anext__(self):
while True:
try:
data = await self._response.content.readline()
data = yield from self._response.content.readline()
if not data:
break
except (aiohttp.ClientConnectionError,
aiohttp.ServerDisconnectedError):
break
return self._transform(json.loads(data.decode('utf8')))

async def close(self):
raise StopAsyncIteration

async def _close(self):
# response.release() indefinitely hangs because the server is sending
# an infinite stream of messages.
# (see https://github.com/KeepSafe/aiohttp/issues/739)
Expand All @@ -47,8 +44,10 @@ async def close(self):

async def json_stream_result(response, transform=None, stream=True):
json_stream = _JsonStreamResult(response, transform)

if stream:
return json_stream

data = []
async for obj in json_stream:
data.append(obj)
Expand Down
87 changes: 45 additions & 42 deletions aiodocker/multiplexed.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import sys
import types
import asyncio
import struct
import aiohttp
Expand All @@ -11,67 +12,69 @@ class MultiplexedResult:
def __init__(self, response, raw):
self._response = response

self._gen = self.fetch
if raw:
self._iter = self.fetch_raw
else:
self._iter = self.fetch
self._gen = self.fetch_raw

def __aiter__(self):
return self

if sys.version_info <= (3, 5, 2):
__aiter__ = asyncio.coroutine(__aiter__)

async def __anext__(self):
response = await self._iter()
response = await self._gen()

if not response:
await self._close()
raise StopAsyncIteration

return response

async def fetch(self):
try:
while True:
try:
hdrlen = constants.STREAM_HEADER_SIZE_BYTES
header = await self._response.content.readexactly(hdrlen)
_, length = struct.unpack('>BxxxL', header)
if not length:
continue
data = await self._response.content.readexactly(length)
except (aiohttp.ClientConnectionError,
aiohttp.ServerDisconnectedError):
break
except asyncio.IncompleteReadError:
break
return data
finally:
await self.close()

async def fetch_raw(self):
try:
async for data in self._response.content.iter_chunked(1024):
return data
except aiohttp.ClientConnectionError:
pass
finally:
await self.close()

async def close(self):
if sys.version_info <= (3, 5, 2):
__aiter__ = asyncio.coroutine(__aiter__)

@types.coroutine
def fetch(self):
while True:

try:
hdrlen = constants.STREAM_HEADER_SIZE_BYTES
header = yield from self._response.content.readexactly(hdrlen)

_, length = struct.unpack('>BxxxL', header)
if not length:
continue

data = yield from self._response.content.readexactly(length)

except (aiohttp.ClientConnectionError,
aiohttp.ServerDisconnectedError,
asyncio.IncompleteReadError):
break
return data

@types.coroutine
def fetch_raw(self):
chunk = self._response.content.iter_chunked(1024).__aiter__()
while True:
try:
data = yield from chunk.__anext__()
except StopAsyncIteration:
break
return data

async def _close(self):
await self._response.release()


async def multiplexed_result(response, follow=False, is_tty=False,
encoding='utf-8'):

log_stream = MultiplexedResult(response, raw=False)
if is_tty:
log_stream = MultiplexedResult(response, raw=True)
# if is_tty is True you get a raw output
log_stream = MultiplexedResult(response, raw=is_tty)

if follow:
return _DecodeHelper(log_stream, encoding=encoding)
else:
d = []
async for piece in _DecodeHelper(log_stream, encoding=encoding):
if isinstance(piece, str):
d.append(piece)
return ''.join(d)
d.append(piece.strip())
return d
50 changes: 49 additions & 1 deletion aiodocker/services.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
from typing import Optional, Dict, List, Any
from typing import Optional, Dict, List, Any, Union, AsyncIterator
from .multiplexed import multiplexed_result
from .utils import clean_config, clean_networks, clean_filters, format_env


Expand Down Expand Up @@ -124,3 +125,50 @@ async def inspect(self, service_id: str) -> Dict[str, Any]:
method="GET",
)
return response

async def logs(self, service_id: str, *,
details: bool=False,
follow: bool=False,
stdout: bool=False,
stderr: bool=False,
since: int=0,
timestamps: bool=False,
is_tty: bool=False,
tail: str="all"
) -> Union[str, AsyncIterator[str]]:
"""
Retrieve logs of the given service
Args:
details: show service context and extra details provided to logs
follow: return the logs as a stream.
stdout: return logs from stdout
stderr: return logs from stderr
since: return logs since this time, as a UNIX timestamp
timestamps: add timestamps to every log line
is_tty: the service has a pseudo-TTY allocated
tail: only return this number of log lines
from the end of the logs, specify as an integer
or `all` to output all log lines.
"""
if stdout is False and stderr is False:
raise TypeError("Need one of stdout or stderr")

params = {
"details": details,
"follow": follow,
"stdout": stdout,
"stderr": stderr,
"since": since,
"timestamps": timestamps,
"tail": tail
}

response = await self.docker._query(
"services/{service_id}/logs".format(service_id=service_id),
method="GET",
params=params
)
return (await multiplexed_result(response, follow, is_tty=is_tty))
1 change: 1 addition & 0 deletions aiodocker/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ async def __anext__(self):
stream_decoded = self._decoder.decode(b'', final=True)
if stream_decoded:
return stream_decoded
raise StopAsyncIteration
else:
return self._decoder.decode(stream)

Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def testing_images():
async def _pull():
docker = Docker()
required_images = [
'alpine:latest', 'redis:latest',
'alpine:latest', 'redis:latest', 'python:3.6.1-alpine'
]
for img in required_images:
try:
Expand Down
7 changes: 3 additions & 4 deletions tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,13 @@ async def test_stdio_stdin(docker, testing_images, shell_container):
# cross-check with container logs.
log = []
found = False

try:
# collect the logs for at most 2 secs until we see the output.
stream = await shell_container.log(stdout=True, follow=True)
with aiohttp.Timeout(2):
async for s in stream:
if isinstance(s, str):
log.append(s)
log.append(s)
if "hello world\r\n" in s:
found = True
break
Expand Down Expand Up @@ -190,8 +190,7 @@ async def test_put_archive(docker, testing_images):
await container.wait(timeout=5)

output = await container.log(stdout=True, stderr=True)
output.strip()
assert output == "hello world"
assert output[0] == "hello world"

await container.delete(force=True)

Expand Down
96 changes: 91 additions & 5 deletions tests/test_services.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
import pytest
from aiodocker.exceptions import DockerError
import aiohttp


@pytest.mark.asyncio
Expand Down Expand Up @@ -69,9 +71,96 @@ async def test_service_tasks(docker):


@pytest.mark.asyncio
async def test_delete_services(docker):
services = await docker.services.list()
async def test_logs_services(docker, testing_images):
TaskTemplate = {
"ContainerSpec": {
"Image": "python:3.6.1-alpine",
"Args": [
"python", "-c",
"for _ in range(10): print('Hello Python')"
]
},
"RestartPolicy": {
"Condition": "none"
}
}
service = await docker.services.create(
task_template=TaskTemplate,
)
service_id = service['ID']

filters = {"service": service_id}

# wait till task status is `complete`
with aiohttp.Timeout(60):
while True:
await asyncio.sleep(2)
task = await docker.tasks.list(filters=filters)
if task:
status = task[0]['Status']['State']
if status == 'complete':
break

logs = await docker.services.logs(
service_id, stdout=True)

assert len(logs) == 10
assert logs[0] == "Hello Python"


@pytest.mark.asyncio
async def test_logs_services_stream(docker, testing_images):
TaskTemplate = {
"ContainerSpec": {
"Image": "python:3.6.1-alpine",
"Args": [
"python", "-c",
"for _ in range(10): print('Hello Python')"
]
},
"RestartPolicy": {
"Condition": "none"
}
}
service = await docker.services.create(
task_template=TaskTemplate,
)
service_id = service['ID']

filters = {"service": service_id}

# wait till task status is `complete`
with aiohttp.Timeout(60):
while True:
await asyncio.sleep(2)
task = await docker.tasks.list(filters=filters)
if task:
status = task[0]['Status']['State']
if status == 'complete':
break

stream = await docker.services.logs(
service_id, stdout=True, follow=True
)

# the service printed 10 `Hello Python`
# let's check for them
count = 0
try:
with aiohttp.Timeout(2):
while True:
async for log in stream:
if "Hello Python\n" in log:
count += 1
except asyncio.TimeoutError:
pass

assert count == 10


@pytest.mark.asyncio
async def test_service_delete(docker):
services = await docker.services.list()
for service in services:
await docker.services.delete(service_id=service['ID'])

Expand All @@ -80,7 +169,4 @@ async def test_delete_services(docker):
@pytest.mark.xfail(raises=DockerError, reason="bug inside Docker")
@pytest.mark.asyncio
async def test_swarm_remove(docker):
services = await docker.services.list()
for service in services:
await docker.services.delete(service_id=service['ID'])
await docker.swarm.leave(force=True)

0 comments on commit 426436c

Please sign in to comment.