Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 2 additions & 14 deletions renga/cli/notebooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,24 +126,12 @@ def launch(ctx, config, context, engine, image, input, output, endpoint):
proj_notebooks[image] = context.id
cfg['notebooks'] = proj_notebooks

environment = {}
context._client._environment = {}

for name, value in context.inputs._names.items():
if name in inputs and value != inputs[name]:
environment[context.inputs._env_tpl.format(name.upper())] = inputs[
name]

for name, value in context.outputs._names.items():
if name in outputs and value != outputs[name]:
environment[context.outputs._env_tpl.format(
name.upper())] = outputs[name]

if 'notebook' not in context.inputs._names:
click.echo('Option "--input notebook[=ID]" is missing. '
'The new notebook will not be tracked.')

execution = context.run(
engine=engine,
environment=environment, )
inputs=inputs,
outputs=outputs, )
click.echo(execution.url)
53 changes: 51 additions & 2 deletions renga/models/deployer.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,57 @@ def vertex_id(self):
labels = self.spec.get('labels', {})
return self.labels.get('renga.execution_context.vertex_id')

def run(self, **kwargs):
"""Execute the context."""
def run(self, inputs=None, outputs=None, **kwargs):
"""Execute the context.

Optionally provide new values for input and output slots. Following
example shows how to create new execution from the current context with
different files attached to input and output slots.

.. code-block:: python

execution = client.current_context.run(
engine='docker',
inputs={
'notebook': client.buckets[1234].file[9876].clone(),
},
outputs={
'plot': client.buckets[1234].create('plot.png'),
},
)
print(execution.url)

"""
inputs = inputs or {}
outputs = outputs or {}

# Make sure that the given environment is updated.
kwargs.setdefault('environment', {})
environment = kwargs['environment']

client_environment = getattr(self._client, '_environment', os.environ)

def update_env(environment, slots, values):
"""Update environment with values not used in slots."""
for name, value in slots._names.items():
new_value = values.get(name, value)

# Support identifier or a File instance.
if isinstance(new_value, File):
new_value = new_value.id

# Update only if they are different.
if new_value != value:
environment[self.inputs._env_tpl.format(
name.upper())] = new_value

try:
self._client._environment = {}
update_env(environment, self.inputs, inputs)
update_env(environment, self.outputs, outputs)
finally:
self._client._environment = client_environment

execution = self._client.api.create_execution(self.id, **kwargs)
execution['context_id'] = self.id
return Execution(execution, client=self._client, collection=self)
Expand Down
38 changes: 26 additions & 12 deletions renga/models/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,19 @@ def filename(self, value):
# Update if the service replace works
self._properties['resource:file_name'] = value

def clone(self, filename=None):
"""Create an instance of the file for independent version tracking."""
if not hasattr(self._collection, 'create'): # pragma: no cover
raise NotImplemented(
'Only files accessed via bucket can be cloned.')

with self.open('r') as source:
filename = filename or 'clone_' + self.filename
cloned_file = self._collection.create(filename)
with cloned_file.open('w') as dest:
dest.write(source.read())
return cloned_file

@property
def versions(self):
"""An object for managing file versions.
Expand Down Expand Up @@ -176,14 +189,14 @@ def __iter__(self):
return (self.Meta.model(f, client=self._client, collection=self)
for f in self._client.api.get_bucket_files(self.bucket.id))

def open(self, file_name=None, mode='w'):
def open(self, filename=None, mode='w'):
"""Create an empty file in this bucket."""
if mode != 'w':
raise NotImplemented('Only mode "w" is currently supported')

resp = self._client.api.create_file(
bucket_id=self.bucket.id,
file_name=file_name,
file_name=filename,
request_type='create_file', )

access_token = resp.pop('access_token')
Expand All @@ -201,31 +214,31 @@ def open(self, file_name=None, mode='w'):
}
return FileHandle(file_handle, client=client)

def create(self, file_name=None):
def create(self, filename=None):
"""Create an empty file in this bucket."""
resp = self._client.api.create_file(
bucket_id=self.bucket.id,
file_name=file_name,
file_name=filename,
request_type='create_file', )
return self.Meta.model(
LazyResponse(lambda: self._client.api.get_file(resp['id']), resp),
client=self._client,
collection=self)

def from_url(self, url, file_name=None):
def from_url(self, url, filename=None):
"""Create a file with data from the streamed GET response.

**Example**

>>> file_ = client.buckets[1234].files.from_url(
... 'https://example.com/tests/data', file_name='hello')
... 'https://example.com/tests/data', filename='hello')
>>> file_.id
9876
>>> client.buckets[1234].files[9876].open('r').read()
b'hello world'

"""
with self.open(file_name=file_name or url, mode='w') as fp:
with self.open(filename=filename or url, mode='w') as fp:
fp.from_url(url)
return self.__getitem__(fp.id)

Expand Down Expand Up @@ -331,8 +344,9 @@ def __getitem__(self, file_id):

def __iter__(self):
"""Return all versions of this file."""
return iter(sorted(
(self.Meta.model(data, client=self._client, collection=self)
for data in self._client.api.get_file_versions(self.file.id)),
key=lambda file_: file_.created,
reverse=True))
return iter(
sorted(
(self.Meta.model(data, client=self._client, collection=self)
for data in self._client.api.get_file_versions(self.file.id)),
key=lambda file_: file_.created,
reverse=True))
20 changes: 14 additions & 6 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,14 +313,22 @@ def storage_responses(auth_responses, renga_client):
'backend': 'local',
})

rsps.add(
file_ = {'id': 9876}

def create_file(request):
"""Create new file."""
file_id = file_['id']
file_['id'] -= 1
return (201, {}, json.dumps({
'id': file_id,
'access_token': 'accessfile_{0}'.format(file_id),
}))

rsps.add_callback(
responses.POST,
renga_client.api._url('/api/storage/authorize/create_file'),
status=201,
json={
'id': 9876,
'access_token': 'accessfile_9876',
})
callback=create_file,
)

def authorize_io(request):
"""Generate access token."""
Expand Down
22 changes: 20 additions & 2 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def test_client_buckets(renga_client, storage_responses):
bucket = renga_client.buckets.create(name='world', backend='local')
assert bucket.id == 1234

file_ = bucket.files.create(file_name='hello')
file_ = bucket.files.create(filename='hello')
assert file_.id == 9876
assert file_.filename == 'hello'

Expand Down Expand Up @@ -161,7 +161,7 @@ def test_file_renaming(renga_client, storage_responses):
bucket = renga_client.buckets.create(name='world', backend='local')
assert bucket.id == 1234

file_ = bucket.files.create(file_name='hello')
file_ = bucket.files.create(filename='hello')
assert file_.id == 9876
assert file_.filename == 'hello'

Expand All @@ -172,6 +172,24 @@ def test_file_renaming(renga_client, storage_responses):
assert file_.filename == 'hello-2'


def test_file_cloning(renga_client, storage_responses):
"""Test file cloning."""
bucket = renga_client.buckets.create(name='world', backend='local')
assert bucket.id == 1234

file_ = bucket.files.create(filename='hello')

with file_.open('w') as fp:
fp.write(b'hello world')

cloned_file = file_.clone()

assert file_.id != cloned_file.id

with cloned_file.open('r') as fp:
assert fp.read() == b'hello world'


def test_file_versioning(renga_client, storage_responses, explorer_responses):
"""Test shortcut for creating file on a bucket."""
bucket = renga_client.buckets.create(name='world', backend='local')
Expand Down