From ab8738bd943a2cfece97e6fcd4649de3a3d502ec Mon Sep 17 00:00:00 2001 From: Jiri Kuncar Date: Thu, 23 Nov 2017 11:35:10 +0100 Subject: [PATCH 1/2] models: re-run execution with cloned file --- renga/cli/notebooks.py | 16 ++---------- renga/models/deployer.py | 53 ++++++++++++++++++++++++++++++++++++++-- renga/models/storage.py | 24 ++++++++++++++---- tests/conftest.py | 20 ++++++++++----- tests/test_client.py | 18 ++++++++++++++ 5 files changed, 104 insertions(+), 27 deletions(-) diff --git a/renga/cli/notebooks.py b/renga/cli/notebooks.py index decc4b6819..3647a1abc0 100644 --- a/renga/cli/notebooks.py +++ b/renga/cli/notebooks.py @@ -126,24 +126,12 @@ def launch(ctx, config, context, engine, image, input, output, endpoint): proj_notebooks[image] = context.id cfg['notebooks'] = proj_notebooks - environment = {} - context._client._environment = {} - - for name, value in context.inputs._names.items(): - if name in inputs and value != inputs[name]: - environment[context.inputs._env_tpl.format(name.upper())] = inputs[ - name] - - for name, value in context.outputs._names.items(): - if name in outputs and value != outputs[name]: - environment[context.outputs._env_tpl.format( - name.upper())] = outputs[name] - if 'notebook' not in context.inputs._names: click.echo('Option "--input notebook[=ID]" is missing. ' 'The new notebook will not be tracked.') execution = context.run( engine=engine, - environment=environment, ) + inputs=inputs, + outputs=outputs, ) click.echo(execution.url) diff --git a/renga/models/deployer.py b/renga/models/deployer.py index a219b7d8ae..104df4cba8 100644 --- a/renga/models/deployer.py +++ b/renga/models/deployer.py @@ -131,8 +131,57 @@ def vertex_id(self): labels = self.spec.get('labels', {}) return self.labels.get('renga.execution_context.vertex_id') - def run(self, **kwargs): - """Execute the context.""" + def run(self, inputs=None, outputs=None, **kwargs): + """Execute the context. + + Optionally provide new values for input and output slots. Following + example shows how to create new execution from the current context with + different files attached to input and output slots. + + .. code-block:: python + + execution = client.current_context.run( + engine='docker', + inputs={ + 'notebook': client.buckets[1234].file[9876].clone(), + }, + outputs={ + 'plot': client.buckets[1234].create('plot.png'), + }, + ) + print(execution.url) + + """ + inputs = inputs or {} + outputs = outputs or {} + + # Make sure that the given environment is updated. + kwargs.setdefault('environment', {}) + environment = kwargs['environment'] + + client_environment = getattr(self._client, '_environment', os.environ) + + def update_env(environment, slots, values): + """Update environment with values not used in slots.""" + for name, value in slots._names.items(): + new_value = values.get(name, value) + + # Support identifier or a File instance. + if isinstance(new_value, File): + new_value = new_value.id + + # Update only if they are different. + if new_value != value: + environment[self.inputs._env_tpl.format( + name.upper())] = new_value + + try: + self._client._environment = {} + update_env(environment, self.inputs, inputs) + update_env(environment, self.outputs, outputs) + finally: + self._client._environment = client_environment + execution = self._client.api.create_execution(self.id, **kwargs) execution['context_id'] = self.id return Execution(execution, client=self._client, collection=self) diff --git a/renga/models/storage.py b/renga/models/storage.py index 8e4154a6e0..1f55f4d822 100644 --- a/renga/models/storage.py +++ b/renga/models/storage.py @@ -139,6 +139,19 @@ def filename(self, value): # Update if the service replace works self._properties['resource:file_name'] = value + def clone(self, file_name=None): + """Create an instance of the file for independent version tracking.""" + if not hasattr(self._collection, 'create'): # pragma: no cover + raise NotImplemented( + 'Only files accessed via bucket can be cloned.') + + with self.open('r') as source: + file_name = file_name or 'clone_' + self.filename + cloned_file = self._collection.create(file_name) + with cloned_file.open('w') as dest: + dest.write(source.read()) + return cloned_file + @property def versions(self): """An object for managing file versions. @@ -331,8 +344,9 @@ def __getitem__(self, file_id): def __iter__(self): """Return all versions of this file.""" - return iter(sorted( - (self.Meta.model(data, client=self._client, collection=self) - for data in self._client.api.get_file_versions(self.file.id)), - key=lambda file_: file_.created, - reverse=True)) + return iter( + sorted( + (self.Meta.model(data, client=self._client, collection=self) + for data in self._client.api.get_file_versions(self.file.id)), + key=lambda file_: file_.created, + reverse=True)) diff --git a/tests/conftest.py b/tests/conftest.py index 47050bad7e..48e14d0e27 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -313,14 +313,22 @@ def storage_responses(auth_responses, renga_client): 'backend': 'local', }) - rsps.add( + file_ = {'id': 9876} + + def create_file(request): + """Create new file.""" + file_id = file_['id'] + file_['id'] -= 1 + return (201, {}, json.dumps({ + 'id': file_id, + 'access_token': 'accessfile_{0}'.format(file_id), + })) + + rsps.add_callback( responses.POST, renga_client.api._url('/api/storage/authorize/create_file'), - status=201, - json={ - 'id': 9876, - 'access_token': 'accessfile_9876', - }) + callback=create_file, + ) def authorize_io(request): """Generate access token.""" diff --git a/tests/test_client.py b/tests/test_client.py index cf309584fd..45df0d3339 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -172,6 +172,24 @@ def test_file_renaming(renga_client, storage_responses): assert file_.filename == 'hello-2' +def test_file_cloning(renga_client, storage_responses): + """Test file cloning.""" + bucket = renga_client.buckets.create(name='world', backend='local') + assert bucket.id == 1234 + + file_ = bucket.files.create(file_name='hello') + + with file_.open('w') as fp: + fp.write(b'hello world') + + cloned_file = file_.clone() + + assert file_.id != cloned_file.id + + with cloned_file.open('r') as fp: + assert fp.read() == b'hello world' + + def test_file_versioning(renga_client, storage_responses, explorer_responses): """Test shortcut for creating file on a bucket.""" bucket = renga_client.buckets.create(name='world', backend='local') From 770d498883d3b59ac486be51bbe28996c6730840 Mon Sep 17 00:00:00 2001 From: Jiri Kuncar Date: Thu, 23 Nov 2017 12:46:30 +0100 Subject: [PATCH 2/2] storage: unified argument naming --- renga/models/storage.py | 20 ++++++++++---------- tests/test_client.py | 6 +++--- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/renga/models/storage.py b/renga/models/storage.py index 1f55f4d822..932c92cbdf 100644 --- a/renga/models/storage.py +++ b/renga/models/storage.py @@ -139,15 +139,15 @@ def filename(self, value): # Update if the service replace works self._properties['resource:file_name'] = value - def clone(self, file_name=None): + def clone(self, filename=None): """Create an instance of the file for independent version tracking.""" if not hasattr(self._collection, 'create'): # pragma: no cover raise NotImplemented( 'Only files accessed via bucket can be cloned.') with self.open('r') as source: - file_name = file_name or 'clone_' + self.filename - cloned_file = self._collection.create(file_name) + filename = filename or 'clone_' + self.filename + cloned_file = self._collection.create(filename) with cloned_file.open('w') as dest: dest.write(source.read()) return cloned_file @@ -189,14 +189,14 @@ def __iter__(self): return (self.Meta.model(f, client=self._client, collection=self) for f in self._client.api.get_bucket_files(self.bucket.id)) - def open(self, file_name=None, mode='w'): + def open(self, filename=None, mode='w'): """Create an empty file in this bucket.""" if mode != 'w': raise NotImplemented('Only mode "w" is currently supported') resp = self._client.api.create_file( bucket_id=self.bucket.id, - file_name=file_name, + file_name=filename, request_type='create_file', ) access_token = resp.pop('access_token') @@ -214,31 +214,31 @@ def open(self, file_name=None, mode='w'): } return FileHandle(file_handle, client=client) - def create(self, file_name=None): + def create(self, filename=None): """Create an empty file in this bucket.""" resp = self._client.api.create_file( bucket_id=self.bucket.id, - file_name=file_name, + file_name=filename, request_type='create_file', ) return self.Meta.model( LazyResponse(lambda: self._client.api.get_file(resp['id']), resp), client=self._client, collection=self) - def from_url(self, url, file_name=None): + def from_url(self, url, filename=None): """Create a file with data from the streamed GET response. **Example** >>> file_ = client.buckets[1234].files.from_url( - ... 'https://example.com/tests/data', file_name='hello') + ... 'https://example.com/tests/data', filename='hello') >>> file_.id 9876 >>> client.buckets[1234].files[9876].open('r').read() b'hello world' """ - with self.open(file_name=file_name or url, mode='w') as fp: + with self.open(filename=filename or url, mode='w') as fp: fp.from_url(url) return self.__getitem__(fp.id) diff --git a/tests/test_client.py b/tests/test_client.py index 45df0d3339..ffc13d2de2 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -122,7 +122,7 @@ def test_client_buckets(renga_client, storage_responses): bucket = renga_client.buckets.create(name='world', backend='local') assert bucket.id == 1234 - file_ = bucket.files.create(file_name='hello') + file_ = bucket.files.create(filename='hello') assert file_.id == 9876 assert file_.filename == 'hello' @@ -161,7 +161,7 @@ def test_file_renaming(renga_client, storage_responses): bucket = renga_client.buckets.create(name='world', backend='local') assert bucket.id == 1234 - file_ = bucket.files.create(file_name='hello') + file_ = bucket.files.create(filename='hello') assert file_.id == 9876 assert file_.filename == 'hello' @@ -177,7 +177,7 @@ def test_file_cloning(renga_client, storage_responses): bucket = renga_client.buckets.create(name='world', backend='local') assert bucket.id == 1234 - file_ = bucket.files.create(file_name='hello') + file_ = bucket.files.create(filename='hello') with file_.open('w') as fp: fp.write(b'hello world')