From 87c7037e087637384d145c4a3c10a098b996a95c Mon Sep 17 00:00:00 2001 From: deependujha Date: Thu, 4 Jul 2024 17:25:08 +0530 Subject: [PATCH 1/4] fix magic serializer issue --- src/litdata/streaming/serializers.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/litdata/streaming/serializers.py b/src/litdata/streaming/serializers.py index 3114c5427..51471d81d 100644 --- a/src/litdata/streaming/serializers.py +++ b/src/litdata/streaming/serializers.py @@ -306,7 +306,9 @@ def deserialize(self, data: bytes) -> Any: return data def can_serialize(self, data: Any) -> bool: - return isinstance(data, str) and os.path.isfile(data) + # return isinstance(data, str) and os.path.isfile(data) + print("FileSerializer will be removed in the future.") + return False class VideoSerializer(Serializer): From 8b60455eb51787769f42658d713d6156629bd42b Mon Sep 17 00:00:00 2001 From: deependujha Date: Fri, 5 Jul 2024 15:19:10 +0530 Subject: [PATCH 2/4] failing tests fixed --- src/litdata/streaming/serializers.py | 3 ++- tests/streaming/test_cache.py | 6 ++++-- tests/streaming/test_writer.py | 18 ++++++++++-------- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/src/litdata/streaming/serializers.py b/src/litdata/streaming/serializers.py index 51471d81d..87928fa51 100644 --- a/src/litdata/streaming/serializers.py +++ b/src/litdata/streaming/serializers.py @@ -297,6 +297,7 @@ def can_serialize(self, _: Any) -> bool: class FileSerializer(Serializer): def serialize(self, filepath: str) -> Tuple[bytes, Optional[str]]: + print("FileSerializer will be removed in the future.") _, file_extension = os.path.splitext(filepath) with open(filepath, "rb") as f: file_extension = file_extension.replace(".", "").lower() @@ -307,7 +308,7 @@ def deserialize(self, data: bytes) -> Any: def can_serialize(self, data: Any) -> bool: # return isinstance(data, str) and os.path.isfile(data) - print("FileSerializer will be removed in the future.") + # FileSerializer will be removed in the future. return False diff --git a/tests/streaming/test_cache.py b/tests/streaming/test_cache.py index 9bb97e65c..7e23fd01d 100644 --- a/tests/streaming/test_cache.py +++ b/tests/streaming/test_cache.py @@ -95,7 +95,8 @@ def _cache_for_image_dataset(num_workers, tmpdir, fabric=None): original_data = dataset.data[i] assert cached_data["class"] == original_data["class"] original_array = PILToTensor()(Image.open(original_data["image"])) - assert torch.equal(original_array, cached_data["image"]) + cached_array = PILToTensor()(Image.open(cached_data["image"])) + assert torch.equal(original_array, cached_array) if distributed_env.world_size == 1: indexes = [] @@ -129,7 +130,8 @@ def _cache_for_image_dataset(num_workers, tmpdir, fabric=None): original_data = dataset.data[i] assert cached_data["class"] == original_data["class"] original_array = PILToTensor()(Image.open(original_data["image"])) - assert torch.equal(original_array, cached_data["image"]) + cached_array = PILToTensor()(Image.open(cached_data["image"])) + assert torch.equal(original_array, cached_array) streaming_dataset_iter = iter(streaming_dataset) for _ in streaming_dataset_iter: diff --git a/tests/streaming/test_writer.py b/tests/streaming/test_writer.py index 7d377ae91..e8f6528b7 100644 --- a/tests/streaming/test_writer.py +++ b/tests/streaming/test_writer.py @@ -165,7 +165,7 @@ def test_binary_writer_with_jpeg_filepath_and_int(tmpdir): cache_dir = os.path.join(tmpdir, "chunks") os.makedirs(cache_dir, exist_ok=True) - binary_writer = BinaryWriter(cache_dir, chunk_bytes=2 << 12) + binary_writer = BinaryWriter(cache_dir, chunk_size=7) # each chunk will have 7 items imgs = [] @@ -178,23 +178,25 @@ def test_binary_writer_with_jpeg_filepath_and_int(tmpdir): imgs.append(img) binary_writer[i] = {"x": path, "y": i} - assert len(os.listdir(cache_dir)) == 24 + assert len(os.listdir(cache_dir)) == 14 # 100 items / 7 items per chunk = 14 chunks binary_writer.done() binary_writer.merge() - assert len(os.listdir(cache_dir)) == 26 + assert len(os.listdir(cache_dir)) == 16 # 2 items in last chunk and index.json file with open(os.path.join(cache_dir, "index.json")) as f: data = json.load(f) - assert data["chunks"][0]["chunk_size"] == 4 - assert data["chunks"][1]["chunk_size"] == 4 - assert data["chunks"][-1]["chunk_size"] == 4 + assert data["chunks"][0]["chunk_size"] == 7 + assert data["chunks"][1]["chunk_size"] == 7 + assert data["chunks"][-1]["chunk_size"] == 2 assert sum([chunk["chunk_size"] for chunk in data["chunks"]]) == 100 reader = BinaryReader(cache_dir, max_cache_size=10 ^ 9) for i in range(100): - data = reader.read(ChunkedIndex(i, chunk_index=i // 4)) - np.testing.assert_array_equal(np.asarray(data["x"]).squeeze(0), imgs[i]) + data = reader.read(ChunkedIndex(i, chunk_index=i // 7)) + img_read = Image.open(data["x"]) + print(f"{img_read.size=}") + np.testing.assert_array_equal(img_read, imgs[i]) assert data["y"] == i From a231a9a0340ff9b8c3a938fc64609cb1b887b3e7 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 5 Jul 2024 09:49:36 +0000 Subject: [PATCH 3/4] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/streaming/test_writer.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/streaming/test_writer.py b/tests/streaming/test_writer.py index e8f6528b7..403a0133b 100644 --- a/tests/streaming/test_writer.py +++ b/tests/streaming/test_writer.py @@ -165,7 +165,7 @@ def test_binary_writer_with_jpeg_filepath_and_int(tmpdir): cache_dir = os.path.join(tmpdir, "chunks") os.makedirs(cache_dir, exist_ok=True) - binary_writer = BinaryWriter(cache_dir, chunk_size=7) # each chunk will have 7 items + binary_writer = BinaryWriter(cache_dir, chunk_size=7) # each chunk will have 7 items imgs = [] @@ -178,10 +178,10 @@ def test_binary_writer_with_jpeg_filepath_and_int(tmpdir): imgs.append(img) binary_writer[i] = {"x": path, "y": i} - assert len(os.listdir(cache_dir)) == 14 # 100 items / 7 items per chunk = 14 chunks + assert len(os.listdir(cache_dir)) == 14 # 100 items / 7 items per chunk = 14 chunks binary_writer.done() binary_writer.merge() - assert len(os.listdir(cache_dir)) == 16 # 2 items in last chunk and index.json file + assert len(os.listdir(cache_dir)) == 16 # 2 items in last chunk and index.json file with open(os.path.join(cache_dir, "index.json")) as f: data = json.load(f) From 21aa0086c649a7f51eb47066b8d893471f420bc2 Mon Sep 17 00:00:00 2001 From: deependujha Date: Fri, 5 Jul 2024 15:50:31 +0530 Subject: [PATCH 4/4] conditionally make dirs in writer --- src/litdata/streaming/writer.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/litdata/streaming/writer.py b/src/litdata/streaming/writer.py index dec72d0f1..568910472 100644 --- a/src/litdata/streaming/writer.py +++ b/src/litdata/streaming/writer.py @@ -65,7 +65,9 @@ def __init__( """ self._cache_dir = cache_dir - os.makedirs(self._cache_dir, exist_ok=True) + if not os.path.exists(self._cache_dir): + os.makedirs(self._cache_dir, exist_ok=True) + if (isinstance(self._cache_dir, str) and not os.path.exists(self._cache_dir)) or self._cache_dir is None: raise FileNotFoundError(f"The provided cache directory `{self._cache_dir}` doesn't exist.")