Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions docker/api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from ..tls import TLSConfig
from ..transport import SSLAdapter, UnixAdapter
from ..utils import utils, check_resource, update_headers, config
from ..utils.socket import frames_iter, socket_raw_iter
from ..utils.socket import frames_iter, consume_socket_output, demux_adaptor
from ..utils.json_stream import json_stream
try:
from ..transport import NpipeAdapter
Expand Down Expand Up @@ -381,19 +381,23 @@ def _stream_raw_result(self, response, chunk_size=1, decode=True):
for out in response.iter_content(chunk_size, decode):
yield out

def _read_from_socket(self, response, stream, tty=False):
def _read_from_socket(self, response, stream, tty=True, demux=False):
socket = self._get_raw_response_socket(response)

gen = None
if tty is False:
gen = frames_iter(socket)
gen = frames_iter(socket, tty)

if demux:
# The generator will output tuples (stdout, stderr)
gen = (demux_adaptor(*frame) for frame in gen)
else:
gen = socket_raw_iter(socket)
# The generator will output strings
gen = (data for (_, data) in gen)

if stream:
return gen
else:
return six.binary_type().join(gen)
# Wait for all the frames, concatenate them, and return the result
return consume_socket_output(gen, demux=demux)

def _disable_socket_timeout(self, socket):
""" Depending on the combination of python version and whether we're
Expand Down
13 changes: 8 additions & 5 deletions docker/api/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
class ContainerApiMixin(object):
@utils.check_resource('container')
def attach(self, container, stdout=True, stderr=True,
stream=False, logs=False):
stream=False, logs=False, demux=False):
"""
Attach to a container.

Expand All @@ -28,11 +28,15 @@ def attach(self, container, stdout=True, stderr=True,
stream (bool): Return container output progressively as an iterator
of strings, rather than a single string.
logs (bool): Include the container's previous output.
demux (bool): Keep stdout and stderr separate.

Returns:
By default, the container's output as a single string.
By default, the container's output as a single string (two if
``demux=True``: one for stdout and one for stderr).

If ``stream=True``, an iterator of output strings.
If ``stream=True``, an iterator of output strings. If
``demux=True``, two iterators are returned: one for stdout and one
for stderr.

Raises:
:py:class:`docker.errors.APIError`
Expand All @@ -54,8 +58,7 @@ def attach(self, container, stdout=True, stderr=True,
response = self._post(u, headers=headers, params=params, stream=True)

output = self._read_from_socket(
response, stream, self._check_is_tty(container)
)
response, stream, self._check_is_tty(container), demux=demux)

if stream:
return CancellableStream(output, response)
Expand Down
13 changes: 8 additions & 5 deletions docker/api/exec_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def exec_resize(self, exec_id, height=None, width=None):

@utils.check_resource('exec_id')
def exec_start(self, exec_id, detach=False, tty=False, stream=False,
socket=False):
socket=False, demux=False):
"""
Start a previously set up exec instance.

Expand All @@ -130,11 +130,14 @@ def exec_start(self, exec_id, detach=False, tty=False, stream=False,
stream (bool): Stream response data. Default: False
socket (bool): Return the connection socket to allow custom
read/write operations.
demux (bool): Return stdout and stderr separately

Returns:
(generator or str): If ``stream=True``, a generator yielding
response chunks. If ``socket=True``, a socket object for the
connection. A string containing response data otherwise.

(generator or str or tuple): If ``stream=True``, a generator
yielding response chunks. If ``socket=True``, a socket object for
the connection. A string containing response data otherwise. If
``demux=True``, stdout and stderr are separated.

Raises:
:py:class:`docker.errors.APIError`
Expand Down Expand Up @@ -162,4 +165,4 @@ def exec_start(self, exec_id, detach=False, tty=False, stream=False,
return self._result(res)
if socket:
return self._get_raw_response_socket(res)
return self._read_from_socket(res, stream, tty)
return self._read_from_socket(res, stream, tty=tty, demux=demux)
70 changes: 68 additions & 2 deletions docker/models/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def diff(self):

def exec_run(self, cmd, stdout=True, stderr=True, stdin=False, tty=False,
privileged=False, user='', detach=False, stream=False,
socket=False, environment=None, workdir=None):
socket=False, environment=None, workdir=None, demux=False):
"""
Run a command inside this container. Similar to
``docker exec``.
Expand All @@ -166,6 +166,7 @@ def exec_run(self, cmd, stdout=True, stderr=True, stdin=False, tty=False,
the following format ``["PASSWORD=xxx"]`` or
``{"PASSWORD": "xxx"}``.
workdir (str): Path to working directory for this exec session
demux (bool): Return stdout and stderr separately

Returns:
(ExecResult): A tuple of (exit_code, output)
Expand All @@ -180,14 +181,79 @@ def exec_run(self, cmd, stdout=True, stderr=True, stdin=False, tty=False,
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.

Example:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this is the best place for such a long example, especially since it applies to attach() as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure where to put it though. I can just copy paste if for attach(), or add a reference to it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe as a separate docs page. We can figure that out later though, it's not worth blocking the PR.


Create a container that runs in the background

>>> client = docker.from_env()
>>> container = client.containers.run(
... 'bfirsh/reticulate-splines', detach=True)

Prepare the command we are going to use. It prints "hello stdout"
in `stdout`, followed by "hello stderr" in `stderr`:

>>> cmd = '/bin/sh -c "echo hello stdout ; echo hello stderr >&2"'

We'll run this command with all four the combinations of ``stream``
and ``demux``.

With ``stream=False`` and ``demux=False``, the output is a string
that contains both the `stdout` and the `stderr` output:

>>> res = container.exec_run(cmd, stream=False, demux=False)
>>> res.output
b'hello stderr\nhello stdout\n'

With ``stream=True``, and ``demux=False``, the output is a
generator that yields strings containing the output of both
`stdout` and `stderr`:

>>> res = container.exec_run(cmd, stream=True, demux=False)
>>> next(res.output)
b'hello stdout\n'
>>> next(res.output)
b'hello stderr\n'
>>> next(res.output)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration

With ``stream=True`` and ``demux=True``, the generator now
separates the streams, and yield tuples
``(stdout, stderr)``:

>>> res = container.exec_run(cmd, stream=True, demux=True)
>>> next(res.output)
(b'hello stdout\n', None)
>>> next(res.output)
(None, b'hello stderr\n')
>>> next(res.output)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration

Finally, with ``stream=False`` and ``demux=True``, the whole output
is returned, but the streams are still separated:

>>> res = container.exec_run(cmd, stream=True, demux=True)
>>> next(res.output)
(b'hello stdout\n', None)
>>> next(res.output)
(None, b'hello stderr\n')
>>> next(res.output)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration
"""
resp = self.client.api.exec_create(
self.id, cmd, stdout=stdout, stderr=stderr, stdin=stdin, tty=tty,
privileged=privileged, user=user, environment=environment,
workdir=workdir
)
exec_output = self.client.api.exec_start(
resp['Id'], detach=detach, tty=tty, stream=stream, socket=socket
resp['Id'], detach=detach, tty=tty, stream=stream, socket=socket,
demux=demux
)
if socket or stream:
return ExecResult(None, exec_output)
Expand Down
97 changes: 83 additions & 14 deletions docker/utils/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
NpipeSocket = type(None)


STDOUT = 1
STDERR = 2


class SocketError(Exception):
pass

Expand Down Expand Up @@ -51,28 +55,43 @@ def read_exactly(socket, n):
return data


def next_frame_size(socket):
def next_frame_header(socket):
"""
Returns the size of the next frame of data waiting to be read from socket,
according to the protocol defined here:
Returns the stream and size of the next frame of data waiting to be read
from socket, according to the protocol defined here:

https://docs.docker.com/engine/reference/api/docker_remote_api_v1.24/#/attach-to-a-container
https://docs.docker.com/engine/api/v1.24/#attach-to-a-container
"""
try:
data = read_exactly(socket, 8)
except SocketError:
return -1
return (-1, -1)

stream, actual = struct.unpack('>BxxxL', data)
return (stream, actual)


_, actual = struct.unpack('>BxxxL', data)
return actual
def frames_iter(socket, tty):
"""
Return a generator of frames read from socket. A frame is a tuple where
the first item is the stream number and the second item is a chunk of data.

If the tty setting is enabled, the streams are multiplexed into the stdout
stream.
"""
if tty:
return ((STDOUT, frame) for frame in frames_iter_tty(socket))
else:
return frames_iter_no_tty(socket)


def frames_iter(socket):
def frames_iter_no_tty(socket):
"""
Returns a generator of frames read from socket
Returns a generator of data read from the socket when the tty setting is
not enabled.
"""
while True:
n = next_frame_size(socket)
(stream, n) = next_frame_header(socket)
if n < 0:
break
while n > 0:
Expand All @@ -84,17 +103,67 @@ def frames_iter(socket):
# We have reached EOF
return
n -= data_length
yield result
yield (stream, result)


def socket_raw_iter(socket):
def frames_iter_tty(socket):
"""
Returns a generator of data read from the socket.
This is used for non-multiplexed streams.
Return a generator of data read from the socket when the tty setting is
enabled.
"""
while True:
result = read(socket)
if len(result) == 0:
# We have reached EOF
return
yield result


def consume_socket_output(frames, demux=False):
"""
Iterate through frames read from the socket and return the result.

Args:

demux (bool):
If False, stdout and stderr are multiplexed, and the result is the
concatenation of all the frames. If True, the streams are
demultiplexed, and the result is a 2-tuple where each item is the
concatenation of frames belonging to the same stream.
"""
if demux is False:
# If the streams are multiplexed, the generator returns strings, that
# we just need to concatenate.
return six.binary_type().join(frames)

# If the streams are demultiplexed, the generator yields tuples
# (stdout, stderr)
out = [None, None]
for frame in frames:
# It is guaranteed that for each frame, one and only one stream
# is not None.
assert frame != (None, None)
if frame[0] is not None:
if out[0] is None:
out[0] = frame[0]
else:
out[0] += frame[0]
else:
if out[1] is None:
out[1] = frame[1]
else:
out[1] += frame[1]
return tuple(out)


def demux_adaptor(stream_id, data):
"""
Utility to demultiplex stdout and stderr when reading frames from the
socket.
"""
if stream_id == STDOUT:
return (data, None)
elif stream_id == STDERR:
return (None, data)
else:
raise ValueError('{0} is not a valid stream'.format(stream_id))
5 changes: 3 additions & 2 deletions tests/integration/api_container_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import docker
from docker.constants import IS_WINDOWS_PLATFORM
from docker.utils.socket import next_frame_size
from docker.utils.socket import next_frame_header
from docker.utils.socket import read_exactly

import pytest
Expand Down Expand Up @@ -1242,7 +1242,8 @@ def test_run_container_reading_socket(self):

self.client.start(container)

next_size = next_frame_size(pty_stdout)
(stream, next_size) = next_frame_header(pty_stdout)
assert stream == 1 # correspond to stdout
assert next_size == len(line)
data = read_exactly(pty_stdout, next_size)
assert data.decode('utf-8') == line
Expand Down
Loading