Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pandas-only P2P shuffling #8635

Closed
wants to merge 27 commits into from
Closed

Conversation

hendrikmakait
Copy link
Member

Supersedes #8606

  • Tests added / passed
  • Passes pre-commit run --all-files

Comment on lines +48 to +49
- git+https://github.com/hendrikmakait/dask@p2p-pandas
- git+https://github.com/hendrikmakait/dask-expr@p2p-pandas
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reminder to revert 712e211

Copy link
Contributor

github-actions bot commented May 6, 2024

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

    29 files  +    2      29 suites  +2   10h 21m 53s ⏱️ + 1h 22m 10s
 4 053 tests  -     3   3 784 ✅  -   159    168 💤 +   62  100 ❌ + 97  1 🔥  -  3 
51 387 runs  +7 177  48 343 ✅ +5 922  2 857 💤 +1 088  185 ❌ +182  2 🔥  - 15 

For more details on these failures and errors, see this check.

Results for commit 98b1c32. ± Comparison against base commit 1ec61a1.

This pull request removes 141 and adds 138 tests. Note that renamed tests count towards both.
distributed.shuffle.tests.test_graph ‑ test_does_not_raise_on_stringified_numeric_column_name
distributed.shuffle.tests.test_graph ‑ test_raise_on_complex_numbers[cdouble]
distributed.shuffle.tests.test_graph ‑ test_raise_on_complex_numbers[clongdouble]
distributed.shuffle.tests.test_graph ‑ test_raise_on_complex_numbers[csingle]
distributed.shuffle.tests.test_graph ‑ test_raise_on_custom_objects
distributed.shuffle.tests.test_graph ‑ test_raise_on_non_string_column_name
distributed.shuffle.tests.test_graph ‑ test_raise_on_sparse_data
distributed.shuffle.tests.test_merge ‑ test_merge[False-inner]
distributed.shuffle.tests.test_merge ‑ test_merge[False-left]
distributed.shuffle.tests.test_merge ‑ test_merge[False-outer]
…
distributed.diagnostics.tests.test_memray ‑ test_basic_integration_scheduler
distributed.diagnostics.tests.test_memray ‑ test_basic_integration_scheduler_report_args[False]
distributed.diagnostics.tests.test_memray ‑ test_basic_integration_scheduler_report_args[report_args0]
distributed.diagnostics.tests.test_memray ‑ test_basic_integration_workers[1]
distributed.diagnostics.tests.test_memray ‑ test_basic_integration_workers[False]
distributed.diagnostics.tests.test_memray ‑ test_basic_integration_workers[True]
distributed.diagnostics.tests.test_memray ‑ test_basic_integration_workers_report_args[False]
distributed.diagnostics.tests.test_memray ‑ test_basic_integration_workers_report_args[report_args0]
distributed.shuffle.tests.test_merge ‑ test_merge[disk-inner]
distributed.shuffle.tests.test_merge ‑ test_merge[disk-left]
…
This pull request removes 1 skipped test and adds 59 skipped tests. Note that renamed tests count towards both.
distributed.shuffle.tests.test_graph ‑ test_raise_on_custom_objects
distributed.shuffle.tests.test_merge ‑ test_merge[memory-inner]
distributed.shuffle.tests.test_merge ‑ test_merge[memory-left]
distributed.shuffle.tests.test_merge ‑ test_merge[memory-outer]
distributed.shuffle.tests.test_merge ‑ test_merge[memory-right]
distributed.shuffle.tests.test_shuffle ‑ test_basic_integration[memory-1]
distributed.shuffle.tests.test_shuffle ‑ test_basic_integration[memory-20]
distributed.shuffle.tests.test_shuffle ‑ test_basic_integration[memory-None]
distributed.shuffle.tests.test_shuffle ‑ test_basic_lowlevel_shuffle[False-memory-False-1-1-10]
distributed.shuffle.tests.test_shuffle ‑ test_basic_lowlevel_shuffle[False-memory-False-1-1-1]
distributed.shuffle.tests.test_shuffle ‑ test_basic_lowlevel_shuffle[False-memory-False-1-10-10]
…
This pull request skips 5 and un-skips 1 tests.
distributed.shuffle.tests.test_merge
distributed.shuffle.tests.test_metrics
distributed.shuffle.tests.test_shuffle
distributed.shuffle.tests.test_shuffle ‑ test_processing_chain[False]
distributed.shuffle.tests.test_shuffle ‑ test_processing_chain[True]
distributed.tests.test_nanny ‑ test_no_unnecessary_imports_on_worker[pandas]

♻️ This comment has been updated with latest results.

@rjzamora
Copy link
Member

rjzamora commented May 6, 2024

@hendrikmakait - Is there any background or motivation you can share about this work? Seems very pandas-specific at the moment.

index: pd.Index, blocks: Sequence[Block], meta: pd.DataFrame
) -> pd.DataFrame:
import pandas as pd
from pandas.core.internals import BlockManager, make_block
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice in the long run if any logic using pandas internals could be dispatched.

@@ -22,7 +22,7 @@ dependencies:
# Distributed depends on the latest version of Dask
- pip
- pip:
- git+https://github.com/dask/dask
- git+https://github.com/hendrikmakait/dask@p2p-pandas
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reminder to revert 814c73a

@mrocklin
Copy link
Member

mrocklin commented May 6, 2024

Is there any background or motivation you can share about this work? Seems very pandas-specific at the moment.

I'll let @hendrikmakait answer fully, but my sense is that early experiments showed substantial speedups (like 30% on full TPC-H Benchmarks, not just on the shuffling portion).

Would be nice in the long run if any logic using pandas internals could be dispatched.

I can see how this would be convenient for RAPIDS folks (although probably a little inconvenient for others due to complexity). If we go this direction then maybe helping address/reduce this complexity is something that RAPIDS people could help design/execute/maintain?

@rjzamora
Copy link
Member

rjzamora commented May 6, 2024

my sense is that early experiments showed substantial speedups (like 30% on full TPC-H Benchmarks, not just on the shuffling portion).

Very nice!

I can see how this would be convenient for RAPIDS folks (although probably a little inconvenient for others due to complexity).

This is a breaking change for rapids "as is". I suppose it would be "convenient" for rapids to continue working with p2p shuffling :)

If we go this direction then maybe helping address/reduce this complexity is something that RAPIDS people could help design/execute/maintain?

I will certainly help with all of the above if doing so is both welcome and feasible. Feasibility probably depends on it being possible to isolate the logic that interacts with pandas internals. If organizing the code in this way really is "too complex", then cudf may need to rely on a completely distinct version of "p2p" to avoid slowing down Coiled engineers. However, my hope is that this is a natural way to organize things even if cudf wasn't in the picture. (fingers crossed)

For this PR, it seems fine to focus on pandas-only details (especially for now). @hendrikmakait - Perhaps we can meet offline so I can figure out whether rapids needs to prioritize (1) rolling-back p2p support, or (2) enabling the pyarrow-free approach with cudf-backed data.

@hendrikmakait
Copy link
Member Author

@hendrikmakait - Is there any background or motivation you can share about this work? Seems very pandas-specific at the moment.

There are three main benefits I see here:

  1. As @mrocklin mentioned, we have seen significant performance improvements with this. I plan to share some benchmarks later. (The Arrow-based implementation has never been optimized for performance and has some shortcomings like excessive data copying.)
  2. It removes the Arrow dependency, a consistent source of pain because of semantics differing from pandas and its strict typing.
  3. It removes complexity that never worked as intended, e.g., disk-write buffering.

For this PR, it seems fine to focus on pandas-only details (especially for now). @hendrikmakait - Perhaps we can meet offline so I can figure out whether rapids needs to prioritize (1) rolling-back p2p support, or (2) enabling the pyarrow-free approach with cudf-backed data.

Sounds good! I'll ping you offline. Looking at the code we currently have in place, I expect it to be fairly straightforward to dispatch the library-specific pieces of code or to dispatch instantiation of shuffle runs and implement a cuDF-specific version.

concatenate3.
"""
with path.open(mode="r+b") as fh:
buffer = memoryview(mmap.mmap(fh.fileno(), 0))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mmap causes us to lose strict control over the file descriptor lifecycle. This creates problems with cleanup on Windows because we can't remove files if they still have an open file descriptor.

t = t.drop([column])
splits = np.where(partition[1:] != partition[:-1])[0] + 1
splits = np.concatenate([[0], splits])
base = df[column].values.base # type: ignore[union-attr]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain the base logic a bit?
In cudf/cupy, df[column].values.base produces None.

Does it make sense to protect against the case that base in None here? (Otherwise, len(base) will produce an error)

Comment on lines +60 to +62
return pickle_bytelist(
(input_part_id, shard.index, *shard._mgr.blocks), prelude=False
)
Copy link
Member

@rjzamora rjzamora May 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fun fact: This PR works fine with cudf-backed data when I do something like the following:

    shard = shard.to_pandas()  if hasattr(shard, "to_pandas") else shard
    return pickle_bytelist(
        (input_part_id, shard.index, *shard._mgr.blocks), prelude=False
    )

as long as I also modify the last line of unpickle_and_concat_dataframe_shards to be:

    result = dd.methods.concat(shards, copy=True)
    if hasattr(type(meta), "from_pandas"):
        return type(meta).from_pandas(result)
    return result

I'm sure there are cases where the cudf->pandas->cudf round trip isn't perfect, but the small amount of logic I needed to work-around the changes is promising. (The performance is certainly not good, but that's a separate concern)

@rjzamora
Copy link
Member

I've been experimenting with PR using cudf-backed data. I think it will take some work on the rapids side to bring performance in line with main (tpch queries take a ~2x perf hit with dask-cudf), but I wouldn't consider cudf + p2p performance to be a big priority yet. My near-term priority is to keep cudf + p2p "functional". This can be achieved in a fairly simple way by defining/using the following dispatch functions:

def deconstruct_dataframe_shard(shard: pd.DataFrame) -> tuple[Any]:
    """ Deconstruct a DataFrame shard into the information needed to reconstruct it later

    Dispatch on the type of `shard`.
    The elements of the result should be whatever is needed by `restore_dataframe_shard`
    """
    ...

def restore_dataframe_shard(meta: pd.DataFrame, unpickled_shard: tuple[Any]) -> pd.DataFrame:
    """ Reconstruct an un-pickled DataFrame shard

    Dispatch on the type of `meta`.
    `unpickled_shard` corresponds to the round-tripped output of `deconstruct_dataframe_shard`.
    Converts a tuple of un-pickled data (e.g. index  and blocks) back to a DataFrame.
    """
    ...

For the sake of generality, pickle_dataframe_shard would look something like:

def pickle_dataframe_shard(input_part_id: int, shard: pd.DataFrame) -> list[pickle.PickleBuffer]:
    return pickle_bytelist(
        (input_part_id,) +  deconstruct_dataframe_shard(shard),
        prelude=False,
    )

and unpickle_and_concat_dataframe_shards would then look something like:

def unpickle_and_concat_dataframe_shards(
    b: bytes | bytearray | memoryview, meta: pd.DataFrame
) -> pd.DataFrame:
    import dask.dataframe as dd

    unpickled_shards = list(unpickle_bytestream(b))
    unpickled_shards.sort(key=first)
    shards = []

    for _, unpickled_shard in unpickled_shards:
        shards.append(restore_dataframe_shard(meta, unpickled_shard))

    return dd.methods.concat(shards, copy=True)

I suppose the only "complex" part of this general suggestion is that the dispatch functions themselves would need to be exposed/defined in dask.dataframe.

@hendrikmakait
Copy link
Member Author

hendrikmakait commented May 22, 2024

TL;DR: After running several A/B tests and profiling the code, I have concluded not to move forward with this PR because it is not a universal improvement.

Analysis

A/B tests have shown improvements ranging from 5%-30% in end-to-end runtime for almost all TPC-H queries:
Screenshot 2024-05-14 at 16 49 45

However, they have also shown disastrous results for test_set_index and to a lesser extent, test_set_index_uber_lyft.

Screenshot 2024-05-14 at 16 47 51

It looks like this PR is overfitting on OLAP-style queries that include narrow dataframes with few columns and heavily reduce data at the cost of ETL-style transformations that include wide dataframes with many columns.

Profiling the code before and after, I saw that both the transfer and the unpack phase have become significantly more expensive in test_set_index (by a factor of 3.5x and 2.4x, respectively) and we lose much time due to what looks like GIL contention. The Coiled clusters I ran these on also report a GIL contention very close to 1 (and higher than on main).

This is likely due to many small shards with very few rows. I have tried multiple possible quick wins like removing mmap to avoid the impact of page loads during GIL-restricted code paths or using df.drop() instead of del, but none of these had any positive impact. It's currently unclear how to improve this implementation in a way that reduces GIL contention such that ETL-style workloads are not impacted as harshly. There may be upstream improvements we could make that reduce GIL contention but those would also take a while to get released. I also suspect that this is yet another example of the convoy effect, so reducing GIL contention in some parts will have non-trivial effects as it had in previous attempts at improving P2P performance.

@hendrikmakait
Copy link
Member Author

Example py-spy profiles for future reference:

profiles.zip

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants