From b9f8b4f59bfbf78096aea646a5c5bed5b3af28ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Thu, 11 Apr 2024 12:38:55 +0000 Subject: [PATCH 1/8] wip --- src/litdata/processing/data_processor.py | 2 +- tests/processing/test_data_processor.py | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/litdata/processing/data_processor.py b/src/litdata/processing/data_processor.py index c2ed1b265..23b7f2901 100644 --- a/src/litdata/processing/data_processor.py +++ b/src/litdata/processing/data_processor.py @@ -1019,7 +1019,7 @@ def run(self, data_recipe: DataRecipe) -> None: print("Workers are finished.") result = data_recipe._done(len(user_items), self.delete_cached_files, self.output_dir) - if num_nodes == node_rank + 1 and self.output_dir.url and _IS_IN_STUDIO and self.input_dir.path: + if num_nodes == node_rank + 1 and self.output_dir.url and _IS_IN_STUDIO: # and self.input_dir.path: assert self.output_dir.path _create_dataset( input_dir=self.input_dir.path, diff --git a/tests/processing/test_data_processor.py b/tests/processing/test_data_processor.py index 0e4e5372e..da9937f5d 100644 --- a/tests/processing/test_data_processor.py +++ b/tests/processing/test_data_processor.py @@ -900,11 +900,13 @@ def map_fn_index(index, output_dir): f.write("Hello") +@pytest.mark.parametrize("local", [True, False]) @pytest.mark.skipif(condition=not _PIL_AVAILABLE or sys.platform == "win32", reason="Requires: ['pil']") -def test_data_processing_map_without_input_dir(monkeypatch, tmpdir): +def test_data_processing_map_without_input_dir(local, monkeypatch, tmpdir): cache_dir = os.path.join(tmpdir, "cache") - output_dir = os.path.join(tmpdir, "target_dir") - os.makedirs(output_dir, exist_ok=True) + output_dir = os.path.join(tmpdir, "target_dir") if local else os.path.join("/teamspace", "datasets", "target_dir") + if local: + os.makedirs(output_dir, exist_ok=True) monkeypatch.setenv("DATA_OPTIMIZER_CACHE_FOLDER", cache_dir) monkeypatch.setenv("DATA_OPTIMIZER_DATA_CACHE_FOLDER", cache_dir) From 608a115b06111bd60fb8eeda3362bbf37442303e Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 11 Apr 2024 12:46:47 +0000 Subject: [PATCH 2/8] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/litdata/processing/data_processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/litdata/processing/data_processor.py b/src/litdata/processing/data_processor.py index 23b7f2901..194f142ff 100644 --- a/src/litdata/processing/data_processor.py +++ b/src/litdata/processing/data_processor.py @@ -1019,7 +1019,7 @@ def run(self, data_recipe: DataRecipe) -> None: print("Workers are finished.") result = data_recipe._done(len(user_items), self.delete_cached_files, self.output_dir) - if num_nodes == node_rank + 1 and self.output_dir.url and _IS_IN_STUDIO: # and self.input_dir.path: + if num_nodes == node_rank + 1 and self.output_dir.url and _IS_IN_STUDIO: # and self.input_dir.path: assert self.output_dir.path _create_dataset( input_dir=self.input_dir.path, From 2f685c26437708d2103b4d3681e0f16d10a9f65c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Thu, 11 Apr 2024 16:27:00 +0100 Subject: [PATCH 3/8] test --- tests/processing/test_data_processor.py | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/tests/processing/test_data_processor.py b/tests/processing/test_data_processor.py index da9937f5d..53f998093 100644 --- a/tests/processing/test_data_processor.py +++ b/tests/processing/test_data_processor.py @@ -5,6 +5,7 @@ from pathlib import Path from typing import Any, List from unittest import mock +from unittest.mock import Mock import numpy as np import pytest @@ -900,7 +901,10 @@ def map_fn_index(index, output_dir): f.write("Hello") -@pytest.mark.parametrize("local", [True, False]) +@pytest.mark.parametrize("local", [ + # True, + False +]) @pytest.mark.skipif(condition=not _PIL_AVAILABLE or sys.platform == "win32", reason="Requires: ['pil']") def test_data_processing_map_without_input_dir(local, monkeypatch, tmpdir): cache_dir = os.path.join(tmpdir, "cache") @@ -910,6 +914,15 @@ def test_data_processing_map_without_input_dir(local, monkeypatch, tmpdir): monkeypatch.setenv("DATA_OPTIMIZER_CACHE_FOLDER", cache_dir) monkeypatch.setenv("DATA_OPTIMIZER_DATA_CACHE_FOLDER", cache_dir) + create_dataset_mock = Mock() + if not local: + monkeypatch.setenv("LIGHTNING_CLUSTER_ID", "1") + monkeypatch.setenv("LIGHTNING_CLOUD_PROJECT_ID", "2") + monkeypatch.setenv("LIGHTNING_CLOUD_SPACE_ID", "3") + monkeypatch.setattr("litdata.processing.data_processor._IS_IN_STUDIO", True) + monkeypatch.setattr("litdata.streaming.resolver._resolve_datasets", Mock(return_value=Dir(path=tmpdir / "output", url="url"))) + monkeypatch.setattr("litdata.processing.data_processor._create_dataset", create_dataset_mock) + map( map_fn_index, list(range(5)), @@ -919,7 +932,10 @@ def test_data_processing_map_without_input_dir(local, monkeypatch, tmpdir): weights=[1 for _ in range(5)], ) - assert sorted(os.listdir(output_dir)) == ["0.JPEG", "1.JPEG", "2.JPEG", "3.JPEG", "4.JPEG"] + if local: + assert sorted(os.listdir(output_dir)) == ["0.JPEG", "1.JPEG", "2.JPEG", "3.JPEG", "4.JPEG"] + else: + create_dataset_mock.assert_called_once() @pytest.mark.skipif(condition=not _PIL_AVAILABLE or sys.platform == "win32", reason="Requires: ['pil']") From d3efb2980789d30bd902139f62d65efa5be62017 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Thu, 11 Apr 2024 16:38:56 +0100 Subject: [PATCH 4/8] update --- tests/processing/status.json | 1 + tests/processing/test_data_processor.py | 59 ++++++++++++++++--------- 2 files changed, 38 insertions(+), 22 deletions(-) create mode 100644 tests/processing/status.json diff --git a/tests/processing/status.json b/tests/processing/status.json new file mode 100644 index 000000000..ff59137fd --- /dev/null +++ b/tests/processing/status.json @@ -0,0 +1 @@ +{"progress": "20.0%"} \ No newline at end of file diff --git a/tests/processing/test_data_processor.py b/tests/processing/test_data_processor.py index 53f998093..73ffa2a9c 100644 --- a/tests/processing/test_data_processor.py +++ b/tests/processing/test_data_processor.py @@ -5,7 +5,7 @@ from pathlib import Path from typing import Any, List from unittest import mock -from unittest.mock import Mock +from unittest.mock import Mock, ANY import numpy as np import pytest @@ -901,28 +901,14 @@ def map_fn_index(index, output_dir): f.write("Hello") -@pytest.mark.parametrize("local", [ - # True, - False -]) @pytest.mark.skipif(condition=not _PIL_AVAILABLE or sys.platform == "win32", reason="Requires: ['pil']") -def test_data_processing_map_without_input_dir(local, monkeypatch, tmpdir): +def test_data_processing_map_without_input_dir_local(monkeypatch, tmpdir): cache_dir = os.path.join(tmpdir, "cache") - output_dir = os.path.join(tmpdir, "target_dir") if local else os.path.join("/teamspace", "datasets", "target_dir") - if local: - os.makedirs(output_dir, exist_ok=True) + output_dir = os.path.join(tmpdir, "target_dir") + os.makedirs(output_dir, exist_ok=True) monkeypatch.setenv("DATA_OPTIMIZER_CACHE_FOLDER", cache_dir) monkeypatch.setenv("DATA_OPTIMIZER_DATA_CACHE_FOLDER", cache_dir) - create_dataset_mock = Mock() - if not local: - monkeypatch.setenv("LIGHTNING_CLUSTER_ID", "1") - monkeypatch.setenv("LIGHTNING_CLOUD_PROJECT_ID", "2") - monkeypatch.setenv("LIGHTNING_CLOUD_SPACE_ID", "3") - monkeypatch.setattr("litdata.processing.data_processor._IS_IN_STUDIO", True) - monkeypatch.setattr("litdata.streaming.resolver._resolve_datasets", Mock(return_value=Dir(path=tmpdir / "output", url="url"))) - monkeypatch.setattr("litdata.processing.data_processor._create_dataset", create_dataset_mock) - map( map_fn_index, list(range(5)), @@ -932,10 +918,39 @@ def test_data_processing_map_without_input_dir(local, monkeypatch, tmpdir): weights=[1 for _ in range(5)], ) - if local: - assert sorted(os.listdir(output_dir)) == ["0.JPEG", "1.JPEG", "2.JPEG", "3.JPEG", "4.JPEG"] - else: - create_dataset_mock.assert_called_once() + assert sorted(os.listdir(output_dir)) == ["0.JPEG", "1.JPEG", "2.JPEG", "3.JPEG", "4.JPEG"] + + +def test_data_processing_map_without_input_dir_remote(monkeypatch, tmpdir): + cache_dir = os.path.join(tmpdir, "cache") + output_dir = os.path.join("/teamspace", "datasets", "target_dir") + + monkeypatch.setenv("DATA_OPTIMIZER_CACHE_FOLDER", cache_dir) + monkeypatch.setenv("DATA_OPTIMIZER_DATA_CACHE_FOLDER", cache_dir) + + create_dataset_mock = Mock() + monkeypatch.setenv("LIGHTNING_CLUSTER_ID", "1") + monkeypatch.setenv("LIGHTNING_CLOUD_PROJECT_ID", "2") + monkeypatch.setenv("LIGHTNING_CLOUD_SPACE_ID", "3") + monkeypatch.setattr("litdata.processing.data_processor._IS_IN_STUDIO", True) + monkeypatch.setattr( + "litdata.streaming.resolver._resolve_datasets", + Mock(return_value=Dir(path=tmpdir / "output", url="url")), + ) + monkeypatch.setattr("litdata.processing.data_processor._create_dataset", create_dataset_mock) + + map( + map_fn_index, + list(range(5)), + output_dir=output_dir, + num_workers=1, + ) + + create_dataset_mock.assert_called_with( + input_dir=None, storage_dir=str(tmpdir / "output"), + dataset_type=ANY, empty=ANY, size=ANY, num_bytes=ANY, data_format=ANY, compression=ANY, + num_chunks=ANY, num_bytes_per_chunk=ANY + ) @pytest.mark.skipif(condition=not _PIL_AVAILABLE or sys.platform == "win32", reason="Requires: ['pil']") From a50f4f2ded94701ad4554dd92c2b151cb1905d90 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 11 Apr 2024 15:39:37 +0000 Subject: [PATCH 5/8] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/processing/status.json | 2 +- tests/processing/test_data_processor.py | 15 +++++++++++---- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/tests/processing/status.json b/tests/processing/status.json index ff59137fd..72aa54955 100644 --- a/tests/processing/status.json +++ b/tests/processing/status.json @@ -1 +1 @@ -{"progress": "20.0%"} \ No newline at end of file +{ "progress": "20.0%" } diff --git a/tests/processing/test_data_processor.py b/tests/processing/test_data_processor.py index 73ffa2a9c..96e95df65 100644 --- a/tests/processing/test_data_processor.py +++ b/tests/processing/test_data_processor.py @@ -5,7 +5,7 @@ from pathlib import Path from typing import Any, List from unittest import mock -from unittest.mock import Mock, ANY +from unittest.mock import ANY, Mock import numpy as np import pytest @@ -947,9 +947,16 @@ def test_data_processing_map_without_input_dir_remote(monkeypatch, tmpdir): ) create_dataset_mock.assert_called_with( - input_dir=None, storage_dir=str(tmpdir / "output"), - dataset_type=ANY, empty=ANY, size=ANY, num_bytes=ANY, data_format=ANY, compression=ANY, - num_chunks=ANY, num_bytes_per_chunk=ANY + input_dir=None, + storage_dir=str(tmpdir / "output"), + dataset_type=ANY, + empty=ANY, + size=ANY, + num_bytes=ANY, + data_format=ANY, + compression=ANY, + num_chunks=ANY, + num_bytes_per_chunk=ANY, ) From 3a9e940f10301a0d55ac3e73a862c5521c6962c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Thu, 11 Apr 2024 16:41:50 +0100 Subject: [PATCH 6/8] remove comment --- src/litdata/processing/data_processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/litdata/processing/data_processor.py b/src/litdata/processing/data_processor.py index 194f142ff..26d38ad0a 100644 --- a/src/litdata/processing/data_processor.py +++ b/src/litdata/processing/data_processor.py @@ -1019,7 +1019,7 @@ def run(self, data_recipe: DataRecipe) -> None: print("Workers are finished.") result = data_recipe._done(len(user_items), self.delete_cached_files, self.output_dir) - if num_nodes == node_rank + 1 and self.output_dir.url and _IS_IN_STUDIO: # and self.input_dir.path: + if num_nodes == node_rank + 1 and self.output_dir.url and _IS_IN_STUDIO: assert self.output_dir.path _create_dataset( input_dir=self.input_dir.path, From 0f6b8692adb79768183e033af38ce4672691c4ff Mon Sep 17 00:00:00 2001 From: awaelchli Date: Thu, 11 Apr 2024 11:43:33 -0400 Subject: [PATCH 7/8] Delete tests/processing/status.json --- tests/processing/status.json | 1 - 1 file changed, 1 deletion(-) delete mode 100644 tests/processing/status.json diff --git a/tests/processing/status.json b/tests/processing/status.json deleted file mode 100644 index 72aa54955..000000000 --- a/tests/processing/status.json +++ /dev/null @@ -1 +0,0 @@ -{ "progress": "20.0%" } From 0b8cc1740efb3beb0f1af2f50dc01fe95a2d1aeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Thu, 11 Apr 2024 16:51:46 +0100 Subject: [PATCH 8/8] win --- tests/processing/test_data_processor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/processing/test_data_processor.py b/tests/processing/test_data_processor.py index 96e95df65..f1b18b1aa 100644 --- a/tests/processing/test_data_processor.py +++ b/tests/processing/test_data_processor.py @@ -921,6 +921,7 @@ def test_data_processing_map_without_input_dir_local(monkeypatch, tmpdir): assert sorted(os.listdir(output_dir)) == ["0.JPEG", "1.JPEG", "2.JPEG", "3.JPEG", "4.JPEG"] +@pytest.mark.skipif(sys.platform == "win32", reason="Windows not supported") def test_data_processing_map_without_input_dir_remote(monkeypatch, tmpdir): cache_dir = os.path.join(tmpdir, "cache") output_dir = os.path.join("/teamspace", "datasets", "target_dir")