Skip to content

Revise divisions logic in from_pandas#9221

Merged
ian-r-rose merged 33 commits intodask:mainfrom
rjzamora:rewrite-sorted_division_locations
Aug 3, 2022
Merged

Revise divisions logic in from_pandas#9221
ian-r-rose merged 33 commits intodask:mainfrom
rjzamora:rewrite-sorted_division_locations

Conversation

@rjzamora
Copy link
Copy Markdown
Member

@rjzamora rjzamora commented Jun 28, 2022

This PR was originally meant to be a small bug-fix for #9218 - However, I struggled to work with the existing sorted_division_locations function, and eventually decided to rewrite it in a slightly different way.

Side Notes:

  • Most of the complexity in this code comes from the fact that sorted_division_locations is expected to prioritize the uniqueness of divisions elements over the output satisfying npartitions or chunksize. Although I can understand the motivation for having unique divisions, it is not clear to me that uniqueness is always more important than the partition size and/or count. Perhaps it makes sense to add something like a unique_partitions argument to from_pandas?
  • Even when there are no duplicate values in the index column, the npartitions= argument is not always satisfied. This result seems a bit suprising to me.

@ian-r-rose ian-r-rose self-requested a review June 28, 2022 15:29
@rjzamora
Copy link
Copy Markdown
Member Author

Yikes - It looks like a lot of tests start to fail when from_pandas partitioning changes. As far as I can tell, there are many places where tests actually rely on from_pandas(<data>, npartitions=n) returning fewer than n partitions.

@rjzamora
Copy link
Copy Markdown
Member Author

rjzamora commented Jun 28, 2022

It looks like the next steps are the following:

  • (Step 1) Confirm that the new from_pandas partitioning logic is what we want
  • (Step 2) Fix failing tests that are assuming bad/old from_pandas partitioning
  • (Step 3) File/fix real bugs in dask.dataframe that are exposed by this change

More about Step 1

The current version of from_pandas uses sorted_division_locations to calculate the divisions for the output DataFrame collection. Although this function accepts an npartitions or chunksize argument, from_pandas exclusively uses the chunksize argument (converting npartitions to chunksize outside of sorted_division_locations).

The docstring for sorted_division_locations includes the following examples:

    """
    Examples
    --------

    >>> L = ['A', 'B', 'C', 'D', 'E', 'F']
    >>> sorted_division_locations(L, chunksize=2)
    (['A', 'C', 'E', 'F'], [0, 2, 4, 6])

    >>> sorted_division_locations(L, chunksize=3)
    (['A', 'D', 'F'], [0, 3, 6])

    >>> L = ['A', 'A', 'A', 'A', 'B', 'B', 'B', 'C']
    >>> sorted_division_locations(L, chunksize=3)
    (['A', 'B', 'C'], [0, 4, 8])

    >>> sorted_division_locations(L, chunksize=2)
    (['A', 'B', 'C'], [0, 4, 8])

    >>> sorted_division_locations(['A'], chunksize=2)
    (['A', 'A'], [0, 1])
    """

While the first two examples seem correct to me, the second two do not. For example, the existing logic will break L=['A', 'A', 'A', 'A', 'B', 'B', 'B', 'C'] into two partitions (['A', 'A', 'A', 'A'] and ['B', 'B', 'B', 'C']) for sorted_division_locations(L, chunksize=3) (or sorted_division_locations(L, npartitions=3)). However, the logic in this PR will break L into ['A', 'A', 'A', 'A'], ['B', 'B', 'B'], and ['C']. Therefore, the new logic will satisfy an npartitions=3 argument, and will avoid returning partitions larger than chunksize=3 whenever possible.

This PR does not change the fact that an npartitions argument will always be translated into a chunksize argument outside of sorted_division_locations, but it does make the output partitioning much more likely to satisfy the npartitions argument when a "small" DataFrame collection is generated by from_pandas (which is very common in the test suite). EDIT: The npartitions argument will now always be satisfied in the case that there are not duplicate values in the index (and the total row-count is sufficient).

More about Step 2 & 3

This PR changes the behavior of many tests so that DataFrame collections being generated with from_pandas(..., npartitions=2) are now much more likely to have multiple partitions. In some cases, these tests fail because they are explicitly testing that from_pandas(..., npartitions=2) is producing a single partition (not sure why...). In many other cases, the assertions used in the test happen to be fragile enough to break for multiple partitions. In a few cases, the new behavior even seems to be exposing existing bugs. For example, merge_asof does not always produce the same result as Pandas when the left DataFrame has multiple partitions.

@github-actions github-actions bot added the array label Jun 30, 2022
@ian-r-rose
Copy link
Copy Markdown
Collaborator

I played around with this and was able to get better duplicate-value handling by keeping track of (and subtracting) the "drift" between the real and ideal chunksize as the divisions are built.

This is looking great! I haven't been able to make it do anything that seems wrong in playing around.

@rjzamora
Copy link
Copy Markdown
Member Author

I haven't been able to make it do anything that seems wrong in playing around.

I noticed that sorted_division_locations(["A", "B", "C", "C", "C", "C", "C", "C"], npartitions=3) wasn't working, but the latest commit should handle that case as well (avoiding "over-stepping").

@rjzamora
Copy link
Copy Markdown
Member Author

@jrbourbeau @jsignell @ian-r-rose - I think this PR is ready for a final review. The takeaway is that dd.from_pandas(..., npartitions=n) will almost always give you n partitions after this change (even when there are duplicate values in the index). In order to achieve stricter partitioning behavior, this PR adds the overhead of an index.unique() call (and some other related logic when duplicate values exist). Performance should not change for the common (RangeIndex) case, but the runtime will increase a bit when there are both duplicate values and a large number of unique values (performance should still be quite good when the number of uniques is small).

My personal opinion is that a minor performance sacrifice is worth having "correct" behavior.

Copy link
Copy Markdown
Collaborator

@ian-r-rose ian-r-rose left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your patience on this @rjzamora!

I took a bit of a stochastic approach to trying this out, generating 10,000 random indexes with varying levels of duplication, and I wasn't able to make anything break:

import string
import random

import dask.dataframe as dd
import pandas as pd
from dask.dataframe.core import check_divisions


def check_from_pandas(chunk: bool):
    idx = []
    for _ in range(100):
        idx.extend(random.randint(0, 10) * [random.choice(string.ascii_uppercase)])
    df = pd.DataFrame({"value": range(len(idx))})  # , index=idx)

    if chunk:
        ddf = dd.from_pandas(df, chunksize=random.randint(1, 100))
    else:
        npartitions = random.randint(1, len(df.index.unique()))
        ddf = dd.from_pandas(df, npartitions=npartitions)
        if ddf.npartitions != npartitions:
            print(f"Failed to match partition count {npartitions}, {ddf.npartitions}")

    try:
        check_divisions(ddf.divisions)
    except ValueError as e:
        print(e)


for i in range(10_000):
    check_from_pandas(False)
for i in range(100_000):
    check_from_pandas(True)

The implementation seems sound, if a bit complex (but it's trying to do a complex thing, so).

@ian-r-rose ian-r-rose merged commit c971931 into dask:main Aug 3, 2022
@cpcloud
Copy link
Copy Markdown
Contributor

cpcloud commented Aug 7, 2022

This PR seems to have broken a large number of tests in ibis: https://github.com/ibis-project/ibis/runs/7707205507?check_suite_focus=true

@cpcloud
Copy link
Copy Markdown
Contributor

cpcloud commented Aug 7, 2022

The behavior of reset_index in particular seems a bit surprising after this PR:

In [1]: import dask.dataframe as dd, pandas as pd

In [2]: df = dd.from_pandas(pd.DataFrame({'key': list("aabbc"), 'value': list(range(1, 6))}), npartitions=2)

In [3]: df
Out[3]:
Dask DataFrame Structure:
                  key  value
npartitions=2
0              object  int64
3                 ...    ...
4                 ...    ...
Dask Name: from_pandas, 2 tasks

In [4]: df.index.compute()
Out[4]: RangeIndex(start=0, stop=5, step=1)

In [5]: df.reset_index().compute()
Out[5]:
   index key  value
0      0   a      1
1      1   a      2
2      2   b      3
0      3   b      4
1      4   c      5

I'm probably just a naive Dask user, but I would expect that

df.index.compute() == df.reset_index().index.compute()

to be an array of all True values, but instead it's:

In [6]: df.index.compute() == df.reset_index().index.compute()
Out[6]: array([ True,  True,  True, False, False])

@cpcloud
Copy link
Copy Markdown
Contributor

cpcloud commented Aug 7, 2022

After looking at the docstring, it could also be that we're depending on broken behavior in our test suite/dask backend implementation.

@jrbourbeau
Copy link
Copy Markdown
Member

Thanks for reporting @cpcloud. Yeah, as you mentioned, reset_index has different behavior in pandas and dask. I've not looked deeply yet -- are all the ibis test failures related to reset_index, or are there other possible issues?

@ian-r-rose
Copy link
Copy Markdown
Collaborator

Thank you for the report @cpcloud. I think most of what is going on here is that, before this PR, dd.from_pandas() was actually not very good about giving you the number of partitions you asked for, especially for small dataframes as you are likely to see in unit tests. This was a bug, or at least highly undesirable behavior (and there were a number of places in the Dask codebase that were tripped up by this, as this PR attests!).

In the ibis test suite, I think most of your Dask dataframe test fixtures wound up having npartitions=1, even though you requested 2. I verified this by checking out ibis and throwing an assert ddf.npartitions == npartitions into your dataframe fixtures. So a lot of the test suite wasn't really testing what it was intended to (with apologies, it wasn't ibis' fault).

To elaborate a bit more on @jrbourbeau's comment -- yes, reset_index() is called per-partition in the dask dataframe, so each partition starts over from zero (documented here, but still a gotcha. So after calling reset_index, the contents of the index are likely to be not very interesting, and should probably be thrown away. If I'm reading the ibis dask executor correctly, it always goes through a reset_index call at the end. You may want to either (a) not call reset_index, (b) ensure that the index is thrown away before returning the pandas dataframe to the user, or (c) use check_index=False when comparing dataframes in your unit test suite.

It looks like the vast majority of the failures are due to the above, but perhaps not all? I particularly see things like ibis/backends/dask/tests/execution/test_functions.py::test_math_functions_decimal, which suggest there may be some index-setting on differently-partitioned datasets.

df = pd.DataFrame(
{"x": [1, 2, 3, 4, 5, 6] * 10, "y": list("abdabd") * 10},
index=pd.Series([1, 2, 3, 4, 5, 6] * 10, dtype=dtype),
index=pd.Series([10, 20, 30, 40, 50, 60] * 10, dtype=dtype),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This broke tests on aarch64, ppc64le, s390x again, cf #4561

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the report @QuLogic, is the problem the same? It seems pretty unsatisfying to have to change the specific numbers here in order to support those architectures.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out @QuLogic - I didn't realize that this was changed from [10, 20, 30, 40, 50, 60] to [1, 2, 3, 4, 5, 6] in the past. Unfortunately, using [1, 2, 3, 4, 5, 6] no longer works when from_pandas returns the correct number of partitions, so I cannot simply revert this change.

Can you share what parameters/assertions are failing? Does it help if we remove the assert all(map(len, parts)) (since it's technically okay to have empty partitions if we are increasing the number of partitions)?

Copy link
Copy Markdown
Contributor

@QuLogic QuLogic Aug 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same ones as before: test_repartition_npartitions[<lambda>0-float-5-1-True] and test_repartition_npartitions[<lambda>1-float-5-1-True]. See build.log on the other arches here.

I assume it would be okay to remove that line as it's the last line of the test and the one that's failing.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping? This is still broken in the latest version. Should I open a new issue?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the ping @QuLogic. A new issue would be good. Do you have a traceback?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be the same as before #9582.

@rjzamora rjzamora deleted the rewrite-sorted_division_locations branch August 16, 2022 14:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Using ddf.from_pandas with chunksize=1 results in one partition having two rows

5 participants