From 50d0cfd013a6eb5e93907e4cea99e06963aac5c2 Mon Sep 17 00:00:00 2001 From: thomas chaton Date: Tue, 14 Jan 2025 22:14:43 +0000 Subject: [PATCH 1/9] update --- src/litdata/processing/data_processor.py | 2 +- src/litdata/streaming/resolver.py | 2 +- src/litdata/utilities/dataset_utilities.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/litdata/processing/data_processor.py b/src/litdata/processing/data_processor.py index beb6b3d4f..52d5b767d 100644 --- a/src/litdata/processing/data_processor.py +++ b/src/litdata/processing/data_processor.py @@ -194,7 +194,7 @@ def _remove_target(input_dir: Dir, cache_dir: str, queue_in: Queue) -> None: if os.path.exists(path): os.remove(path) - elif os.path.exists(path) and "s3_connections" not in path: + elif "_connections" not in path and "_folders" not in path and os.path.exists(path): os.remove(path) diff --git a/src/litdata/streaming/resolver.py b/src/litdata/streaming/resolver.py index e7767f8a5..b1a968840 100644 --- a/src/litdata/streaming/resolver.py +++ b/src/litdata/streaming/resolver.py @@ -180,7 +180,7 @@ def _resolve_s3_folders(dir_path: str) -> Dir: if not data_connection: raise ValueError(f"We didn't find any matching data connection with the provided name `{target_name}`.") - return Dir(path=dir_path, url=data_connection[0].s3_folder.source) + return Dir(path=dir_path, url=os.path.join(data_connection[0].s3_folder.source, *dir_path.split("/")[4:])) def _resolve_datasets(dir_path: str) -> Dir: diff --git a/src/litdata/utilities/dataset_utilities.py b/src/litdata/utilities/dataset_utilities.py index 55d72260d..be7a8e596 100644 --- a/src/litdata/utilities/dataset_utilities.py +++ b/src/litdata/utilities/dataset_utilities.py @@ -98,7 +98,7 @@ def _should_replace_path(path: Optional[str]) -> bool: if path is None or path == "": return True - return path.startswith("/teamspace/datasets/") or path.startswith("/teamspace/s3_connections/") + return path.startswith("/teamspace/datasets/") or path.startswith("/teamspace/s3_connections/") or path.startswith("/teamspace/s3_folders/") def _read_updated_at(input_dir: Optional[Dir], storage_options: Optional[Dict] = {}) -> str: From e17e6034c8e01b15424a9df64c18717d39abb014 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 14 Jan 2025 22:15:58 +0000 Subject: [PATCH 2/9] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/litdata/utilities/dataset_utilities.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/litdata/utilities/dataset_utilities.py b/src/litdata/utilities/dataset_utilities.py index be7a8e596..9cc8f1894 100644 --- a/src/litdata/utilities/dataset_utilities.py +++ b/src/litdata/utilities/dataset_utilities.py @@ -98,7 +98,11 @@ def _should_replace_path(path: Optional[str]) -> bool: if path is None or path == "": return True - return path.startswith("/teamspace/datasets/") or path.startswith("/teamspace/s3_connections/") or path.startswith("/teamspace/s3_folders/") + return ( + path.startswith("/teamspace/datasets/") + or path.startswith("/teamspace/s3_connections/") + or path.startswith("/teamspace/s3_folders/") + ) def _read_updated_at(input_dir: Optional[Dir], storage_options: Optional[Dict] = {}) -> str: From 9ccc66212188bee411209c2f0861018713c37449 Mon Sep 17 00:00:00 2001 From: tchaton Date: Tue, 14 Jan 2025 22:17:55 +0000 Subject: [PATCH 3/9] update --- tests/streaming/test_resolver.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/streaming/test_resolver.py b/tests/streaming/test_resolver.py index d049e45c2..3b04d1a5a 100644 --- a/tests/streaming/test_resolver.py +++ b/tests/streaming/test_resolver.py @@ -155,7 +155,7 @@ def test_src_resolver_s3_folders(monkeypatch, lightning_cloud_mock): expected = "s3://imagenet-bucket" assert resolver._resolve_dir("/teamspace/s3_folders/debug_folder").url == expected - + assert resolver._resolve_dir("/teamspace/s3_folders/debug_folder/a/b/c").url == expected + "/a/b/c" auth.clear() From b4c6e5485d824183220a3fbe0ad5761f5fcc769b Mon Sep 17 00:00:00 2001 From: tchaton Date: Tue, 14 Jan 2025 22:20:38 +0000 Subject: [PATCH 4/9] update --- src/litdata/processing/data_processor.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/litdata/processing/data_processor.py b/src/litdata/processing/data_processor.py index 52d5b767d..23221bf1f 100644 --- a/src/litdata/processing/data_processor.py +++ b/src/litdata/processing/data_processor.py @@ -194,10 +194,22 @@ def _remove_target(input_dir: Dir, cache_dir: str, queue_in: Queue) -> None: if os.path.exists(path): os.remove(path) - elif "_connections" not in path and "_folders" not in path and os.path.exists(path): + elif keep_path(path) and os.path.exists(path): os.remove(path) +def keep_path(path: str) -> bool: + paths = [ + "efs_connections", + "efs_folders", + "gcs_connections", + "s3_connections", + "s3_folders", + "snowflake_connections", + ] + return all(p not in path for p in paths) + + def _upload_fn(upload_queue: Queue, remove_queue: Queue, cache_dir: str, output_dir: Dir) -> None: """Upload optimised chunks from a local to remote dataset directory.""" obj = parse.urlparse(output_dir.url if output_dir.url else output_dir.path) From ee435990dbaa991ff92001aad2507fa2ab1b186f Mon Sep 17 00:00:00 2001 From: tchaton Date: Tue, 14 Jan 2025 22:21:20 +0000 Subject: [PATCH 5/9] update --- src/litdata/__about__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/litdata/__about__.py b/src/litdata/__about__.py index 031d9f4de..2591bd7d8 100644 --- a/src/litdata/__about__.py +++ b/src/litdata/__about__.py @@ -14,7 +14,7 @@ import time -__version__ = "0.2.35" +__version__ = "0.2.36" __author__ = "Lightning AI et al." __author_email__ = "pytorch@lightning.ai" __license__ = "Apache-2.0" From 90263c7259747dc6db134bf306f41b73ba65c09f Mon Sep 17 00:00:00 2001 From: thomas chaton Date: Tue, 14 Jan 2025 22:26:44 +0000 Subject: [PATCH 6/9] update --- src/litdata/streaming/reader.py | 9 ++++++--- src/litdata/utilities/dataset_utilities.py | 4 ++++ 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/litdata/streaming/reader.py b/src/litdata/streaming/reader.py index 078c9fa2b..e6f2fc662 100644 --- a/src/litdata/streaming/reader.py +++ b/src/litdata/streaming/reader.py @@ -86,9 +86,12 @@ def _apply_delete(self, chunk_index: int) -> None: chunk_filepath, _, _ = self._config[ChunkedIndex(index=-1, chunk_index=chunk_index)] self._item_loader.delete(chunk_index, chunk_filepath) - locak_chunk_path = chunk_filepath + ".lock" - if os.path.exists(locak_chunk_path): - os.remove(locak_chunk_path) + try: + locak_chunk_path = chunk_filepath + ".lock" + if os.path.exists(locak_chunk_path): + os.remove(locak_chunk_path) + except: + pass def stop(self) -> None: """Receive the list of the chunk indices to download for the current epoch.""" diff --git a/src/litdata/utilities/dataset_utilities.py b/src/litdata/utilities/dataset_utilities.py index 9cc8f1894..9efa7c4c8 100644 --- a/src/litdata/utilities/dataset_utilities.py +++ b/src/litdata/utilities/dataset_utilities.py @@ -47,6 +47,8 @@ def subsample_streaming_dataset( if cache_path is not None: input_dir.path = cache_path + print(input_dir) + assert input_dir.path is not None cache_index_filepath = os.path.join(input_dir.path, _INDEX_FILENAME) @@ -102,6 +104,8 @@ def _should_replace_path(path: Optional[str]) -> bool: path.startswith("/teamspace/datasets/") or path.startswith("/teamspace/s3_connections/") or path.startswith("/teamspace/s3_folders/") + or path.startswith("/teamspace/gcs_folders/") + or path.startswith("/teamspace/gcs_connections/") ) From a5c357ed7111102c029db74b167ede55d0959882 Mon Sep 17 00:00:00 2001 From: tchaton Date: Tue, 14 Jan 2025 22:27:59 +0000 Subject: [PATCH 7/9] update --- src/litdata/utilities/dataset_utilities.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/litdata/utilities/dataset_utilities.py b/src/litdata/utilities/dataset_utilities.py index 9efa7c4c8..901642623 100644 --- a/src/litdata/utilities/dataset_utilities.py +++ b/src/litdata/utilities/dataset_utilities.py @@ -47,8 +47,6 @@ def subsample_streaming_dataset( if cache_path is not None: input_dir.path = cache_path - print(input_dir) - assert input_dir.path is not None cache_index_filepath = os.path.join(input_dir.path, _INDEX_FILENAME) From 019729ec1383123d6c593c2aea9b28db71c39b95 Mon Sep 17 00:00:00 2001 From: tchaton Date: Tue, 14 Jan 2025 22:29:03 +0000 Subject: [PATCH 8/9] update --- tests/utilities/test_dataset_utilities.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/utilities/test_dataset_utilities.py b/tests/utilities/test_dataset_utilities.py index 03d8d905c..31dc0b38c 100644 --- a/tests/utilities/test_dataset_utilities.py +++ b/tests/utilities/test_dataset_utilities.py @@ -19,6 +19,9 @@ def test_should_replace_path(): assert not _should_replace_path(".../s3__connections/...") assert _should_replace_path("/teamspace/datasets/...") assert _should_replace_path("/teamspace/s3_connections/...") + assert _should_replace_path("/teamspace/s3_folders/...") + assert _should_replace_path("/teamspace/gcs_folders/...") + assert _should_replace_path("/teamspace/gcs_connections/...") assert not _should_replace_path("something_else") From c513f0aaf939aec46cbd7141dc3f6c434d9fd4d2 Mon Sep 17 00:00:00 2001 From: tchaton Date: Tue, 14 Jan 2025 22:34:04 +0000 Subject: [PATCH 9/9] update --- src/litdata/streaming/reader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/litdata/streaming/reader.py b/src/litdata/streaming/reader.py index e6f2fc662..1e4cd71a1 100644 --- a/src/litdata/streaming/reader.py +++ b/src/litdata/streaming/reader.py @@ -90,7 +90,7 @@ def _apply_delete(self, chunk_index: int) -> None: locak_chunk_path = chunk_filepath + ".lock" if os.path.exists(locak_chunk_path): os.remove(locak_chunk_path) - except: + except FileNotFoundError: pass def stop(self) -> None: