Skip to content

Commit

Permalink
fix(api): make methods lock free
Browse files Browse the repository at this point in the history
* Moves locking to CLI from API.  (closes #486)
  • Loading branch information
jirikuncar committed Mar 26, 2019
1 parent cf0f502 commit 31f733b
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 94 deletions.
48 changes: 24 additions & 24 deletions renku/api/datasets.py
Expand Up @@ -106,37 +106,37 @@ def store_dataset(self, dataset):
def with_dataset(self, name=None):
"""Yield an editable metadata object for a dataset."""
from renku.models.refs import LinkReference
with self.lock:
dataset = self.load_dataset(name=name)

if dataset is None:
dataset = Dataset(name=name)
setattr(dataset, '__source__', {})
dataset = self.load_dataset(name=name)

path = (
self.renku_datasets_path / dataset.identifier.hex /
self.METADATA
)
path.parent.mkdir(parents=True, exist_ok=True)
setattr(dataset, '__reference__', path)
if dataset is None:
dataset = Dataset(name=name)
setattr(dataset, '__source__', {})

if name:
LinkReference.create(
client=self, name='datasets/' + name
).set_reference(path)
path = (
self.renku_datasets_path / dataset.identifier.hex /
self.METADATA
)
path.parent.mkdir(parents=True, exist_ok=True)
setattr(dataset, '__reference__', path)

dataset_path = self.path / self.datadir / dataset.name
dataset_path.mkdir(parents=True, exist_ok=True)
if name:
LinkReference.create(
client=self, name='datasets/' + name
).set_reference(path)

dataset_path = self.path / self.datadir / dataset.name
dataset_path.mkdir(parents=True, exist_ok=True)

yield dataset
yield dataset

# TODO
# if path is None:
# path = dataset_path / self.METADATA
# if path.exists():
# raise ValueError('Dataset already exists')
# TODO
# if path is None:
# path = dataset_path / self.METADATA
# if path.exists():
# raise ValueError('Dataset already exists')

self.store_dataset(dataset)
self.store_dataset(dataset)

def add_data_to_dataset(
self, dataset, url, git=False, force=False, **kwargs
Expand Down
81 changes: 40 additions & 41 deletions renku/api/repository.py
Expand Up @@ -108,7 +108,8 @@ def __attrs_post_init__(self):
def lock(self):
"""Create a Renku config lock."""
return filelock.FileLock(
str(self.renku_path.with_suffix(self.LOCK_SUFFIX))
str(self.renku_path.with_suffix(self.LOCK_SUFFIX)),
timeout=0,
)

@property
Expand Down Expand Up @@ -286,58 +287,56 @@ def resolve_in_submodules(self, commit, path):
@contextmanager
def with_metadata(self):
"""Yield an editable metadata object."""
with self.lock:
from renku.models._jsonld import asjsonld
from renku.models.projects import Project
from renku.models._jsonld import asjsonld
from renku.models.projects import Project

metadata_path = self.renku_metadata_path
metadata_path = self.renku_metadata_path

if self.renku_metadata_path.exists():
with metadata_path.open('r') as f:
source = yaml.safe_load(f) or {}
else:
source = {}
if self.renku_metadata_path.exists():
with metadata_path.open('r') as f:
source = yaml.safe_load(f) or {}
else:
source = {}

metadata = Project.from_jsonld(source, __reference__=metadata_path)
metadata = Project.from_jsonld(source, __reference__=metadata_path)

yield metadata
yield metadata

source.update(**asjsonld(metadata))
with metadata_path.open('w') as f:
yaml.dump(source, f, default_flow_style=False)
source.update(**asjsonld(metadata))
with metadata_path.open('w') as f:
yaml.dump(source, f, default_flow_style=False)

@contextmanager
def with_workflow_storage(self):
"""Yield a workflow storage."""
with self.lock:
from renku.models.cwl._ascwl import ascwl
from renku.models.cwl.workflow import Workflow
from renku.models.cwl._ascwl import ascwl
from renku.models.cwl.workflow import Workflow

workflow = Workflow()
yield workflow
workflow = Workflow()
yield workflow

for step in workflow.steps:
step_name = '{0}_{1}.cwl'.format(
uuid.uuid4().hex,
secure_filename('_'.join(step.run.baseCommand)),
)
for step in workflow.steps:
step_name = '{0}_{1}.cwl'.format(
uuid.uuid4().hex,
secure_filename('_'.join(step.run.baseCommand)),
)

workflow_path = self.workflow_path
if not workflow_path.exists():
workflow_path.mkdir()

step_path = workflow_path / step_name
with step_path.open('w') as step_file:
yaml.dump(
ascwl(
# filter=lambda _, x: not (x is False or bool(x)
step.run,
filter=lambda _, x: x is not None,
basedir=workflow_path,
),
stream=step_file,
default_flow_style=False
)
workflow_path = self.workflow_path
if not workflow_path.exists():
workflow_path.mkdir()

step_path = workflow_path / step_name
with step_path.open('w') as step_file:
yaml.dump(
ascwl(
# filter=lambda _, x: not (x is False or bool(x)
step.run,
filter=lambda _, x: x is not None,
basedir=workflow_path,
),
stream=step_file,
default_flow_style=False
)

def init_repository(self, name=None, force=False):
"""Initialize a local Renku repository."""
Expand Down
7 changes: 6 additions & 1 deletion renku/cli/_client.py
Expand Up @@ -42,7 +42,8 @@ def pass_local_client(
clean=None,
up_to_date=None,
commit=None,
ignore_std_streams=True
ignore_std_streams=True,
lock=None,
):
"""Pass client from the current context to the decorated command."""
if method is None:
Expand All @@ -52,6 +53,7 @@ def pass_local_client(
up_to_date=up_to_date,
commit=commit,
ignore_std_streams=ignore_std_streams,
lock=lock,
)

def new_func(*args, **kwargs):
Expand All @@ -71,6 +73,9 @@ def new_func(*args, **kwargs):
)
stack.enter_context(transaction)

if lock or (lock is None and commit):
stack.enter_context(client.lock)

with stack:
result = ctx.invoke(method, client, *args, **kwargs)
return result
Expand Down
3 changes: 2 additions & 1 deletion renku/cli/init.py
Expand Up @@ -143,7 +143,8 @@ def init(ctx, client, directory, name, force, use_external_storage):
)

try:
path = client.init_repository(name=name, force=force)
with client.lock:
path = client.init_repository(name=name, force=force)
except FileExistsError:
raise click.UsageError(
'Renku repository is not empty. '
Expand Down
50 changes: 24 additions & 26 deletions renku/cli/migrate.py
Expand Up @@ -50,29 +50,27 @@ def datasets(ctx, client):

from ._checks.location_datasets import _dataset_metadata_pre_0_3_4

with client.lock:
for old_path in _dataset_metadata_pre_0_3_4(client):
with old_path.open('r') as fp:
dataset = Dataset.from_jsonld(yaml.safe_load(fp))

name = str(old_path.parent.relative_to(client.path / 'data'))
new_path = (
client.renku_datasets_path / dataset.identifier.hex /
client.METADATA
)
new_path.parent.mkdir(parents=True, exist_ok=True)

dataset = dataset.rename_files(
lambda key: os.path.relpath(
str(old_path.parent / key), start=str(new_path.parent)
)
)

with new_path.open('w') as fp:
yaml.dump(asjsonld(dataset), fp, default_flow_style=False)

old_path.unlink()

LinkReference.create(
client=client, name='datasets/' + name
).set_reference(new_path)
for old_path in _dataset_metadata_pre_0_3_4(client):
with old_path.open('r') as fp:
dataset = Dataset.from_jsonld(yaml.safe_load(fp))

name = str(old_path.parent.relative_to(client.path / 'data'))
new_path = (
client.renku_datasets_path / dataset.identifier.hex /
client.METADATA
)
new_path.parent.mkdir(parents=True, exist_ok=True)

dataset = dataset.rename_files(
lambda key: os.path.
relpath(str(old_path.parent / key), start=str(new_path.parent))
)

with new_path.open('w') as fp:
yaml.dump(asjsonld(dataset), fp, default_flow_style=False)

old_path.unlink()

LinkReference.create(
client=client, name='datasets/' + name
).set_reference(new_path)
5 changes: 4 additions & 1 deletion renku/cli/run.py
Expand Up @@ -156,7 +156,10 @@
@option_isolation
@click.argument('command_line', nargs=-1, type=click.UNPROCESSED)
@pass_local_client(
clean=True, up_to_date=True, commit=True, ignore_std_streams=True
clean=True,
up_to_date=True,
commit=True,
ignore_std_streams=True,
)
def run(client, outputs, no_output, success_codes, isolation, command_line):
"""Tracking work on a specific problem."""
Expand Down

0 comments on commit 31f733b

Please sign in to comment.