Skip to content

Commit

Permalink
fix: ensure external storage is handled correctly (#592)
Browse files Browse the repository at this point in the history
* fix: ensure external storage is handled correctly

* fix: yapf pinned to 0.27

* fix: fixed wording

* fix: missing tests
  • Loading branch information
jsam committed Jul 12, 2019
1 parent 80b9839 commit 7938ac4
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 120 deletions.
114 changes: 62 additions & 52 deletions renku/api/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,43 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Client for handling a data storage."""

import functools
import shlex
from collections import defaultdict
from shutil import which
from subprocess import PIPE, STDOUT, call, run

import attr
from werkzeug.utils import cached_property

from renku import errors
from renku._compat import Path

from ._git import _expand_directories
from .repository import RepositoryApiMixin

HAS_LFS = call(['git', 'lfs'], stdout=PIPE, stderr=STDOUT) == 0

# Batch size for when renku is expanding a large list
# of files into an argument string.
ARGUMENT_BATCH_SIZE = 100


def ensure_external_storage(fn):
"""Ensure management of external storage on methods which depend on it.
:raises: ``errors.ExternalStorageNotInstalled``
:raises: ``errors.ExternalStorageDisabled``
"""
# noqa
@functools.wraps(fn)
def wrapper(self, *args, **kwargs):
if not self.has_external_storage:
pass
else:
return fn(self, *args, **kwargs)

return wrapper


@attr.s
class StorageApiMixin(RepositoryApiMixin):
"""Client for handling a data storage."""
Expand All @@ -53,6 +70,30 @@ class StorageApiMixin(RepositoryApiMixin):

_CMD_STORAGE_PULL = ['git', 'lfs', 'pull', '-I']

@cached_property
def storage_installed(self):
"""Verify that git-lfs is installed and on system PATH."""
return bool(which('git-lfs'))

@cached_property
def has_external_storage(self):
"""Check if repository has external storage enabled.
:raises: ``errors.ExternalStorageNotInstalled``
:raises: ``errors.ExternalStorageDisabled``
"""
repo_config = self.repo.config_reader(config_level='repository')
lfs_enabled = repo_config.has_section('filter "lfs"')

storage_enabled = lfs_enabled and self.storage_installed
if self.use_external_storage and not storage_enabled:
raise errors.ExternalStorageDisabled(self.repo)

if lfs_enabled and not self.storage_installed:
raise errors.ExternalStorageNotInstalled(self.repo)

return lfs_enabled and self.storage_installed

def init_external_storage(self, force=False):
"""Initialize the external storage for data."""
call(
Expand All @@ -62,19 +103,20 @@ def init_external_storage(self, force=False):
cwd=str(self.path.absolute()),
)

@property
def external_storage_installed(self):
"""Check that Large File Storage is installed."""
return HAS_LFS
def init_repository(self, name=None, force=False):
"""Initialize a local Renku repository."""
result = super().init_repository(name=name, force=force)

def track_paths_in_storage(self, *paths):
"""Track paths in the external storage."""
if not self._use_lfs():
return
# initialize LFS if it is requested and installed
if self.use_external_storage and self.storage_installed:
self.init_external_storage(force=force)

if not self.external_storage_installed:
raise errors.ExternalStorageNotInstalled(self.repo)
return result

@ensure_external_storage
def track_paths_in_storage(self, *paths):
"""Track paths in the external storage."""
# Calculate which paths can be tracked in lfs
track_paths = []
attrs = self.find_attr(*paths)

Expand All @@ -97,31 +139,20 @@ def track_paths_in_storage(self, *paths):
cwd=str(self.path),
)

@ensure_external_storage
def untrack_paths_from_storage(self, *paths):
"""Untrack paths from the external storage."""
if not self._use_lfs():
return

if not self.external_storage_installed:
raise errors.ExternalStorageNotInstalled(self.repo)

call(
self._CMD_STORAGE_UNTRACK + list(paths),
stdout=PIPE,
stderr=STDOUT,
cwd=str(self.path),
)

@ensure_external_storage
def pull_paths_from_storage(self, *paths):
"""Pull paths from LFS."""
import math

if not self._use_lfs():
return

if not self.external_storage_installed:
raise errors.ExternalStorageNotInstalled(self.repo)

client_dict = defaultdict(list)

for path in _expand_directories(paths):
Expand All @@ -131,13 +162,14 @@ def pull_paths_from_storage(self, *paths):
client_dict[client.path].append(str(path))

for client_path, paths in client_dict.items():
for ibatch in range(math.ceil(len(paths) / ARGUMENT_BATCH_SIZE)):
batch_size = math.ceil(len(paths) / ARGUMENT_BATCH_SIZE)
for index in range(batch_size):
run(
self._CMD_STORAGE_PULL + [
shlex.quote(
','.join(
paths[ibatch * ARGUMENT_BATCH_SIZE:
(ibatch + 1) * ARGUMENT_BATCH_SIZE]
paths[index * ARGUMENT_BATCH_SIZE:(index + 1) *
ARGUMENT_BATCH_SIZE]
)
)
],
Expand All @@ -146,35 +178,13 @@ def pull_paths_from_storage(self, *paths):
stderr=STDOUT,
)

@ensure_external_storage
def checkout_paths_from_storage(self, *paths):
"""Checkout a paths from LFS."""
if not self._use_lfs():
return

if not self.external_storage_installed:
raise errors.ExternalStorageNotInstalled(self.repo)

run(
self._CMD_STORAGE_CHECKOUT + list(paths),
cwd=str(self.path.absolute()),
stdout=PIPE,
stderr=STDOUT,
check=True,
)

def init_repository(self, name=None, force=False):
"""Initialize a local Renku repository."""
result = super().init_repository(name=name, force=force)

# initialize LFS if it is requested and installed
if self.use_external_storage and self.external_storage_installed:
self.init_external_storage(force=force)

return result

def _use_lfs(self):
renku_initialized_to_use_lfs = self.repo.config_reader(
config_level='repository'
).has_section('filter "lfs"')

return renku_initialized_to_use_lfs and self.use_external_storage
33 changes: 19 additions & 14 deletions renku/cli/move.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,20 +98,25 @@ def fmt_dst(path):
dataset.to_yaml()

# 3. Manage .gitattributes for external storage.
tracked = tuple(
path for path, attr in client.find_attr(*files).items()
if attr.get('filter') == 'lfs'
)
client.untrack_paths_from_storage(*tracked)
existing = client.find_attr(*tracked)
if existing:
click.echo(WARNING + 'There are custom .gitattributes.\n')
if click.confirm(
'Do you want to edit ".gitattributes" now?', default=False
):
click.edit(filename=str(client.path / '.gitattributes'))

client.track_paths_in_storage(*(destinations[path] for path in tracked))
tracked = tuple()
if client.has_external_storage:
tracked = tuple(
path for path, attr in client.find_attr(*files).items()
if attr.get('filter') == 'lfs'
)
client.untrack_paths_from_storage(*tracked)

if client.find_attr(*tracked):
click.echo(WARNING + 'There are custom .gitattributes.\n')
if click.confirm(
'Do you want to edit ".gitattributes" now?', default=False
):
click.edit(filename=str(client.path / '.gitattributes'))

if tracked and client.has_external_storage:
client.track_paths_in_storage(
*(destinations[path] for path in tracked)
)

# 4. Handle symlinks.
dst.parent.mkdir(parents=True, exist_ok=True)
Expand Down
25 changes: 13 additions & 12 deletions renku/cli/remove.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,19 @@ def fmt_path(path):
dataset.to_yaml()

# 2. Manage .gitattributes for external storage.
tracked = tuple(
path for path, attr in client.find_attr(*files).items()
if attr.get('filter') == 'lfs'
)
client.untrack_paths_from_storage(*tracked)
existing = client.find_attr(*tracked)
if existing:
click.echo(WARNING + 'There are custom .gitattributes.\n')
if click.confirm(
'Do you want to edit ".gitattributes" now?', default=False
):
click.edit(filename=str(client.path / '.gitattributes'))
if client.has_external_storage:
tracked = tuple(
path for path, attr in client.find_attr(*files).items()
if attr.get('filter') == 'lfs'
)
client.untrack_paths_from_storage(*tracked)
existing = client.find_attr(*tracked)
if existing:
click.echo(WARNING + 'There are custom .gitattributes.\n')
if click.confirm(
'Do you want to edit ".gitattributes" now?', default=False
):
click.edit(filename=str(client.path / '.gitattributes'))

# Finally remove the files.
final_sources = list(set(files.values()))
Expand Down
12 changes: 8 additions & 4 deletions renku/cli/rerun.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,14 @@ def rerun(client, revision, roots, siblings, inputs, paths):
)
)

# Make sure all inputs are pulled from a storage.
client.pull_paths_from_storage(
*(path for _, path in workflow.iter_input_files(client.workflow_path))
)
# Don't compute paths if storage is disabled.
if client.has_external_storage:
# Make sure all inputs are pulled from a storage.
paths_ = (
path
for _, path in workflow.iter_input_files(client.workflow_path)
)
client.pull_paths_from_storage(*paths_)

# Store the generated workflow used for updating paths.
import yaml
Expand Down
14 changes: 8 additions & 6 deletions renku/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@
output.
Because the output path detection is based on the Git repository state after
the execution of ``renku run`` command, it is good to have a basic understading
of the underlying principles and limitations of tracking files in Git.
the execution of ``renku run`` command, it is good to have a basic
understanding of the underlying principles and limitations of tracking
files in Git.
Git tracks not only the paths in a repository, but also the content stored in
those paths. Therefore:
Expand Down Expand Up @@ -180,13 +181,14 @@ def run(client, outputs, no_output, success_codes, isolation, command_line):
with factory.watch(
client, no_output=no_output, outputs=outputs
) as tool:
# Make sure all inputs are pulled from a storage.
client.pull_paths_from_storage(
*(
# Don't compute paths if storage is disabled.
if client.has_external_storage:
# Make sure all inputs are pulled from a storage.
paths_ = (
path
for _, path in tool.iter_input_files(client.workflow_path)
)
)
client.pull_paths_from_storage(*paths_)

returncode = call(
factory.command_line,
Expand Down
12 changes: 8 additions & 4 deletions renku/cli/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,14 @@ def update(client, revision, no_output, siblings, paths):
outputs=outputs,
)

# Make sure all inputs are pulled from a storage.
client.pull_paths_from_storage(
*(path for _, path in workflow.iter_input_files(client.workflow_path))
)
# Don't compute paths if storage is disabled.
if client.has_external_storage:
# Make sure all inputs are pulled from a storage.
paths_ = (
path
for _, path in workflow.iter_input_files(client.workflow_path)
)
client.pull_paths_from_storage(*paths_)

with output_file.open('w') as f:
f.write(
Expand Down
26 changes: 23 additions & 3 deletions renku/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,10 @@ class ExternalStorageNotInstalled(RenkuException, click.ClickException):
def __init__(self, repo):
"""Build a custom message."""
msg = (
'Git-LFS is either not installed or not configured '
'for this repo.\n'
'By running this command without LFS you could be committing\n'
'External storage is not installed, '
'but this repository depends on it. \n'
'By running this command without storage installed '
'you could be committing\n'
'large files directly to the git repository.\n\n'
'If this is your intention, please repeat the command with '
'the -S flag (e.g. renku -S run <cmd>), \n'
Expand All @@ -263,6 +264,25 @@ def __init__(self, repo):
super(ExternalStorageNotInstalled, self).__init__(msg)


class ExternalStorageDisabled(RenkuException, click.ClickException):
"""Raise when disabled repository storage API is trying to be used."""

def __init__(self, repo):
"""Build a custom message."""
msg = (
'External storage is not configured, '
'but this action is trying to use it.\n'
'By running this command without storage enabled '
'you could be committing\n'
'large files directly to the git repository.\n\n'
'If this is your intention, please repeat the command with '
'the -S flag (e.g. renku -S run <cmd>), \n'
'otherwise install e.g. git-LFS with "git lfs install --local".'
)

super(ExternalStorageDisabled, self).__init__(msg)


class UninitializedProject(RenkuException, click.ClickException):
"""Raise when a project does not seem to have been initialized yet."""

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
'pytest>=4.0.0',
'responses>=0.7.0',
'unify>=0.4',
'yapf>=0.27.0',
'yapf==0.27.0',
]

extras_require = {
Expand Down
Loading

0 comments on commit 7938ac4

Please sign in to comment.