Skip to content

Commit

Permalink
add api resource; also support for local scratch space
Browse files Browse the repository at this point in the history
  • Loading branch information
jfischer committed Sep 21, 2019
1 parent a377edd commit e25aaeb
Show file tree
Hide file tree
Showing 10 changed files with 294 additions and 30 deletions.
45 changes: 24 additions & 21 deletions dataworkspaces/backends/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
"""

import os
from os.path import exists, join, isdir, basename, isabs, abspath, expanduser, dirname, curdir
from os.path import exists, join, isdir, basename, isabs, abspath, expanduser,\
dirname, curdir, commonpath
import shutil
import json
import re
Expand All @@ -23,7 +24,7 @@
is_git_dirty,\
is_pull_needed_from_remote, GIT_EXE_PATH,\
set_remote_origin, \
git_remove_file, git_remove_subtree
git_remove_file, git_remove_subtree, ensure_entry_in_gitignore
from dataworkspaces.utils.git_fat_utils import \
validate_git_fat_in_path_if_needed, \
run_git_fat_pull_if_needed, validate_git_fat_in_path, run_git_fat_push_if_needed,\
Expand Down Expand Up @@ -208,30 +209,17 @@ def _add_local_dir_to_gitignore_if_needed(self, resource):
if local_path is None:
return
assert isabs(local_path), "Resource local path should be absolute"
if not local_path.startswith(self.workspace_dir):
if commonpath([local_path, self.workspace_dir])!=self.workspace_dir:
return None
local_relpath = local_path[len(self.workspace_dir)+1:]
if not local_relpath.endswith('/'):
local_relpath_noslash = local_relpath
local_relpath = local_relpath + '/'
else:
local_relpath_noslash = local_relpath[:-1]
local_relpath = local_relpath + '/' # matches only directories
# Add a / as the start to indicate that the path starts at the root of the repo.
# Otherwise, we'll hit cases where the path could match other directories (e.g. issue #11)
local_relpath = '/'+local_relpath if not local_relpath.startswith('/') else local_relpath
local_relpath_noslash = '/'+local_relpath_noslash \
if not local_relpath_noslash.startswith('/') \
else local_relpath_noslash
gitignore_path = join(self.workspace_dir, '.gitignore')
# read the gitignore file to see if relpath is already there
if exists(gitignore_path):
with open(gitignore_path, 'r') as f:
for line in f:
line = line.rstrip()
if line==local_relpath or line==local_relpath_noslash:
return # no need to add
with open(gitignore_path, 'a') as f:
f.write(local_relpath+ '\n')
ensure_entry_in_gitignore(self.workspace_dir, '.gitignore',
local_relpath, match_independent_of_slashes=True,
verbose=self.verbose)

def add_resource(self, name:str, resource_type:str, role:str, *args, **kwargs)\
-> ws.Resource:
Expand All @@ -245,7 +233,22 @@ def clone_resource(self, name:str) -> ws.LocalStateResourceMixin:
r = super().clone_resource(name)
self._add_local_dir_to_gitignore_if_needed(r)
return r


def _get_local_scratch_space_for_resource(self, resource_name:str,
create_if_not_present:bool=False) \
-> str:
scratch_path = join(self.workspace_dir,
'.dataworkspace/scratch/%s'%resource_name)
if not isdir(scratch_path):
if create_if_not_present is False:
raise InternalError("Scratch path '%s' for resource %s is missing"%
(scratch_path, resource_name))
os.makedirs(scratch_path)
ensure_entry_in_gitignore(self.workspace_dir, '.dataworkspace/.gitignore',
'/scratch/%s/'%resource_name,
commit=True)
return scratch_path

def save(self, message:str) -> None:
"""Save the current state of the workspace"""
commit_changes_in_repo(self.workspace_dir, message, verbose=self.verbose)
Expand Down
1 change: 1 addition & 0 deletions dataworkspaces/commands/add.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ def add_command(scheme:str, role:str, name:str, workspace:Workspace, *args):

workspace.add_resource(name, scheme, role, *args)
workspace.save("add of %s" % name)
click.echo("Successful added resource '%s' to workspace."% name)
37 changes: 37 additions & 0 deletions dataworkspaces/dws.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,23 @@ def convert(self, value, param, ctx):
self.fail("Invalid resource role. Must be one of: %s" %
', '.join(RESOURCE_ROLE_CHOICES))
ROLE_PARAM = RoleParamType()
ROLE_DATA_CHOICES=[ResourceRoles.SOURCE_DATA_SET,
ResourceRoles.INTERMEDIATE_DATA]
class DataRoleParamType(click.ParamType):
"""A role parameter limited to source and intermediate data."""
name = 'role (one of %s)' % ', '.join(ROLE_DATA_CHOICES)

def convert(self, value, param, ctx):
value = value.lower()
if value in (ResourceRoles.SOURCE_DATA_SET, 's'):
return ResourceRoles.SOURCE_DATA_SET
elif value in (ResourceRoles.INTERMEDIATE_DATA, 'i'):
return ResourceRoles.INTERMEDIATE_DATA
else:
self.fail("Invalid resource role. Must be one of: %s" %
', '.join(ROLE_DATA_CHOICES))
DATA_ROLE_PARAM = DataRoleParamType()


@click.group()
@click.option('-b', '--batch', default=False, is_flag=True,
Expand Down Expand Up @@ -290,6 +307,26 @@ def git(ctx, role, name, branch, read_only, path):

add.add_command(git)

@click.command(name='api-resource')
@click.option('--role', type=DATA_ROLE_PARAM)
@click.option('--name', type=str, default=None,
help="Short name for this resource")
@click.pass_context
def api_resource(ctx, role, name):
"""Resource to represent data obtained via an API. Use this when there is
no file-based representation of your data that can be versioned and captured
more directly. Subcommand of ``add``"""
ns = ctx.obj
if role is None:
if ns.batch:
raise BatchModeError("--role")
else:
role = click.prompt("Please enter a role for this resource, either [s]ource-data or [i]ntermediate-data", type=DATA_ROLE_PARAM)
workspace = find_and_load_workspace(ns.batch, ns.verbose, ns.workspace_dir)
add_command('api-resource', role, name, workspace)

add.add_command(api_resource)


@click.command()
@click.option('--workspace-dir', type=WORKSPACE_PARAM, default=DWS_PATHDIR)
Expand Down
149 changes: 149 additions & 0 deletions dataworkspaces/resources/api_resource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@

import os
from os.path import join, exists
from typing import Tuple, Optional

import click

from dataworkspaces.errors import ConfigurationError, InternalError
from dataworkspaces.workspace import \
Workspace, Resource, ResourceRoles, SnapshotResourceMixin, \
LocalStateResourceMixin, ResourceFactory, JSONDict

API_RESOURCE_TYPE='api-resource'

class ApiResource(Resource, LocalStateResourceMixin, SnapshotResourceMixin):
"""This is a resource type for an API that has to be called to get data.
It is only valid for source data and intermediate resources.
To get the hash for a snapshot, a caller needs to go through the data in memory
and compute the hash. This is usually done by "monkey-patching" a framework's
API to get the data. The hash is stored in a local scratch directory which
is read when the snapshot is taken.
This resource inherits from LocalStateResourceMixin so that we can get
a clone call to initialze the scratch directory when the workspace or
individual resource is cloned.
"""
def __init__(self, name:str, role:str, workspace:Workspace):
super().__init__(API_RESOURCE_TYPE, name, role, workspace)

def get_params(self) -> JSONDict:
return {
'resource_type':self.resource_type,
'name':self.name,
'role':self.role
}

def validate_subpath_exists(self, subpath:str) -> None:
raise ConfigurationError("Subpath %s is not valid for resource %s: API resources do not support subpaths"%
(subpath, self.name))

def get_local_params(self) -> JSONDict:
return {}

def get_local_path_if_any(self) -> Optional[str]:
return None

def pull_precheck(self):
pass

def pull(self):
pass

def push_precheck(self):
pass

def push(self):
pass

def snapshot_precheck(self) -> None:
pass

def snapshot(self) -> Tuple[Optional[str], Optional[str]]:
scratch = self.workspace._get_local_scratch_space_for_resource(self.name)
hashfile = join(scratch, 'hashval.txt')
if exists(hashfile):
with open(hashfile, 'r') as f:
data = f.read().rstrip()
return (data, None)
else:
click.echo("WARNING: no hash available for resource %s" % self.name)
return (None, None)

def restore_precheck(self, restore_hashval:str) -> None:
raise InternalError("Attempt to restore resource %s, which is not restoreable"%
self.name)

def restore(self, restore_hashval:str) -> None:
raise InternalError("Attempt to restore resource %s, which is not restoreable"%
self.name)

def delete_snapshot(self, workspace_snapshot_hash:str, resource_restore_hash:str,
relative_path:str) -> None:
pass

def save_current_hash(self, hashval:str, comment:Optional[str]=None) -> None:
"""Save the specified hash value to the scratch space. If a
comment is provided, it is written to a separate file.
"""
scratch = self.workspace._get_local_scratch_space_for_resource(self.name)
hashfile = join(scratch, 'hashval.txt')
with open(hashfile, 'w') as f:
f.write(hashval)
commentfile = join(scratch, 'comment.txt')
if comment is not None:
with open(commentfile, 'w') as f:
f.write(comment+'\n')
else:
if exists(commentfile):
os.remove(commentfile)


class ApiResourceFactory(ResourceFactory):
def from_command_line(self, role:str, name:str, workspace:Workspace,
*args, **kwargs) -> Resource:
"""Instantiate a resource object from the add command's
arguments"""
if role not in (ResourceRoles.SOURCE_DATA_SET,
ResourceRoles.INTERMEDIATE_DATA):
raise ConfigurationError("API resources only supported for %s and %s roles"%
(ResourceRoles.SOURCE_DATA_SET,
ResourceRoles.INTERMEDIATE_DATA))
workspace._get_local_scratch_space_for_resource(name,
create_if_not_present=True)
return ApiResource(name, role, workspace)

def from_json(self, params:JSONDict, local_params:JSONDict,
workspace:Workspace) -> Resource:
"""Instantiate a resource object from saved params and local params"""
return ApiResource(params['name'], params['role'], workspace)

def has_local_state(self) -> bool:
"""Return true if this resource has local state and needs
a clone step the first time it is used.
We return True because we have the local scratch space for the hashes
that needs to be set up during a clone.
"""
return True

def clone(self, params:JSONDict, workspace:Workspace) -> LocalStateResourceMixin:
"""Instantiate a local copy of the resource
that came from the remote origin. We don't yet have local params,
since this resource is not yet on the local machine. If not in batch
mode, this method can ask the user for any additional information needed
(e.g. a local path). In batch mode, should either come up with a reasonable
default or error out if not enough information is available."""
workspace._get_local_scratch_space_for_resource(params['name'],
create_if_not_present=True)
return ApiResource(params['name'], params['role'], workspace)


def suggest_name(self, workspace:Workspace, role:str, *args) -> str:
"""Given the arguments passed in to create a resource,
suggest a name for the case where the user did not provide one
via --name. This will be used by suggest_resource_name() to
find a short, but unique name for the resource.
"""
return role+'-api'
3 changes: 2 additions & 1 deletion dataworkspaces/resources/git_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,8 @@ def clone(self, params, workspace):
relative_local_path, local_path,
branch, read_only)

def suggest_name(self, workspace, local_path, branch, read_only):
def suggest_name(self, workspace, role, local_path, branch,
read_only):
return basename(local_path)


Expand Down
2 changes: 1 addition & 1 deletion dataworkspaces/resources/local_file_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,6 @@ def clone(self, params:JSONDict, workspace:Workspace) -> LocalStateResourceMixin
os.mkdir(non_git_hashes)
return self.from_json(params, local_params, workspace)

def suggest_name(self, workspace, local_path, compute_hash):
def suggest_name(self, workspace, role, local_path, compute_hash):
return os.path.basename(local_path)

2 changes: 1 addition & 1 deletion dataworkspaces/resources/rclone_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,6 @@ def clone(self, params:JSONDict, workspace:Workspace) -> LocalStateResourceMixin



def suggest_name(self, local_path, compute_hash):
def suggest_name(self, role, local_path, compute_hash):
return os.path.basename(local_path)

4 changes: 4 additions & 0 deletions dataworkspaces/resources/resource_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,7 @@ def register_resource_type(scheme, factory):

from dataworkspaces.resources.rclone_resource import RcloneFactory
register_resource_type('rclone', RcloneFactory)

from dataworkspaces.resources.api_resource import ApiResourceFactory,\
API_RESOURCE_TYPE
register_resource_type(API_RESOURCE_TYPE, ApiResourceFactory)
58 changes: 57 additions & 1 deletion dataworkspaces/utils/git_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"""
Utility functions related to interacting with git
"""
from os.path import isdir, join, dirname
from os.path import isdir, join, dirname, exists
from subprocess import run, PIPE
import shutil
import re
Expand Down Expand Up @@ -398,3 +398,59 @@ def get_git_config_param(repo_dir, param_name, verbose):
param_val = call_subprocess([GIT_EXE_PATH, 'config', param_name],
cwd=repo_dir, verbose=verbose)
return param_val.strip()

def ensure_entry_in_gitignore(repo_dir:str, gitignore_rel_path:str, entry:str,
match_independent_of_slashes=False,
commit:bool=False, verbose=False) -> bool:
"""Ensure that the specified entry is in the specified .gitignore file.
Entries can have a leading slash (refers to an absolute path within the repo)
and a trailing slash (matches only a directory, not a file).
If match_independent_of_slashes is True, we match an existing
entry, even if it differs on leading and/or trailing slashes. Otherwise,
it must be an exact match.
If a change was made, and commit is specified, commit the change. Otherwise,
just add the file to the staging.
Returns True if a change was made, False otherwise.
"""
def strip_slashes(e):
if e.startswith('/'):
e = e[1:]
if e.endswith('/'):
e = e[:-1]
assert len(e)>0
return e

entry_wo_slashes = strip_slashes(entry)
abs_file_path = join(repo_dir, gitignore_rel_path)
if exists(abs_file_path):
last_has_newline = True
with open(abs_file_path, 'r') as f:
for line in f:
if line.endswith('\n'):
last_has_newline = True
else:
last_has_newline = False
line = line.rstrip()
if line==entry or \
(match_independent_of_slashes and
strip_slashes(line)==entry_wo_slashes):
return False # entry already present, nothing to do
with open(abs_file_path, 'a') as f:
if not last_has_newline:
f.write('\n')
f.write(entry+'\n')
else:
with open(abs_file_path, 'a') as f:
f.write(entry+'\n')
call_subprocess([GIT_EXE_PATH, 'add', gitignore_rel_path],
cwd=repo_dir, verbose=verbose)
if commit:
call_subprocess([GIT_EXE_PATH, 'commit', '-m',
'Add .gitignore entry for %s' % entry],
cwd=repo_dir, verbose=verbose)
return True


0 comments on commit e25aaeb

Please sign in to comment.