Skip to content

Conversation

@awaelchli
Copy link
Contributor

@awaelchli awaelchli commented Jul 16, 2024

Fixes #233

This PR changes/fixes the implementation of how items are assigned to workers.
Before: Chunks are first assigned to ranks, then samples from ranks assigned to workers
Now: Assign samples directly across combined world size of all workers/ranks.

This allows us to correctly apply drop_last and ensure that each rank returns the same amount of data. However, this means this PR is a breaking change.

IMPORTANT:
This changes the order in which samples are batched and returned. A consequence of this also is resuming from checkpoints made prior to this PR are not going to be restored correctly.

TODOS

  • Resuming logic
  • Chunk deletion logic
  • Apply fix for no-shuffle
  • Tests

@awaelchli awaelchli force-pushed the fix_uneven_number_of_batches2 branch from a7e425c to 265c4e9 Compare July 16, 2024 14:55
@awaelchli awaelchli force-pushed the fix_uneven_number_of_batches2 branch from ebc7e3b to b0096c5 Compare July 16, 2024 16:27
@awaelchli awaelchli marked this pull request as ready for review July 19, 2024 15:49
output_dir=data_dir,
chunk_size=190,
num_workers=4,
num_workers=1, # TODO: Want 4 here, but optimize() has deletion race condition
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like everywhere in the tests we use num_workers=1, and here I wanted 4 but there seems to be race conditions (?) on the copying/deletion of chunks, causing this test to fail because of missing chunks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

__________________ test_dataset_resume_on_future_chunks[True] __________________

shuffle = True
tmpdir = local('/tmp/pytest-of-runner/pytest-0/test_dataset_resume_on_future_0')
monkeypatch = <_pytest.monkeypatch.MonkeyPatch object at 0x7f6a4124f460>

    @pytest.mark.skipif(sys.platform == "win32", reason="Not tested on windows and MacOs")
    @mock.patch.dict(os.environ, {}, clear=True)
    @pytest.mark.timeout(60)
    @pytest.mark.parametrize("shuffle", [True, False])
    def test_dataset_resume_on_future_chunks(shuffle, tmpdir, monkeypatch):
        """This test is constructed to test resuming from a chunk past the first chunk, when subsequent chunks don't have
        the same size."""
        s3_cache_dir = str(tmpdir / "s3cache")
        optimize_data_cache_dir = str(tmpdir / "optimize_data_cache")
        optimize_cache_dir = str(tmpdir / "optimize_cache")
        data_dir = str(tmpdir / "optimized")
        monkeypatch.setenv("DATA_OPTIMIZER_DATA_CACHE_FOLDER", optimize_data_cache_dir)
        monkeypatch.setenv("DATA_OPTIMIZER_CACHE_FOLDER", optimize_cache_dir)
    
>       optimize(
            fn=_simple_preprocess,
            inputs=list(range(8)),
            output_dir=data_dir,
            chunk_size=190,
            num_workers=4,
            num_uploaders=1,
copying /tmp/pytest-of-runner/pytest-0/test_dataset_resume_on_future_0/optimize_cache/chunk-3-1.bin to /tmp/pytest-of-runner/pytest-0/test_dataset_resume_on_future_0/optimized/chunk-3-1.bin
putting /tmp/pytest-of-runner/pytest-0/test_dataset_resume_on_future_0/optimize_cache/chunk-3-1.bin on the remove queue
Worker 1 is done.
Worker 2 is done.
Worker 3 is done.
Worker 0 is done.
Workers are finished.
----------------------------- Captured stderr call -----------------------------


Progress:   0%|          | 0/8 [00:00<?, ?it/s]Process Process-85:1:
Traceback (most recent call last):
  File "/opt/hostedtoolcache/Python/3.9.19/x64/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/opt/hostedtoolcache/Python/3.9.19/x64/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/runner/work/litdata/litdata/src/litdata/processing/data_processor.py", line 259, in _upload_fn
    shutil.copy(local_filepath, output_filepath)
  File "/opt/hostedtoolcache/Python/3.9.19/x64/lib/python3.9/shutil.py", line 427, in copy
    copyfile(src, dst, follow_symlinks=follow_symlinks)
  File "/opt/hostedtoolcache/Python/3.9.19/x64/lib/python3.9/shutil.py", line 264, in copyfile
    with open(src, 'rb') as fsrc:
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/pytest-of-runner/pytest-0/test_dataset_resume_on_future_0/optimize_cache/chunk-0-0.bin'

Progress: 100%|██████████| 8/8 [00:00<00:00, 122.77it/s]
=========================== short test summary info ============================
FAILED tests/streaming/test_dataset.py::test_dataset_resume_on_future_chunks[True] - RuntimeError: All the chunks should have been deleted. Found ['chunk-0-1.bin']
====== 1 failed, 191 passed, 8 skipped, 11 warnings in [247](https://github.com/Lightning-AI/litdata/actions/runs/10010459328/job/27671682379?pr=237#step:10:248).94s (0:04:07) =======

@awaelchli awaelchli requested a review from tchaton July 19, 2024 15:50
@awaelchli awaelchli force-pushed the fix_uneven_number_of_batches2 branch from 8350fc2 to bc64b77 Compare July 19, 2024 16:28
awaelchli and others added 2 commits July 19, 2024 12:28
Co-authored-by: thomas chaton <thomas.chaton.ai@gmail.com>
@awaelchli awaelchli force-pushed the fix_uneven_number_of_batches2 branch from 6fc442e to 66017e8 Compare July 19, 2024 17:36
@awaelchli awaelchli enabled auto-merge (squash) July 19, 2024 17:44
@awaelchli awaelchli merged commit c58b673 into main Jul 19, 2024
@awaelchli awaelchli deleted the fix_uneven_number_of_batches2 branch July 19, 2024 17:57
@tchaton
Copy link
Collaborator

tchaton commented Jul 19, 2024

Awesome work @awaelchli !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Uneven number of batches returned across ranks in StreamingDataset/DataLoader

2 participants