Skip to content

Commit

Permalink
updates for cleaned up tensorflow API
Browse files Browse the repository at this point in the history
  • Loading branch information
jfischer committed Oct 13, 2019
1 parent 632de8b commit c81909e
Show file tree
Hide file tree
Showing 7 changed files with 283 additions and 107 deletions.
136 changes: 34 additions & 102 deletions dataworkspaces/kits/tensorflow1.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,66 +97,21 @@ def call(self, inputs):
**API**
"""
import hashlib
from typing import Optional, Union
import numpy as np
import datetime
from typing import Optional, Union, List
assert List

import tensorflow.keras.optimizers as optimizers
import tensorflow.losses as losses

from dataworkspaces.workspace import find_and_load_workspace, ResourceRef, \
ResourceRoles, JSONDict, Workspace
assert JSONDict # make pyflakes happy
from dataworkspaces.lineage import ResultsLineage
from dataworkspaces.utils.lineage_utils import LineageError, infer_step_name
from dataworkspaces.resources.api_resource import API_RESOURCE_TYPE, ApiResource
assert ApiResource # make pyflakes happy
from dataworkspaces.kits.jupyter import get_step_name_for_notebook
from dataworkspaces.workspace import find_and_load_workspace, ResourceRef
from dataworkspaces.kits.wrapper_utils import _DwsModelState, _add_to_hash


def _find_resource(workspace:Workspace, role:str,
name_or_ref:Optional[Union[str, ResourceRef]]=None) -> ResourceRef:
if isinstance(name_or_ref, str):
return workspace.map_local_path_to_resource(name_or_ref, expecting_a_code_resource=False)
elif isinstance(name_or_ref, ResourceRef):
workspace.validate_resource_name(name_or_ref.name, name_or_ref.subpath)
return name_or_ref
else:
for rname in workspace.get_resource_names():
if workspace.get_resource_role(rname)==role:
return ResourceRef(rname, subpath=None)
raise LineageError("Could not find a %s resource in your workspace" % role)


def _infer_step_name() -> str:
"""Come up with a step name by looking at whether this is a notebook
and then the command line arguments.
"""
# TODO: this should be moved to a utility module (e.g. lineage_utils)
try:
notebook_name = get_step_name_for_notebook()
if notebook_name is not None:
return notebook_name
except:
pass # not a notebook
return infer_step_name()


def _metric_val_to_json(v):
if isinstance(v, int) or isinstance(v, str):
return v
elif isinstance(v, np.int64) or isinstance(v, np.int32):
return int(v)
elif isinstance(v, np.float64) or isinstance(v, np.float32):
return float(v)
else:
return v


def add_lineage_to_keras_model_class(Cls:type,
input_resource:Optional[Union[str, ResourceRef]]=None,
results_resource:Optional[Union[str, ResourceRef]]=None):
results_resource:Optional[Union[str, ResourceRef]]=None,
verbose=False):
"""This function wraps a Keras model class with a subclass that overwrites
key methods to make calls to the data lineage API.
Expand All @@ -175,20 +130,14 @@ def add_lineage_to_keras_model_class(Cls:type,
if hasattr(Cls, '_dws_model_wrap') and Cls._dws_model_wrap is True: # type: ignore
print("%s or a superclass is already wrapped" % Cls.__name__)
return Cls # already wrapped
workspace = find_and_load_workspace(batch=True, verbose=False)
results_ref = _find_resource(workspace, ResourceRoles.RESULTS, results_resource)
workspace = find_and_load_workspace(batch=True, verbose=verbose)

class WrappedModel(Cls): # type: ignore
_dws_model_wrap = True
def __init__(self,*args,**kwargs):
super().__init__(*args, **kwargs)
print("In wrapped init")
self._dws_workspace = workspace
self._dws_results_ref = results_ref
self._dws_input_resource = input_resource
self._dws_hash_state = hashlib.sha1()
self._dws_api_resource = None # type: Optional[ApiResource]
self._dws_params = {} # type: JSONDict
print("In wrapped init") # XXX
self._dws_state = _DwsModelState(workspace, input_resource, results_resource)
def compile(self, optimizer,
loss=None,
metrics=None,
Expand All @@ -199,64 +148,47 @@ def compile(self, optimizer,
distribute=None,
**kwargs):
if isinstance(optimizer, str):
self._dws_params['optimizer'] = optimizer
self._dws_state.lineage.add_param('optimizer', optimizer)
elif isinstance(optimizer, optimizers.Optimizer):
self._dws_params['optimizer'] = optimizer.__class__.__name__
self._dws_state.lineage.add_param('optimizer', optimizer.__class__.__name__)
if isinstance(loss, str):
self._dws_params['loss_function'] = loss
self._dws_state.lineage.add_param('loss_function', loss)
elif isinstance(loss, losses.Loss):
self._dws_params['loss_function'] = loss.__class__.__name__
self._dws_state.lineage.add_param('loss_function', loss.__class__.__name__)
return super().compile(optimizer, loss, metrics, loss_weights,
sample_weight_mode, weighted_metrics,
target_tensors, distribute, **kwargs)
def fit(self, x, y, **kwargs):
print("fit: in wrap of %s" % Cls.__name__)
print("fit: in wrap of %s" % Cls.__name__) # XXX
if 'epochs' in kwargs:
self._dws_params['epochs'] = kwargs['epochs']
self._dws_state.lineage.add_param('epochs', kwargs['epochs'])
else:
self._dws_params['epochs'] = 1
self._dws_state.lineage.add_param('epochs', 1)
if 'batch_size' in kwargs:
self._dws_params['fit_batch_size'] = kwargs['batch_size']
else:
self._dws_params['fit_batch_size'] = None
if isinstance(x, np.ndarray):
input_ref = _find_resource(self._dws_workspace, ResourceRoles.SOURCE_DATA_SET,
self._dws_input_resource)
if self._dws_workspace.get_resource_type(input_ref.name)==API_RESOURCE_TYPE:
# capture the hash of the data coming in...
self._dws_api_resource = self._dws_workspace.get_resource(input_ref.name)
self._dws_hash_state.update(x.data.tobytes())
self._dws_hash_state.update(y.data.tobytes())
hashval = self._dws_hash_state.hexdigest()
self._dws_api_resource.save_current_hash(hashval)
print("captured hash of training data: %s" % hashval)
elif hasattr(x, 'resource'):
input_ref = x.resource
if self._dws_workspace.get_resource_type(input_ref.name)==API_RESOURCE_TYPE:
assert 0, "Need to implement obtaining of hash from dataset"
self._dws_state.lineage.add_param('fit_batch_size', kwargs['batch_size'])
else:
raise LineageError("No way to determine resource associated with model input. Please specify in model wrapping function or use a wapped data set.")
self._dws_lineage = ResultsLineage(_infer_step_name(), datetime.datetime.now(),
self._dws_params, [input_ref], [], self._dws_results_ref,
self._dws_workspace)
self._dws_state.lineage.add_param('fit_batch_size', None)
api_resource = self._dws_state.find_input_resources_and_return_if_api(x, y)
if api_resource is not None:
api_resource.init_hash_state()
_add_to_hash(x, api_resource.get_hash_state())
_add_to_hash(y, api_resource.get_hash_state())
return super().fit(x, y, **kwargs)
def evaluate(self, x, y, **kwargs):
if 'batch_size' in kwargs:
self._dws_params['evaluate_batch_size'] = kwargs['batch_size']
self._dws_state.lineage.add_param('evaluate_batch_size', kwargs['batch_size'])
else:
self._dws_params['evaluate_batch_size'] = None
if self._dws_api_resource is not None:
h = self._dws_hash_state.copy()
h.update(x.data.tobytes())
h.update(y.data.tobytes())
hashval = h.hexdigest()
print("hash of input data is %s" % hashval)
self._dws_api_resource.save_current_hash(hashval)
self._dws_state.lineage.add_param('evaluate_batch_size', None)
api_resource = self._dws_state.find_input_resources_and_return_if_api(x, y)
if api_resource is not None:
api_resource.dup_hash_state()
_add_to_hash(x, api_resource.get_hash_state())
_add_to_hash(y, api_resource.get_hash_state())
api_resource.save_current_hash()
api_resource.pop_hash_state()
results = super().evaluate(x, y, **kwargs)
assert len(results)==len(self.metrics_names)
metrics = {n:_metric_val_to_json(v) for (n, v) in zip(self.metrics_names, results)}
print("Metrics: %s" % repr(metrics))
self._dws_lineage.write_results(metrics)
self._dws_lineage.complete()
self._dws_state.write_metrics_and_complete({n:v for (n, v) in
zip(self.metrics_names, results)})
return results
return WrappedModel
149 changes: 149 additions & 0 deletions dataworkspaces/kits/wrapper_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
"""
Common utils for wrapping objects with the Lineage API.
"""
import datetime
from typing import Optional, Union, cast
from os.path import exists

from dataworkspaces.workspace import Workspace, ResourceRoles, ResourceRef
from dataworkspaces.utils.lineage_utils import LineageError, infer_step_name
from dataworkspaces.kits.jupyter import get_step_name_for_notebook
from dataworkspaces.lineage import ResultsLineage
from dataworkspaces.resources.api_resource import API_RESOURCE_TYPE, ApiResource

import numpy as np

try:
import pandas
except ImportError:
pandas = None


def _infer_step_name() -> str:
"""Come up with a step name by looking at whether this is a notebook
and then the command line arguments.
"""
try:
notebook_name = get_step_name_for_notebook()
if notebook_name is not None:
return notebook_name
except:
pass # not a notebook
return infer_step_name()


def _metric_scalar_to_json(v):
if isinstance(v, int) or isinstance(v, str):
return v
elif isinstance(v, np.int64) or isinstance(v, np.int32):
return int(v)
elif isinstance(v, np.float64) or isinstance(v, np.float32):
return float(v)
elif isinstance(v, datetime.datetime):
return v.isoformat()
else:
return v


def _metric_obj_to_json(v):
if isinstance(v, dict):
return {k: _metric_obj_to_json(vi) for (k,vi) in v.items()}
elif isinstance(v, list) or isinstance(v, tuple):
return [_metric_obj_to_json(vi) for vi in v]
else:
return _metric_scalar_to_json(v)


def _add_to_hash(array_data, hash_state):
if isinstance(array_data, np.ndarray):
hash_state.update(array_data.data.tobytes())
elif (pandas is not None) and isinstance(array_data, pandas.DataFrame):
for c in array_data.columns:
hash_state.update(array_data[c].to_numpy(copy=False).data.to_bytes())
elif (pandas is not None) and isinstance(array_data, pandas.Series):
hash_state.update(array_data.to_numpy(copy=False).data.to_bytes())
else:
raise Exception("Unable to hash data type %s, data was: %s"%
(type(array_data), array_data))


def _find_resource(workspace:Workspace, role:str,
name_or_ref:Optional[Union[str, ResourceRef]]=None) -> ResourceRef:
if isinstance(name_or_ref, str):
if (not name_or_ref.startswith('./')) and (not name_or_ref.startswith('/')) and \
(name_or_ref in workspace.get_resource_names()):
return ResourceRef(name_or_ref)
elif exists(name_or_ref):
return workspace.map_local_path_to_resource(name_or_ref,
expecting_a_code_resource=False)
else:
raise LineageError("Could not find a resource for '" + name_or_ref +
" in your workspace. Please create a resource"+
" using the dws add command or correct the name.")
elif isinstance(name_or_ref, ResourceRef):
workspace.validate_resource_name(name_or_ref.name, name_or_ref.subpath)
return name_or_ref
else:
# no resource specified. If we have exactly one for that role,
# we will use it
resource_for_role = None
for rname in workspace.get_resource_names():
if workspace.get_resource_role(rname)==role:
if resource_for_role is None:
resource_for_role = ResourceRef(rname, subpath=None)
else:
raise LineageError(
"There is more than one resource for role " + role +
" in your workspace. Please specify the resource you want"+
" in model wrapping function or use a wrapped data set")
if resource_for_role is not None:
return resource_for_role
else:
raise LineageError("Could not find a " + role +
" resource in your workspace. Please create a resource"+
" using the dws add command.")

class _DwsModelState:
def __init__(self, workspace:Workspace,
input_resource:Optional[Union[str, ResourceRef]]=None,
results_resource:Optional[Union[str, ResourceRef]]=None):
self.workspace = workspace
self.results_ref = _find_resource(workspace, ResourceRoles.RESULTS, results_resource)
self.default_input_resource = input_resource
self.api_resource_cache = {} # type: Dict[str,ApiResource]
self.lineage = ResultsLineage(_infer_step_name(), datetime.datetime.now(),
{}, [], [],
self.results_ref, workspace)

def find_input_resources_and_return_if_api(self, data, target_data=None) \
-> Optional[ApiResource]:
print("default_input_resource: %s, input_resources=%s" % (self.default_input_resource,
self.lineage.step.input_resources)) # XXX
if hasattr(data, 'resource'):
ref = data.resource
else:
ref = _find_resource(self.workspace, ResourceRoles.SOURCE_DATA_SET,
self.default_input_resource)
self.lineage.add_input_ref(ref)
data_resource_type = self.workspace.get_resource_type(ref.name)
if target_data is not None and hasattr(target_data, 'resource'):
target_ref = data.resource
if target_ref!=ref: # only can happen if resource is specified on data
if data_resource_type==API_RESOURCE_TYPE or \
self.workspace.get_resource_type(target_ref.name)==API_RESOURCE_TYPE:
raise LineageError("Currently, we do not support API Resources where the feature and target data are from different resources (%s and %s)."%
(ref, target_ref))
self.lineage.add_input_ref(target_ref)
if data_resource_type==API_RESOURCE_TYPE:
if ref.name not in self.api_resource_cache:
self.api_resource_cache[ref.name] = cast(ApiResource,
self.workspace.get_resource(ref.name))
return self.api_resource_cache[ref.name]
else:
return None

def write_metrics_and_complete(self, metrics):
metrics = _metric_obj_to_json(metrics)
print("Metrics: %s" % repr(metrics))
self.lineage.write_results(metrics)
self.lineage.complete()
20 changes: 18 additions & 2 deletions dataworkspaces/lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def main():
from collections import OrderedDict
import datetime
from typing import List, Union, Any, Type, Iterable, Dict, Optional, cast
from os.path import curdir, join, isabs, abspath, expanduser
from os.path import curdir, join, isabs, abspath, expanduser, exists
from argparse import ArgumentParser, Namespace
from copy import copy

Expand All @@ -122,7 +122,7 @@ def main():
PathNotAResourceError, SnapshotWorkspaceMixin,\
ResourceRoles, _find_containing_workspace
from dataworkspaces.utils.lineage_utils import \
ResourceRef, StepLineage, infer_step_name, infer_script_path
ResourceRef, StepLineage, infer_step_name, infer_script_path, LineageError



Expand Down Expand Up @@ -192,6 +192,15 @@ def __init__(self, step_name:str, start_time:datetime.datetime,
run_from_directory=run_from_directory)
self.in_progress = True

def add_input_path(self, path:str) -> None:
if not exists(path):
raise LineageError("Path %s does not exist" % path)
ref = self.workspace.map_local_path_to_resource(path) # mypy: ignore
self.step.add_input(self.workspace.get_instance(), self.store, ref) # mypy: ignore

def add_input_ref(self, ref:ResourceRef) -> None:
self.step.add_input(self.workspace.get_instance(), self.store, ref)

def add_output_path(self, path:str) -> None:
"""Resolve the path to a resource name and subpath. Add
that to the lineage as an output of the step. From this point on,
Expand All @@ -209,6 +218,13 @@ def add_output_ref(self, ref:ResourceRef):
"""
self.step.add_output(self.workspace.get_instance(), self.store, ref)

def add_param(self, name:str, value) -> None:
"""Add or update one of the step's parameters.
"""
assert self.in_progress # should only do while step running
self.step.parameters[name] = value


def abort(self):
"""The step has failed, so we mark its outputs in an unknown state.
If you create the lineage via a "with" statement, then this will be
Expand Down

0 comments on commit c81909e

Please sign in to comment.