Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add customized data loader #2923

Merged
merged 5 commits into from May 30, 2021
Merged

add customized data loader #2923

merged 5 commits into from May 30, 2021

Conversation

irasit
Copy link
Collaborator

@irasit irasit commented May 20, 2021

Signed-off-by: Peng Zhang pengz@uber.com

Checklist before submitting

  • Did you read the contributor guide?
  • Did you update the docs?
  • Did you write any tests to validate this change?
  • Did you update the CHANGELOG, if this change affects users?

Description

To fix the deadlock issue from uneven data in model check pointing and early stopping callback

  • Make data loader’s num_epoch confinable, default as infinite loop
  • Make train_steps_per_epoch configurable, default as “row_count/batch_size/hvd.size()”
  • Make data loader configurable in estimator, default as new async data loader.
  • Add common data loaders interface, and a mixin class to load and produce batches asynchronously.
    Mainly used to detach data loader with trainer. It can be used by Ray data loader in future.
  • Enable model checkpoint call back and early stopping call back test cases.

Review process to land

  1. All tests and other checks must succeed.
  2. At least one member of the technical steering committee must review and approve.
  3. If any member of the technical steering committee requests changes, they must be addressed.

@github-actions
Copy link

github-actions bot commented May 20, 2021

Unit Test Results

     792 files  ±0       792 suites  ±0   6h 3m 41s ⏱️ ±0s
     600 tests ±0       564 ✔️ ±0       35 💤 ±0  1 ❌ ±0 
16 473 runs  ±0  12 430 ✔️ ±0  4 042 💤 ±0  1 ❌ ±0 

For more details on these failures, see this check.

Results for commit 52d0b27. ± Comparison against base commit 52d0b27.

♻️ This comment has been updated with latest results.

@irasit irasit force-pushed the pl_model_checkpoint branch 4 times, most recently from 6f0ba55 to 36350e2 Compare May 24, 2021 17:08
Copy link
Collaborator

@chongxiaoc chongxiaoc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall it looks good to me. I leave a few comments open to discuss.

horovod/spark/common/data_loader.py Outdated Show resolved Hide resolved
horovod/spark/common/data_loader.py Outdated Show resolved Hide resolved
horovod/spark/lightning/estimator.py Outdated Show resolved Hide resolved
horovod/spark/lightning/remote.py Outdated Show resolved Hide resolved
horovod/spark/common/data_loader.py Outdated Show resolved Hide resolved
horovod/spark/common/data_loader.py Outdated Show resolved Hide resolved
@chongxiaoc
Copy link
Collaborator

Also, I think we should add in README or somewhere in docs, introducing the basic class of pytorch dataloader. In that sense, users know how to inherit and implement a dataloader on their own.
For example,

import horovod.spark.common.basePytorchDataLoader  as BaseDataloader

class MyDataLoader(BaseDataloader)
#  customize below

Signed-off-by: Peng Zhang <pengz@uber.com>
Signed-off-by: Peng Zhang <pengz@uber.com>
Signed-off-by: Peng Zhang <pengz@uber.com>
Signed-off-by: Peng Zhang <pengz@uber.com>
self.reader.reset()

# Re-create the data loader for each iterate. There maybe some left over data
# from last epoch which will cause petastorm's BatchedDataLoader fail to reset.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment means this dataloader is expected to fail in some corner cases?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your change guarantees to do reset() when last row is consumed, leaving a comment here is very confusing. How about just saying "need to reset reader() once last row is consumed"?

self.reader.reset()

# Re-create the data loader for each iterate. There maybe some left over data
# from last epoch which will cause petastorm's BatchedDataLoader fail to reset.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as discussion above for this comment.

make_petastorm_reader = _make_petastorm_reader_fn(transformation, schema_fields,
batch_size, calculate_shuffle_buffer_size,
dataloader_cls)
data_loader_cls, loader_num_epochs)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a quick search here, the make_petastorm_reader is not used anymore?
This function actually set dataloader as attribute of lightning model.
If I understand correctly, we should rename this function.

Copy link
Collaborator

@chongxiaoc chongxiaoc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leave a few comments for 2nd round.
Thanks for refactoring the code, looks better.

and add mixin?

@irasit irasit force-pushed the pl_model_checkpoint branch 5 times, most recently from 6d2503c to e3986c3 Compare May 28, 2021 05:47
@thuningxu
Copy link
Collaborator

Like it! Looks good to me but also want @tgaddair to take a look.

@irasit irasit force-pushed the pl_model_checkpoint branch 2 times, most recently from 981a777 to d3164ad Compare May 28, 2021 08:05
class PytorchAsyncDataLoader(AsyncDataLoaderMixin, PytorchDataLoader):
"""

def __init__(self, async_loader_queue_size=64, *args, **kwargs):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: I remember we discussed offline but I forgot, why *args, **kwargs are needed for constructor?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In some cases you may have multiple-inheritance, in which case one of the other superclasses could need initialization as well, so I think this is good practice. (see https://www.educative.io/edpresso/what-is-mro-in-python).


if self.async_loader_queue_size > 0:
self.finished_event = Event()
self.queue = Queue(self.async_loader_queue_size)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the option be provided to use a Process instead of a Thread here? Did you try both in your tests? I suppose if most of the work is on I/O it should be okay to use a thread.

from threading import Thread, Event


class BaseDataLoader(object):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this stuff really Spark specific? Maybe we can move this into a new horovod.data module. What do you think?

docs/spark.rst Outdated
@@ -96,6 +96,8 @@ logging (for Tensorboard) using the Estimator ``Store`` abstraction. Stores are
artifacts including intermediate representations of the training data. Horovod natively supports stores for HDFS
and local filesystems.

Petastorm based data loader is used by default, but user can define a custom data loader by override the `base_data_loader` interface.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: link Petastorm to the GitHub page.

docs/pytorch.rst Outdated
@@ -139,3 +139,5 @@ Start the training job and specify the number of workers on the command line as
You can find an example of use pytorch lightning trainer with horovod backend in `pytorch_lightning_mnist.py script <../examples/pytorch/pytorch_lightning_mnist.py>`__

See the PyTorch Lightning `docs <https://pytorch-lightning.readthedocs.io/en/stable/multi_gpu.html#horovod>`_ for more details.

A pytorch-lightning based spark estimator trainer is also added example is in `pytorch_lightning_spark_mnist.py <../examples/spark/pytorch/pytorch_lightning_spark_mnist.py>`__
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: capitalize PyTorch Lightning.

data_loader_class = Param(Params._dummy(), 'data_loader_class',
'Name of the dataloader class.')

loader_num_epochs = Param(Params._dummy(), 'loader_num_epochs',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would the user want to set this? It seems we would always want to set it to None and let the trainer decide when stop reading.

typeConverter=TypeConverters.toInt)
typeConverter=TypeConverters.toInt)

data_loader_class = Param(Params._dummy(), 'data_loader_class',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a name (string) or is it a class / function? Seems it would be easier to pass around a class directly.

Copy link
Collaborator

@tgaddair tgaddair left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, just a few minor things.

Signed-off-by: Peng Zhang <pengz@uber.com>
@irasit irasit merged commit 52d0b27 into master May 30, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants