Skip to content

Commit

Permalink
improved version of lineage graph
Browse files Browse the repository at this point in the history
  • Loading branch information
jfischer committed Sep 16, 2019
1 parent 8be8bdf commit 56313fb
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 57 deletions.
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
include dataworkspaces/third_party/git-fat
dataworkspaces/third_party/lineage_graph_template.html
13 changes: 8 additions & 5 deletions dataworkspaces/commands/lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@

from dataworkspaces.workspace import Workspace, SnapshotWorkspaceMixin, ResourceRoles
from dataworkspaces.errors import ConfigurationError
from dataworkspaces.utils.lineage_utils import make_lineage_graph_for_resource
from dataworkspaces.utils.lineage_utils import make_simplified_lineage_graph_for_resource


def lineage_graph_command(workspace:Workspace,
output_file:str,
resource_name:Optional[str],
snapshot:Optional[str],
output_file:str) -> None:
width:int=1024,
height:int=800) -> None:
if not isinstance(workspace, SnapshotWorkspaceMixin):
raise ConfigurationError("Workspace %s does not support snapshots and lineage"%
workspace.name)
Expand All @@ -33,9 +35,10 @@ def lineage_graph_command(workspace:Workspace,
break
if resource_name is None:
raise ConfigurationError("Did not find a results resource in workspace. If you want to graph the lineage of a non-results resource, use the --resource option.")
make_lineage_graph_for_resource(workspace.get_instance(),
store, resource_name, output_file,
snapshot_hash=snapshot_hash)
make_simplified_lineage_graph_for_resource(workspace.get_instance(),
store, resource_name, output_file,
snapshot_hash=snapshot_hash,
width=width, height=height)
if snapshot is None:
click.echo("Wrote lineage for %s to %s" % (resource_name, output_file))
else:
Expand Down
9 changes: 7 additions & 2 deletions dataworkspaces/dws.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,15 +583,20 @@ def lineage(ctx, workspace_dir):
help="name of the resource to graph the lineage for (default to the first results resource)")
@click.option('--snapshot', type=str, default=None,
help="Snapshot hash or tag to use for lineage. If not specified, use current lineage.")
@click.option('--width', type=int, default=1024,
help="Width of graph in pixels (defaults to 1024)")
@click.option('--height', type=int, default=800,
help="Height of graph in pixels (defaults to 800)")
@click.argument('output_file',
type=click.Path(exists=False, file_okay=True, dir_okay=False,
readable=True, writable=True, resolve_path=True))
@click.pass_context
def graph(ctx, resource:Optional[str], snapshot:Optional[str], output_file:str):
def graph(ctx, resource:Optional[str], snapshot:Optional[str], width:int,
height:int, output_file:str):
"""Graph the lineage of a resource, writing the graph to an HTML file. Subcommand of ``lineage``"""
ns = ctx.obj
workspace = find_and_load_workspace(ns.batch, ns.verbose, ns.workspace_dir)
lineage_graph_command(workspace, resource, snapshot, output_file)
lineage_graph_command(workspace, output_file, resource, snapshot, width, height)

lineage.add_command(graph)

Expand Down
104 changes: 54 additions & 50 deletions dataworkspaces/utils/lineage_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1476,58 +1476,62 @@ def lineage_to_names(lineage):
WIDTH=str(width),
HEIGHT=str(height)))

def make_lineage_graph_for_resource(instance:str, store:LineageStore,
resource_name:str, output_file:str,
snapshot_hash:Optional[str],
width=1024, height=800) -> None:
def make_simplified_lineage_graph_for_resource(instance:str, store:LineageStore,
resource_name:str, output_file:str,
snapshot_hash:Optional[str],
width=1024, height=800) -> None:
nodes = [] # type: List[Dict[str, Any]]
links = [] # type: List[Dict[str, Any]]
def ref_name(ref):
return ref.name if ref.subpath is None else ref.name + ":/" + ref.subpath
def cert_name(cert):
return ref_name(cert.ref) + ":" + \
(cert.hashval if isinstance(cert, HashCertificate) \
else "version=%d"%cast(PlaceholderCertificate, cert).version)
def cert_short_name(cert):
return cert.hashval if isinstance(cert, HashCertificate) \
else "placeholder version=%d" % cast(PlaceholderCertificate, cert).version
def lineage_to_names(lineage):
if isinstance(lineage, StepLineage):
sname = "%s at %s" % (lineage.step_name, lineage.start_time)
return (sname, sname)
elif isinstance(lineage, SourceDataLineage):
return (cert_short_name(lineage.cert), cert_name(lineage.cert))
elif isinstance(lineage, CodeLineage):
return (cert_short_name(lineage.cert), cert_name(lineage.cert))
class LineageNodes:
return cert.hashval[0:7] if isinstance(cert, HashCertificate) \
else "placeholder=%d" % cast(PlaceholderCertificate, cert).version
def step_lineage_to_name(lineage):
assert isinstance(lineage, StepLineage)
return "%s at %s" % (lineage.step_name, str(lineage.start_time)[0:16])
class CertNodes:
def __init__(self):
self.next_node_id = 1
self.lineage_nodes = {} # type: Dict[str, int]
def get_lineage_node(self, ref:ResourceRef) -> \
Tuple[bool, int, ResourceLineage]:
if snapshot_hash is not None:
lineage = store.retrieve_entry_as_of_snapshot(instance, ref,
snapshot_hash)
else:
lineage = store.retrieve_entry(instance, ref)
(short_name, long_name) = lineage_to_names(lineage)
if long_name in self.lineage_nodes:
return (False, self.lineage_nodes[long_name], lineage)
self.cert_nodes = {} # type: Dict[Certificate, int]
def get_cert_node(self, cert:Certificate) -> Tuple[int, bool]:
if cert in self.cert_nodes:
return (self.cert_nodes[cert], False)
else:
node_id = self.next_node_id
l_node = {
"name":short_name,
"label":'Step' if isinstance(lineage, StepLineage)
else ('SourceData' if isinstance(lineage, SourceDataLineage)
else 'Code'),
"id":node_id
}
nodes.append(l_node)
self.lineage_nodes[long_name] = node_id
nodes.append({
'name':ref_name(cert.ref),
'label':cert_short_name(cert),
'id':node_id
})
self.cert_nodes[cert] = node_id
self.next_node_id += 1
return (True, node_id, lineage)

ln = LineageNodes()
return (node_id, True)
def get_cert_and_lineage(ref:ResourceRef) -> \
Tuple[Certificate, ResourceLineage]:
if snapshot_hash is not None:
lineage = store.retrieve_entry_as_of_snapshot(instance, ref,
snapshot_hash)
else:
lineage = store.retrieve_entry(instance, ref)
cert = lineage.get_cert_for_ref(ref)
assert cert is not None
return (cert, lineage)
def cert_in_lineage(cert:Certificate) -> bool:
if snapshot_hash is not None:
lineage = store.retrieve_entry_as_of_snapshot(instance, cert.ref,
snapshot_hash)
else:
lineage = store.retrieve_entry(instance, cert.ref)
other_cert = lineage.get_cert_for_ref(cert.ref)
if other_cert==cert:
return True
else:
print("Warning: Certificate %s not found in store, was overwritten by %s"%
(cert, other_cert))
return False

cn = CertNodes()
if snapshot_hash is not None:
worklist = [ref for ref in
store.get_refs_for_resource_as_of_snapshot(instance,
Expand All @@ -1541,16 +1545,16 @@ def get_lineage_node(self, ref:ResourceRef) -> \
while len(worklist) > 0:
next_worklist = []
for ref in worklist:
(is_new, node_id, lineage) = ln.get_lineage_node(ref)
if isinstance(lineage, StepLineage):
for cert in lineage.get_input_certs():
(input_is_new, input_node_id, input_lineage) = \
ln.get_lineage_node(cert.ref)
(cert, lineage) = get_cert_and_lineage(ref)
(node_id, is_new) = cn.get_cert_node(cert)
if lineage is not None and isinstance(lineage, StepLineage):
for input_cert in lineage.get_input_certs():
(input_node_id, input_is_new) = cn.get_cert_node(input_cert)
links.append({'source':input_node_id,
'target':node_id,
'type':ref_name(ref)})
if input_is_new:
next_worklist.append(cert.ref)
'type':step_lineage_to_name(lineage)})
if input_is_new and cert_in_lineage(input_cert):
next_worklist.append(input_cert.ref)
worklist = next_worklist
if not exists(GRAPH_TEMPLATE_FILE):
raise InternalError("Could not find lineage graph template")
Expand Down

0 comments on commit 56313fb

Please sign in to comment.