From 45a59ae52acd32c6b6f8ca0da5b07004203b09b9 Mon Sep 17 00:00:00 2001 From: Dmitry Unkovsky Date: Mon, 27 Oct 2014 19:22:23 +0300 Subject: [PATCH] add support for docker {pull,tag}, and, finally, for app import command --- cocaine/tools/actions/app.py | 74 +++++++++++++++++++ cocaine/tools/actions/docker.py | 125 ++++++++++++++++++++++++++++++++ cocaine/tools/cli.py | 1 + cocaine/tools/dispatcher.py | 35 +++++++++ 4 files changed, 235 insertions(+) diff --git a/cocaine/tools/actions/app.py b/cocaine/tools/actions/app.py index 1fad2b3..0791bfa 100644 --- a/cocaine/tools/actions/app.py +++ b/cocaine/tools/actions/app.py @@ -274,6 +274,80 @@ def _on_read(self, value): print(value) +class DockerImport(actions.Storage): + def __init__(self, storage, path, name, manifest, address, container, registry='', on_read=None): + + print "__init", storage, path, name, manifest, address, container + + super(DockerImport, self).__init__(storage) + self.path = path or os.path.curdir + self.name = name or os.path.basename(os.path.abspath(self.path)) + self.container_url = container + if registry: + self.fullname = '{0}/{1}'.format(registry, self.name) + else: + self.fullname = self.name + + self.manifest = manifest + + self.client = docker.Client(address) + + log.debug('checking Dockerfile') + if not address: + raise ValueError('Docker address is not specified') + + if on_read is not None: + if not callable(on_read): + raise ValueError("on_read must ne callable") + self._on_read = on_read + + self._last_message = '' + + @engine.asynchronous + def execute(self): + log.debug('application name will be: %s', self.fullname) + + if self.manifest: + manifestPath = self.manifest + else: + try: + manifestPath = _locateFile(self.path, 'manifest.json') + except IOError: + log.error("unable to locate manifest.json") + raise ToolsError("unable to locate manifest.json") + + with printer('Loading manifest'): + manifest = CocaineConfigReader.load(manifestPath) + + with printer('Uploading manifest'): + yield self.storage.write('manifests', self.name, manifest, APPS_TAGS) + + try: + response = yield self.client.pull(self.container_url, {}, streaming=self._on_read) + if response.code != 200: + raise ToolsError('building failed with error code {0} {1}'.format(response.code, + response.body)) + + response = yield self.client.tag(self.container_url, {}, self.fullname, streaming=self._on_read) + if response.code != 200 and response.code != 201: + raise ToolsError('building failed with error code {0} {1}'.format(response.code, + response.body)) + + response = yield self.client.push(self.fullname, {}, streaming=self._on_read) + if response.code != 200: + raise ToolsError('pushing failed with error code {0} {1}'.format(response.code, + response.body)) + except Exception as err: + log.error("Error occurred. Erase manifest") + yield self.storage.remove('manifests', self.name) + raise err + + def _on_read(self, value): + if self._last_message != value: + self._last_message = value + print(value) + + class LocalUpload(actions.Storage): def __init__(self, storage, path, name, manifest): super(LocalUpload, self).__init__(storage) diff --git a/cocaine/tools/actions/docker.py b/cocaine/tools/actions/docker.py index d02047b..8ac5085 100644 --- a/cocaine/tools/actions/docker.py +++ b/cocaine/tools/actions/docker.py @@ -93,6 +93,12 @@ def containers(self): def build(self, path, tag=None, quiet=False, streaming=None): return Build(path, tag, quiet, streaming, **self.config).execute() + def pull(self, name, auth, streaming=None): + return Pull(name, auth, streaming, **self.config).execute() + + def tag(self, name, auth, tag, streaming=None): + return Tag(name, auth, tag, streaming, **self.config).execute() + def push(self, name, auth, streaming=None): return Push(name, auth, streaming, **self.config).execute() @@ -274,3 +280,122 @@ def _match_first(self, dict_, keys, default): if value is not None: return value return default + + +class Pull(Action): + def __init__(self, name, auth, streaming=None, + url=DEFAULT_URL, version=DEFAULT_VERSION, timeout=DEFAULT_TIMEOUT, io_loop=None): + self.name = name + self.auth = auth + self._streaming = streaming + super(Pull, self).__init__(url, version, timeout, io_loop) + + @chain.source + def execute(self): + url = self._make_url('/images/create', query={"fromImage":self.name}) + + print "url", url + registry, name = resolve_repository_name(self.name) + + headers = HTTPHeaders() + headers.add('X-Registry-Auth', self._prepare_auth_header_value()) + body = '' + log.info('Pulling "%s" ... ', name) + request = HTTPRequest(url, method='POST', + headers=headers, + body=body, + allow_ipv6=True, + request_timeout=self.timeout, + streaming_callback=self._on_body) + try: + yield self._http_client.fetch(request) + log.info('OK') + except Exception as err: + log.error('FAIL - %s', err) + raise err + + def _prepare_auth_header_value(self): + username = self.auth.get('username', 'username') + password = self.auth.get('password', 'password') + return base64.b64encode('{0}:{1}'.format(username, password)) + + def _on_body(self, data): + parsed = '' + try: + response = json.loads(data) + except ValueError: + parsed = data + except Exception as err: + parsed = 'Unknown error: {0}'.format(err) + else: + parsed = self._match_first(response, ['status', 'error'], data) + finally: + self._streaming(parsed) + + def _match_first(self, dict_, keys, default): + for key in keys: + value = dict_.get(key) + if value is not None: + return value + return default + + +class Tag(Action): + def __init__(self, name, auth, tag, streaming=None, + url=DEFAULT_URL, version=DEFAULT_VERSION, timeout=DEFAULT_TIMEOUT, io_loop=None): + self.name = name + self.auth = auth + self.tag = tag + self._streaming = streaming + super(Tag, self).__init__(url, version, timeout, io_loop) + + @chain.source + def execute(self): + url = self._make_url('/images/{0}/tag'.format(self.name), query={"repo":self.tag}) + + print "url", url + registry, name = resolve_repository_name(self.name) + + headers = HTTPHeaders() + headers.add('X-Registry-Auth', self._prepare_auth_header_value()) + body = '' + log.info('Tagging "%s" with "%s"" ... ', name, self.tag) + request = HTTPRequest(url, method='POST', + headers=headers, + body=body, + allow_ipv6=True, + request_timeout=self.timeout, + streaming_callback=self._on_body) + try: + yield self._http_client.fetch(request) + log.info('OK') + except Exception as err: + log.error('FAIL - %s', err) + raise err + + def _prepare_auth_header_value(self): + username = self.auth.get('username', 'username') + password = self.auth.get('password', 'password') + return base64.b64encode('{0}:{1}'.format(username, password)) + + def _on_body(self, data): + parsed = '' + try: + response = json.loads(data) + except ValueError: + parsed = data + except Exception as err: + parsed = 'Unknown error: {0}'.format(err) + else: + parsed = self._match_first(response, ['status', 'error'], data) + finally: + self._streaming(parsed) + + def _match_first(self, dict_, keys, default): + for key in keys: + value = dict_.get(key) + if value is not None: + return value + return default + + diff --git a/cocaine/tools/cli.py b/cocaine/tools/cli.py index 13beb03..a70e54f 100644 --- a/cocaine/tools/cli.py +++ b/cocaine/tools/cli.py @@ -125,6 +125,7 @@ def _processResult(self, result): 'app:remove': ToolHandler(app.Remove), 'app:upload': ToolHandler(app.LocalUpload), 'app:upload-docker': ToolHandler(app.DockerUpload), + 'app:import-docker': ToolHandler(app.DockerImport), 'app:upload-manual': ToolHandler(app.Upload), 'app:start': JsonToolHandler(app.Start), 'app:pause': JsonToolHandler(app.Stop), diff --git a/cocaine/tools/dispatcher.py b/cocaine/tools/dispatcher.py index ad244ca..cd78e37 100644 --- a/cocaine/tools/dispatcher.py +++ b/cocaine/tools/dispatcher.py @@ -267,6 +267,41 @@ def app_upload(options, }) +@appDispatcher.command(name='import', usage='[PATH] [--name=NAME] [--manifest=MANIFEST] [--package=PACKAGE]') +def app_import(options, + path=None, + name=('n', '', 'application name'), + manifest=('', '', 'manifest file name'), + container_url=('', '', 'docker container url'), + docker_address=('', '', 'docker address'), + registry=('', '', 'registry address'), + recipe=('', '', 'path to the recipe file'), + manifest_only=('', False, 'upload manifest only')): + """Import application's docker container + + You can control process of creating and uploading application by specifying `--debug=tools` option. This is helpful + when some errors occurred. + """ + TIMEOUT_THRESHOLD = 120.0 + if options.executor.timeout < TIMEOUT_THRESHOLD: + logging.getLogger('cocaine.tools').info('Setting timeout to the %fs', TIMEOUT_THRESHOLD) + options.executor.timeout = TIMEOUT_THRESHOLD + + if container_url and docker_address: + options.executor.executeAction('app:import-docker', **{ + 'storage': options.getService('storage'), + 'path': path, + 'name': name, + 'manifest': manifest, + 'container': container_url, + 'address': docker_address, + 'registry': registry + }) + else: + print "wrong usage" + exit(os.EX_USAGE) + + @appDispatcher.command(name='remove') def app_remove(options, name=('n', '', 'application name')):