Skip to content

Commit

Permalink
add support for python 3.5.x (#68)
Browse files Browse the repository at this point in the history
  • Loading branch information
Christian Barra committed Jul 19, 2017
1 parent efe04db commit 0be85e6
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 52 deletions.
4 changes: 2 additions & 2 deletions aiodocker/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def __init__(self,
self.volumes = DockerVolumes(self)

async def close(self):
self.session.close() # no longer coroutine since aiohttp 2.x
await self.session.close()

async def auth(self, **credentials):
response = await self._query_json(
Expand Down Expand Up @@ -500,7 +500,7 @@ async def run(self, **params):
self._transform_event,
human_bool(params['stream']),
)
async for data in self.json_stream.fetch():
async for data in self.json_stream:
await self.channel.publish(data)
finally:
# signal termination to subscribers
Expand Down
31 changes: 23 additions & 8 deletions aiodocker/jsonstream.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,55 @@
import sys
import asyncio
import json
import logging
import aiohttp

log = logging.getLogger(__name__)


class JsonStreamResult:
class _JsonStreamResult:
def __init__(self, response, transform=None):
self.response = response
self.transform = transform or (lambda x: x)
self._response = response
self._transform = transform or (lambda x: x)

def __aiter__(self):
return self

if sys.version_info <= (3, 5, 2):
__aiter__ = asyncio.coroutine(__aiter__)

async def __anext__(self):
response = await self.fetch()
if response:
return response
else:
raise StopAsyncIteration

async def fetch(self):
while True:
try:
data = await self.response.content.readline()
data = await self._response.content.readline()
if not data:
break
except (aiohttp.ClientConnectionError,
aiohttp.ServerDisconnectedError):
break
yield self.transform(json.loads(data.decode('utf8')))
return self._transform(json.loads(data.decode('utf8')))

async def close(self):
# response.release() indefinitely hangs because the server is sending
# an infinite stream of messages.
# (see https://github.com/KeepSafe/aiohttp/issues/739)

# response error , it has been closed
self.response.close()
await self._response.close()


async def json_stream_result(response, transform=None, stream=True):
json_stream = JsonStreamResult(response, transform)
json_stream = _JsonStreamResult(response, transform)
if stream:
return json_stream
data = []
async for obj in json_stream.fetch():
async for obj in json_stream:
data.append(obj)
return data
63 changes: 38 additions & 25 deletions aiodocker/multiplexed.py
Original file line number Diff line number Diff line change
@@ -1,64 +1,77 @@
import sys
import asyncio
import struct
import aiohttp

from . import constants
from aiodocker.utils import decoded
from aiodocker.utils import _DecodeHelper


class MultiplexedResult:
def __init__(self, response):
self.response = response
def __init__(self, response, raw):
self._response = response

if raw:
self._iter = self.fetch_raw
else:
self._iter = self.fetch

def __aiter__(self):
return self

if sys.version_info <= (3, 5, 2):
__aiter__ = asyncio.coroutine(__aiter__)

async def __anext__(self):
response = await self._iter()
if not response:
raise StopAsyncIteration
return response

async def fetch(self):
try:
while True:
try:
hdrlen = constants.STREAM_HEADER_SIZE_BYTES
header = await self.response.content.readexactly(hdrlen)
header = await self._response.content.readexactly(hdrlen)
_, length = struct.unpack('>BxxxL', header)
if not length:
continue
data = await self.response.content.readexactly(length)
data = await self._response.content.readexactly(length)
except (aiohttp.ClientConnectionError,
aiohttp.ServerDisconnectedError):
break
except asyncio.IncompleteReadError:
break
yield data
return data
finally:
await self.close()

async def fetch_raw(self):
try:
async for data in self.response.content.iter_chunked(1024):
yield data
async for data in self._response.content.iter_chunked(1024):
return data
except aiohttp.ClientConnectionError:
pass
finally:
await self.close()

async def close(self):
await self.response.release()
await self._response.release()


async def multiplexed_result(response, follow=False, is_tty=False,
encoding='utf-8'):
log_stream = MultiplexedResult(response)

log_stream = MultiplexedResult(response, raw=False)
if is_tty:
if follow:
return decoded(log_stream.fetch_raw(), encoding=encoding)
else:
d = []
async for piece in decoded(log_stream.fetch_raw(),
encoding=encoding):
d.append(piece)
return ''.join(d)
log_stream = MultiplexedResult(response, raw=True)

if follow:
return _DecodeHelper(log_stream, encoding=encoding)
else:
if follow:
return decoded(log_stream.fetch(), encoding=encoding)
else:
d = []
async for piece in decoded(log_stream.fetch(),
encoding=encoding):
d = []
async for piece in _DecodeHelper(log_stream, encoding=encoding):
if isinstance(piece, str):
d.append(piece)
return ''.join(d)
return ''.join(d)
47 changes: 35 additions & 12 deletions aiodocker/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import asyncio
import sys
from typing import Optional, Dict, List, Union, Any, BinaryIO, IO
from io import BytesIO
import tempfile
Expand Down Expand Up @@ -55,20 +57,45 @@ def httpize(d: Optional[Dict]) -> Dict[str, Any]:
return converted


async def decoded(generator, encoding='utf-8'):
decoder = codecs.getincrementaldecoder(encoding)(errors='ignore')
async for d in generator:
yield decoder.decode(d)
class _DecodeHelper:
"""
Decode logs from the Docker Engine
"""

d = decoder.decode(b'', final=True)
if d:
yield d
def __init__(self, generator, encoding):
self._gen = generator.__aiter__()
self._decoder = codecs.getincrementaldecoder(encoding)(errors='ignore')
self._flag = False

def __aiter__(self):
return self

# to make it compatible with Python 3.5.0 and 3.5.2
# https://www.python.org/dev/peps/pep-0492/#api-design-and-implementation-revisions
if sys.version_info <= (3, 5, 2):
__aiter__ = asyncio.coroutine(__aiter__)

async def __anext__(self):
if self._flag:
raise StopAsyncIteration

# we catch StopAsyncIteration from self._gen
# because we need to close the decoder
# then we raise StopAsyncIteration checking self._flag
try:
stream = await self._gen.__anext__()
except StopAsyncIteration:
self._flag = True
stream_decoded = self._decoder.decode(b'', final=True)
if stream_decoded:
return stream_decoded
else:
return self._decoder.decode(stream)


def clean_config(config: Optional[dict]) -> dict:
"""
Checks the values inside `config`
Returns a new dictionary with only NOT `None` values
"""
data = {}
Expand All @@ -95,7 +122,6 @@ def format_env(key, value: Union[None, bytes, str]) -> str:
def clean_networks(networks: Optional[List]=None) -> List:
"""
Cleans the values inside `networks`
Returns a new list
"""
if not networks:
Expand All @@ -114,7 +140,6 @@ def clean_networks(networks: Optional[List]=None) -> List:
def clean_filters(filters: Optional[dict]=None) -> str:
"""
Checks the values inside `filters`
https://docs.docker.com/engine/api/v1.29/#operation/ServiceList
Returns a new dictionary in the format `map[string][]string` jsonized
"""
Expand All @@ -132,10 +157,8 @@ def mktar_from_dockerfile(fileobject: BinaryIO) -> IO:
"""
Create a zipped tar archive from a Dockerfile
**Remember to close the file object**
Args:
fileobj: a Dockerfile
Returns:
a NamedTemporaryFile() object
"""
Expand Down
2 changes: 1 addition & 1 deletion tests/test_images.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async def test_build_from_remote_file(docker):
params = {'tag': tag, 'remote': remote, 'stream': True}
stream = await docker.images.build(**params)

async for output in stream.fetch():
async for output in stream:
if "Successfully tagged image:1.0\n" in output:
break

Expand Down
9 changes: 5 additions & 4 deletions tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,16 @@ async def test_stdio_stdin(docker, testing_images, shell_container):
assert found

# cross-check with container logs.
stream_output = await shell_container.log(stdout=True, follow=True)
log = []
found = False
try:
# collect the logs for at most 2 secs until we see the output.
stream = await shell_container.log(stdout=True, follow=True)
with aiohttp.Timeout(2):
async for d in stream_output:
log.append(d)
if "hello world\r\n" == d:
async for s in stream:
if isinstance(s, str):
log.append(s)
if "hello world\r\n" in s:
found = True
break
except asyncio.TimeoutError:
Expand Down

0 comments on commit 0be85e6

Please sign in to comment.