Skip to content
Merged
9 changes: 5 additions & 4 deletions examples/getting_started/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,11 @@ contains the concatenated token ids for all documents.

Index segment:
==============
The index contains a tuple for each document with the format (byte_offset, segment_length),
where the byte_offset specifies the byte position in the data segment for the start of the document and segment_length.
Therfore, the index segment would look like [(8, 100), (108, 302), (410, 803), ...]. The first sample starts at byte position 8 and
has a length of 100 bytes. The second sample therefore starts at byte position 108 and has a length of 284 bytes and so on.
The index contains a tuple for each document with the format (byte_offset, segment_byte_length),
where the byte_offset specifies the byte position in the data segment for the start of the document and segment_length
specifies the byte length of the document.
Therfore, the index segment would look like [(0, 100), (100, 302), (402, 803), ...]. The first sample starts at byte position 0 and
Comment thread
mali-git marked this conversation as resolved.
has a length of 100 bytes. The second sample therefore starts at byte position 100 and has a length of 302 bytes and so on.
```

We have implemented different packing strategies on top of the file format, each making sure that a batch is completely filled up with documents without any trailing padding in the sequences.
Expand Down
2 changes: 2 additions & 0 deletions src/modalities/dataloader/create_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ def __init__(self, src_file: Path, chunksize: int = 4096, drop_faulty_entries: b
self.chunksize = chunksize
self.drop_faulty_entries = drop_faulty_entries
with self.src_file.open(mode="r") as fin:
# Move the cursor to the end of the file
fin.seek(0, os.SEEK_END)
# Get number of characters in the file
self._total_num_chars = fin.tell()
self.num_chunks = self._total_num_chars // self.chunksize
self._queue_of_raw_lines = queue.Queue()
Expand Down
11 changes: 7 additions & 4 deletions src/modalities/dataloader/create_packed_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,9 @@ def _write_batch(
EmbeddedStreamData.TOKEN_SIZE_DESCRIPTOR_LENGTH_IN_BYTES, byteorder="little"
)
)
curr_offset = EmbeddedStreamData.HEADER_SIZE_IN_BYTES
# The offset only applies to the data section, not the header
# When we load the file, we add the header size to the offset
curr_offset = 0

# write data section (tokens)
pbar = tqdm(total=len(self._reader), desc="Processed batches")
Expand Down Expand Up @@ -229,8 +231,7 @@ def _process_thread(self, process_id: int):
)

def _update_data_length_in_pre_allocated_header(self, dst_path: Path, index_list: List[Tuple[int, int]]):
start_of_index_in_bytes = index_list[-1][0] + index_list[-1][1]
length_of_byte_encoded_data_section = start_of_index_in_bytes - EmbeddedStreamData.HEADER_SIZE_IN_BYTES
length_of_byte_encoded_data_section = index_list[-1][0] + index_list[-1][1]
data_section_length_in_bytes = length_of_byte_encoded_data_section.to_bytes(
EmbeddedStreamData.DATA_SECTION_LENGTH_IN_BYTES, byteorder="little"
)
Expand Down Expand Up @@ -277,7 +278,9 @@ def __init__(self, data_path: Path):
# get index
f.seek(self.HEADER_SIZE_IN_BYTES + self.data_len)
pkl_encoded_index = f.read()
self.index_base = pickle.loads(pkl_encoded_index)
# contains the start offset and length of each segment
# as byte positions in the data section
self.index_base: List[Tuple[int, int]] = pickle.loads(pkl_encoded_index)

# initialize memmapped data section
self.data = np.memmap(self._data_path, mode="r", offset=self.HEADER_SIZE_IN_BYTES, shape=(self.data_len,))
Expand Down
47 changes: 34 additions & 13 deletions src/modalities/dataloader/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@


class Dataset(TorchdataSet):
def __init__(self, raw_data_path: Path, block_size: int, sample_key: str):
def __init__(self, raw_data_path: Path, sample_key: str):
self.raw_data_path = raw_data_path
self.block_size = block_size
self.sample_key = sample_key

def _check_if_inbounds(self, idx: int):
Expand Down Expand Up @@ -51,7 +50,7 @@ def __init__(self, num_samples: int, sample_definition: Tuple[DummySampleConfig]
:param sample_definition: A list of tuples defining the dataset output.
Each touple contains the sample key, shape and data type.
"""
super().__init__(raw_data_path=None, block_size=None, sample_key=None)
super().__init__(raw_data_path=None, sample_key=None)
self.num_samples = num_samples
self.sample_definition = sample_definition

Expand All @@ -78,7 +77,6 @@ class MemMapDataset(Dataset):
def __init__(
self,
raw_data_path: Path,
block_size: int,
tokenizer: TokenizerWrapper,
sample_key: str,
index_path: Optional[Path] = None,
Expand All @@ -88,7 +86,6 @@ def __init__(
Pytorch Dataset with mmap support.

:param raw_data_path: Path to a jsonl file, which holds text data
:param block_size: alias for max sequence length. The amount of tokens the model can handle.
:param tokenizer: PretrainedTokenizer required to tokenize text data on the fly.
:param jq_pattern: jq-pattern applied on every jsonl-entry. Results are afterwards tokenized and packed
:param index_path: Path to an index file, which indicates the start character/byte position
Expand All @@ -99,7 +96,7 @@ def __init__(
TODO: If this setting should support multi-modal features using separately encoded inputs,
this needs to get replaced with a list of sample keys!
"""
super().__init__(raw_data_path=raw_data_path, block_size=block_size, sample_key=sample_key)
super().__init__(raw_data_path=raw_data_path, sample_key=sample_key)

self.reader = LargeFileLinesReader(self.raw_data_path, index_path=index_path)
self.jq_filter = jq.compile(jq_pattern)
Expand All @@ -124,7 +121,7 @@ class PackedMemMapDatasetBase(Dataset):
}
type_converter_for_torch = {1: np.uint8, 2: np.int32, 4: np.int64}

def __init__(self, raw_data_path: Path, block_size: int, sample_key: str):
def __init__(self, raw_data_path: Path, sample_key: str):
"""
Base class for packed memmapped datasets. The underlying dataset file has the structure:
| header | data | index |
Expand All @@ -134,12 +131,11 @@ def __init__(self, raw_data_path: Path, block_size: int, sample_key: str):

:param raw_data_path: Path to a packed binary file (*.pbin).
Use `modalities data pack_encoded_data` to create one based on a jsonl-file.
:param block_size: alias for max sequence length. The amount of tokens the model can handle.
:param sample_key: model-specific parameter to indicate where in the BatchEncoding the input_token_ids are.
TODO: If this setting should support multi-modal features using separately encoded inputs,
this needs to get replaced with a list of sample keys!
"""
super().__init__(raw_data_path=raw_data_path, block_size=block_size, sample_key=sample_key)
super().__init__(raw_data_path=raw_data_path, sample_key=sample_key)
self._embedded_stream_data = EmbeddedStreamData(raw_data_path)
self._token_size_in_bytes = self._embedded_stream_data.token_size_in_bytes
try:
Expand All @@ -153,23 +149,41 @@ def __init__(self, raw_data_path: Path, block_size: int, sample_key: str):
self._index = self._generate_packing_index()

def _generate_packing_index(self) -> List[Tuple[int, int]]:
raise NotImplementedError
# index is a tuple of offset and length in bytes
return self._embedded_stream_data.index_base
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not like that this method is overwritten in the inherited classes PackedMemMapDatasetContinuous and PackedMemMapDatasetMegatron, see my general comment.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will be adressed as part of a new PR and issue #167


def __len__(self) -> int:
return len(self._index)

def __getitem__(self, idx: int) -> BatchEncoding:
self._check_if_inbounds(idx)
offset, length = self._index[idx]
# offset and length in bytes
offset_in_bytes, length_in_bytes = self._index[idx]
if length_in_bytes % self._token_size_in_bytes != 0:
raise ValueError(
f"Length of the sample in bytes is not a multiple of {self._token_size_in_bytes}."
f"Offset in bytes: {offset_in_bytes}, Length in bytes: {length_in_bytes}"
)
# numpy frombuffer takes the memmap object as the buffer
# and indices the data section with the given offset (in bytes)
# and length in indices of type self._token_dtype_on_disk
num_tokens = length_in_bytes // self._token_size_in_bytes
tokens = np.frombuffer(
self._embedded_stream_data.data, dtype=self._token_dtype_on_disk, count=length, offset=offset
buffer=self._embedded_stream_data.data,
dtype=self._token_dtype_on_disk,
count=num_tokens,
offset=offset_in_bytes,
)
# torch can't convert most uint-formats, therefore we infer regular int types
tokens = tokens.astype(self._token_dtype_in_ram)
return BatchEncoding(data={self.sample_key: tokens})


class PackedMemMapDatasetContinuous(PackedMemMapDatasetBase):
def __init__(self, raw_data_path: Path, sample_key: str, block_size: int):
self.block_size = block_size
super().__init__(raw_data_path=raw_data_path, sample_key=sample_key)

def _generate_packing_index(self) -> List[Tuple[int, int]]:
# get number of total tokens in file
total_tokens = self._embedded_stream_data.data_len // self._token_size_in_bytes
Expand All @@ -187,10 +201,17 @@ def _generate_packing_index(self) -> List[Tuple[int, int]]:
# of the subsequent sample).
num_samples = (total_tokens - self.block_size) // (self.block_size - 1) + 1
# given num_samples we calculate the starting index and length of each sample as tuple.
return [((i * self.block_size - i) * self._token_size_in_bytes, self.block_size) for i in range(num_samples)]
return [
((i * self.block_size - i) * self._token_size_in_bytes, self.block_size * self._token_size_in_bytes)
for i in range(num_samples)
]


class PackedMemMapDatasetMegatron(PackedMemMapDatasetBase):
def __init__(self, raw_data_path: Path, sample_key: str, block_size: int):
self.block_size = block_size
super().__init__(raw_data_path=raw_data_path, sample_key=sample_key)

def _generate_packing_index(self) -> List[Tuple[int, int]]:
index = []
curr_offset = self.HEADER_SIZE_IN_BYTES
Expand Down
3 changes: 2 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ def dummy_packed_data_path(tmpdir) -> Path:
data += (len(tokens) * token_size_in_bytes).to_bytes(header_size_in_bytes, byteorder="little")
data += token_size_in_bytes.to_bytes(4, byteorder="little")
data += b"".join([t.to_bytes(token_size_in_bytes, byteorder="little") for t in tokens])
index = [(4, 24), (28, 40), (68, 12), (80, 4)] # [(index,len), ...] -> in 4 bytes #lengths: 6,10,3,1
# NOTE: so far none of the implemented pytests use this index though!
index = [(0, 24), (24, 40), (64, 12), (76, 4)] # [(index,len), ...] -> in 4 bytes #lengths: 6,10,3,1
data += pickle.dumps(index)
dummy_packed_data_path = Path(tmpdir, "dummy.pbin")
dummy_packed_data_path.write_bytes(data)
Expand Down
45 changes: 42 additions & 3 deletions tests/dataloader/test_packed_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,19 @@
import pytest

from modalities.dataloader.create_packed_data import EmbeddedStreamData, PackedDataGenerator, join_embedded_stream_data
from modalities.dataloader.dataset import PackedMemMapDatasetContinuous, PackedMemMapDatasetMegatron
from modalities.dataloader.dataset import (
PackedMemMapDatasetBase,
PackedMemMapDatasetContinuous,
PackedMemMapDatasetMegatron,
)
from modalities.models.gpt2.collator import GPT2LLMCollateFn


@pytest.mark.parametrize("block_size, expected_length", [(1, 4), (2, 3), (3, 3), (10, 2), (6, 2), (20, 1), (25, 0)])
def test_packed_megatron_dataset_loading(dummy_packed_data_path, block_size, expected_length):
ds = PackedMemMapDatasetMegatron(dummy_packed_data_path, block_size, sample_key="input_ids")
ds = PackedMemMapDatasetMegatron(
raw_data_path=dummy_packed_data_path, block_size=block_size, sample_key="input_ids"
)
assert len(ds) == expected_length


Expand Down Expand Up @@ -66,7 +72,9 @@ def test_packed_megatron_dataset_loading(dummy_packed_data_path, block_size, exp
)
def test_packed_continuous_dataset_loading(dummy_packed_data_path, block_size, expected_length, expected_output):
try:
ds = PackedMemMapDatasetContinuous(dummy_packed_data_path, block_size, sample_key="input_ids")
ds = PackedMemMapDatasetContinuous(
raw_data_path=dummy_packed_data_path, block_size=block_size, sample_key="input_ids"
)
except ValueError:
assert expected_output == ValueError
return
Expand Down Expand Up @@ -202,3 +210,34 @@ def test_conversion_tokens_represented_as_unsigned_ints(tmpdir, token_size_in_by
collator = GPT2LLMCollateFn(sample_key=sample_key, target_key="abc")
for batch in zip(ds, ds):
collator(list(batch))


def test_original_samples_in_packed_dataset(indexed_dummy_data_path_long, wrapped_gpt2_tokenizer):
# In this test, we create a packed dataset from a long jsonl file
# and iterate over the packed dataset to check if the tokenization is correct.
# We do so by manually tokenizing the jsonl file and comparing the tokenized
# output with the packed dataset
packed_generator = PackedDataGenerator(
src_path=indexed_dummy_data_path_long.raw_data_path,
tokenizer=wrapped_gpt2_tokenizer,
number_of_processes=5,
eod_token="<|endoftext|>",
index_path=indexed_dummy_data_path_long.index_path,
jq_pattern=".text",
processing_batch_size=5,
raw_samples_queue_size=3,
processed_samples_queue_size=3,
)
default_packed_dataset_path = packed_generator._default_destination_path()
assert not default_packed_dataset_path.is_file()
packed_generator.run()
packed_dataset = PackedMemMapDatasetBase(default_packed_dataset_path, sample_key="input_ids")
# read in the raw jsonl files for manual tokenization
with open(indexed_dummy_data_path_long.raw_data_path) as f:
jsonl_list = [json.loads(line)["text"] for line in f]

eod_token_id = wrapped_gpt2_tokenizer.get_token_id("<|endoftext|>")
jsonl_tokenized = [wrapped_gpt2_tokenizer.tokenize(v) + [eod_token_id] for v in jsonl_list]

for sample, original_sample in zip(packed_dataset, jsonl_tokenized):
assert sample["input_ids"].tolist() == original_sample