From cc6ee9907103cfccbbf1cbdb28c0969ec3b5abad Mon Sep 17 00:00:00 2001 From: Deependu Jha Date: Mon, 8 Jul 2024 17:06:22 +0000 Subject: [PATCH 1/5] fix local path issue in distributed optimize method --- src/litdata/processing/data_processor.py | 30 ++++++++++++------------ src/litdata/processing/functions.py | 7 ++++++ 2 files changed, 22 insertions(+), 15 deletions(-) diff --git a/src/litdata/processing/data_processor.py b/src/litdata/processing/data_processor.py index d2220a00c..fea6e200a 100644 --- a/src/litdata/processing/data_processor.py +++ b/src/litdata/processing/data_processor.py @@ -1097,21 +1097,21 @@ def run(self, data_recipe: DataRecipe) -> None: result = data_recipe._done(len(user_items), self.delete_cached_files, self.output_dir) if num_nodes == node_rank + 1 and self.output_dir.url and _IS_IN_STUDIO: - assert self.output_dir.path - _create_dataset( - input_dir=self.input_dir.path, - storage_dir=self.output_dir.path, - dataset_type=V1DatasetType.CHUNKED - if isinstance(data_recipe, DataChunkRecipe) - else V1DatasetType.TRANSFORMED, - empty=False, - size=result.size, - num_bytes=result.num_bytes, - data_format=result.data_format, - compression=result.compression, - num_chunks=result.num_chunks, - num_bytes_per_chunk=result.num_bytes_per_chunk, - ) + if self.output_dir.path is not None: + _create_dataset( + input_dir=self.input_dir.path, + storage_dir=self.output_dir.path, + dataset_type=V1DatasetType.CHUNKED + if isinstance(data_recipe, DataChunkRecipe) + else V1DatasetType.TRANSFORMED, + empty=False, + size=result.size, + num_bytes=result.num_bytes, + data_format=result.data_format, + compression=result.compression, + num_chunks=result.num_chunks, + num_bytes_per_chunk=result.num_bytes_per_chunk, + ) print("Finished data processing!") if self.use_checkpoint and isinstance(data_recipe, DataChunkRecipe): diff --git a/src/litdata/processing/functions.py b/src/litdata/processing/functions.py index 631a8176b..9d0ec4df0 100644 --- a/src/litdata/processing/functions.py +++ b/src/litdata/processing/functions.py @@ -32,6 +32,7 @@ from litdata.processing.readers import BaseReader from litdata.processing.utilities import ( extract_rank_and_index_from_filename, + _get_work_dir, optimize_dns_context, read_index_file_content, ) @@ -372,8 +373,14 @@ def optimize( ) if num_nodes is None or int(os.getenv("DATA_OPTIMIZER_NUM_NODES", 0)) > 0: + DATA_OPTIMIZER_NUM_NODES = int(os.getenv("DATA_OPTIMIZER_NUM_NODES", 0)) _output_dir: Dir = _resolve_dir(output_dir) + if _output_dir.url is None and _output_dir.path.startswith("/teamspace/studios/this_studio") and DATA_OPTIMIZER_NUM_NODES > 0: + _output_dir = _output_dir.path.replace("/teamspace/studios/this_studio", "") + _output_dir = _get_work_dir().lstrip("/").rstrip("/") + "/" + _output_dir.lstrip("/").rstrip("/") + _output_dir = _resolve_dir(_output_dir) + if _output_dir.url is not None and "cloudspaces" in _output_dir.url: raise ValueError( f"The provided `output_dir` isn't valid. Found {_output_dir.path}." From 1afe5314c41c2a452e35ccb129952551bd0b92d5 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 8 Jul 2024 17:21:15 +0000 Subject: [PATCH 2/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/litdata/processing/functions.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/litdata/processing/functions.py b/src/litdata/processing/functions.py index 9d0ec4df0..310303773 100644 --- a/src/litdata/processing/functions.py +++ b/src/litdata/processing/functions.py @@ -31,8 +31,8 @@ from litdata.processing.data_processor import DataChunkRecipe, DataProcessor, DataTransformRecipe from litdata.processing.readers import BaseReader from litdata.processing.utilities import ( - extract_rank_and_index_from_filename, _get_work_dir, + extract_rank_and_index_from_filename, optimize_dns_context, read_index_file_content, ) @@ -376,7 +376,11 @@ def optimize( DATA_OPTIMIZER_NUM_NODES = int(os.getenv("DATA_OPTIMIZER_NUM_NODES", 0)) _output_dir: Dir = _resolve_dir(output_dir) - if _output_dir.url is None and _output_dir.path.startswith("/teamspace/studios/this_studio") and DATA_OPTIMIZER_NUM_NODES > 0: + if ( + _output_dir.url is None + and _output_dir.path.startswith("/teamspace/studios/this_studio") + and DATA_OPTIMIZER_NUM_NODES > 0 + ): _output_dir = _output_dir.path.replace("/teamspace/studios/this_studio", "") _output_dir = _get_work_dir().lstrip("/").rstrip("/") + "/" + _output_dir.lstrip("/").rstrip("/") _output_dir = _resolve_dir(_output_dir) From 4c5c203b2425f5632baabf2c2304d0513915ced7 Mon Sep 17 00:00:00 2001 From: Deependu Jha Date: Mon, 8 Jul 2024 17:23:38 +0000 Subject: [PATCH 3/5] fix pre-commit-ci error --- src/litdata/processing/data_processor.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/litdata/processing/data_processor.py b/src/litdata/processing/data_processor.py index fea6e200a..112349923 100644 --- a/src/litdata/processing/data_processor.py +++ b/src/litdata/processing/data_processor.py @@ -1096,8 +1096,7 @@ def run(self, data_recipe: DataRecipe) -> None: print("Workers are finished.") result = data_recipe._done(len(user_items), self.delete_cached_files, self.output_dir) - if num_nodes == node_rank + 1 and self.output_dir.url and _IS_IN_STUDIO: - if self.output_dir.path is not None: + if num_nodes == node_rank + 1 and self.output_dir.url and self.output_dir.path is not None and _IS_IN_STUDIO: _create_dataset( input_dir=self.input_dir.path, storage_dir=self.output_dir.path, From b0b7856047ac52e872a9ace8c3490ca2f8ec0e66 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 8 Jul 2024 17:24:05 +0000 Subject: [PATCH 4/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/litdata/processing/data_processor.py | 28 ++++++++++++------------ 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/src/litdata/processing/data_processor.py b/src/litdata/processing/data_processor.py index 112349923..53cf33502 100644 --- a/src/litdata/processing/data_processor.py +++ b/src/litdata/processing/data_processor.py @@ -1097,20 +1097,20 @@ def run(self, data_recipe: DataRecipe) -> None: result = data_recipe._done(len(user_items), self.delete_cached_files, self.output_dir) if num_nodes == node_rank + 1 and self.output_dir.url and self.output_dir.path is not None and _IS_IN_STUDIO: - _create_dataset( - input_dir=self.input_dir.path, - storage_dir=self.output_dir.path, - dataset_type=V1DatasetType.CHUNKED - if isinstance(data_recipe, DataChunkRecipe) - else V1DatasetType.TRANSFORMED, - empty=False, - size=result.size, - num_bytes=result.num_bytes, - data_format=result.data_format, - compression=result.compression, - num_chunks=result.num_chunks, - num_bytes_per_chunk=result.num_bytes_per_chunk, - ) + _create_dataset( + input_dir=self.input_dir.path, + storage_dir=self.output_dir.path, + dataset_type=V1DatasetType.CHUNKED + if isinstance(data_recipe, DataChunkRecipe) + else V1DatasetType.TRANSFORMED, + empty=False, + size=result.size, + num_bytes=result.num_bytes, + data_format=result.data_format, + compression=result.compression, + num_chunks=result.num_chunks, + num_bytes_per_chunk=result.num_bytes_per_chunk, + ) print("Finished data processing!") if self.use_checkpoint and isinstance(data_recipe, DataChunkRecipe): From c9f6000185d10b3561c8e9cab7e4e15b6a28786f Mon Sep 17 00:00:00 2001 From: Deependu Jha Date: Mon, 8 Jul 2024 17:32:25 +0000 Subject: [PATCH 5/5] fix mypy error --- src/litdata/processing/functions.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/litdata/processing/functions.py b/src/litdata/processing/functions.py index 310303773..adb5cdeb4 100644 --- a/src/litdata/processing/functions.py +++ b/src/litdata/processing/functions.py @@ -378,12 +378,14 @@ def optimize( if ( _output_dir.url is None + and _output_dir.path and _output_dir.path.startswith("/teamspace/studios/this_studio") and DATA_OPTIMIZER_NUM_NODES > 0 ): - _output_dir = _output_dir.path.replace("/teamspace/studios/this_studio", "") - _output_dir = _get_work_dir().lstrip("/").rstrip("/") + "/" + _output_dir.lstrip("/").rstrip("/") - _output_dir = _resolve_dir(_output_dir) + assert _output_dir.path + output_dir = _output_dir.path.replace("/teamspace/studios/this_studio", "") + output_dir = _get_work_dir().lstrip("/").rstrip("/") + "/" + output_dir.lstrip("/").rstrip("/") + _output_dir = _resolve_dir(output_dir) if _output_dir.url is not None and "cloudspaces" in _output_dir.url: raise ValueError(