From c532e6d6200feb48b745af6d3666f4302e7ac374 Mon Sep 17 00:00:00 2001 From: thomas Date: Fri, 10 Nov 2023 18:21:18 +0000 Subject: [PATCH 01/13] update --- .../data/streaming/data_processor.py | 58 +++++++++++-------- 1 file changed, 35 insertions(+), 23 deletions(-) diff --git a/src/lightning/data/streaming/data_processor.py b/src/lightning/data/streaming/data_processor.py index ad9ec4793bc47..4c3dd2380eb18 100644 --- a/src/lightning/data/streaming/data_processor.py +++ b/src/lightning/data/streaming/data_processor.py @@ -7,6 +7,7 @@ import types from abc import abstractmethod from dataclasses import dataclass +from datetime import datetime from multiprocessing import Process, Queue from queue import Empty from shutil import copyfile, rmtree @@ -15,7 +16,7 @@ from urllib import parse import torch -from tqdm.auto import tqdm +from tqdm.auto import tqdm as _tqdm from lightning import seed_everything from lightning.data.streaming import Cache @@ -816,30 +817,41 @@ def run(self, data_recipe: DataRecipe) -> None: current_total = 0 has_failed = False - with tqdm(total=num_items, smoothing=0, position=-1, mininterval=1) as pbar: - while True: + tq = _tqdm( + desc="Progress", + total=num_items, + smoothing=0, + position=-1, + mininterval=1, + leave=True, + dynamic_ncols=True, + ) + + while True: + try: + error = self.error_queue.get(timeout=0.001) + self._exit_on_error(error) + except Empty: + assert self.progress_queue try: - error = self.error_queue.get(timeout=0.001) - self._exit_on_error(error) + index, counter = self.progress_queue.get(timeout=0.001) except Empty: - assert self.progress_queue - try: - index, counter = self.progress_queue.get(timeout=0.001) - except Empty: - continue - self.workers_tracker[index] = counter - new_total = sum(self.workers_tracker.values()) - - pbar.update(new_total - current_total) - current_total = new_total - if current_total == num_items: - break - - # Exit early if all the workers are done. - # This means there were some kinda of errors. - if all(not w.is_alive() for w in self.workers): - has_failed = True - break + continue + self.workers_tracker[index] = counter + new_total = sum(self.workers_tracker.values()) + + tq.set_postfix({"time": datetime.now().strftime("%H:%M:%S.%f")}) + tq.update(new_total - current_total) + + current_total = new_total + if current_total == num_items: + break + + # Exit early if all the workers are done. + # This means there were some kinda of errors. + if all(not w.is_alive() for w in self.workers): + has_failed = True + break num_nodes = _get_num_nodes() node_rank = _get_node_rank() From 18c3ec6df06608764c1b7cdd1a8a5af5bf934f3c Mon Sep 17 00:00:00 2001 From: thomas Date: Fri, 10 Nov 2023 18:32:26 +0000 Subject: [PATCH 02/13] update --- src/lightning/data/streaming/data_processor.py | 3 +++ tests/tests_data/streaming/test_data_processor.py | 1 - 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/lightning/data/streaming/data_processor.py b/src/lightning/data/streaming/data_processor.py index 4c3dd2380eb18..a7714a60d298c 100644 --- a/src/lightning/data/streaming/data_processor.py +++ b/src/lightning/data/streaming/data_processor.py @@ -195,6 +195,7 @@ def _upload_fn(upload_queue: Queue, remove_queue: Queue, cache_dir: str, output_ s3.client.upload_file( local_filepath, obj.netloc, os.path.join(obj.path.lstrip("/"), os.path.basename(local_filepath)) ) + print(os.path.basename(local_filepath)) except Exception as e: print(e) elif os.path.isdir(output_dir.path): @@ -840,6 +841,8 @@ def run(self, data_recipe: DataRecipe) -> None: self.workers_tracker[index] = counter new_total = sum(self.workers_tracker.values()) + breakpoint() + tq.set_postfix({"time": datetime.now().strftime("%H:%M:%S.%f")}) tq.update(new_total - current_total) diff --git a/tests/tests_data/streaming/test_data_processor.py b/tests/tests_data/streaming/test_data_processor.py index df0bfe0116789..afd264f58cffb 100644 --- a/tests/tests_data/streaming/test_data_processor.py +++ b/tests/tests_data/streaming/test_data_processor.py @@ -668,7 +668,6 @@ def test_data_processing_map(monkeypatch, tmpdir): def optimize_fn(filepath): - print(filepath) from PIL import Image return [Image.open(filepath), os.path.basename(filepath)] From a69775d0864de58ffdb969182b2589065333df5a Mon Sep 17 00:00:00 2001 From: thomas Date: Fri, 10 Nov 2023 18:34:06 +0000 Subject: [PATCH 03/13] update --- src/lightning/data/streaming/data_processor.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/lightning/data/streaming/data_processor.py b/src/lightning/data/streaming/data_processor.py index a7714a60d298c..01367db59e60f 100644 --- a/src/lightning/data/streaming/data_processor.py +++ b/src/lightning/data/streaming/data_processor.py @@ -318,7 +318,7 @@ def run(self) -> None: traceback_format = traceback.format_exc() print(traceback_format) self.error_queue.put(traceback_format) - print(f"Worker {self.worker_index} is done.") + print(f"Worker {str(_get_node_rank() * self.num_workers + self.worker_index)} is done.") def _setup(self) -> None: self._set_environ_variables() @@ -337,6 +337,8 @@ def _loop(self) -> None: if index is None: num_downloader_finished += 1 if num_downloader_finished == self.num_downloaders: + print(f"Worker {str(_get_node_rank() * self.num_workers + self.worker_index)} is terminating.") + if isinstance(self.data_recipe, DataChunkRecipe): self._handle_data_chunk_recipe_end() From d34df532a3ff11a297727a5c9367c8e23abef8d6 Mon Sep 17 00:00:00 2001 From: thomas Date: Fri, 10 Nov 2023 18:40:04 +0000 Subject: [PATCH 04/13] update --- src/lightning/data/streaming/data_processor.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/lightning/data/streaming/data_processor.py b/src/lightning/data/streaming/data_processor.py index 01367db59e60f..009dec89f2fed 100644 --- a/src/lightning/data/streaming/data_processor.py +++ b/src/lightning/data/streaming/data_processor.py @@ -843,8 +843,6 @@ def run(self, data_recipe: DataRecipe) -> None: self.workers_tracker[index] = counter new_total = sum(self.workers_tracker.values()) - breakpoint() - tq.set_postfix({"time": datetime.now().strftime("%H:%M:%S.%f")}) tq.update(new_total - current_total) From 4eaad5cd5f94b5630a5de93ee79096a2b8a07fd2 Mon Sep 17 00:00:00 2001 From: thomas Date: Fri, 10 Nov 2023 18:47:58 +0000 Subject: [PATCH 05/13] update --- src/lightning/data/streaming/data_processor.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/lightning/data/streaming/data_processor.py b/src/lightning/data/streaming/data_processor.py index 009dec89f2fed..a4b332e33535a 100644 --- a/src/lightning/data/streaming/data_processor.py +++ b/src/lightning/data/streaming/data_processor.py @@ -195,7 +195,6 @@ def _upload_fn(upload_queue: Queue, remove_queue: Queue, cache_dir: str, output_ s3.client.upload_file( local_filepath, obj.netloc, os.path.join(obj.path.lstrip("/"), os.path.basename(local_filepath)) ) - print(os.path.basename(local_filepath)) except Exception as e: print(e) elif os.path.isdir(output_dir.path): From 09c73da3eed85c6fc4f70e44b7ec88df087b0991 Mon Sep 17 00:00:00 2001 From: thomas Date: Fri, 10 Nov 2023 19:00:46 +0000 Subject: [PATCH 06/13] update --- .../data/streaming/data_processor.py | 44 ++++++++++++------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/src/lightning/data/streaming/data_processor.py b/src/lightning/data/streaming/data_processor.py index a4b332e33535a..65af2095d1abd 100644 --- a/src/lightning/data/streaming/data_processor.py +++ b/src/lightning/data/streaming/data_processor.py @@ -291,18 +291,19 @@ def __init__( self.items = items self.num_items = len(self.items) self.num_downloaders = num_downloaders + self.num_uploaders = 5 self.remove = remove self.paths: List[List[str]] = [] self.remover: Optional[Process] = None self.downloaders: List[Process] = [] + self.uploaders: List[Process] = [] self.to_download_queues: List[Queue] = [] + self.to_upload_queues: List[Queue] = [] self.stop_queue = stop_queue self.ready_to_process_queue: Queue = Queue() self.remove_queue: Queue = Queue() - self.upload_queue: Queue = Queue() self.progress_queue: Queue = progress_queue self.error_queue: Queue = error_queue - self.uploader: Optional[Process] = None self._collected_items = 0 self._counter = 0 self._last_time = time() @@ -324,7 +325,7 @@ def _setup(self) -> None: self._create_cache() self._collect_paths() self._start_downloaders() - self._start_uploader() + self._start_uploaders() self._start_remover() def _loop(self) -> None: @@ -342,9 +343,13 @@ def _loop(self) -> None: self._handle_data_chunk_recipe_end() if self.output_dir.url if self.output_dir.url else self.output_dir.path: - assert self.uploader - self.upload_queue.put(None) - self.uploader.join() + # Inform the uploaders they are doing working + for i in range(self.num_uploaders): + self.to_upload_queues[i].put(None) + + # Wait for them all to be finished + for _ in self.uploaders: + self.uploader.join() if self.remove: assert self.remover @@ -478,19 +483,24 @@ def _start_remover(self) -> None: ) self.remover.start() - def _start_uploader(self) -> None: + def _start_uploaders(self) -> None: if self.output_dir.path is None and self.output_dir.url is None: return - self.uploader = Process( - target=_upload_fn, - args=( - self.upload_queue, - self.remove_queue, - self.cache_chunks_dir, - self.output_dir, - ), - ) - self.uploader.start() + + for _ in range(self.num_uploaders): + to_upload_queue: Queue = Queue() + p = Process( + target=_download_data_target, + args=( + to_upload_queue, + self.remove_queue, + self.cache_chunks_dir, + self.output_dir, + ), + ) + p.start() + self.uploaders.append(p) + self.to_upload_queues.append(to_upload_queue) def _handle_data_chunk_recipe(self, index: int) -> None: try: From bf806c72c7a69661a99d11bff300b292278866b4 Mon Sep 17 00:00:00 2001 From: thomas Date: Fri, 10 Nov 2023 19:00:58 +0000 Subject: [PATCH 07/13] update --- src/lightning/data/streaming/data_processor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lightning/data/streaming/data_processor.py b/src/lightning/data/streaming/data_processor.py index 65af2095d1abd..6cf19a6c2ff1c 100644 --- a/src/lightning/data/streaming/data_processor.py +++ b/src/lightning/data/streaming/data_processor.py @@ -348,8 +348,8 @@ def _loop(self) -> None: self.to_upload_queues[i].put(None) # Wait for them all to be finished - for _ in self.uploaders: - self.uploader.join() + for uploader in self.uploaders: + uploader.join() if self.remove: assert self.remover From 3363574a1f6fcf1cf20da608b5c680e1aae3d2f4 Mon Sep 17 00:00:00 2001 From: thomas Date: Fri, 10 Nov 2023 19:04:07 +0000 Subject: [PATCH 08/13] update --- src/lightning/data/streaming/data_processor.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/lightning/data/streaming/data_processor.py b/src/lightning/data/streaming/data_processor.py index 6cf19a6c2ff1c..bd27c84585618 100644 --- a/src/lightning/data/streaming/data_processor.py +++ b/src/lightning/data/streaming/data_processor.py @@ -410,7 +410,7 @@ def _try_upload(self, filepath: Optional[str]) -> None: return assert os.path.exists(filepath), filepath - self.upload_queue.put(filepath) + self.to_upload_queues[self._counter % self.num_uploaders].put(filepath) def _collect_paths(self) -> None: items = [] @@ -490,7 +490,7 @@ def _start_uploaders(self) -> None: for _ in range(self.num_uploaders): to_upload_queue: Queue = Queue() p = Process( - target=_download_data_target, + target=_upload_fn, args=( to_upload_queue, self.remove_queue, @@ -523,9 +523,9 @@ def _handle_data_chunk_recipe_end(self) -> None: chunks_filepaths = self.cache.done() if chunks_filepaths: - for chunk_filepath in chunks_filepaths: + for i, chunk_filepath in enumerate(chunks_filepaths): if isinstance(chunk_filepath, str) and os.path.exists(chunk_filepath): - self.upload_queue.put(chunk_filepath) + self.to_upload_queues[i % self.num_uploaders].put(chunk_filepath) def _handle_data_transform_recipe(self, index: int) -> None: # Don't use a context manager to avoid deleting files that are being uploaded. From 10c2f87f230db87055dd4ddd2a0f5657192d630e Mon Sep 17 00:00:00 2001 From: thomas Date: Fri, 10 Nov 2023 19:11:56 +0000 Subject: [PATCH 09/13] update --- src/lightning/data/streaming/data_processor.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/lightning/data/streaming/data_processor.py b/src/lightning/data/streaming/data_processor.py index bd27c84585618..04b8676f281a1 100644 --- a/src/lightning/data/streaming/data_processor.py +++ b/src/lightning/data/streaming/data_processor.py @@ -279,6 +279,7 @@ def __init__( error_queue: Queue, stop_queue: Queue, num_downloaders: int, + num_uploaders: int, remove: bool, ) -> None: """The BaseWorker is responsible to process the user data.""" @@ -291,7 +292,7 @@ def __init__( self.items = items self.num_items = len(self.items) self.num_downloaders = num_downloaders - self.num_uploaders = 5 + self.num_uploaders = num_uploaders self.remove = remove self.paths: List[List[str]] = [] self.remover: Optional[Process] = None @@ -522,7 +523,7 @@ def _handle_data_chunk_recipe(self, index: int) -> None: def _handle_data_chunk_recipe_end(self) -> None: chunks_filepaths = self.cache.done() - if chunks_filepaths: + if chunks_filepaths and len(self.to_upload_queues): for i, chunk_filepath in enumerate(chunks_filepaths): if isinstance(chunk_filepath, str) and os.path.exists(chunk_filepath): self.to_upload_queues[i % self.num_uploaders].put(chunk_filepath) @@ -734,6 +735,7 @@ def __init__( output_dir: Optional[Union[str, Dir]] = None, num_workers: Optional[int] = None, num_downloaders: Optional[int] = None, + num_uploaders: Optional[int] = None, delete_cached_files: bool = True, fast_dev_run: Optional[Union[bool, int]] = None, random_seed: Optional[int] = 42, @@ -747,6 +749,7 @@ def __init__( output_dir: The path to where the output data are stored. num_workers: The number of worker threads to use. num_downloaders: The number of file downloaders to use. + num_uploaders: The number of file uploaders to use. delete_cached_files: Whether to delete the cached files. fast_dev_run: Whether to run a quick dev run. random_seed: The random seed to be set before shuffling the data. @@ -757,7 +760,8 @@ def __init__( self.input_dir = _resolve_dir(input_dir) self.output_dir = _resolve_dir(output_dir) self.num_workers = num_workers or (1 if fast_dev_run else (os.cpu_count() or 1) * 4) - self.num_downloaders = num_downloaders or 1 + self.num_downloaders = num_downloaders or 2 + self.num_uploaders = num_uploaders or 5 self.delete_cached_files = delete_cached_files self.fast_dev_run = _get_fast_dev_run() if fast_dev_run is None else fast_dev_run self.workers: Any = [] @@ -920,6 +924,7 @@ def _create_process_workers(self, data_recipe: DataRecipe, workers_user_items: L self.error_queue, stop_queues[-1], self.num_downloaders, + self.num_uploaders, self.delete_cached_files, ) worker.start() From dc8292a6bb340813807a06405cedf895f6df1b27 Mon Sep 17 00:00:00 2001 From: thomas Date: Sat, 11 Nov 2023 03:54:50 +0000 Subject: [PATCH 10/13] update --- tests/tests_data/streaming/test_data_processor.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/tests_data/streaming/test_data_processor.py b/tests/tests_data/streaming/test_data_processor.py index afd264f58cffb..45316e2dbb46f 100644 --- a/tests/tests_data/streaming/test_data_processor.py +++ b/tests/tests_data/streaming/test_data_processor.py @@ -484,6 +484,8 @@ def test_data_processsor_distributed(fast_dev_run, delete_cached_files, tmpdir, delete_cached_files=delete_cached_files, fast_dev_run=fast_dev_run, output_dir=remote_output_dir, + num_uploaders=1, + num_downloaders=1, ) data_processor.run(CustomDataChunkRecipe(chunk_size=2)) @@ -508,6 +510,7 @@ def test_data_processsor_distributed(fast_dev_run, delete_cached_files, tmpdir, data_processor = TestDataProcessor( input_dir=input_dir, num_workers=2, + num_uploaders=1, num_downloaders=1, delete_cached_files=delete_cached_files, fast_dev_run=fast_dev_run, From 735acb688ceb5bff8f8f7b79652c5fea2b6c32e7 Mon Sep 17 00:00:00 2001 From: thomas chaton Date: Mon, 13 Nov 2023 12:54:24 -0500 Subject: [PATCH 11/13] Update src/lightning/data/streaming/data_processor.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Adrian Wälchli --- src/lightning/data/streaming/data_processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lightning/data/streaming/data_processor.py b/src/lightning/data/streaming/data_processor.py index 04b8676f281a1..9c7401ef2834f 100644 --- a/src/lightning/data/streaming/data_processor.py +++ b/src/lightning/data/streaming/data_processor.py @@ -833,7 +833,7 @@ def run(self, data_recipe: DataRecipe) -> None: current_total = 0 has_failed = False - tq = _tqdm( + pbar = _tqdm( desc="Progress", total=num_items, smoothing=0, From c0fb97f62e047e6f6cfc01e03be589cdb2b65901 Mon Sep 17 00:00:00 2001 From: thomas Date: Mon, 13 Nov 2023 13:03:42 -0500 Subject: [PATCH 12/13] update --- src/lightning/data/streaming/data_processor.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/lightning/data/streaming/data_processor.py b/src/lightning/data/streaming/data_processor.py index 9c7401ef2834f..836c9bbd453b3 100644 --- a/src/lightning/data/streaming/data_processor.py +++ b/src/lightning/data/streaming/data_processor.py @@ -856,8 +856,8 @@ def run(self, data_recipe: DataRecipe) -> None: self.workers_tracker[index] = counter new_total = sum(self.workers_tracker.values()) - tq.set_postfix({"time": datetime.now().strftime("%H:%M:%S.%f")}) - tq.update(new_total - current_total) + pbar.set_postfix({"time": datetime.now().strftime("%H:%M:%S.%f")}) + pbar.update(new_total - current_total) current_total = new_total if current_total == num_items: @@ -869,6 +869,8 @@ def run(self, data_recipe: DataRecipe) -> None: has_failed = True break + pbar.close(leave=True) + num_nodes = _get_num_nodes() node_rank = _get_node_rank() # TODO: Understand why it hangs. From 96c7f1be348c26e2228e78c2312c354480d6a9f3 Mon Sep 17 00:00:00 2001 From: thomas Date: Mon, 13 Nov 2023 13:31:57 -0500 Subject: [PATCH 13/13] update --- src/lightning/data/streaming/data_processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lightning/data/streaming/data_processor.py b/src/lightning/data/streaming/data_processor.py index 836c9bbd453b3..f2f79f73463ed 100644 --- a/src/lightning/data/streaming/data_processor.py +++ b/src/lightning/data/streaming/data_processor.py @@ -869,7 +869,7 @@ def run(self, data_recipe: DataRecipe) -> None: has_failed = True break - pbar.close(leave=True) + pbar.close() num_nodes = _get_num_nodes() node_rank = _get_node_rank()