From 210fbfd58019e4dddbea84fed090102fad6faeb4 Mon Sep 17 00:00:00 2001 From: Peng Zhang Date: Sun, 24 Oct 2021 01:28:03 -0700 Subject: [PATCH 01/10] data_loader Signed-off-by: Peng Zhang --- horovod/data/data_loader_base.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/horovod/data/data_loader_base.py b/horovod/data/data_loader_base.py index 6fd0f5f255..2887eb6882 100644 --- a/horovod/data/data_loader_base.py +++ b/horovod/data/data_loader_base.py @@ -92,12 +92,12 @@ def _async_worker(self): User need to implement self._iterate() to read the data. """ try: - while not self.finished_event.is_set(): - for batch in self._iterate(): - if self.finished_event.is_set(): - break - self.queue.put(batch) - self.queue.put(None) + # Only need to iterate once because data loader will be re-created in each epoch. + for batch in self._iterate(): + if self.finished_event.is_set(): + break + self.queue.put(batch) + self.queue.put(None) except Exception as ex: self.queue.put(ex) self.queue.put(None) From 9eb489b03ca488611659dee4b26f68826e739561 Mon Sep 17 00:00:00 2001 From: Peng Zhang Date: Sun, 24 Oct 2021 13:33:59 -0700 Subject: [PATCH 02/10] limit data queue size Signed-off-by: Peng Zhang --- horovod/data/data_loader_base.py | 17 ++++++++++------- horovod/spark/lightning/datamodule.py | 6 ++++++ 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/horovod/data/data_loader_base.py b/horovod/data/data_loader_base.py index 2887eb6882..669fa38e31 100644 --- a/horovod/data/data_loader_base.py +++ b/horovod/data/data_loader_base.py @@ -55,7 +55,7 @@ class AsyncDataLoaderMixin(object): class PytorchAsyncDataLoader(AsyncDataLoaderMixin, PytorchDataLoader): """ - def __init__(self, async_loader_queue_size=64, *args, **kwargs): + def __init__(self, async_loader_queue_size=1, *args, **kwargs): """ initialize the async data loader. Need to add this in the __init__() of the implementation """ @@ -92,12 +92,12 @@ def _async_worker(self): User need to implement self._iterate() to read the data. """ try: - # Only need to iterate once because data loader will be re-created in each epoch. - for batch in self._iterate(): - if self.finished_event.is_set(): - break - self.queue.put(batch) - self.queue.put(None) + while not self.finished_event.is_set(): + for batch in self._iterate(): + if self.finished_event.is_set(): + break + self.queue.put(batch) + self.queue.put(None) except Exception as ex: self.queue.put(ex) self.queue.put(None) @@ -125,3 +125,6 @@ def __iter__(self): else: for batch in self._iterate(): yield self._process_batch(batch) + + def __del__(self): + self.close_async_loader() diff --git a/horovod/spark/lightning/datamodule.py b/horovod/spark/lightning/datamodule.py index 534361469a..a87069f87e 100644 --- a/horovod/spark/lightning/datamodule.py +++ b/horovod/spark/lightning/datamodule.py @@ -94,6 +94,9 @@ def train_dataloader(self): else: dataloader_class = PytorchInfiniteAsyncDataLoader kwargs['shuffling_queue_capacity'] = self.shuffle_size + # To avoid loading too much data in memory, need to limit the queue size so it will + # only load 1/4 of the data or less than the 10000 rows. (batch_size * queue_size) + kwargs['async_loader_queue_size'] = max(1, min(10000 // kwargs['batch_size'], kwargs['limit_step_per_epoch'] // 4)) self.train_dl = dataloader_class(**kwargs) return self.train_dl @@ -115,6 +118,9 @@ def val_dataloader(self): else: dataloader_class = PytorchInfiniteAsyncDataLoader kwargs['shuffling_queue_capacity'] = 0 + # To avoid loading too much data in memory, need to limit the queue size so it will + # only load 1/4 of the data or less than the 10000 rows. (batch_size * queue_size) + kwargs['async_loader_queue_size'] = max(1, min(10000 // kwargs['batch_size'], kwargs['limit_step_per_epoch'] // 4)) self.val_dl = dataloader_class(**kwargs) return self.val_dl From ba915f55d7da6a822d7549083d2c83a648990dc4 Mon Sep 17 00:00:00 2001 From: Peng Zhang Date: Sun, 24 Oct 2021 23:42:02 -0700 Subject: [PATCH 03/10] remove __del__ Signed-off-by: Peng Zhang --- horovod/data/data_loader_base.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/horovod/data/data_loader_base.py b/horovod/data/data_loader_base.py index 669fa38e31..32ec60adbc 100644 --- a/horovod/data/data_loader_base.py +++ b/horovod/data/data_loader_base.py @@ -125,6 +125,3 @@ def __iter__(self): else: for batch in self._iterate(): yield self._process_batch(batch) - - def __del__(self): - self.close_async_loader() From dac81efb50502882883ef900df0fe985f085a35d Mon Sep 17 00:00:00 2001 From: Peng Zhang Date: Mon, 25 Oct 2021 10:55:47 -0700 Subject: [PATCH 04/10] add log Signed-off-by: Peng Zhang --- horovod/spark/lightning/datamodule.py | 1 + 1 file changed, 1 insertion(+) diff --git a/horovod/spark/lightning/datamodule.py b/horovod/spark/lightning/datamodule.py index a87069f87e..beb70758bc 100644 --- a/horovod/spark/lightning/datamodule.py +++ b/horovod/spark/lightning/datamodule.py @@ -78,6 +78,7 @@ def teardown(self, stage=None): if self.has_val: self.val_reader.stop() self.val_reader.join() + print("Tear down: async dataloaders closed.") def train_dataloader(self): if self.verbose: From 50fa3f5b781f5b97b0d147a12753b4048f7ffd73 Mon Sep 17 00:00:00 2001 From: Peng Zhang Date: Mon, 25 Oct 2021 17:26:41 -0700 Subject: [PATCH 05/10] increase_default Signed-off-by: Peng Zhang --- horovod/data/data_loader_base.py | 5 ++++- horovod/spark/lightning/datamodule.py | 4 ++-- horovod/spark/lightning/remote.py | 2 -- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/horovod/data/data_loader_base.py b/horovod/data/data_loader_base.py index 32ec60adbc..fbb985deff 100644 --- a/horovod/data/data_loader_base.py +++ b/horovod/data/data_loader_base.py @@ -55,7 +55,7 @@ class AsyncDataLoaderMixin(object): class PytorchAsyncDataLoader(AsyncDataLoaderMixin, PytorchDataLoader): """ - def __init__(self, async_loader_queue_size=1, *args, **kwargs): + def __init__(self, async_loader_queue_size=2, *args, **kwargs): """ initialize the async data loader. Need to add this in the __init__() of the implementation """ @@ -115,6 +115,9 @@ def __iter__(self): if not self.started: self.started = True self.thread.start() + # remove the left over None if there is from last run. + if not self.queue.empty() and self.queue[0] is None: + self.queue.get() while True: batch = self.queue.get() if batch is None: diff --git a/horovod/spark/lightning/datamodule.py b/horovod/spark/lightning/datamodule.py index beb70758bc..03b56426ab 100644 --- a/horovod/spark/lightning/datamodule.py +++ b/horovod/spark/lightning/datamodule.py @@ -97,7 +97,7 @@ def train_dataloader(self): kwargs['shuffling_queue_capacity'] = self.shuffle_size # To avoid loading too much data in memory, need to limit the queue size so it will # only load 1/4 of the data or less than the 10000 rows. (batch_size * queue_size) - kwargs['async_loader_queue_size'] = max(1, min(10000 // kwargs['batch_size'], kwargs['limit_step_per_epoch'] // 4)) + kwargs['async_loader_queue_size'] = max(1, min(10000 // kwargs['batch_size'], kwargs['limit_step_per_epoch'] // 4)) + 1 self.train_dl = dataloader_class(**kwargs) return self.train_dl @@ -121,7 +121,7 @@ def val_dataloader(self): kwargs['shuffling_queue_capacity'] = 0 # To avoid loading too much data in memory, need to limit the queue size so it will # only load 1/4 of the data or less than the 10000 rows. (batch_size * queue_size) - kwargs['async_loader_queue_size'] = max(1, min(10000 // kwargs['batch_size'], kwargs['limit_step_per_epoch'] // 4)) + kwargs['async_loader_queue_size'] = max(1, min(10000 // kwargs['batch_size'], kwargs['limit_step_per_epoch'] // 4)) + 1 self.val_dl = dataloader_class(**kwargs) return self.val_dl diff --git a/horovod/spark/lightning/remote.py b/horovod/spark/lightning/remote.py index 2c083810c8..f4f180955c 100644 --- a/horovod/spark/lightning/remote.py +++ b/horovod/spark/lightning/remote.py @@ -193,8 +193,6 @@ def on_epoch_end(self, trainer: "pl.Trainer", pl_module: "pl.LightningModule") - 'gpus': _num_gpus, 'callbacks': callbacks, 'max_epochs': epochs, - 'limit_train_batches': _train_steps_per_epoch, - 'limit_val_batches': _val_steps_per_epoch, 'logger': train_logger, 'log_every_n_steps': log_every_n_steps, 'num_sanity_val_steps': 0, From 3b3a19812fe2cdc485df8cabde73dfe035c73374 Mon Sep 17 00:00:00 2001 From: Peng Zhang Date: Mon, 25 Oct 2021 17:32:00 -0700 Subject: [PATCH 06/10] increase_default Signed-off-by: Peng Zhang --- horovod/data/data_loader_base.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/horovod/data/data_loader_base.py b/horovod/data/data_loader_base.py index fbb985deff..23f212c882 100644 --- a/horovod/data/data_loader_base.py +++ b/horovod/data/data_loader_base.py @@ -84,7 +84,9 @@ def close_async_loader(self): self.queue.get_nowait() except Empty: break + print("PENG==> 1") self.thread.join() + print("Closing the AsyncDataLoaderMixin finish.") def _async_worker(self): """ @@ -93,6 +95,7 @@ def _async_worker(self): """ try: while not self.finished_event.is_set(): + print("PENG==> 2") for batch in self._iterate(): if self.finished_event.is_set(): break @@ -119,6 +122,7 @@ def __iter__(self): if not self.queue.empty() and self.queue[0] is None: self.queue.get() while True: + print("PENG==> 3") batch = self.queue.get() if batch is None: break From 52ae2bbc67f76f57bdd882b043dd5b933a7983d6 Mon Sep 17 00:00:00 2001 From: Peng Zhang Date: Mon, 25 Oct 2021 20:17:49 -0700 Subject: [PATCH 07/10] increase_default Signed-off-by: Peng Zhang --- horovod/data/data_loader_base.py | 10 +++++++--- horovod/spark/lightning/datamodule.py | 2 +- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/horovod/data/data_loader_base.py b/horovod/data/data_loader_base.py index 23f212c882..84b14b7d30 100644 --- a/horovod/data/data_loader_base.py +++ b/horovod/data/data_loader_base.py @@ -55,7 +55,7 @@ class AsyncDataLoaderMixin(object): class PytorchAsyncDataLoader(AsyncDataLoaderMixin, PytorchDataLoader): """ - def __init__(self, async_loader_queue_size=2, *args, **kwargs): + def __init__(self, async_loader_queue_size=5, *args, **kwargs): """ initialize the async data loader. Need to add this in the __init__() of the implementation """ @@ -119,15 +119,19 @@ def __iter__(self): self.started = True self.thread.start() # remove the left over None if there is from last run. - if not self.queue.empty() and self.queue[0] is None: - self.queue.get() + first = True + while True: print("PENG==> 3") batch = self.queue.get() + if batch is None and first: + print('PENG==> first in batch') + continue if batch is None: break if isinstance(batch, Exception): raise batch + first = False yield self._process_batch(batch) else: for batch in self._iterate(): diff --git a/horovod/spark/lightning/datamodule.py b/horovod/spark/lightning/datamodule.py index 03b56426ab..93fa5473f2 100644 --- a/horovod/spark/lightning/datamodule.py +++ b/horovod/spark/lightning/datamodule.py @@ -97,7 +97,7 @@ def train_dataloader(self): kwargs['shuffling_queue_capacity'] = self.shuffle_size # To avoid loading too much data in memory, need to limit the queue size so it will # only load 1/4 of the data or less than the 10000 rows. (batch_size * queue_size) - kwargs['async_loader_queue_size'] = max(1, min(10000 // kwargs['batch_size'], kwargs['limit_step_per_epoch'] // 4)) + 1 + kwargs['async_loader_queue_size'] = max(4, min(10000 // kwargs['batch_size'], kwargs['limit_step_per_epoch'] // 4)) + 1 self.train_dl = dataloader_class(**kwargs) return self.train_dl From c49fd30bb32c25a4f9a44c638d8e80476f759834 Mon Sep 17 00:00:00 2001 From: Peng Zhang Date: Tue, 26 Oct 2021 10:55:02 -0700 Subject: [PATCH 08/10] clean up Signed-off-by: Peng Zhang --- horovod/data/data_loader_base.py | 8 -------- horovod/spark/lightning/datamodule.py | 6 ++++-- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/horovod/data/data_loader_base.py b/horovod/data/data_loader_base.py index 84b14b7d30..edf9c6a7cd 100644 --- a/horovod/data/data_loader_base.py +++ b/horovod/data/data_loader_base.py @@ -95,7 +95,6 @@ def _async_worker(self): """ try: while not self.finished_event.is_set(): - print("PENG==> 2") for batch in self._iterate(): if self.finished_event.is_set(): break @@ -118,20 +117,13 @@ def __iter__(self): if not self.started: self.started = True self.thread.start() - # remove the left over None if there is from last run. - first = True while True: - print("PENG==> 3") batch = self.queue.get() - if batch is None and first: - print('PENG==> first in batch') - continue if batch is None: break if isinstance(batch, Exception): raise batch - first = False yield self._process_batch(batch) else: for batch in self._iterate(): diff --git a/horovod/spark/lightning/datamodule.py b/horovod/spark/lightning/datamodule.py index 93fa5473f2..7badea3a0e 100644 --- a/horovod/spark/lightning/datamodule.py +++ b/horovod/spark/lightning/datamodule.py @@ -96,7 +96,8 @@ def train_dataloader(self): dataloader_class = PytorchInfiniteAsyncDataLoader kwargs['shuffling_queue_capacity'] = self.shuffle_size # To avoid loading too much data in memory, need to limit the queue size so it will - # only load 1/4 of the data or less than the 10000 rows. (batch_size * queue_size) + # only load 1/4 of the data or less than the 10000 rows(batch_size * queue_size). + # Add 1 for the None in the end of each epoch kwargs['async_loader_queue_size'] = max(4, min(10000 // kwargs['batch_size'], kwargs['limit_step_per_epoch'] // 4)) + 1 self.train_dl = dataloader_class(**kwargs) @@ -120,7 +121,8 @@ def val_dataloader(self): dataloader_class = PytorchInfiniteAsyncDataLoader kwargs['shuffling_queue_capacity'] = 0 # To avoid loading too much data in memory, need to limit the queue size so it will - # only load 1/4 of the data or less than the 10000 rows. (batch_size * queue_size) + # only load 1/4 of the data or less than the 10000 rows(batch_size * queue_size). + # Add 1 for the None in the end of each epoch kwargs['async_loader_queue_size'] = max(1, min(10000 // kwargs['batch_size'], kwargs['limit_step_per_epoch'] // 4)) + 1 self.val_dl = dataloader_class(**kwargs) From e3c810dbd05ed1921b685d9db6187bcc270b840a Mon Sep 17 00:00:00 2001 From: Peng Zhang Date: Tue, 26 Oct 2021 11:59:05 -0700 Subject: [PATCH 09/10] increase_default Signed-off-by: Peng Zhang --- horovod/data/data_loader_base.py | 1 - horovod/spark/lightning/datamodule.py | 14 +++++++------- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/horovod/data/data_loader_base.py b/horovod/data/data_loader_base.py index edf9c6a7cd..8ae2bedaa7 100644 --- a/horovod/data/data_loader_base.py +++ b/horovod/data/data_loader_base.py @@ -84,7 +84,6 @@ def close_async_loader(self): self.queue.get_nowait() except Empty: break - print("PENG==> 1") self.thread.join() print("Closing the AsyncDataLoaderMixin finish.") diff --git a/horovod/spark/lightning/datamodule.py b/horovod/spark/lightning/datamodule.py index 7badea3a0e..1b6c6fde1d 100644 --- a/horovod/spark/lightning/datamodule.py +++ b/horovod/spark/lightning/datamodule.py @@ -95,10 +95,10 @@ def train_dataloader(self): else: dataloader_class = PytorchInfiniteAsyncDataLoader kwargs['shuffling_queue_capacity'] = self.shuffle_size - # To avoid loading too much data in memory, need to limit the queue size so it will - # only load 1/4 of the data or less than the 10000 rows(batch_size * queue_size). - # Add 1 for the None in the end of each epoch - kwargs['async_loader_queue_size'] = max(4, min(10000 // kwargs['batch_size'], kwargs['limit_step_per_epoch'] // 4)) + 1 + # To avoid loading too much data in memory, need to calculate the queue size + # dynamicaly, and limit the data loaded in queue. + # Add 1 in size for storing the None in the end of each epoch. + kwargs['async_loader_queue_size'] = max(1, min(100000 // kwargs['batch_size'], kwargs['limit_step_per_epoch'] // 4)) + 1 self.train_dl = dataloader_class(**kwargs) return self.train_dl @@ -120,9 +120,9 @@ def val_dataloader(self): else: dataloader_class = PytorchInfiniteAsyncDataLoader kwargs['shuffling_queue_capacity'] = 0 - # To avoid loading too much data in memory, need to limit the queue size so it will - # only load 1/4 of the data or less than the 10000 rows(batch_size * queue_size). - # Add 1 for the None in the end of each epoch + # To avoid loading too much data in memory, need to calculate the queue size + # dynamicaly, and limit the data loaded in queue. + # Add 1 in size for storing the None in the end of each epoch. kwargs['async_loader_queue_size'] = max(1, min(10000 // kwargs['batch_size'], kwargs['limit_step_per_epoch'] // 4)) + 1 self.val_dl = dataloader_class(**kwargs) From 1e18fdbd4b2320fc2fbdb102c7100ff6e345a079 Mon Sep 17 00:00:00 2001 From: Peng Zhang Date: Tue, 26 Oct 2021 16:42:19 -0700 Subject: [PATCH 10/10] verbose Signed-off-by: Peng Zhang --- horovod/data/data_loader_base.py | 1 - horovod/spark/lightning/datamodule.py | 3 ++- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/horovod/data/data_loader_base.py b/horovod/data/data_loader_base.py index 8ae2bedaa7..70e5d8955c 100644 --- a/horovod/data/data_loader_base.py +++ b/horovod/data/data_loader_base.py @@ -85,7 +85,6 @@ def close_async_loader(self): except Empty: break self.thread.join() - print("Closing the AsyncDataLoaderMixin finish.") def _async_worker(self): """ diff --git a/horovod/spark/lightning/datamodule.py b/horovod/spark/lightning/datamodule.py index 1b6c6fde1d..ebee791aad 100644 --- a/horovod/spark/lightning/datamodule.py +++ b/horovod/spark/lightning/datamodule.py @@ -78,7 +78,8 @@ def teardown(self, stage=None): if self.has_val: self.val_reader.stop() self.val_reader.join() - print("Tear down: async dataloaders closed.") + if self.verbose: + print("Tear down: async dataloaders closed.") def train_dataloader(self): if self.verbose: