Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 5 additions & 227 deletions docker/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,246 +12,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import os
import re
import shlex
import struct
import warnings
from datetime import datetime

import requests
import requests.exceptions
import six
import websocket


from . import clientbase
from . import constants
from . import errors
from .auth import auth
from .unixconn import unixconn
from .ssladapter import ssladapter
from .utils import utils, check_resource
from .tls import TLSConfig


class Client(requests.Session):
def __init__(self, base_url=None, version=None,
timeout=constants.DEFAULT_TIMEOUT_SECONDS, tls=False):
super(Client, self).__init__()

if tls and not base_url.startswith('https://'):
raise errors.TLSParameterError(
'If using TLS, the base_url argument must begin with '
'"https://".')

self.base_url = base_url
self.timeout = timeout

self._auth_configs = auth.load_config()

base_url = utils.parse_host(base_url)
if base_url.startswith('http+unix://'):
self._adapter = unixconn.UnixAdapter(base_url, timeout)
self.mount('http+docker://', self._adapter)
self.base_url = 'http+docker://localunixsocket'
else:
# Use SSLAdapter for the ability to specify SSL version
if isinstance(tls, TLSConfig):
tls.configure_client(self)
elif tls:
self._adapter = ssladapter.SSLAdapter()
self.mount('https://', self._adapter)
self.base_url = base_url

# version detection needs to be after unix adapter mounting
if version is None:
self._version = constants.DEFAULT_DOCKER_API_VERSION
elif isinstance(version, six.string_types):
if version.lower() == 'auto':
self._version = self._retrieve_server_version()
else:
self._version = version
else:
raise errors.DockerException(
'Version parameter must be a string or None. Found {0}'.format(
type(version).__name__
)
)

def _retrieve_server_version(self):
try:
return self.version(api_version=False)["ApiVersion"]
except KeyError:
raise errors.DockerException(
'Invalid response from docker daemon: key "ApiVersion"'
' is missing.'
)
except Exception as e:
raise errors.DockerException(
'Error while fetching server API version: {0}'.format(e)
)

def _set_request_timeout(self, kwargs):
"""Prepare the kwargs for an HTTP request by inserting the timeout
parameter, if not already present."""
kwargs.setdefault('timeout', self.timeout)
return kwargs

def _post(self, url, **kwargs):
return self.post(url, **self._set_request_timeout(kwargs))

def _get(self, url, **kwargs):
return self.get(url, **self._set_request_timeout(kwargs))

def _delete(self, url, **kwargs):
return self.delete(url, **self._set_request_timeout(kwargs))

def _url(self, path, versioned_api=True):
if versioned_api:
return '{0}/v{1}{2}'.format(self.base_url, self._version, path)
else:
return '{0}{1}'.format(self.base_url, path)

def _raise_for_status(self, response, explanation=None):
"""Raises stored :class:`APIError`, if one occurred."""
try:
response.raise_for_status()
except requests.exceptions.HTTPError as e:
raise errors.APIError(e, response, explanation=explanation)

def _result(self, response, json=False, binary=False):
assert not (json and binary)
self._raise_for_status(response)

if json:
return response.json()
if binary:
return response.content
return response.text

def _post_json(self, url, data, **kwargs):
# Go <1.1 can't unserialize null to a string
# so we do this disgusting thing here.
data2 = {}
if data is not None:
for k, v in six.iteritems(data):
if v is not None:
data2[k] = v

if 'headers' not in kwargs:
kwargs['headers'] = {}
kwargs['headers']['Content-Type'] = 'application/json'
return self._post(url, data=json.dumps(data2), **kwargs)

def _attach_params(self, override=None):
return override or {
'stdout': 1,
'stderr': 1,
'stream': 1
}

@check_resource
def _attach_websocket(self, container, params=None):
url = self._url("/containers/{0}/attach/ws".format(container))
req = requests.Request("POST", url, params=self._attach_params(params))
full_url = req.prepare().url
full_url = full_url.replace("http://", "ws://", 1)
full_url = full_url.replace("https://", "wss://", 1)
return self._create_websocket_connection(full_url)

def _create_websocket_connection(self, url):
return websocket.create_connection(url)

def _get_raw_response_socket(self, response):
self._raise_for_status(response)
if six.PY3:
sock = response.raw._fp.fp.raw
else:
sock = response.raw._fp.fp._sock
try:
# Keep a reference to the response to stop it being garbage
# collected. If the response is garbage collected, it will
# close TLS sockets.
sock._response = response
except AttributeError:
# UNIX sockets can't have attributes set on them, but that's
# fine because we won't be doing TLS over them
pass

return sock

def _stream_helper(self, response, decode=False):
"""Generator for data coming from a chunked-encoded HTTP response."""
if response.raw._fp.chunked:
reader = response.raw
while not reader.closed:
# this read call will block until we get a chunk
data = reader.read(1)
if not data:
break
if reader._fp.chunk_left:
data += reader.read(reader._fp.chunk_left)
if decode:
if six.PY3:
data = data.decode('utf-8')
data = json.loads(data)
yield data
else:
# Response isn't chunked, meaning we probably
# encountered an error immediately
yield self._result(response)

def _multiplexed_buffer_helper(self, response):
"""A generator of multiplexed data blocks read from a buffered
response."""
buf = self._result(response, binary=True)
walker = 0
while True:
if len(buf[walker:]) < 8:
break
_, length = struct.unpack_from('>BxxxL', buf[walker:])
start = walker + constants.STREAM_HEADER_SIZE_BYTES
end = start + length
walker = end
yield buf[start:end]

def _multiplexed_response_stream_helper(self, response):
"""A generator of multiplexed data blocks coming from a response
stream."""

# Disable timeout on the underlying socket to prevent
# Read timed out(s) for long running processes
socket = self._get_raw_response_socket(response)
if six.PY3:
socket._sock.settimeout(None)
else:
socket.settimeout(None)

while True:
header = response.raw.read(constants.STREAM_HEADER_SIZE_BYTES)
if not header:
break
_, length = struct.unpack('>BxxxL', header)
if not length:
break
data = response.raw.read(length)
if not data:
break
yield data

@property
def api_version(self):
return self._version

def get_adapter(self, url):
try:
return super(Client, self).get_adapter(url)
except requests.exceptions.InvalidSchema as e:
if self._adapter:
return self._adapter
raise e

class Client(clientbase.ClientBase):
@check_resource
def attach(self, container, stdout=True, stderr=True,
stream=False, logs=False):
Expand Down Expand Up @@ -745,7 +521,9 @@ def inspect_container(self, container):
@check_resource
def inspect_image(self, image):
return self._result(
self._get(self._url("/images/{0}/json".format(image))),
self._get(
self._url("/images/{0}/json".format(image.replace('/', '%2F')))
),
True
)

Expand Down
Loading