From 1772f4774230e19c990dca7abf4adc90d95aecb6 Mon Sep 17 00:00:00 2001 From: Max Luebbering Date: Tue, 25 Jun 2024 14:58:53 +0200 Subject: [PATCH 1/9] fix: we use the correct byte-based indexation now --- .../dataloader/create_packed_data.py | 12 +++-- src/modalities/dataloader/dataset.py | 44 +++++++++++++------ 2 files changed, 39 insertions(+), 17 deletions(-) diff --git a/src/modalities/dataloader/create_packed_data.py b/src/modalities/dataloader/create_packed_data.py index e31572034..80c9dfc47 100644 --- a/src/modalities/dataloader/create_packed_data.py +++ b/src/modalities/dataloader/create_packed_data.py @@ -161,7 +161,10 @@ def _write_batch( EmbeddedStreamData.TOKEN_SIZE_DESCRIPTOR_LENGTH_IN_BYTES, byteorder="little" ) ) - curr_offset = EmbeddedStreamData.HEADER_SIZE_IN_BYTES + # The offset only applies to the data section, not the header + # When we load the file, we addtionally add the header size + # to the offset + curr_offset = 0 # write data section (tokens) pbar = tqdm(total=len(self._reader), desc="Processed batches") @@ -229,8 +232,7 @@ def _process_thread(self, process_id: int): ) def _update_data_length_in_pre_allocated_header(self, dst_path: Path, index_list: List[Tuple[int, int]]): - start_of_index_in_bytes = index_list[-1][0] + index_list[-1][1] - length_of_byte_encoded_data_section = start_of_index_in_bytes - EmbeddedStreamData.HEADER_SIZE_IN_BYTES + length_of_byte_encoded_data_section = index_list[-1][0] + index_list[-1][1] data_section_length_in_bytes = length_of_byte_encoded_data_section.to_bytes( EmbeddedStreamData.DATA_SECTION_LENGTH_IN_BYTES, byteorder="little" ) @@ -277,7 +279,9 @@ def __init__(self, data_path: Path): # get index f.seek(self.HEADER_SIZE_IN_BYTES + self.data_len) pkl_encoded_index = f.read() - self.index_base = pickle.loads(pkl_encoded_index) + # contains the start offset and length of each segment + # as byte positions in the data section + self.index_base: List[Tuple[int, int]] = pickle.loads(pkl_encoded_index) # initialize memmapped data section self.data = np.memmap(self._data_path, mode="r", offset=self.HEADER_SIZE_IN_BYTES, shape=(self.data_len,)) diff --git a/src/modalities/dataloader/dataset.py b/src/modalities/dataloader/dataset.py index 90de616cd..53f0b2617 100644 --- a/src/modalities/dataloader/dataset.py +++ b/src/modalities/dataloader/dataset.py @@ -18,9 +18,8 @@ class Dataset(TorchdataSet): - def __init__(self, raw_data_path: Path, block_size: int, sample_key: str): + def __init__(self, raw_data_path: Path, sample_key: str): self.raw_data_path = raw_data_path - self.block_size = block_size self.sample_key = sample_key def _check_if_inbounds(self, idx: int): @@ -51,7 +50,7 @@ def __init__(self, num_samples: int, sample_definition: Tuple[DummySampleConfig] :param sample_definition: A list of tuples defining the dataset output. Each touple contains the sample key, shape and data type. """ - super().__init__(raw_data_path=None, block_size=None, sample_key=None) + super().__init__(raw_data_path=None, sample_key=None) self.num_samples = num_samples self.sample_definition = sample_definition @@ -78,7 +77,6 @@ class MemMapDataset(Dataset): def __init__( self, raw_data_path: Path, - block_size: int, tokenizer: TokenizerWrapper, sample_key: str, index_path: Optional[Path] = None, @@ -88,7 +86,6 @@ def __init__( Pytorch Dataset with mmap support. :param raw_data_path: Path to a jsonl file, which holds text data - :param block_size: alias for max sequence length. The amount of tokens the model can handle. :param tokenizer: PretrainedTokenizer required to tokenize text data on the fly. :param jq_pattern: jq-pattern applied on every jsonl-entry. Results are afterwards tokenized and packed :param index_path: Path to an index file, which indicates the start character/byte position @@ -99,7 +96,7 @@ def __init__( TODO: If this setting should support multi-modal features using separately encoded inputs, this needs to get replaced with a list of sample keys! """ - super().__init__(raw_data_path=raw_data_path, block_size=block_size, sample_key=sample_key) + super().__init__(raw_data_path=raw_data_path, sample_key=sample_key) self.reader = LargeFileLinesReader(self.raw_data_path, index_path=index_path) self.jq_filter = jq.compile(jq_pattern) @@ -124,7 +121,7 @@ class PackedMemMapDatasetBase(Dataset): } type_converter_for_torch = {1: np.uint8, 2: np.int32, 4: np.int64} - def __init__(self, raw_data_path: Path, block_size: int, sample_key: str): + def __init__(self, raw_data_path: Path, sample_key: str): """ Base class for packed memmapped datasets. The underlying dataset file has the structure: | header | data | index | @@ -134,12 +131,11 @@ def __init__(self, raw_data_path: Path, block_size: int, sample_key: str): :param raw_data_path: Path to a packed binary file (*.pbin). Use `modalities data pack_encoded_data` to create one based on a jsonl-file. - :param block_size: alias for max sequence length. The amount of tokens the model can handle. :param sample_key: model-specific parameter to indicate where in the BatchEncoding the input_token_ids are. TODO: If this setting should support multi-modal features using separately encoded inputs, this needs to get replaced with a list of sample keys! """ - super().__init__(raw_data_path=raw_data_path, block_size=block_size, sample_key=sample_key) + super().__init__(raw_data_path=raw_data_path, sample_key=sample_key) self._embedded_stream_data = EmbeddedStreamData(raw_data_path) self._token_size_in_bytes = self._embedded_stream_data.token_size_in_bytes try: @@ -153,16 +149,27 @@ def __init__(self, raw_data_path: Path, block_size: int, sample_key: str): self._index = self._generate_packing_index() def _generate_packing_index(self) -> List[Tuple[int, int]]: - raise NotImplementedError + # index is a tuple of offset and length in bytes + return self._embedded_stream_data.index_base def __len__(self) -> int: return len(self._index) def __getitem__(self, idx: int) -> BatchEncoding: self._check_if_inbounds(idx) - offset, length = self._index[idx] + # offset and length in bytes + offset, length_in_bytes = self._index[idx] + if length_in_bytes % self._token_size_in_bytes != 0: + raise ValueError( + f"Length of the sample in bytes is not a multiple of {self._token_size_in_bytes}." + f"Offset in bytes: {offset}, Length in bytes: {length_in_bytes}" + ) + # numpy frombuffer takes the memmap object as the buffer + # and indices the data section with the given offset (in bytes) + # and length in indices of type self._token_dtype_on_disk + num_tokens = length_in_bytes // self._token_size_in_bytes tokens = np.frombuffer( - self._embedded_stream_data.data, dtype=self._token_dtype_on_disk, count=length, offset=offset + buffer=self._embedded_stream_data.data, dtype=self._token_dtype_on_disk, count=num_tokens, offset=offset ) # torch can't convert most uint-formats, therefore we infer regular int types tokens = tokens.astype(self._token_dtype_in_ram) @@ -170,6 +177,10 @@ def __getitem__(self, idx: int) -> BatchEncoding: class PackedMemMapDatasetContinuous(PackedMemMapDatasetBase): + def __init__(self, raw_data_path: Path, sample_key: str, block_size: int): + self.block_size = block_size + super().__init__(raw_data_path=raw_data_path, sample_key=sample_key) + def _generate_packing_index(self) -> List[Tuple[int, int]]: # get number of total tokens in file total_tokens = self._embedded_stream_data.data_len // self._token_size_in_bytes @@ -187,10 +198,17 @@ def _generate_packing_index(self) -> List[Tuple[int, int]]: # of the subsequent sample). num_samples = (total_tokens - self.block_size) // (self.block_size - 1) + 1 # given num_samples we calculate the starting index and length of each sample as tuple. - return [((i * self.block_size - i) * self._token_size_in_bytes, self.block_size) for i in range(num_samples)] + return [ + ((i * self.block_size - i) * self._token_size_in_bytes, self.block_size * self._token_size_in_bytes) + for i in range(num_samples) + ] class PackedMemMapDatasetMegatron(PackedMemMapDatasetBase): + def __init__(self, raw_data_path: Path, sample_key: str, block_size: int): + self.block_size = block_size + super().__init__(raw_data_path=raw_data_path, sample_key=sample_key) + def _generate_packing_index(self) -> List[Tuple[int, int]]: index = [] curr_offset = self.HEADER_SIZE_IN_BYTES From 3c6bfbc01506126a1170d537de9a062c8e5eb3cf Mon Sep 17 00:00:00 2001 From: Max Luebbering Date: Tue, 25 Jun 2024 14:59:35 +0200 Subject: [PATCH 2/9] test: added test test_original_samples_in_packed_dataset for testing the indexation of the original samples --- tests/dataloader/test_packed_dataset.py | 45 +++++++++++++++++++++++-- 1 file changed, 42 insertions(+), 3 deletions(-) diff --git a/tests/dataloader/test_packed_dataset.py b/tests/dataloader/test_packed_dataset.py index ffc26f7a5..9c9882027 100644 --- a/tests/dataloader/test_packed_dataset.py +++ b/tests/dataloader/test_packed_dataset.py @@ -4,13 +4,19 @@ import pytest from modalities.dataloader.create_packed_data import EmbeddedStreamData, PackedDataGenerator, join_embedded_stream_data -from modalities.dataloader.dataset import PackedMemMapDatasetContinuous, PackedMemMapDatasetMegatron +from modalities.dataloader.dataset import ( + PackedMemMapDatasetBase, + PackedMemMapDatasetContinuous, + PackedMemMapDatasetMegatron, +) from modalities.models.gpt2.collator import GPT2LLMCollateFn @pytest.mark.parametrize("block_size, expected_length", [(1, 4), (2, 3), (3, 3), (10, 2), (6, 2), (20, 1), (25, 0)]) def test_packed_megatron_dataset_loading(dummy_packed_data_path, block_size, expected_length): - ds = PackedMemMapDatasetMegatron(dummy_packed_data_path, block_size, sample_key="input_ids") + ds = PackedMemMapDatasetMegatron( + raw_data_path=dummy_packed_data_path, block_size=block_size, sample_key="input_ids" + ) assert len(ds) == expected_length @@ -66,7 +72,9 @@ def test_packed_megatron_dataset_loading(dummy_packed_data_path, block_size, exp ) def test_packed_continuous_dataset_loading(dummy_packed_data_path, block_size, expected_length, expected_output): try: - ds = PackedMemMapDatasetContinuous(dummy_packed_data_path, block_size, sample_key="input_ids") + ds = PackedMemMapDatasetContinuous( + raw_data_path=dummy_packed_data_path, block_size=block_size, sample_key="input_ids" + ) except ValueError: assert expected_output == ValueError return @@ -202,3 +210,34 @@ def test_conversion_tokens_represented_as_unsigned_ints(tmpdir, token_size_in_by collator = GPT2LLMCollateFn(sample_key=sample_key, target_key="abc") for batch in zip(ds, ds): collator(list(batch)) + + +def test_original_samples_in_packed_dataset(indexed_dummy_data_path_long, wrapped_gpt2_tokenizer): + # In this test, we create a packed dataset from a long jsonl file + # and iterate over the packed dataset to check if the tokenization is correct. + # We do so by manually tokenizing the jsonl file and comparing the tokenized + # output with the packed dataset + packed_generator = PackedDataGenerator( + src_path=indexed_dummy_data_path_long.raw_data_path, + tokenizer=wrapped_gpt2_tokenizer, + number_of_processes=5, + eod_token="<|endoftext|>", + index_path=indexed_dummy_data_path_long.index_path, + jq_pattern=".text", + processing_batch_size=5, + raw_samples_queue_size=3, + processed_samples_queue_size=3, + ) + default_packed_dataset_path = packed_generator._default_destination_path() + assert not default_packed_dataset_path.is_file() + packed_generator.run() + packed_dataset = PackedMemMapDatasetBase(default_packed_dataset_path, sample_key="input_ids") + # read in the raw jsonl files for manual tokenization + with open(indexed_dummy_data_path_long.raw_data_path) as f: + jsonl_list = [json.loads(line)["text"] for line in f] + + eod_token_id = wrapped_gpt2_tokenizer.get_token_id("<|endoftext|>") + jsonl_tokenized = [wrapped_gpt2_tokenizer.tokenize(v) + [eod_token_id] for v in jsonl_list] + + for sample, original_sample in zip(packed_dataset, jsonl_tokenized): + assert sample["input_ids"].tolist() == original_sample From 3a45e52e7cd2955c4b1f3de46795cf9ab3b3b527 Mon Sep 17 00:00:00 2001 From: Max Luebbering Date: Tue, 25 Jun 2024 15:00:34 +0200 Subject: [PATCH 3/9] chore: updated getting started documentation regarding the byte-based indexation --- examples/getting_started/README.md | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/examples/getting_started/README.md b/examples/getting_started/README.md index 04bfee821..8b74dcd8d 100644 --- a/examples/getting_started/README.md +++ b/examples/getting_started/README.md @@ -130,10 +130,11 @@ contains the concatenated token ids for all documents. Index segment: ============== -The index contains a tuple for each document with the format (byte_offset, segment_length), -where the byte_offset specifies the byte position in the data segment for the start of the document and segment_length. -Therfore, the index segment would look like [(8, 100), (108, 302), (410, 803), ...]. The first sample starts at byte position 8 and -has a length of 100 bytes. The second sample therefore starts at byte position 108 and has a length of 284 bytes and so on. +The index contains a tuple for each document with the format (byte_offset, segment_byte_length), +where the byte_offset specifies the byte position in the data segment for the start of the document and segment_length +specifies the byte length of the document. +Therfore, the index segment would look like [(0, 100), (100, 302), (410, 803), ...]. The first sample starts at byte position 0 and +has a length of 100 bytes. The second sample therefore starts at byte position 100 and has a length of 202 bytes and so on. ``` We have implemented different packing strategies on top of the file format, each making sure that a batch is completely filled up with documents without any trailing padding in the sequences. From 0f284927004124ad2af6068949dafe7ef1079a88 Mon Sep 17 00:00:00 2001 From: Max Luebbering Date: Tue, 25 Jun 2024 15:04:15 +0200 Subject: [PATCH 4/9] fix: fixed index in dummy_packed_data_path of conftest --- tests/conftest.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index 8dfc430f9..5c8ba9e5c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -37,7 +37,8 @@ def dummy_packed_data_path(tmpdir) -> Path: data += (len(tokens) * token_size_in_bytes).to_bytes(header_size_in_bytes, byteorder="little") data += token_size_in_bytes.to_bytes(4, byteorder="little") data += b"".join([t.to_bytes(token_size_in_bytes, byteorder="little") for t in tokens]) - index = [(4, 24), (28, 40), (68, 12), (80, 4)] # [(index,len), ...] -> in 4 bytes #lengths: 6,10,3,1 + # NOTE: so far none of the implemented pytests use this index though! + index = [(0, 24), (24, 40), (64, 12), (76, 4)] # [(index,len), ...] -> in 4 bytes #lengths: 6,10,3,1 data += pickle.dumps(index) dummy_packed_data_path = Path(tmpdir, "dummy.pbin") dummy_packed_data_path.write_bytes(data) From 378c59c4f68c5b4355763ef893c75a57e460f45f Mon Sep 17 00:00:00 2001 From: Max Luebbering Date: Tue, 25 Jun 2024 15:14:10 +0200 Subject: [PATCH 5/9] chore: updated readme inaccuracy --- examples/getting_started/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/getting_started/README.md b/examples/getting_started/README.md index 8b74dcd8d..a91e79c48 100644 --- a/examples/getting_started/README.md +++ b/examples/getting_started/README.md @@ -133,8 +133,8 @@ Index segment: The index contains a tuple for each document with the format (byte_offset, segment_byte_length), where the byte_offset specifies the byte position in the data segment for the start of the document and segment_length specifies the byte length of the document. -Therfore, the index segment would look like [(0, 100), (100, 302), (410, 803), ...]. The first sample starts at byte position 0 and -has a length of 100 bytes. The second sample therefore starts at byte position 100 and has a length of 202 bytes and so on. +Therfore, the index segment would look like [(0, 100), (100, 302), (402, 803), ...]. The first sample starts at byte position 0 and +has a length of 100 bytes. The second sample therefore starts at byte position 100 and has a length of 302 bytes and so on. ``` We have implemented different packing strategies on top of the file format, each making sure that a batch is completely filled up with documents without any trailing padding in the sequences. From 138fa85d63ad30cb242335ebc0d4d73bdb4b2ff1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Max=20L=C3=BCbbering?= <2804731+le1nux@users.noreply.github.com> Date: Fri, 28 Jun 2024 10:24:03 +0200 Subject: [PATCH 6/9] Update src/modalities/dataloader/create_packed_data.py Co-authored-by: Felix Stollenwerk --- src/modalities/dataloader/create_packed_data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/modalities/dataloader/create_packed_data.py b/src/modalities/dataloader/create_packed_data.py index 80c9dfc47..0687c88f1 100644 --- a/src/modalities/dataloader/create_packed_data.py +++ b/src/modalities/dataloader/create_packed_data.py @@ -162,7 +162,7 @@ def _write_batch( ) ) # The offset only applies to the data section, not the header - # When we load the file, we addtionally add the header size + # When we load the file, we add the header size to the offset # to the offset curr_offset = 0 From 455c26a96e1b19973d2398619519e8398d98fe3c Mon Sep 17 00:00:00 2001 From: Max Luebbering Date: Fri, 28 Jun 2024 10:30:31 +0200 Subject: [PATCH 7/9] chore: renamed offset to offset_in_bytes for consistency --- src/modalities/dataloader/dataset.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/modalities/dataloader/dataset.py b/src/modalities/dataloader/dataset.py index 53f0b2617..1730971f4 100644 --- a/src/modalities/dataloader/dataset.py +++ b/src/modalities/dataloader/dataset.py @@ -158,18 +158,21 @@ def __len__(self) -> int: def __getitem__(self, idx: int) -> BatchEncoding: self._check_if_inbounds(idx) # offset and length in bytes - offset, length_in_bytes = self._index[idx] + offset_in_bytes, length_in_bytes = self._index[idx] if length_in_bytes % self._token_size_in_bytes != 0: raise ValueError( f"Length of the sample in bytes is not a multiple of {self._token_size_in_bytes}." - f"Offset in bytes: {offset}, Length in bytes: {length_in_bytes}" + f"Offset in bytes: {offset_in_bytes}, Length in bytes: {length_in_bytes}" ) # numpy frombuffer takes the memmap object as the buffer # and indices the data section with the given offset (in bytes) # and length in indices of type self._token_dtype_on_disk num_tokens = length_in_bytes // self._token_size_in_bytes tokens = np.frombuffer( - buffer=self._embedded_stream_data.data, dtype=self._token_dtype_on_disk, count=num_tokens, offset=offset + buffer=self._embedded_stream_data.data, + dtype=self._token_dtype_on_disk, + count=num_tokens, + offset=offset_in_bytes, ) # torch can't convert most uint-formats, therefore we infer regular int types tokens = tokens.astype(self._token_dtype_in_ram) From 969f11ab8f033dcfc8aa3d4993910cb5e3793c44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Max=20L=C3=BCbbering?= <2804731+le1nux@users.noreply.github.com> Date: Fri, 28 Jun 2024 10:32:32 +0200 Subject: [PATCH 8/9] Update src/modalities/dataloader/create_packed_data.py Co-authored-by: Felix Stollenwerk --- src/modalities/dataloader/create_packed_data.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/modalities/dataloader/create_packed_data.py b/src/modalities/dataloader/create_packed_data.py index 0687c88f1..d71c5a3b3 100644 --- a/src/modalities/dataloader/create_packed_data.py +++ b/src/modalities/dataloader/create_packed_data.py @@ -163,7 +163,6 @@ def _write_batch( ) # The offset only applies to the data section, not the header # When we load the file, we add the header size to the offset - # to the offset curr_offset = 0 # write data section (tokens) From a8a0a1dbc6b1794f3e87cf6de3128b2b705b6610 Mon Sep 17 00:00:00 2001 From: mali-git Date: Sat, 29 Jun 2024 16:37:51 +0200 Subject: [PATCH 9/9] chore: add comments --- src/modalities/dataloader/create_index.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/modalities/dataloader/create_index.py b/src/modalities/dataloader/create_index.py index 1fc0d4d9b..656b69416 100644 --- a/src/modalities/dataloader/create_index.py +++ b/src/modalities/dataloader/create_index.py @@ -25,7 +25,9 @@ def __init__(self, src_file: Path, chunksize: int = 4096, drop_faulty_entries: b self.chunksize = chunksize self.drop_faulty_entries = drop_faulty_entries with self.src_file.open(mode="r") as fin: + # Move the cursor to the end of the file fin.seek(0, os.SEEK_END) + # Get number of characters in the file self._total_num_chars = fin.tell() self.num_chunks = self._total_num_chars // self.chunksize self._queue_of_raw_lines = queue.Queue()