From 6c9f6a225d57cad3050006c59d3853bab79e4034 Mon Sep 17 00:00:00 2001 From: bhimrazy Date: Mon, 23 Sep 2024 00:39:20 +0545 Subject: [PATCH 01/12] adds close method for tokens loader --- src/litdata/streaming/item_loader.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/litdata/streaming/item_loader.py b/src/litdata/streaming/item_loader.py index acc50d2b6..55cd746a7 100644 --- a/src/litdata/streaming/item_loader.py +++ b/src/litdata/streaming/item_loader.py @@ -360,6 +360,13 @@ def delete(self, chunk_index: int, chunk_filepath: str) -> None: del self._mmaps[chunk_index] os.remove(chunk_filepath) + def close(self, chunk_index: int) -> None: + """Release the memory-mapped file for a specific chunk index.""" + if chunk_index in self._mmaps: + del self._mmaps[chunk_index] # Remove the memmap object + if chunk_index in self._buffers: + del self._buffers[chunk_index] # Remove the associated buffer + @classmethod def encode_data(cls, data: List[bytes], _: List[int], flattened: List[Any]) -> Tuple[bytes, Optional[int]]: return data[0], flattened[0].shape[0] From 483750fb2aa927291af424fe2bc726662652c0d6 Mon Sep 17 00:00:00 2001 From: bhimrazy Date: Mon, 23 Sep 2024 00:40:49 +0545 Subject: [PATCH 02/12] add close memap for given index --- src/litdata/streaming/reader.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/litdata/streaming/reader.py b/src/litdata/streaming/reader.py index 60cd29658..2004f3179 100644 --- a/src/litdata/streaming/reader.py +++ b/src/litdata/streaming/reader.py @@ -20,7 +20,7 @@ from typing import Any, Dict, List, Optional, Tuple, Union from litdata.streaming.config import ChunksConfig, Interval -from litdata.streaming.item_loader import BaseItemLoader, PyTreeLoader +from litdata.streaming.item_loader import BaseItemLoader, PyTreeLoader, TokensLoader from litdata.streaming.sampler import ChunkedIndex from litdata.streaming.serializers import Serializer, _get_serializers from litdata.utilities.encryption import Encryption @@ -288,7 +288,6 @@ def read(self, index: ChunkedIndex) -> Any: item = self._item_loader.load_item_from_chunk( index.index, index.chunk_index, chunk_filepath, begin, chunk_bytes ) - # We need to request deletion after the latest element has been loaded. # Otherwise, this could trigger segmentation fault error depending on the item loader used. if ( @@ -305,6 +304,9 @@ def read(self, index: ChunkedIndex) -> Any: # track the new chunk index as the latest one self._last_chunk_index = index.chunk_index + if isinstance(self._item_loader, TokensLoader): + self._item_loader.close(index.chunk_index) + if index.is_last_index and self._prepare_thread: # inform the thread it is time to stop self._prepare_thread.stop() From 46461ff0f38074f97fd5359f8eb273eb387362c2 Mon Sep 17 00:00:00 2001 From: bhimrazy Date: Mon, 23 Sep 2024 01:25:24 +0545 Subject: [PATCH 03/12] Add test for dataset with large number of chunks --- tests/streaming/test_dataset.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/tests/streaming/test_dataset.py b/tests/streaming/test_dataset.py index ef93021c9..a7e33bbff 100644 --- a/tests/streaming/test_dataset.py +++ b/tests/streaming/test_dataset.py @@ -507,6 +507,29 @@ def test_dataset_for_text_tokens(tmpdir): break +def test_dataset_for_text_tokens_with_large_num_chunks(tmpdir): + import resource + + resource.setrlimit(resource.RLIMIT_NOFILE, (1024, 1024)) + + seed_everything(42) + + block_size = 1024 + cache = Cache(input_dir=str(tmpdir), chunk_bytes="10KB", item_loader=TokensLoader(block_size)) + + for i in range(10000): + text_ids = np.array([random.randint(0, 10000) for _ in range(random.randint(100, 1000))]) + cache._add_item(i, text_ids) + + cache.done() + cache.merge() + + dataset = StreamingDataset(input_dir=str(tmpdir), item_loader=TokensLoader(block_size), shuffle=True) + + for _ in dataset: + pass + + def test_dataset_with_1d_array(tmpdir): seed_everything(42) From c9fddd5563e7f68d22b4d1bb4d00862bca8cbe5d Mon Sep 17 00:00:00 2001 From: bhimrazy Date: Mon, 23 Sep 2024 01:30:34 +0545 Subject: [PATCH 04/12] Add test for generating dataset with large number of chunks --- tests/streaming/test_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/streaming/test_dataset.py b/tests/streaming/test_dataset.py index a7e33bbff..057fb4499 100644 --- a/tests/streaming/test_dataset.py +++ b/tests/streaming/test_dataset.py @@ -518,7 +518,7 @@ def test_dataset_for_text_tokens_with_large_num_chunks(tmpdir): cache = Cache(input_dir=str(tmpdir), chunk_bytes="10KB", item_loader=TokensLoader(block_size)) for i in range(10000): - text_ids = np.array([random.randint(0, 10000) for _ in range(random.randint(100, 1000))]) + text_ids = torch.randint(0, 10001, (torch.randint(100, 1001, (1,)).item(),)).numpy() cache._add_item(i, text_ids) cache.done() From 6332a2e56d69df82589aaaf8a42af412c9526be6 Mon Sep 17 00:00:00 2001 From: bhimrazy Date: Mon, 23 Sep 2024 01:49:26 +0545 Subject: [PATCH 05/12] skip for windows --- tests/streaming/test_dataset.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/streaming/test_dataset.py b/tests/streaming/test_dataset.py index 057fb4499..3b0010996 100644 --- a/tests/streaming/test_dataset.py +++ b/tests/streaming/test_dataset.py @@ -507,13 +507,12 @@ def test_dataset_for_text_tokens(tmpdir): break +@pytest.mark.skipif(sys.platform == "win32", reason="windows isn't supported") def test_dataset_for_text_tokens_with_large_num_chunks(tmpdir): import resource resource.setrlimit(resource.RLIMIT_NOFILE, (1024, 1024)) - seed_everything(42) - block_size = 1024 cache = Cache(input_dir=str(tmpdir), chunk_bytes="10KB", item_loader=TokensLoader(block_size)) From 6c6d6f20437204cf8324f75d181c69a4ad786667 Mon Sep 17 00:00:00 2001 From: bhimrazy Date: Tue, 24 Sep 2024 15:18:16 +0545 Subject: [PATCH 06/12] close mmap before delete --- src/litdata/streaming/item_loader.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/litdata/streaming/item_loader.py b/src/litdata/streaming/item_loader.py index 55cd746a7..17a2d5342 100644 --- a/src/litdata/streaming/item_loader.py +++ b/src/litdata/streaming/item_loader.py @@ -363,9 +363,10 @@ def delete(self, chunk_index: int, chunk_filepath: str) -> None: def close(self, chunk_index: int) -> None: """Release the memory-mapped file for a specific chunk index.""" if chunk_index in self._mmaps: - del self._mmaps[chunk_index] # Remove the memmap object + self._mmaps[chunk_index]._mmap.close() + del self._mmaps[chunk_index] if chunk_index in self._buffers: - del self._buffers[chunk_index] # Remove the associated buffer + del self._buffers[chunk_index] @classmethod def encode_data(cls, data: List[bytes], _: List[int], flattened: List[Any]) -> Tuple[bytes, Optional[int]]: From 5e8793b06c4f9f9d82cffdde84550de9f1a3746e Mon Sep 17 00:00:00 2001 From: bhimrazy Date: Tue, 24 Sep 2024 15:50:22 +0545 Subject: [PATCH 07/12] revert close change: causing segmentation error --- src/litdata/streaming/item_loader.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/litdata/streaming/item_loader.py b/src/litdata/streaming/item_loader.py index 17a2d5342..46a2f789c 100644 --- a/src/litdata/streaming/item_loader.py +++ b/src/litdata/streaming/item_loader.py @@ -363,7 +363,6 @@ def delete(self, chunk_index: int, chunk_filepath: str) -> None: def close(self, chunk_index: int) -> None: """Release the memory-mapped file for a specific chunk index.""" if chunk_index in self._mmaps: - self._mmaps[chunk_index]._mmap.close() del self._mmaps[chunk_index] if chunk_index in self._buffers: del self._buffers[chunk_index] From a42993d6cb1c18b9b691662fe51db0a4d55ab78b Mon Sep 17 00:00:00 2001 From: bhimrazy Date: Wed, 9 Oct 2024 12:26:45 +0545 Subject: [PATCH 08/12] moved close function to execute only if the current chunk is fully consumed --- src/litdata/streaming/item_loader.py | 1 + src/litdata/streaming/reader.py | 8 +++++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/litdata/streaming/item_loader.py b/src/litdata/streaming/item_loader.py index 46a2f789c..17a2d5342 100644 --- a/src/litdata/streaming/item_loader.py +++ b/src/litdata/streaming/item_loader.py @@ -363,6 +363,7 @@ def delete(self, chunk_index: int, chunk_filepath: str) -> None: def close(self, chunk_index: int) -> None: """Release the memory-mapped file for a specific chunk index.""" if chunk_index in self._mmaps: + self._mmaps[chunk_index]._mmap.close() del self._mmaps[chunk_index] if chunk_index in self._buffers: del self._buffers[chunk_index] diff --git a/src/litdata/streaming/reader.py b/src/litdata/streaming/reader.py index 2004f3179..5ac3d7a39 100644 --- a/src/litdata/streaming/reader.py +++ b/src/litdata/streaming/reader.py @@ -301,12 +301,14 @@ def read(self, index: ChunkedIndex) -> Any: # inform the chunk has been completely consumed self._prepare_thread.delete([self._last_chunk_index]) + if index.chunk_index != self._last_chunk_index: + # Close the memory-mapped file for the last chunk index + if isinstance(self._item_loader, TokensLoader): + self._item_loader.close(self._last_chunk_index) + # track the new chunk index as the latest one self._last_chunk_index = index.chunk_index - if isinstance(self._item_loader, TokensLoader): - self._item_loader.close(index.chunk_index) - if index.is_last_index and self._prepare_thread: # inform the thread it is time to stop self._prepare_thread.stop() From feab6943888f0104997ca58ed33174368af60949 Mon Sep 17 00:00:00 2001 From: bhimrazy Date: Wed, 9 Oct 2024 12:38:10 +0545 Subject: [PATCH 09/12] updated the condition --- src/litdata/streaming/reader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/litdata/streaming/reader.py b/src/litdata/streaming/reader.py index 5ac3d7a39..8cb28ef74 100644 --- a/src/litdata/streaming/reader.py +++ b/src/litdata/streaming/reader.py @@ -303,7 +303,7 @@ def read(self, index: ChunkedIndex) -> Any: if index.chunk_index != self._last_chunk_index: # Close the memory-mapped file for the last chunk index - if isinstance(self._item_loader, TokensLoader): + if isinstance(self._item_loader, TokensLoader) and self._last_chunk_index is not None: self._item_loader.close(self._last_chunk_index) # track the new chunk index as the latest one From bddfcdf12b523697ede0efcd6eb5f0b11d7ab249 Mon Sep 17 00:00:00 2001 From: bhimrazy Date: Wed, 9 Oct 2024 12:38:30 +0545 Subject: [PATCH 10/12] removed unused type:ignore --- src/litdata/streaming/item_loader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/litdata/streaming/item_loader.py b/src/litdata/streaming/item_loader.py index 17a2d5342..4adc4d6fa 100644 --- a/src/litdata/streaming/item_loader.py +++ b/src/litdata/streaming/item_loader.py @@ -313,7 +313,7 @@ def _load_chunk(self, chunk_index: int, chunk_filepath: str) -> None: offset = (1 + chunk["chunk_size"] + 1) * 4 mmap = np.memmap(chunk_filepath, mode="r", order="C", offset=offset) self._mmaps[chunk_index] = mmap - self._buffers[chunk_index] = memoryview(mmap) # type: ignore + self._buffers[chunk_index] = memoryview(mmap) def pre_load_chunk(self, chunk_index: int, chunk_filepath: str) -> None: # This is called within the prepare chunks thread, so we overlap data loading with data reading. From 4f3f39484410c562f5a042392897cb452b612c68 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 9 Oct 2024 06:54:40 +0000 Subject: [PATCH 11/12] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/litdata/streaming/item_loader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/litdata/streaming/item_loader.py b/src/litdata/streaming/item_loader.py index 4adc4d6fa..7c8b5a64a 100644 --- a/src/litdata/streaming/item_loader.py +++ b/src/litdata/streaming/item_loader.py @@ -313,7 +313,7 @@ def _load_chunk(self, chunk_index: int, chunk_filepath: str) -> None: offset = (1 + chunk["chunk_size"] + 1) * 4 mmap = np.memmap(chunk_filepath, mode="r", order="C", offset=offset) self._mmaps[chunk_index] = mmap - self._buffers[chunk_index] = memoryview(mmap) + self._buffers[chunk_index] = memoryview(mmap) def pre_load_chunk(self, chunk_index: int, chunk_filepath: str) -> None: # This is called within the prepare chunks thread, so we overlap data loading with data reading. From 9e8d3ebe23821809bff6d7e59578820e78a89146 Mon Sep 17 00:00:00 2001 From: bhimrazy Date: Wed, 9 Oct 2024 12:44:22 +0545 Subject: [PATCH 12/12] revert ignore for memoryview --- src/litdata/streaming/item_loader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/litdata/streaming/item_loader.py b/src/litdata/streaming/item_loader.py index 4adc4d6fa..17a2d5342 100644 --- a/src/litdata/streaming/item_loader.py +++ b/src/litdata/streaming/item_loader.py @@ -313,7 +313,7 @@ def _load_chunk(self, chunk_index: int, chunk_filepath: str) -> None: offset = (1 + chunk["chunk_size"] + 1) * 4 mmap = np.memmap(chunk_filepath, mode="r", order="C", offset=offset) self._mmaps[chunk_index] = mmap - self._buffers[chunk_index] = memoryview(mmap) + self._buffers[chunk_index] = memoryview(mmap) # type: ignore def pre_load_chunk(self, chunk_index: int, chunk_filepath: str) -> None: # This is called within the prepare chunks thread, so we overlap data loading with data reading.