From 9676febae9ec2ae1a0c72c416660ef1766f97f5f Mon Sep 17 00:00:00 2001 From: Davanum Srinivas Date: Mon, 22 Dec 2014 08:17:36 -0500 Subject: [PATCH] Fix to enable streaming container logs reliably Started a ubuntu container that just runs "ping 8.8.8.8" and tried the sample code in https://gist.github.com/dims/c3327f633c526847c8e5 to recreate the problem mentioned in: https://github.com/docker/docker-py/issues/300 To debug the problem i printed the byte array read in recvall when reading STREAM_HEADER_SIZE_BYTES and realized that the data being read was far ahead of the actual start of the header documented in the vnd.docker.raw-stream of the docker remote api. This is possibly because the requests/urllib3 is reading ahead a bit more and we shouldn't be trying to hack the internals of those projects. So just using the documented file-like response.raw is good enough for us to get the functionality we need which is being able to read for exactly where the stream header starts. With this change i can reliably stream the logs just like "docker logs --follow". Note that we still need to access the underlying socket to set the timeout to prevent read time outs. The original fix was for client.logs() only but on further review it made sense to replace all occurances of _multiplexed_socket_stream_helper with the new method. --- docker/client.py | 42 ++++++++++++++---------------------------- 1 file changed, 14 insertions(+), 28 deletions(-) diff --git a/docker/client.py b/docker/client.py index 7aad1d2cdf..7b436cd4b9 100644 --- a/docker/client.py +++ b/docker/client.py @@ -320,40 +320,26 @@ def _multiplexed_buffer_helper(self, response): walker = end yield buf[start:end] - def _multiplexed_socket_stream_helper(self, response): + def _multiplexed_response_stream_helper(self, response): """A generator of multiplexed data blocks coming from a response - socket.""" - socket = self._get_raw_response_socket(response) - - def recvall(socket, size): - blocks = [] - while size > 0: - if six.PY3: - block = socket._sock.recv(size) - else: - block = socket.recv(size) - if not block: - return None - - blocks.append(block) - size -= len(block) + stream.""" - sep = bytes() if six.PY3 else str() - data = sep.join(blocks) - return data + # Disable timeout on the underlying socket to prevent + # Read timed out(s) for long running processes + socket = self._get_raw_response_socket(response) + if six.PY3: + socket._sock.settimeout(None) + else: + socket.settimeout(None) while True: - if six.PY3: - socket._sock.settimeout(None) - else: - socket.settimeout(None) - header = recvall(socket, STREAM_HEADER_SIZE_BYTES) + header = response.raw.read(STREAM_HEADER_SIZE_BYTES) if not header: break _, length = struct.unpack('>BxxxL', header) if not length: break - data = recvall(socket, length) + data = response.raw.read(length) if not data: break yield data @@ -387,7 +373,7 @@ def stream_result(): sep = bytes() if six.PY3 else str() - return stream and self._multiplexed_socket_stream_helper(response) or \ + return stream and self._multiplexed_response_stream_helper(response) or \ sep.join([x for x in self._multiplexed_buffer_helper(response)]) def attach_socket(self, container, params=None, ws=False): @@ -604,7 +590,7 @@ def execute(self, container, cmd, detach=False, stdout=True, stderr=True, data=data, stream=stream) self._raise_for_status(res) if stream: - return self._multiplexed_socket_stream_helper(res) + return self._multiplexed_response_stream_helper(res) elif six.PY3: return bytes().join( [x for x in self._multiplexed_buffer_helper(res)] @@ -774,7 +760,7 @@ def logs(self, container, stdout=True, stderr=True, stream=False, url = self._url("/containers/{0}/logs".format(container)) res = self._get(url, params=params, stream=stream) if stream: - return self._multiplexed_socket_stream_helper(res) + return self._multiplexed_response_stream_helper(res) elif six.PY3: return bytes().join( [x for x in self._multiplexed_buffer_helper(res)]