Skip to content

Commit

Permalink
Reformmated the entire source tree with Black (except for 3rd party)
Browse files Browse the repository at this point in the history
  • Loading branch information
jfischer committed Mar 28, 2020
1 parent 681fce1 commit cd43b02
Show file tree
Hide file tree
Showing 45 changed files with 5,747 additions and 3,927 deletions.
2 changes: 1 addition & 1 deletion dataworkspaces/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# Copyright 2018,2019 by MPI-SWS and Data-ken Research. Licensed under Apache 2.0. See LICENSE.txt.

__version__ = '1.2.4'
__version__ = "1.2.4"
8 changes: 3 additions & 5 deletions dataworkspaces/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@

import click

from .errors import CalledProcessError, UserAbort, ConfigurationError,\
InternalError, BatchModeError
from .errors import CalledProcessError, UserAbort, ConfigurationError, InternalError, BatchModeError

from .dws import cli, is_verbose_mode


def main():
try:
cli()
Expand All @@ -34,8 +34,7 @@ def main():
if is_verbose_mode():
tb = traceback.format_exc()
click.echo(tb, err=True)
click.echo("Running in --batch mode, but user input required for %s"%
str(e), err=True)
click.echo("Running in --batch mode, but user input required for %s" % str(e), err=True)
sys.exit(1)
except InternalError as e:
tb = traceback.format_exc()
Expand All @@ -47,4 +46,3 @@ def main():
click.echo(tb, err=True)
click.echo("Aborting due to unexpected exception: %s" % repr(e), err=True)
sys.exit(1)

145 changes: 92 additions & 53 deletions dataworkspaces/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,50 +5,60 @@
from typing import Optional, NamedTuple, List, Iterable, cast, Tuple

from dataworkspaces import __version__
from dataworkspaces.workspace import find_and_load_workspace,\
LocalStateResourceMixin, SnapshotWorkspaceMixin, JSONDict
from dataworkspaces.workspace import (
find_and_load_workspace,
LocalStateResourceMixin,
SnapshotWorkspaceMixin,
JSONDict,
)
from dataworkspaces.commands.snapshot import snapshot_command
from dataworkspaces.commands.restore import restore_command
from dataworkspaces.commands.lineage import lineage_graph_command
from dataworkspaces.commands.report import _get_results
from dataworkspaces.commands.report import _get_results
from dataworkspaces.errors import ConfigurationError
import dataworkspaces.utils.lineage_utils as lu

__api_version__ = '0.2'
__api_version__ = "0.2"


def get_version():
"""Get the version string for the installed version of Data Workspaces"""
return __version__


def get_api_version():
"""The API version is maintained independently of the overall DWS version.
It should be more stable.
"""
return __api_version__



class ResourceInfo(NamedTuple):
"""Named tuple representing the
results from a call to :func:`~get_resource_info`.
"""
name : str
role : str
resource_type : str
local_path : Optional[str]

name: str
role: str
resource_type: str
local_path: Optional[str]


def get_resource_info(workspace_uri_or_path:Optional[str]=None, verbose:bool=False):
def get_resource_info(workspace_uri_or_path: Optional[str] = None, verbose: bool = False):
"""Returns a list of ResourceInfo instances, describing the resources
defined for this workspace.
"""
workspace = find_and_load_workspace(True, verbose, workspace_uri_or_path)

return [
ResourceInfo(r.name, r.role, r.resource_type,
cast(LocalStateResourceMixin, r).get_local_path_if_any()
if isinstance(r, LocalStateResourceMixin) else None)
ResourceInfo(
r.name,
r.role,
r.resource_type,
cast(LocalStateResourceMixin, r).get_local_path_if_any()
if isinstance(r, LocalStateResourceMixin)
else None,
)
for r in workspace.get_resources()
]

Expand All @@ -57,16 +67,21 @@ class SnapshotInfo(NamedTuple):
"""Named tuple represneting the results from a call
to :func:`~get_snapshot_history`
"""

snapshot_number: int
hashval : str
tags : List[str]
hashval: str
tags: List[str]
timestamp: str
message: str
metrics: Optional[JSONDict]


def take_snapshot(workspace_uri_or_path:Optional[str]=None, tag:Optional[str]=None, message:str='',
verbose:bool=False) -> str:
def take_snapshot(
workspace_uri_or_path: Optional[str] = None,
tag: Optional[str] = None,
message: str = "",
verbose: bool = False,
) -> str:
"""Take a snapshot of the workspace, using the tag and message,
if provided. Returns the snapshot hash (which can be used to restore to
this point).
Expand All @@ -75,9 +90,12 @@ def take_snapshot(workspace_uri_or_path:Optional[str]=None, tag:Optional[str]=No
return snapshot_command(workspace, tag=tag, message=message)


def get_snapshot_history(workspace_uri_or_path:Optional[str]=None,
reverse:bool=False, max_count:Optional[int]=None,
verbose:bool=False) -> Iterable[SnapshotInfo]:
def get_snapshot_history(
workspace_uri_or_path: Optional[str] = None,
reverse: bool = False,
max_count: Optional[int] = None,
verbose: bool = False,
) -> Iterable[SnapshotInfo]:
"""Get the history of snapshots, starting with the oldest first (unless :reverse: is True).
Returns a list of SnapshotInfo instances, containing the snapshot number,
hash, tag, timestamp, and message. If :max_count: is specified, returns at most that many snapshots.
Expand All @@ -87,23 +105,30 @@ def get_snapshot_history(workspace_uri_or_path:Optional[str]=None,
assert isinstance(workspace, SnapshotWorkspaceMixin)
if not reverse:
return [
SnapshotInfo(snapshot_idx+1, md.hashval, md.tags, md.timestamp,
md.message, md.metrics) for (snapshot_idx, md) in
enumerate(workspace.list_snapshots(reverse=False, max_count=max_count))
SnapshotInfo(
snapshot_idx + 1, md.hashval, md.tags, md.timestamp, md.message, md.metrics
)
for (snapshot_idx, md) in enumerate(
workspace.list_snapshots(reverse=False, max_count=max_count)
)
]
else:
last_snapshot_no = workspace.get_next_snapshot_number() - 1
return [
SnapshotInfo(last_snapshot_no-i, md.hashval, md.tags, md.timestamp,
md.message, md.metrics) for (i, md) in
enumerate(workspace.list_snapshots(reverse=True, max_count=max_count))
SnapshotInfo(
last_snapshot_no - i, md.hashval, md.tags, md.timestamp, md.message, md.metrics
)
for (i, md) in enumerate(workspace.list_snapshots(reverse=True, max_count=max_count))
]



def restore(tag_or_hash:str, workspace_uri_or_path:Optional[str]=None,
only:Optional[List[str]]=None, leave:Optional[List[str]]=None,
verbose:bool=False) -> int:
def restore(
tag_or_hash: str,
workspace_uri_or_path: Optional[str] = None,
only: Optional[List[str]] = None,
leave: Optional[List[str]] = None,
verbose: bool = False,
) -> int:
"""Restore to a previous snapshot, identified by either its hash
or its tag (if one was specified). Parameters:
Expand All @@ -115,14 +140,14 @@ def restore(tag_or_hash:str, workspace_uri_or_path:Optional[str]=None,
Returns the number of resources changed.
"""
workspace = find_and_load_workspace(True, verbose, workspace_uri_or_path)
return restore_command(workspace, tag_or_hash=tag_or_hash,
only=only, leave=leave)
return restore_command(workspace, tag_or_hash=tag_or_hash, only=only, leave=leave)



def make_lineage_table(workspace_uri_or_path:Optional[str]=None,
tag_or_hash:Optional[str]=None, verbose:bool=False) \
-> Iterable[Tuple[str, str, str, Optional[List[str]]]]:
def make_lineage_table(
workspace_uri_or_path: Optional[str] = None,
tag_or_hash: Optional[str] = None,
verbose: bool = False,
) -> Iterable[Tuple[str, str, str, Optional[List[str]]]]:
"""Make a table of the lineage for each resource.
The columns are: ref, lineage type, details, inputs
"""
Expand All @@ -131,29 +156,43 @@ def make_lineage_table(workspace_uri_or_path:Optional[str]=None,
raise ConfigurationError("Workspace %s does not support lineage" % workspace.name)
if not workspace.supports_lineage():
raise ConfigurationError("Workspace %s does not support lineage" % workspace.name)
snapshot_hash = None # type: Optional[str]
snapshot_hash = None # type: Optional[str]
if tag_or_hash is not None:
md = workspace.get_snapshot_by_tag_or_hash(tag_or_hash)
snapshot_hash = md.hashval
return lu.make_lineage_table(workspace.get_instance(), workspace.get_lineage_store(), snapshot_hash)


def make_lineage_graph(output_file:str,
workspace_uri_or_path:Optional[str]=None,
resource_name:Optional[str]=None,
tag_or_hash:Optional[str]=None,
width:int=1024, height:int=800,
verbose:bool=False) -> None:
return lu.make_lineage_table(
workspace.get_instance(), workspace.get_lineage_store(), snapshot_hash
)


def make_lineage_graph(
output_file: str,
workspace_uri_or_path: Optional[str] = None,
resource_name: Optional[str] = None,
tag_or_hash: Optional[str] = None,
width: int = 1024,
height: int = 800,
verbose: bool = False,
) -> None:
"""Write a lineage graph as an html/javascript page to the specified file.
"""
workspace = find_and_load_workspace(True, verbose, workspace_uri_or_path)
lineage_graph_command(workspace, output_file, resource_name=resource_name,
snapshot=tag_or_hash, width=width, height=height)

def get_results(workspace_uri_or_path:Optional[str]=None,
tag_or_hash:Optional[str]=None,
resource_name:Optional[str]=None,
verbose:bool=False) -> Optional[Tuple[JSONDict, str]]:
lineage_graph_command(
workspace,
output_file,
resource_name=resource_name,
snapshot=tag_or_hash,
width=width,
height=height,
)


def get_results(
workspace_uri_or_path: Optional[str] = None,
tag_or_hash: Optional[str] = None,
resource_name: Optional[str] = None,
verbose: bool = False,
) -> Optional[Tuple[JSONDict, str]]:
"""Get a results file as a parsed json dict. If no resource or snapshot
is specified, searches all the results resources for a file. If a snapshot
is specified, we look in the subdirectory where the resuls have been moved.
Expand Down

0 comments on commit cd43b02

Please sign in to comment.