Skip to content


Add prototype of tensorflow api; some type fixes as the type checker …
Browse files Browse the repository at this point in the history
…got stricter (and some of the typeshed annotations are wrong)
  • Loading branch information
jfischer committed Sep 27, 2019
1 parent 5158961 commit 57d1464
Show file tree
Hide file tree
Showing 9 changed files with 301 additions and 12 deletions.
9 changes: 3 additions & 6 deletions dataworkspaces/
Original file line number Diff line number Diff line change
Expand Up @@ -389,8 +389,7 @@ def restore(ctx, workspace_dir:str, only:Optional[str], leave:Optional[str], str
"""Restore the workspace to a prior state"""
ns = ctx.obj
if (only is not None) and (leave is not None):
raise click.BadOptionUsage(message="Please specify either --only or --leave, but not both",
raise click.BadOptionUsage(option_name='--only', message="Please specify either --only or --leave, but not both") # type: ignore
if workspace_dir is None:
if ns.batch:
raise BatchModeError("--workspace-dir")
Expand Down Expand Up @@ -448,8 +447,7 @@ def push(ctx, workspace_dir:str, only:Optional[str], skip:Optional[str], only_wo
option_cnt = (1 if only is not None else 0) + (1 if skip is not None else 0) + \
(1 if only_workspace else 0)
if option_cnt>1:
raise click.BadOptionUsage(message="Please specify at most one of --only, --skip, or --only-workspace",
raise click.BadOptionUsage(message="Please specify at most one of --only, --skip, or --only-workspace", option_name='--only') # type: ignore
if workspace_dir is None:
if ns.batch:
raise BatchModeError("--workspace-dir")
Expand Down Expand Up @@ -482,8 +480,7 @@ def pull(ctx, workspace_dir:str, only:Optional[str], skip:Optional[str], only_wo
option_cnt = (1 if only is not None else 0) + (1 if skip is not None else 0) + \
(1 if only_workspace else 0)
if option_cnt>1:
raise click.BadOptionUsage(message="Please specify at most one of --only, --skip, or --only-workspace",
raise click.BadOptionUsage(message="Please specify at most one of --only, --skip, or --only-workspace", option_name='--only') # type: ignore
if workspace_dir is None:
if ns.batch:
raise BatchModeError("--workspace-dir")
Expand Down
282 changes: 282 additions & 0 deletions dataworkspaces/kits/
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
"""Integration with Tensorflow 1.x
This is an experimental API and subject to change.
**Wrapping a Karas Model**
Below is an example of wrapping one of the standard tf.keras model classes,
based on
Assume we have a workspace already set up, with two resources: a *Source Data*
resource of type `api-resource`, which is used to capture the hash of
input data as it is passed to the model, and a *Results* resource to
keep the metrics. The only change we need to do to capture the lineage from
the model is to wrap the model's class, using
Here is the code::
# TensorFlow and tf.keras
import tensorflow as tf
from tensorflow import keras
from dataworkspaces.kits.tensorflow1 import add_lineage_to_keras_model_class
# Wrap our model class. This is the only DWS-specific change needed.
keras.Sequential = add_lineage_to_keras_model_class(keras.Sequential)
fashion_mnist = keras.datasets.fashion_mnist
(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()
model = keras.Sequential([
keras.layers.Flatten(input_shape=(28, 28)),
keras.layers.Dense(128, activation=tf.nn.relu),
keras.layers.Dense(10, activation=tf.nn.softmax)
metrics=['accuracy']), train_labels, epochs=5)
test_loss, test_acc = model.evaluate(test_images, test_labels)
print('Test accuracy:', test_acc)
This will create a ``results.json`` file in the results resource. It will
look like this::
"step": "test",
"start_time": "2019-09-26T11:33:22.100584",
"execution_time_seconds": 26.991521,
"parameters": {
"optimizer": "adam",
"loss_function": "sparse_categorical_crossentropy",
"epochs": 5,
"fit_batch_size": null,
"evaluate_batch_size": null
"run_description": null,
"metrics": {
"loss": 0.3657455060243607,
"acc": 0.8727999925613403
If you subclass from a Keras Model class, you can just use
:func:`~add_lineage_to-keras_model_class` as a decorator. Here is an example::
class MyModel(keras.Model):
def __init__(self):
print("In MyModel init")
#super(MyModel, self).__init__()
self.dense1 = tf.keras.layers.Dense(4, activation=tf.nn.relu)
self.dense2 = tf.keras.layers.Dense(5, activation=tf.nn.softmax)
def call(self, inputs):
print("Inputs: %s" % repr(inputs))
x1 = self.dense1(inputs)
return self.dense2(x1)
model = MyModel()
import numpy as np
metrics=['accuracy']),4)), np.ones(5), epochs=5)
test_loss, test_acc = model.evaluate(np.zeros(16).reshape(4,4), np.ones(4))
print('Test accuracy:', test_acc)
import hashlib
from typing import Optional, Union
import numpy as np
import datetime

import tensorflow.keras.optimizers as optimizers
import tensorflow.losses as losses

from dataworkspaces.workspace import find_and_load_workspace, ResourceRef, \
ResourceRoles, JSONDict, Workspace
assert JSONDict # make pyflakes happy
from dataworkspaces.lineage import ResultsLineage
from dataworkspaces.utils.lineage_utils import LineageError, infer_step_name
from dataworkspaces.resources.api_resource import API_RESOURCE_TYPE, ApiResource
assert ApiResource # make pyflakes happy
from dataworkspaces.kits.jupyter import get_step_name_for_notebook

#from dataworkspaces.utils.patch_utils import patch_method
#def add_lineage_to_model_class(Cls):
# if hasattr(Cls, '_dws_model_wrap') and Cls._dws_model_wrap is True:
# print("%s is already wrapped" % Cls.__name__)
# return Cls # already wrapped
#def make_model__init__(original_method):
# def init(self, **kwargs):
# self.workspace = find_and_load_workspace(batch=True, verbose=False)
# self.hash_state = hashlib.sha1()
# original_method(self, **kwargs)
# return init
#patch_method(keras.Model, '__init__', make_model__init__)
#def make_model_fit(original_method):
# def fit(self, x, y, **kwargs):
# self.hash_state.update(
# self.hash_state.update(
# print("captured hash of training data: %s" % self.hash_state.hexdigest())
# return original_method(self, x, y, **kwargs)
# return fit
#patch_method(keras.Model, 'fit', make_model_fit)
#def make_model_evaluate(original_method):
# def evaluate(self, x, y, **kwargs):
# h = self.hash_state.copy()
# h.update(
# h.update(
# print("hash of input data is %s" % h.hexdigest())
# results = original_method(self, x, y, **kwargs)
# assert len(results)==len(self.metrics_names)
# metrics = {n:v for (n, v) in zip(self.metrics_names, results)}
# print("Metrics: %s" % metrics)
# return results
# return evaluate
#patch_method(keras.Model, 'evaluate', make_model_evaluate)

def _find_resource(workspace:Workspace, role:str,
name_or_ref:Optional[Union[str, ResourceRef]]=None) -> ResourceRef:
if isinstance(name_or_ref, str):
return workspace.map_local_path_to_resource(name_or_ref, expecting_a_code_resource=False)
elif isinstance(name_or_ref, ResourceRef):
workspace.validate_resource_name(, name_or_ref.subpath)
return name_or_ref
for rname in workspace.get_resource_names():
if workspace.get_resource_role(rname)==role:
return ResourceRef(rname, subpath=None)
raise LineageError("Could not find a %s resource in your workspace" % role)

def _infer_step_name() -> str:
"""Come up with a step name by looking at whether this is a notebook
and then the command line arguments.
# TODO: this should be moved to a utility module (e.g. lineage_utils)
notebook_name = get_step_name_for_notebook()
if notebook_name is not None:
return notebook_name
pass # not a notebook
return infer_step_name()

def _metric_val_to_json(v):
if isinstance(v, int) or isinstance(v, str):
return v
elif isinstance(v, np.int64) or isinstance(v, np.int32):
return int(v)
elif isinstance(v, np.float64) or isinstance(v, np.float32):
return float(v)
return v

def add_lineage_to_keras_model_class(Cls:type,
input_resource:Optional[Union[str, ResourceRef]]=None,
results_resource:Optional[Union[str, ResourceRef]]=None):
"""This function wraps a Keras model class with a subclass that overwrites
key methods to make calls to the data lineage API.
if hasattr(Cls, '_dws_model_wrap') and Cls._dws_model_wrap is True: # type: ignore
print("%s or a superclass is already wrapped" % Cls.__name__)
return Cls # already wrapped
workspace = find_and_load_workspace(batch=True, verbose=False)
results_ref = _find_resource(workspace, ResourceRoles.RESULTS, results_resource)

class WrappedModel(Cls): # type: ignore
_dws_model_wrap = True
def __init__(self,*args,**kwargs):
super().__init__(*args, **kwargs)
print("In wrapped init")
self._dws_workspace = workspace
self._dws_results_ref = results_ref
self._dws_input_resource = input_resource
self._dws_hash_state = hashlib.sha1()
self._dws_api_resource = None # type: Optional[ApiResource]
self._dws_params = {} # type: JSONDict
def compile(self, optimizer,
if isinstance(optimizer, str):
self._dws_params['optimizer'] = optimizer
elif isinstance(optimizer, optimizers.Optimizer):
self._dws_params['optimizer'] = optimizer.__class__.__name__
if isinstance(loss, str):
self._dws_params['loss_function'] = loss
elif isinstance(loss, losses.Loss):
self._dws_params['loss_function'] = loss.__class__.__name__
return super().compile(optimizer, loss, metrics, loss_weights,
sample_weight_mode, weighted_metrics,
target_tensors, distribute, **kwargs)
def fit(self, x, y, **kwargs):
print("fit: in wrap of %s" % Cls.__name__)
if 'epochs' in kwargs:
self._dws_params['epochs'] = kwargs['epochs']
self._dws_params['epochs'] = 1
if 'batch_size' in kwargs:
self._dws_params['fit_batch_size'] = kwargs['batch_size']
self._dws_params['fit_batch_size'] = None
if isinstance(x, np.ndarray):
input_ref = _find_resource(self._dws_workspace, ResourceRoles.SOURCE_DATA_SET,
if self._dws_workspace.get_resource_type(
# capture the hash of the data coming in...
self._dws_api_resource = self._dws_workspace.get_resource(
hashval = self._dws_hash_state.hexdigest()
print("captured hash of training data: %s" % hashval)
elif hasattr(x, 'resource'):
input_ref = x.resource
if self._dws_workspace.get_resource_type(
assert 0, "Need to implement obtaining of hash from dataset"
raise LineageError("No way to determine resource associated with model input. Please specify in model wrapping function or use a wapped data set.")
self._dws_lineage = ResultsLineage(_infer_step_name(),,
self._dws_params, [input_ref], [], self._dws_results_ref,
return super().fit(x, y, **kwargs)
def evaluate(self, x, y, **kwargs):
if 'batch_size' in kwargs:
self._dws_params['evaluate_batch_size'] = kwargs['batch_size']
self._dws_params['evaluate_batch_size'] = None
if self._dws_api_resource is not None:
h = self._dws_hash_state.copy()
hashval = h.hexdigest()
print("hash of input data is %s" % hashval)
results = super().evaluate(x, y, **kwargs)
assert len(results)==len(self.metrics_names)
metrics = {n:_metric_val_to_json(v) for (n, v) in zip(self.metrics_names, results)}
print("Metrics: %s" % repr(metrics))
return results
return WrappedModel
7 changes: 5 additions & 2 deletions dataworkspaces/
Original file line number Diff line number Diff line change
Expand Up @@ -271,14 +271,17 @@ def __init__(self, step_name:str, start_time:datetime.datetime,
inputs:List[Union[str, ResourceRef]],
code:List[Union[str, ResourceRef]],
results_dir_or_ref:Union[str, ResourceRef],
super().__init__(step_name, start_time, parameters,
inputs, code, workspace, command_line, current_directory)
self.results_ref = self.workspace.map_local_path_to_resource(results_dir)
if isinstance(results_dir_or_ref, str):
self.results_ref = self.workspace.map_local_path_to_resource(results_dir_or_ref)
self.results_ref = cast(ResourceRef, results_dir_or_ref)
self.results_resource = self.workspace.get_resource(
self.run_description = run_description
Expand Down
1 change: 1 addition & 0 deletions dataworkspaces/resources/
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import hashlib

from typing import Dict, Optional, List, Tuple, Iterable, Callable
assert Dict

from dataworkspaces.utils.subprocess_utils import call_subprocess
from dataworkspaces.utils.git_utils import GIT_EXE_PATH
Expand Down
2 changes: 2 additions & 0 deletions dataworkspaces/utils/
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import socket
from typing import Dict, Callable, Any, Optional
assert Dict
assert Callable

from dataworkspaces.utils.snapshot_utils import \
Expand Down
2 changes: 1 addition & 1 deletion dataworkspaces/utils/
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def repl(tvar_mo):

def move_file_and_set_readonly(src:str, dest:str)->None:
os.rename(src, dest)
mode = os.stat(dest)[stat.ST_MODE]
mode = os.stat(dest).st_mode
os.chmod(dest, mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH)

DOT_GIT_RE = re.compile('^'+re.escape('.git')+'$')
Expand Down
2 changes: 1 addition & 1 deletion docs/
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@

autodoc_mock_imports=['click', 'tensorflow']

# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
Expand Down
6 changes: 5 additions & 1 deletion docs/kits.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,9 @@ Scikit-learn
:members: load_dataset_from_resource,train_and_predict_with_cv,Metrics,BinaryClassificationMetrics,MulticlassClassificationMetrics

TensorFlow 1.x
.. automodule:: dataworkspaces.kits.tensorflow1
:members: add_lineage_to_keras_model_class

2 changes: 1 addition & 1 deletion tests/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ help:

UNIT_TESTS=test_git_utils test_file_utils test_move_results test_snapshots test_push_pull test_local_files_resource test_hashtree test_lineage_utils test_git_fat_integration test_lineage test_jupyter_kit test_api

test: clean mypy pyflakes
./ --batch
Expand Down

0 comments on commit 57d1464

Please sign in to comment.