Skip to content

Commit

Permalink
fix rclone and local test parameters; add size_only parameter to rclone
Browse files Browse the repository at this point in the history
  • Loading branch information
jfischer committed May 4, 2020
1 parent d10759b commit f84f627
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 45 deletions.
10 changes: 10 additions & 0 deletions dataworkspaces/dws.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,14 @@ def local_files(ctx, role, name, compute_hash: bool, export: bool, imported: boo
+ "the target are removed if they are not present at the source. The default is 'copy'. If master is 'none', "
+ "this option has no effect.",
)
@click.option(
"--size-only",
is_flag=True,
default=False,
help="If specified, use only the file size (rather than also modification time and checksum) to "
+ "determine if a file has been changed. If your resource has a lot of files and access to the remote "
+ "is over a WAN, you probably want to set this. Otherwise, syncs/copies can be VERY slow.",
)
@click.argument("remote", type=str)
@click.argument(
"local_path", type=str
Expand All @@ -462,6 +470,7 @@ def rclone(
imported: bool,
master: str,
sync_mode: str,
size_only: bool,
remote: str,
local_path: str,
):
Expand Down Expand Up @@ -514,6 +523,7 @@ def rclone(
imported,
master,
sync_mode,
size_only,
)


Expand Down
2 changes: 2 additions & 0 deletions dataworkspaces/resources/local_file_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ def __init__(
help="True if metadata for export should be added each snapshot",
is_global=True,
ptype=BoolType(),
allow_missing=True,
)
self.export = self.param_defs.get("export", export) # type: bool
self.param_defs.define(
Expand All @@ -114,6 +115,7 @@ def __init__(
help="If True, then this resource has lineage imported from another workspace",
is_global=True,
ptype=BoolType(),
allow_missing=True,
)
self.imported = self.param_defs.get("imported", imported) # type: bool
self.ignore = ignore if (ignore is not None) else [] # TODO: should this be a parameter?
Expand Down
106 changes: 63 additions & 43 deletions dataworkspaces/resources/rclone_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
ParamValidationError,
StringType,
EnumType,
BoolType,
)

from dataworkspaces.resources.local_file_resource import LocalFileResource, setup_path_for_hashes
Expand All @@ -42,6 +43,7 @@ class RemoteOriginType(ParamType):
"""Custom param type for maintaining the remote origin in the form
remote_name:path. Path does not need to be absolute
"""

def validate(self, value: Any) -> None:
if not isinstance(value, str):
raise ParamValidationError("Remote origin '%s' is not a string" % repr(value))
Expand Down Expand Up @@ -71,6 +73,7 @@ def __init__(
imported: Optional[bool] = None,
master: Optional[str] = None,
sync_mode: Optional[str] = None,
size_only: Optional[bool] = None,
ignore: List[str] = [],
):
super().__init__(
Expand Down Expand Up @@ -129,41 +132,43 @@ def __init__(
ptype=EnumType("sync", "copy"),
)
self.sync_mode = self.param_defs.get("sync_mode", sync_mode) # type: str
self.param_defs.define(
"size_only",
default_value=False,
optional=False,
help="If specified, use only the file size (rather than also modification time and checksum) to "
+ "determine if a file has been changed. If your resource has a lot of files and access to the remote "
+ "is over a WAN, you probably want to set this. Otherwise, syncs/copies can be VERY slow.",
is_global=True,
ptype=BoolType(),
allow_missing=True,
)
self.size_only = self.param_defs.get("size_only", size_only) # type: bool

if config:
self.rclone = RClone(cfgfile=self.config)
else:
self.rclone = RClone()

def pull_precheck(self) -> None:
if self.master == "remote":
if self.sync_mode == "sync":
ret = self.rclone.sync(
self.remote_origin, self.local_path, flags=["--dry-run", "--quiet"]
)
if ret["code"] != 0:
raise ConfigurationError(
"rclone sync precheck raised error %d: %s" % (ret["code"], ret["error"])
)
else: # copy mode
ret = self.rclone.copy(
self.remote_origin, self.local_path, flags=["--dry-run", "--quiet"]
)
if ret["code"] != 0:
raise ConfigurationError(
"rclone copy precheck raised error %d: %s" % (ret["code"], ret["error"])
)
pass # a dry run can be very expensive, so skipping for now

def pull(self) -> None:
if self.size_only:
flags = ["--size-only"]
else:
flags = []
if self.workspace.verbose:
flags.append("--verbose")
if self.master == "remote":
if self.sync_mode == "sync":
ret = self.rclone.sync(self.remote_origin, self.local_path)
ret = self.rclone.sync(self.remote_origin, self.local_path, flags=flags)
if ret["code"] != 0:
raise ConfigurationError(
"rclone sync raised error %d: %s" % (ret["code"], ret["error"])
)
elif self.sync_mode == "copy":
ret = self.rclone.copy(self.remote_origin, self.local_path)
ret = self.rclone.copy(self.remote_origin, self.local_path, flags=flags)
if ret["code"] != 0:
raise ConfigurationError(
"rclone copy raised error %d: %s" % (ret["code"], ret["error"])
Expand All @@ -172,34 +177,24 @@ def pull(self) -> None:
click.echo("Skipping pull of resource %s, master is %s" % (self.name, self.master))

def push_precheck(self) -> None:
if self.master == "local":
if self.sync_mode == "copy":
ret = self.rclone.copy(
self.local_path, self.remote_origin, flags=["--dry-run", "--quiet"]
)
if ret["code"] != 0:
raise ConfigurationError(
"rclone copy precheck raised error %d: %s" % (ret["code"], ret["error"])
)
else:
ret = self.rclone.sync(
self.local_path, self.remote_origin, flags=["--dry-run", "--quiet"]
)
if ret["code"] != 0:
raise ConfigurationError(
"rclone sync precheck raised error %d: %s" % (ret["code"], ret["error"])
)
pass # a dry run can be very expensive, so skipping for now

def push(self) -> None:
if self.size_only:
flags = ["--size-only"]
else:
flags = []
if self.workspace.verbose:
flags.append("--verbose")
if self.master == "local":
if self.sync_mode == "copy":
ret = self.rclone.copy(self.local_path, self.remote_origin)
ret = self.rclone.copy(self.local_path, self.remote_origin, flags=flags)
if ret["code"] != 0:
raise ConfigurationError(
"rclone copy raised error %d: %s" % (ret["code"], ret["error"])
)
else:
ret = self.rclone.sync(self.local_path, self.remote_origin)
ret = self.rclone.sync(self.local_path, self.remote_origin, flags=flags)
if ret["code"] != 0:
raise ConfigurationError(
"rclone sync raised error %d: %s" % (ret["code"], ret["error"])
Expand Down Expand Up @@ -230,7 +225,15 @@ def _add_prechecks(self, local_path, remote_origin, config) -> RClone:
return rclone

def _copy_from_remote(
self, name, local_path, remote_origin, rclone, master="none", sync_mode="copy"
self,
name,
local_path,
remote_origin,
rclone,
master="none",
sync_mode="copy",
size_only=False,
verbose=False,
):
if master == "remote":
click.echo("%s: performing initial %s from remote" % (name, sync_mode))
Expand All @@ -251,10 +254,17 @@ def _copy_from_remote(
)
return

if size_only:
flags = ["--size-only"]
else:
flags = []
if verbose:
flags.append("--verbose")

if sync_mode == "copy":
ret = rclone.copy(remote_origin, local_path)
ret = rclone.copy(remote_origin, local_path, flags=flags)
else:
ret = rclone.sync(remote_origin, local_path)
ret = rclone.sync(remote_origin, local_path, flags=flags)
if ret["code"] != 0:
raise ConfigurationError(
"rclone %s raised error %d: %s" % (sync_mode, ret["code"], ret["error"])
Expand All @@ -281,9 +291,12 @@ def from_command_line(
imported,
master,
sync_mode,
size_only,
):
rclone = self._add_prechecks(local_path, remote_origin, config)
self._copy_from_remote(name, local_path, remote_origin, rclone, master, sync_mode)
self._copy_from_remote(
name, local_path, remote_origin, rclone, master, sync_mode, size_only, workspace.verbose
)
setup_path_for_hashes(role, name, workspace, local_path)
if imported:
lineage_path = os.path.join(local_path, "lineage.json")
Expand Down Expand Up @@ -322,6 +335,7 @@ def from_command_line(
imported=imported,
master=master,
sync_mode=sync_mode,
size_only=size_only,
)

def from_json(
Expand All @@ -347,6 +361,7 @@ def from_json(
imported=params.get("imported", False),
master=params.get("master", None),
sync_mode=params.get("sync_mode", None),
size_only=params.get("size_only", None),
)

def has_local_state(self) -> bool:
Expand All @@ -361,6 +376,7 @@ def clone(self, params: JSONDict, workspace: Workspace) -> LocalStateResourceMix
name = params["name"]
master = params.get("master", None)
sync_mode = params.get("sync_mode", None)
size_only = params.get("size_only", None)

# check local_path, too for backward compatibility
global_local_path = (
Expand Down Expand Up @@ -388,7 +404,9 @@ def clone(self, params: JSONDict, workspace: Workspace) -> LocalStateResourceMix
)

rclone = self._add_prechecks(local_path, remote_origin, config)
self._copy_from_remote(name, local_path, remote_origin, rclone, master, sync_mode)
self._copy_from_remote(
name, local_path, remote_origin, rclone, master, sync_mode, size_only, workspace.verbose
)
return RcloneResource(
name,
params["role"],
Expand All @@ -402,6 +420,7 @@ def clone(self, params: JSONDict, workspace: Workspace) -> LocalStateResourceMix
imported=params.get("imported", False),
master=master,
sync_mode=sync_mode,
size_only=size_only,
)

def suggest_name(
Expand All @@ -416,5 +435,6 @@ def suggest_name(
imported,
master,
sync_mode,
size_only,
):
return os.path.basename(local_path)
28 changes: 26 additions & 2 deletions tests/test_rclone.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from os.path import join, exists
import unittest
import shutil
import subprocess

from utils_for_tests import BaseCase, WS_DIR, TEMPDIR
from dataworkspaces.lineage import LineageBuilder
Expand Down Expand Up @@ -74,6 +75,10 @@ def _assert_final_state_sync(self, basedir):
self._assert_file(basedir, 'subdir/file3.txt', 16)
self._assert_file(basedir, 'subdir/file4.txt', 16)

def _touch_file(self, path, cwd=TEMPDIR):
assert exists(path)
subprocess.run(['touch', path], cwd=cwd)

def _init_files(self, basedir=MASTER_DIR):
with open(join(basedir, 'file1.txt'), 'w') as f:
f.write("this is a test\n")
Expand Down Expand Up @@ -119,10 +124,10 @@ def test_copy_remote_is_master(self):
self.assertEqual(len(history), 2)
snap1 = history[0]
self.assertEqual(['tag1'], snap1.tags)
self.assertEqual('9162a3c2825841ee9426c28089da4901986bb226', snap1.hashval)
self.assertEqual('830910b0c7e74e62d1ea57a403980640a71471c0', snap1.hashval)
snap2 = history[1]
self.assertEqual(['tag2'], snap2.tags)
self.assertEqual('e8ea515f179a08479caafb7c7da05ce18525cbd0', snap2.hashval)
self.assertEqual('4991d9620dbe557c18df9f7301f278c6267c13a3', snap2.hashval)

def test_sync_remote_is_master(self):
"""Will pull changes down from master in sync mode.
Expand Down Expand Up @@ -239,6 +244,25 @@ def test_real_remote(self):
self._assert_final_state_sync(RESOURCE_DIR)
self._run_dws(['snapshot', 'tag2'])

def test_size_only(self):
"""Will pull changes down from master in copy mode.
"""
self._setup_initial_repo()
os.mkdir(MASTER_DIR)
self._init_files()
self._run_dws(['add', 'rclone','--role', 'source-data', '--sync-mode=copy', '--master=remote',
'--size-only', 'localfs:'+MASTER_DIR,
RESOURCE_DIR])
self._assert_initial_state(RESOURCE_DIR)
self._run_dws(['snapshot', 'tag1'])

self._update_files()
FILE3_MASTER=join(MASTER_DIR, 'subdir/file3.txt')
self._touch_file(FILE3_MASTER)
self._run_dws(['pull'])
self._assert_final_state_copy(RESOURCE_DIR)
self._run_dws(['snapshot', 'tag2'])


if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "--keep-outputs":
Expand Down

0 comments on commit f84f627

Please sign in to comment.