Skip to content

Commit

Permalink
Change the snapshot to differentiate between comparison and restore h…
Browse files Browse the repository at this point in the history
…ashes. Fixes #23, Fixes #5, Fixes #6.
  • Loading branch information
jfischer committed Mar 30, 2019
1 parent 5ea8b42 commit ef7ede4
Show file tree
Hide file tree
Showing 12 changed files with 74 additions and 89 deletions.
2 changes: 2 additions & 0 deletions .travis.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
language: python
script: cd tests; make test
2 changes: 1 addition & 1 deletion dataworkspaces/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def restore(tag_or_hash, workspace_dir=None, only=None, leave=None):
"""
workspace_dir = _get_workspace(workspace_dir)
restore_command(workspace_dir, batch=True, verbose=False, tag_or_hash=tag_or_hash,
only=only, leave=leave, no_new_snapshot=True)
only=only, leave=leave)



8 changes: 4 additions & 4 deletions dataworkspaces/commands/diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ def compare_lineage_files(f1, f2):


def diff_command(workspace_dir, snapshot_or_tag1, snapshot_or_tag2, batch, verbose):
(snapshot1, tag1) = find_snapshot(snapshot_or_tag1, workspace_dir)
snstr1 = "%s, tag %s" % (snapshot1, tag1) if tag1 else snapshot1
(snapshot1, tags1, _) = find_snapshot(snapshot_or_tag1, workspace_dir)
snstr1 = "%s, tags %s" % (snapshot1, ','.join(tags1)) if tags1 else snapshot1
sn1_resources = SnapshotResources.read_shapshot_manifest(snapshot1,
workspace_dir,
batch, verbose)
sn1_names = sn1_resources.get_names()
(snapshot2, tag2) = find_snapshot(snapshot_or_tag2, workspace_dir)
snstr2 = "%s, tag %s" % (snapshot2, tag2) if tag2 else snapshot2
(snapshot2, tags2, _) = find_snapshot(snapshot_or_tag2, workspace_dir)
snstr2 = "%s, tags %s" % (snapshot2, ','.join(tags2)) if tags2 else snapshot2
sn2_resources = SnapshotResources.read_shapshot_manifest(snapshot2,
workspace_dir,
batch, verbose)
Expand Down
1 change: 0 additions & 1 deletion dataworkspaces/commands/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
"""
Definition of configuration parameters
"""
import re
import json
import socket
from os.path import join
Expand Down
81 changes: 25 additions & 56 deletions dataworkspaces/commands/restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,34 @@
import json
import os
from os.path import join, isdir
import datetime
import click

from dataworkspaces.utils.git_utils import \
is_a_git_hash, is_a_shortened_git_hash, is_a_git_fat_repo, \
validate_git_fat_in_path_if_needed
import dataworkspaces.commands.actions as actions
from dataworkspaces.errors import ConfigurationError, UserAbort
from dataworkspaces.errors import ConfigurationError, UserAbort, \
InternalError
from dataworkspaces.resources.resource import \
CurrentResources, SnapshotResources
from .init import get_snapshot_metadata_dir_path
from .snapshot import TakeResourceSnapshot, WriteSnapshotMetadata,\
get_snapshot_lineage_dir
from .snapshot import get_snapshot_lineage_dir
from dataworkspaces.utils.lineage_utils import \
get_current_lineage_dir, LineageStoreCurrent
from .params import get_local_param_from_file, HOSTNAME

class RestoreResource(actions.Action):
def __init__(self, ns, verbose, resource, snapshot_resources):
def __init__(self, ns, verbose, resource, restore_hashes):
super().__init__(ns, verbose)
self.resource = resource
self.hashval = snapshot_resources.name_to_hashval[resource.name]
#self.hashval = snapshot_resources.name_to_hashval[resource.name]
if (resource.name in restore_hashes):
self.hashval = restore_hashes[resource.name]
if self.hashval is None:
raise ConfigurationError("Resource '%s' cannot be restored. To run the restore of the workspace, skip this resource using the --leave option"%
resource.name)
else:
raise InternalError("Unable to restore resource '%s': no restore hash"
% resource.name)
self.resource.restore_prechecks(self.hashval)

def run(self):
Expand Down Expand Up @@ -165,7 +171,7 @@ def process_names(current_names, snapshot_names, only=None, leave=None):


def find_snapshot(tag_or_hash, workspace_dir):
"""Return a (hash, tag) pair for the tag or hash. Throws
"""Return a (hash, tags, result_hashes) tuple for the tag or hash. Throws
a configuration error if not found.
"""
if is_a_git_hash(tag_or_hash):
Expand Down Expand Up @@ -193,7 +199,7 @@ def process_dir(dirpath):
(is_short_hash and data['hash'].endswith(tag_or_hash)) or\
((not (is_hash or is_short_hash)) and
tag_or_hash in data['tags']):
return (data['hash'], data['tags'])
return (data['hash'], data['tags'], data['restore_hashes'])
result = process_dir(md_dir)
if result is not None:
return result
Expand All @@ -203,18 +209,16 @@ def process_dir(dirpath):
raise ConfigurationError("Did not find a snapshot corresponding to tag '%s' in history" % tag_or_hash)


def restore_command(workspace_dir, batch, verbose, tag_or_hash,
only=None, leave=None, no_new_snapshot=False):
def restore_command(workspace_dir, batch, verbose, tag_or_hash, only=None, leave=None):
validate_git_fat_in_path_if_needed(workspace_dir)
# First, find the history entry
(snapshot_hash, snapshot_tags) = find_snapshot(tag_or_hash, workspace_dir)
(snapshot_hash, snapshot_tags, restore_hashes) = find_snapshot(tag_or_hash, workspace_dir)
snapshot_resources = SnapshotResources.read_shapshot_manifest(snapshot_hash, workspace_dir, batch, verbose)
current_resources = CurrentResources.read_current_resources(workspace_dir, batch, verbose)
original_current_resource_names = current_resources.get_names()
(names_to_restore, names_to_add, names_to_leave) = \
process_names(original_current_resource_names, snapshot_resources.get_names(), only, leave)
plan = []
creating_new_snapshot = False # True if we are creating a new snapshot and hash
ns = actions.Namespace()
ns.map_of_hashes = {}
names_to_restore_lineage = []
Expand All @@ -224,7 +228,7 @@ def restore_command(workspace_dir, batch, verbose, tag_or_hash,
r = current_resources.by_name[name]
if not r.has_results_role():
# just need to call restore
plan.append(RestoreResource(ns, verbose, r, snapshot_resources))
plan.append(RestoreResource(ns, verbose, r, restore_hashes))
names_to_restore_lineage.append(r.name) # only non-results restored
else:
# This is a results resource, we'll add it to the leave set
Expand Down Expand Up @@ -253,37 +257,9 @@ def restore_command(workspace_dir, batch, verbose, tag_or_hash,
if not snapshot_resources.is_a_current_name(name):
# we are just leaving a resource added since snapshot was taken
plan.append(AddResourceToSnapshotResourceList(ns, verbose, r, snapshot_resources))
if (name not in results_resources) and (not no_new_snapshot):
plan.append(TakeResourceSnapshot(ns, verbose, r))
creating_new_snapshot = True
need_to_write_resources_file = \
original_current_resource_names!=snapshot_resources.get_names()
tagstr = ', tags=%s' % ', '.join(snapshot_tags) if snapshot_tags else ''
if creating_new_snapshot:
new_snapshot_desc = \
"Partial restore of snapshot %s%s, resulting in a new snapshot"% \
(snapshot_hash, tagstr)
write_revised = WriteRevisedSnapshotFile(ns, verbose, workspace_dir,
snapshot_resources)
plan.append(write_revised)
hostname = get_local_param_from_file(workspace_dir, HOSTNAME)
metadata_action = \
WriteSnapshotMetadata(ns, verbose, batch, workspace_dir,
None, new_snapshot_desc,
datetime.datetime.now(),
rel_dest_root=None, hostname=hostname)
plan.append(metadata_action)
if need_to_write_resources_file:
plan.append(WriteRevisedResourceFile(ns, verbose, snapshot_resources))
plan.append(actions.GitAdd(ns, verbose, workspace_dir,
lambda:[ns.snapshot_filename,
ns.snapshot_metadata_file,
snapshot_resources.resource_file]))
else:
plan.append(actions.GitAdd(ns, verbose, workspace_dir,
lambda:[ns.snapshot_filename,
ns.snapshot_metadata_file]))
elif need_to_write_resources_file:
if need_to_write_resources_file:
plan.append(actions.GitAdd(ns, verbose, workspace_dir,
[snapshot_resources.resource_file]))

Expand All @@ -298,17 +274,12 @@ def restore_command(workspace_dir, batch, verbose, tag_or_hash,
if is_a_git_fat_repo(workspace_dir):
plan.append(GitFatPull(ns, verbose, workspace_dir))

if creating_new_snapshot:
commit_msg_fn = lambda: new_snapshot_desc + " " + ns.snapshot_hash
desc = new_snapshot_desc
else:
desc = "Restore snapshot %s%s" % (snapshot_hash, tagstr)
commit_msg_fn = lambda: desc

if need_to_write_resources_file or creating_new_snapshot:
if need_to_write_resources_file:
plan.append(actions.GitCommit(ns, verbose, workspace_dir,
commit_message=commit_msg_fn))
click.echo(desc)
commit_message="Updating resources after restore to %s"%
tag_or_hash))
tagstr = ' (%s)'%','.join(snapshot_tags) if len(snapshot_tags)>0 else ''
click.echo("Restore snapshot %s%s" % (snapshot_hash, tagstr))
def fmt_rlist(rnames):
if len(rnames)>0:
return ', '.join(rnames)
Expand All @@ -325,6 +296,4 @@ def fmt_rlist(rnames):
if resp.lower()!='y' and resp!='':
raise UserAbort()
actions.run_plan(plan, 'run this restore', 'run restore', batch, verbose)
if creating_new_snapshot:
click.echo("New snapshot is %s." % ns.snapshot_hash)


10 changes: 8 additions & 2 deletions dataworkspaces/commands/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ def __init__(self, ns, verbose, resource):
self.resource.snapshot_prechecks()

def run(self):
self.ns.map_of_hashes[self.resource.name] = self.resource.snapshot()
(compare_hash, restore_hash) = self.resource.snapshot()
self.ns.map_of_hashes[self.resource.name] = compare_hash
self.ns.map_of_restore_hashes[self.resource.name] = restore_hash

def __str__(self):
return "Run snapshot actions for %s" % str(self.resource)
Expand Down Expand Up @@ -166,6 +168,7 @@ def merge_snapshot_metadata(old, new, batch):

class WriteSnapshotMetadata(actions.Action):
@actions.requires_from_ns('snapshot_hash', str)
@actions.requires_from_ns('map_of_restore_hashes', dict)
@actions.provides_to_ns('snapshot_metadata_file', str)
def __init__(self, ns, verbose, batch, workspace_dir,
tag, message, timestamp, rel_dest_root,
Expand All @@ -186,7 +189,8 @@ def run(self):
'message':self.message,
'relative_destination_path':self.rel_dest_root,
'hostname':self.hostname,
'timestamp':self.timestamp.isoformat()
'timestamp':self.timestamp.isoformat(),
'restore_hashes':self.ns.map_of_restore_hashes
}
if not isdir(self.snapshot_metadata_dir):
os.mkdir(self.shapshot_metadata_dir) # just in case
Expand Down Expand Up @@ -316,6 +320,7 @@ def snapshot_command(workspace_dir, batch, verbose, tag=None, message=''):
plan = []
ns = actions.Namespace()
ns.map_of_hashes = actions.Promise(dict, "TakeResourceSnapshot")
ns.map_of_restore_hashes = actions.Promise(dict, "TakeResourceSnapshot")

snapshot_number = get_next_snapshot_number(workspace_dir)
with open(get_config_file_path(workspace_dir), 'r') as f:
Expand Down Expand Up @@ -362,6 +367,7 @@ def snapshot_command(workspace_dir, batch, verbose, tag=None, message=''):
plan.append(actions.GitCommit(ns, verbose, workspace_dir,
commit_message=lambda:"Snapshot "+ns.snapshot_hash))
ns.map_of_hashes = {}
ns.map_of_restore_hashes = {}
actions.run_plan(plan, "take snapshot of workspace",
"taken snapshot of workspace", batch=batch, verbose=verbose)
return ns.snapshot_hash
Expand Down
10 changes: 2 additions & 8 deletions dataworkspaces/dws.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,15 +312,9 @@ def snapshot(ctx, workspace_dir, message, tag):
@click.option('--leave', type=str, default=None,
metavar="RESOURCE1[,RESOURCE2,...]",
help="Comma-separated list of resource names that you wish to leave in their current state. The rest will be restored to the specified snapshot.")
@click.option('--no-new-snapshot', is_flag=True, default=False,
metavar="RESOURCE1[,RESOURCE2,...]",
help="By default, a new snapshot will be taken if the restore leaves the "+
"workspace in a different state than the requested shapshot (e.g. due "+
"to --only or --leave or added resources). If --no-new-snapshot is "+
"specified, we adjust the individual resource states without taking a new snapshot.")
@click.argument('tag_or_hash', type=str, default=None, required=True)
@click.pass_context
def restore(ctx, workspace_dir, only, leave, no_new_snapshot, tag_or_hash):
def restore(ctx, workspace_dir, only, leave, tag_or_hash):
"""Restore the workspace to a prior state"""
ns = ctx.obj
if (only is not None) and (leave is not None):
Expand All @@ -332,7 +326,7 @@ def restore(ctx, workspace_dir, only, leave, no_new_snapshot, tag_or_hash):
workspace_dir = click.prompt("Please enter the workspace root dir",
type=WORKSPACE_PARAM)
restore_command(workspace_dir, ns.batch, ns.verbose, tag_or_hash,
only=only, leave=leave, no_new_snapshot=no_new_snapshot)
only=only, leave=leave)

cli.add_command(restore)

Expand Down
17 changes: 11 additions & 6 deletions dataworkspaces/resources/git_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
is_git_repo, commit_changes_in_repo_subdir,\
checkout_subdir_and_apply_commit, is_a_git_fat_repo,\
has_git_fat_been_initialized, validate_git_fat_in_path,\
validate_git_fat_in_path_if_needed
validate_git_fat_in_path_if_needed, get_subdirectory_hash
from .resource import Resource, ResourceFactory, LocalPathType, ResourceRoles,\
RESOURCE_ROLE_PURPOSES
from .snapshot_utils import move_current_files_local_fs
Expand Down Expand Up @@ -161,7 +161,8 @@ def snapshot(self):
commit_changes_in_repo(self.local_path, 'autocommit ahead of snapshot',
verbose=self.verbose)
switch_git_branch_if_needed(self.local_path, self.branch, self.verbose)
return get_local_head_hash(self.local_path, self.verbose)
hashval = get_local_head_hash(self.local_path, self.verbose)
return (hashval, hashval)

def restore_prechecks(self, hashval):
rc = call_subprocess_for_rc([GIT_EXE_PATH, 'cat-file', '-e',
Expand Down Expand Up @@ -443,8 +444,11 @@ def snapshot_prechecks(self):
validate_git_fat_in_path_if_needed(self.workspace_dir)

def snapshot(self):
# Todo: handle tags
return get_local_head_hash(self.workspace_dir, verbose=self.verbose)
# The subdirectory hash is used for comparison and the head
# hash used for restoring
return (get_subdirectory_hash(self.workspace_dir, self.relative_path,
verbose=self.verbose),
get_local_head_hash(self.workspace_dir, verbose=self.verbose))


def restore_prechecks(self, hashval):
Expand Down Expand Up @@ -537,8 +541,9 @@ def snapshot(self):
# Todo: handle tags
commit_changes_in_repo_subdir(self.workspace_dir, self.relative_path, 'autocommit ahead of snapshot',
verbose=self.verbose)
return get_local_head_hash(self.workspace_dir, verbose=self.verbose)

return (get_subdirectory_hash(self.workspace_dir, self.relative_path,
verbose=self.verbose),
get_local_head_hash(self.workspace_dir, verbose=self.verbose))

def restore_prechecks(self, hashval):
validate_git_fat_in_path_if_needed(self.workspace_dir)
Expand Down
2 changes: 1 addition & 1 deletion dataworkspaces/resources/local_file_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def snapshot(self):
call_subprocess([GIT_EXE_PATH, 'commit', '-m',
"Add snapshot hash files for resource %s" % self.name],
cwd=self.workspace_dir, verbose=False)
return h
return (h, None)

def add_results_file(self, temp_path, rel_dest_path):
"""Move a results file from the temporary location to
Expand Down
2 changes: 1 addition & 1 deletion dataworkspaces/resources/rclone_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def snapshot(self):
else:
(ret, out) = self.rclone.check(self.remote_origin, self.local_path, flags=['--one-way', '--size-only'])
print('Snapshot returns ', ret, out)
return ret
return (ret, None) # None for the restore hash since we cannot restore

def add_results_file(self, temp_path, rel_dest_path):
"""Move a results file from the temporary location to
Expand Down
18 changes: 14 additions & 4 deletions dataworkspaces/resources/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import os
from os.path import join, exists, abspath, expanduser, dirname, isdir, realpath
from tempfile import NamedTemporaryFile
from typing import Union, Tuple

import click

Expand Down Expand Up @@ -135,13 +136,22 @@ def add_results_file_from_buffer(self, str_buffer, rel_dest_path):
if tfname is not None and exists(tfname):
os.remove(tfname)

def snapshot(self):
pass
def snapshot(self) -> Tuple[str, str]:
"""Take the actual snapshot of the resource and return a tuple
of two hash values, the first for comparison, and the second for restoring.
The comparison hash value is the one we save in the snapshot file. The
restore hash value is saved in the snapshot metadata.
In many cases both hashes are the same. If the resource does not support
restores, it can return None for the second hash. This will cause
attempted restores involving this resource to error out, unless it
is explicitly left out with the --leave option.
"""
raise NotImplementedError(self.__class__.__name__)

def restore_prechecks(self, hashval):
def restore_prechecks(self, restore_hashval):
pass

def restore(self, hashval):
def restore(self, restore_hashval):
pass

def push_prechecks(self):
Expand Down

0 comments on commit ef7ede4

Please sign in to comment.