Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support S3 data loading #8447

Closed
wants to merge 3 commits into from
Closed

Support S3 data loading #8447

wants to merge 3 commits into from

Conversation

jrocmar
Copy link

@jrocmar jrocmar commented Feb 16, 2024

What does this PR do ?

Support S3 data loading for the GPTDataset

This PR introduces the S3IndexedDataset, which supports loading a dataset stored in S3 in the same format as the MMapIndexedDataset. In particular, the .idx file is downloaded to a local directory at initialization so that we can memory map it and the .bin file is streamed into memory block-by-block.

In order for S3 data loading to be performant, we introduce block shuffling to GPTDataset, where we divide samples into blocks, shuffle the blocks and then shuffle within the blocks. In this way, we introduce a minimal amount of correlation into the shuffle, which can be exploited to download larger, contiguous chunks of data from S3. We set the default block size for shuffling to 1, which recovers the shuffling behavior before this commit.

Example

I now describe the S3 data loading strategy in more detail by walking through a small example. In NeMo, a sample consists of seq_length tokens. For simplicity, suppose each token is 1 byte and seq_length is 100.

Each sample then takes 100 bytes.

Suppose we have a dataset with 12 samples.

Sample index 0 is stored in bytes [0, 100), sample index 1 is stored in bytes [100, 200), ..., and sample index 11 is stored in bytes [1100, 1200).

Currently, NeMo takes the list of sample indices:

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]

And produces a shuffle_idx, which is just a permutation of those sample indices like:

[11, 3, 0, 6, 1, 9, 10, 2, 5, 7, 4, 8]

The shuffle_idx determines the order in which NeMo processes samples.

We could have the S3IndexedDataset just grab the bytes for a sample at a time. The first request would be for the bytes [1100, 1200), the second request would be for the bytes [300, 400), the third request would be for the bytes [0, 100) and so on in the order determined by shuffle_idx. That works, but it's slow, because you're making one request for each sample.

Let's try to introduce an in-memory cache. In particular, suppose the S3IndexedDataset does this:

  • If the requested bytes range [start, end) is in the cache, then extract the requested bytes range from the cache.
  • Otherwise, first refresh the cache by downloading the bytes range [start, start + cache_nbytes) and then extract the requested bytes range from the cache.

Suppose the cache_nbytes is 400. The first request would be for the bytes [1100, 1200). The cache is initially empty, so we refresh the cache by downloading the bytes [1100, 1500) and then extract the requested bytes range from the cache. The second request would be for the bytes [300, 400). Those bytes are not in the cache, so we refresh the cache by downloading the bytes [300, 700) and then extract the requested bytes range from that cache. And so on.

We actually made the problem worse. For most samples, we have to refresh the cache, so we have not reduced the number of requests much. We've just made the requests have to download a larger number of bytes. The issue is that the bytes needed for a sample index are probably not next to the bytes needed for the previous sample index.

To use the cache effectively, we have to introduce some correlation in the shuffle. In particular, we divide the original list of sample indices into blocks like:

  • [0, 1, 2, 3]
  • [4, 5, 6, 7]
  • [8, 9, 10, 11]

We then shuffle within the blocks like:

  • [3, 0, 2, 1]
  • [4, 6, 5, 7]
  • [11, 10, 8, 9]

We then shuffle the order of the blocks like:

  • [11, 10, 8, 9]
  • [4, 6, 5, 7]
  • [3, 0, 2, 1]

And we construct the block-shuffled shuffle_idx like:

[11, 10, 8, 9, 4, 6, 5, 7, 3, 0, 2, 1]

We also have to change which bytes we download on a cache miss. In particular, we download the bytes [cache_start, cache_start + cache_nbytes), where cache_start is (start//cache_nbytes) * cache_nbytes.

The first request would be for the bytes [1100, 1200). The cache is initially empty, so we refresh the cache by downloading the bytes [800, 1200) and then extract the requested bytes range from that cache. The second request would be for the bytes [1000, 1100). We extract those bytes from the cache. The third request would be for the bytes [800, 1200). We extract those bytes from the cache. And so on. In this way, we only have to refresh cache at the start of each new block.

Collection: nlp

Known issues

Each data worker will download the same cache (though still prepare different batches). It does not look like there's a straightforward, performant way to avoid that.

Alternatives

We could remove block shuffling and add an option to disable shuffling under the assumption that the dataset is preshuffled.

Changelog

  • Add specific line by line info of high level changes in this PR.

Usage

Here's an example data config:

data:
    data_impl: mmap
    seq_length: 2048
    # 128 * 1024 * 1024, i.e., 128 MiB
    data_cache_nbytes: 134217728
    #  data_cache_nbytes / (bytes per token * seq_length) 
    #  = (128 * 1024 * 1024) / (2 * 2048) = 32768
    shuffle_block_size: 32768
    shuffle_documents: false
    no_seqlen_plus_one_input_tokens: true
    data_prefix:
    - 0.1
    - s3://path/to/shard0
    - 0.1
    - s3://path/to/shard1
    ...

Jenkins CI

To run Jenkins, a NeMo User with write access must comment jenkins on the PR.

Before your PR is "Ready for review"

Pre checks:

  • Make sure you read and followed Contributor guidelines
  • Did you write any new necessary tests?
  • Did you add or update any necessary documentation?
  • Does the PR affect components that are optional to install? (Ex: Numba, Pynini, Apex etc)
    • Reviewer: Does the PR have correct import guards for all optional libraries?

PR Type:

  • New Feature
  • Bugfix
  • Documentation

If you haven't finished some of the above items you can still open "Draft" PR.

Who can review?

Anyone in the community is free to review the PR once the checks have passed.
Contributor guidelines contains specific people who can review PRs to various areas.

Additional Information

  • Related to # (issue)

This commit introduces the S3IndexedDataset, which supports
loading a dataset stored in S3 in the same format as the
MMapIndexedDataset. In particular, the .idx file is downloaded
to a local directory at initialization so that we can memory map
it and the .bin file is streamed into memory block-by-block.

In order for S3 data loading to be performant, we introduce block
shuffling to GPTDataset, where we divide samples into blocks,
shuffle the blocks and then shuffle within the blocks. In this
way, we introduce a minimal amount of correlation into the
shuffle, which can be exploited to download larger,
continguous chunks of data from S3. We set the default block
size for shuffling to 1, which recovers the shuffling behavior
before this commit.
@github-actions github-actions bot added the NLP label Feb 16, 2024
Copy link
Contributor

github-actions bot commented Mar 2, 2024

This PR is stale because it has been open for 14 days with no activity. Remove stale label or comment or update or this will be closed in 7 days.

@github-actions github-actions bot added the stale label Mar 2, 2024
Copy link
Contributor

This PR was closed because it has been inactive for 7 days since being marked as stale.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant