-
Notifications
You must be signed in to change notification settings - Fork 348
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DET-2252, DET-2858, DET-2861] Support Data Layer #6
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
What if a user defined their own DataRef? We seem to be missing the abstractions we would need to support that case.
-
We have local caches with GCS and S3 storages that seem super opaque to me, and no mechanism for garbage collecting them. I'm ok with not GC'ing the S3 bucket, or even the LFS directory, since those are very clearly passed in by the user, but I think this intermediate cache thing might be a real problem.
-
This is a lot of new code, and it looks pretty good, and comes with tests and examples and everything. Good work.
harness/determined/_train_context.py
Outdated
@@ -121,6 +134,12 @@ def get_hparam(self, name: str) -> Any: | |||
) | |||
return self.env.hparams[name] | |||
|
|||
def get_train_cacheable(self) -> data_layer.CacheableDecorator: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should get a second opinion on these names. I know why they are named the way they are, and even to me I think this name is not particularly clear.
But I'm not exactly overwhelmed by great ideas for alternatives...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about self.context.data_layer.train_dataset_decorator
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding this to the agenda for monday's meeting.
if isinstance(ds, tf.data.Dataset): | ||
ds = ds.repeat() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does estimator support non-tf.data datasets?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It support tuples. We do currently support it and one our unit tests actually used to do exactly this.
Note: In our current setting we only support as creating an iterator
from a tf.data.Dataset
object since we require wrap_dataset()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand, it seems like these two statements are in conflict:
We do currently support it
Note: In our current setting we only support as creating an iterator from a tf.data.Dataset object since we require wrap_dataset().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our current wrap_dataset
expects to receive a tf.data.Dataset
otherwise it craps out. Otherwise we support any input. This means that users can pass a tf.data.Dataset
into wrap_dataset
and then pass a tf.data.dataset
or tf.data.iterator
as their output. Theoretically they could also pass a tf.data.dataset
into the wrapper and pass any estimator supported input in to us.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I guess this is fine. It seems like we are setting ourselves up for a very confused user by silently allowing the passing of iterators here, since we can't repeat() on an iterator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed but will punt on this issue for now.
harness/determined/data_layer.py
Outdated
if configured_storage_path: | ||
storage_path = pathlib.Path(cast(str, configured_storage_path)) | ||
else: | ||
storage_path = pathlib.Path("/data/determined/") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better use tempfile
here because sometimes we don't have access to /data
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep switched over to using the home directory
harness/determined/_train_context.py
Outdated
@@ -121,6 +134,12 @@ def get_hparam(self, name: str) -> Any: | |||
) | |||
return self.env.hparams[name] | |||
|
|||
def get_train_cacheable(self) -> data_layer.CacheableDecorator: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about self.context.data_layer.train_dataset_decorator
?
|
||
def wrap(make_dataset_fn: Callable) -> Callable: | ||
def _decorated_fn(*args: Any, **kwargs: Any) -> Any: | ||
@self._storage.cacheable( # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems it's unnecessary to create a function make_dataset
just to use this decorator. Can we add one function that accepts a dataset as an argument rather than only supporting the decorator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean for the user facing call of @cacheable
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with accepting a dataset as a parameter is that the dataset will already have been built, which means you are potentially throwing away a lot of time savings. If you accept a function that creates a dataset, then you don't have to do anything inside of the function when you know you have already cached the output once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rb-determined-ai I see. Probably we should document it in cacheable
to explain why we only support decorators and what users should put in make_dataset()
to make the most use of the data layer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Documentation is coming!
from determined_common import check | ||
|
||
|
||
class _InputManager(metaclass=abc.ABCMeta): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add more context to the docstring to explain when this class is used and why do we need it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep good call.
harness/determined/_train_context.py
Outdated
@@ -121,6 +134,12 @@ def get_hparam(self, name: str) -> Any: | |||
) | |||
return self.env.hparams[name] | |||
|
|||
def get_train_cacheable(self) -> data_layer.CacheableDecorator: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding this to the agenda for monday's meeting.
if isinstance(ds, tf.data.Dataset): | ||
ds = ds.repeat() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It support tuples. We do currently support it and one our unit tests actually used to do exactly this.
Note: In our current setting we only support as creating an iterator
from a tf.data.Dataset
object since we require wrap_dataset()
.
from determined_common import check | ||
|
||
|
||
class _InputManager(metaclass=abc.ABCMeta): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep good call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably the first thing we should do is reconsider our @Cacheable API based on the feedback we got in the sync yesterday.
|
||
def wrap(make_dataset_fn: Callable) -> Callable: | ||
def _decorated_fn(*args: Any, **kwargs: Any) -> Any: | ||
@self._storage.cacheable( # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with accepting a dataset as a parameter is that the dataset will already have been built, which means you are potentially throwing away a lot of time savings. If you accept a function that creates a dataset, then you don't have to do anything inside of the function when you know you have already cached the output once.
if isinstance(ds, tf.data.Dataset): | ||
ds = ds.repeat() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand, it seems like these two statements are in conflict:
We do currently support it
Note: In our current setting we only support as creating an iterator from a tf.data.Dataset object since we require wrap_dataset().
) | ||
|
||
|
||
@pytest.mark.integ3 # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be integ4
? It seems like therest of the parameterize("tf2",...)
tests are integ4
, so maybe there's some efficiency in putting them on the same machine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
didn't want to overload a branch but sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lol it was not a rhetorical question, I really don't know. So sure.
"github.com/determined-ai/determined/master/pkg/union" | ||
) | ||
|
||
// DataLayerConfig configures data layer storage. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't remember if we have discussed this. Can we merge this configuration with the checkpoint storage configuration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO it's better to leave them separate for now, since data_layer
is still experimental and we are not sure if the final version of this will be compatible for the checkpoint_stroage
config.
return [TFKerasTensorBoard(update_freq="batch", profile_batch=0, histogram_freq=1)] | ||
|
||
def build_training_data_loader(self) -> tf.data.Dataset: | ||
@self.context.experimental.cache_train_dataset("mnist-tf-keras", "v1", shuffle=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it recommended that this decorated function should be put specifically inside a trial class?
@@ -39,3 +39,6 @@ def slots_per_trial(self) -> int: | |||
|
|||
def experiment_seed(self) -> int: | |||
return int(self.get("reproducibility", {}).get("experiment_seed", 0)) | |||
|
|||
def get_data_layer_type(self) -> str: | |||
return cast(str, self["data_layer"]["type"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be better to use str(self.get("data_layer", {}).get("type",""))
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be always set.
harness/determined/data_layer.py
Outdated
from determined import horovod, workload | ||
from determined_common import check | ||
|
||
tensorflow_dataset_type = "tf.data.Dataset" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these strings of type names used somewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah good catch, they were being used but not anymore
harness/determined/data_layer.py
Outdated
storage_config = storage.LFSConfigurations(storage_dir_path=str(local_cache_path)) | ||
self._storage = storage.LFSStorage(storage_config, tensorflow_config=session_config) | ||
|
||
elif data_layer_type == StorageTypes.SHARED_FS.value: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be S3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
YESSS!
config = conf.set_max_steps(config, 2) | ||
config = conf.set_slots_per_trial(config, 8) | ||
config = conf.set_tf2_image(config) if tf2 else conf.set_tf1_image(config) | ||
if storage_type == "lfs": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this test cover lfs
and s3
while the below test only cover lfs
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to avoid running too many CI tests.
) -> Callable[[], Tuple[tf.Tensor, tf.Tensor]]: | ||
def _input_fn() -> Tuple[tf.Tensor, tf.Tensor]: | ||
data, labels = xor_data() | ||
dataset = tf.data.Dataset.from_tensor_slices((data, labels)) | ||
dataset = context.wrap_dataset(dataset) | ||
if shuffle: | ||
dataset = dataset.shuffle(1000) | ||
|
||
def map_dataset(x, y): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems this function can be replaced with lambda
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
im lazy :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes our codebase 10 lines cleaner!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ran some tests, found some bugs, they got fixed, and I think this is good to go! Nice work!
self, | ||
env: det.EnvContext, | ||
hvd_config: horovod.HorovodContext, | ||
train_context: Union[NativeContext, TrialContext], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes _DataLayerContext
depends on a TrainContext
. If it is a part of TFKerasContext
, it shouldn't depend on a TrainContext
. I think we should move calculating per_slot_batch_size to EnvContext and access that directly. I made the above change in #167. @rb-determined-ai what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shiyuann To get the fixes in this PR into this release I want to land this as is. Happy to discuss further about this outside the scope of this PR. I am not deadset on this being the right way to do this. I think once we have a base Experimental
class, it could clear some of this up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's outside the scope of this PR. Introducing _DataLayerContext
along with some weird workaround rather than a simple clean refactor seems wrong to me and confusing to other developers. And given this refactor is very simple (just very few lines of change) I don't see any reasons that would postpone this PR to land. Basically, what you need to do is to move TrainContext._calculate_batch_sizes
to EnvContext
and use env.per_slot_batch_size
in DataLayerContext
. This won't even take more than 10 mins to write.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not saying it is outside the scope of this PR, I am sayin I want to postpone fixing this to outside this PR because this needs to land this asap (and not have to re-run CI).
I do disagree that it is a simple refactor because I am not convinced that passing in the batch size is the best way forward. I need to think about the best way to do this and don't have time to do that today. I filed: https://determinedai.atlassian.net/browse/DET-2884 to track this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the confusion. It's not passing in the batch size. It's making per_slot_batch_size
part of EnvContext
and passing in EnvContext
. See code here: https://github.com/determined-ai/determined/blob/69c79eb8468b9ba2b3c3c279d98e4337f5308237/harness/determined/_env_context.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think you are right... this is pretty messy.
Do you think that this is functionally broken? If it is only messy then we can fix it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Talked to @shiyuann offline, I will clean this up after his PR with the above changes lands.
self, | ||
env: det.EnvContext, | ||
hvd_config: horovod.HorovodContext, | ||
train_context: Union[det.NativeContext, det.TrialContext], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
self, | ||
env: det.EnvContext, | ||
hvd_config: horovod.HorovodContext, | ||
train_context: Union[det.NativeContext, det.TrialContext], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
self, | ||
env: det.EnvContext, | ||
hvd_config: horovod.HorovodContext, | ||
train_context: Union[det.NativeContext, det.TrialContext], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
self, | ||
env: det.EnvContext, | ||
hvd_config: horovod.HorovodContext, | ||
train_context: Union[det.NativeContext, det.TrialContext], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
class _TrainingDataLayerTFDatasetManager(_TrainingInputManager): | ||
def __init__( | ||
self, | ||
context: Union[keras.TFKerasTrialContext, keras.TFKerasNativeContext], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keras.TFKerasContext is fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, good call
return None | ||
|
||
|
||
class _TrainingSequenceAdapterManager(_TrainingInputManager): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SequenceAdapter
might be merged with this class. Nowhere else uses SequenceAdapter
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am hesitant to do this since they serve quite different purposes, and SequenceAdapter
is user facing class, while this is not.
|
||
tf.compat.v1.summary.tensor_summary("features", features) | ||
tf.compat.v1.summary.tensor_summary("labels", labels) | ||
def map_dataset(x, y): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean replace it with lambda x, y: ({"input": x}, y)
. This won't even take more than 1 min.
|
||
tf.compat.v1.summary.tensor_summary("features", features) | ||
tf.compat.v1.summary.tensor_summary("labels", labels) | ||
def map_dataset(x, y): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean replace it with lambda x, y: ({"input": x}, y)
.
This PR is blocked by: determined-ai/yogadl#14