Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] Guarantee that group_by has stable ordering. #36709

Closed
coady opened this issue Jul 16, 2023 · 9 comments · Fixed by #36768
Closed

[Python] Guarantee that group_by has stable ordering. #36709

coady opened this issue Jul 16, 2023 · 9 comments · Fixed by #36768

Comments

@coady
Copy link

coady commented Jul 16, 2023

Describe the enhancement requested

As far as I can tell, it's not officially documented that grouping maintains the order of its keys, but it has seemed that way until recently. Starting with 13.0.0.dev, I've noticed ordering changes intermittently. I think this is an important feature of grouping, as with sorting.

This example outputs differences reliably on 13.0.0.dev516.

import pyarrow as pa

array = pa.chunked_array([list('aba'), list('bcb')])
table = pa.table({'key': array})
for i in range(100):
    groups = table.group_by(['key']).aggregate([])
    if groups['key'].to_pylist() != list('abc'):
        print(i, groups['key'])

Component(s)

C++, Python

coady added a commit to coady/graphique that referenced this issue Jul 18, 2023
Test fix to not rely on ordering: apache/arrow#36709. Distinct aggregation supports aliasing.
@coady
Copy link
Author

coady commented Jul 19, 2023

Another effect of having keys/batches unordered is that distinct and list aggregations will have their scalars unordered. That is more likely to be perceived as a bug. E.g., if a user sorts and then groups with the intention of the list arrays being internally sorted, that no longer works and does not really have a workaround.

@jorisvandenbossche
Copy link
Member

The groupby feature is part of the Acero query execution engine (https://arrow.apache.org/docs/dev/cpp/streaming_execution.html), and in general Acero doesn't guarantee stable ordering of batches that are executed.
It does have support for explicit ordering (indicating that the input is ordered or after sorting the input), and then some nodes (like filter) will honor and preserve that ordering of the batches, and when executing the plan and gathering the results in a Table, this order will be preserved (see https://arrow.apache.org/docs/dev/cpp/api/acero.html#_CPPv4NK5arrow5acero8ExecNode8orderingEv for details). But I assume a (hash) aggregation doesn't use this ordering for its execution (and will remove any ordering of the input to that node).

Now, I can reproduce the difference that this was indeed stable in 12.0, while not always stable in 13.0. I am not sure if something changed in the 13.0 release cycle that might have caused this, but I think in general the new behaviour is what can be expected cc @westonpace

@jorisvandenbossche
Copy link
Member

Actually, looking at what changed in the groupby implementation the last months, I suppose my clean-up PR #34769 will have caused this. Before that, pyarrow's group_by used a arrow::compute::TableGroupBy helper under the hood, and that small helper was removed to just directly use the Acero declaration the helper was wrapping.

That should have been fully equivalent, but now I see that the TableGroupBy helper was having a default of bool use_threads = false in its header file, while in the new python code we are doing decl.to_table(use_threads=True).

So that will probably explain the difference in behaviour: in 12.0, the group_by method was not yet running in parallel, while now it is.

The question is still whether we are fine with this change. We actually do have some (hash) aggregations that do depend on the input being ordered (e.g. first/last), but I don't think there is a way to "force" doing the calculation ordered for other aggregations (like hash_list), except for specifying to not run in parallel.

We should probably at least expose use_threads in group_by, so you can still set that to False to keep the old behaviour.

@jorisvandenbossche jorisvandenbossche changed the title Guarantee that group_by has stable ordering. [Python] Guarantee that group_by has stable ordering. Jul 19, 2023
jorisvandenbossche added a commit to jorisvandenbossche/arrow that referenced this issue Jul 19, 2023
@westonpace
Copy link
Member

+1 for exposing use_threads. However, even with use_threads=False there is generally not a guarantee of stable ordering if are reading from a disk. Even with use_threads=False we still read from I/O in parallel (in an async fashion). This can be disabled by setting the I/O thread pool size to 1 but that sometimes has unpleasant side effects. If your data source is in-memory, and you disable threads, then you should get stable results.

There are two different things here and I'm not sure which we are discussing:

  • The order of the keys

Stable ordering of the keys is tricky since we are essentially using an "unordered hashmap" for our grouping. Changes in the order data arrives, or even just changes in the amount of data could, in theory, change the order of the resulting keys. Perhaps the easiest thing to do for now is to just sort the results by key. At some point we could investigate an ordered hashmap if desired. In classic SQL one generally needs to add an order-by clause to the end of the query to order by the keys (and the underlying implementation may just add a sort node or it may do something more clever with the groupby).

  • The order of the grouping values

Stable ordering of the values is a bit trickier, especially in parallel. There are only a few aggregate functions which depend on this. In postgres there is special syntax for dealing with this case. Note, this is also something that can be expressed in Substrait.

SELECT xmlagg(x) FROM (SELECT x FROM test ORDER BY y DESC) AS tab;

Acero doesn't yet have the components that would be needed to do this. I think, at a minimum, you would want a way to force the aggregate "consume" operation to be serialized (with some kind of sequencing queue). Then you could use a regular order-by followed by the group-by node and get predictable results. Since you're paying the cost of ordering you could probably order first by the grouping keys, and then by the measure column. Then you could use a streaming group-by operator.

Then, of course, there is a whole different can of worms about how to effectively wrap this all up in pyarrow :)

@alippai
Copy link
Contributor

alippai commented Jul 19, 2023

Using the current unordered hashmap implementation would be the same keys from the same group next to each other? so eg.:

select col, sum(x) from tbl group by 1
C 1
C 3
C 2
A 3
A 2
A 1
B 1
B 2
B 3

This sounds pretty easy to fix just slicing / memcpy the columns.
Even using REE for the grouping col is a possibility (and implementing an REE based optimized take_ranges or similar).

@westonpace
Copy link
Member

Using the current unordered hashmap implementation would be the same keys from the same group next to each other? so eg.:

No. If the columns are keys then there will only be one output row for each combination of keys. So there is no concept of "keys from the same group".

select col, sum(x) from tbl group by 1

This is odd syntax to me. However, it looks to be the same as select col, sum(x) from tbl group by col. Either way I would expect the result would be:

C 6
A 6
B 6

@coady
Copy link
Author

coady commented Jul 20, 2023

The question is still whether we are fine with this change. We actually do have some (hash) aggregations that do depend on the input being ordered (e.g. first/last), but I don't think there is a way to "force" doing the calculation ordered for other aggregations (like hash_list), except for specifying to not run in parallel.

We should probably at least expose use_threads in group_by, so you can still set that to False to keep the old behaviour.

👍 first and last aren't usable from python as is.

>>> table.group_by('key').aggregate([('value', 'first')])
...
ArrowNotImplementedError: Using ordered aggregator in multiple threaded execution is not supported

@coady
Copy link
Author

coady commented Jul 20, 2023

Some more data points:

  • pandas: sorts keys by default. Without sorting, appears to maintain key order. Guarantees value order.
  • unique and value_counts maintain key order (apparently, but undocumented).
  • group_by on a single batch maintains key and value order (apparently, but undocumented).

The latter implies the issue is parallel execution, not an unordered hash map. I'm not clear on why grouping can't use ordered execution and have both features.

Even if a node does not care about order it should try and maintain the batch index if it can. The project and filter nodes do not care about order but they ensure that output batches keep the same index as their input batches.

@westonpace
Copy link
Member

To be clear, I fully support allowing for both ordered keys and ordered values within keys. I'm just explaining why the behavior is what it is today.

I didn't expect things to be quite as predictable as you found them. However, I do verify that I get the same results you do. I wouldn't recommend putting too much faith in the way things happen to work now. The general expectation for group_by, in C++, is that it does not maintain order. There is no regression in place to verify whatever current ordering behavior you are seeing and it could very well change. If we want to explicitly maintain key and/or value order then I think those need to be features in their own right.

coady added a commit to coady/graphique that referenced this issue Aug 27, 2023
Replaces the explicit batch iteration, with support for all aggregates. The fragment optimization is still significant enough to retain. Natively supports aliases, `first_last`, and `use_thread=False` fixes the batch ordering regression with no noticeable performance difference: apache/arrow#36709.
jorisvandenbossche added a commit that referenced this issue Oct 5, 2023
…by to have stable ordering (#36768)

### Rationale for this change

Add a `use_threads` keyword to the `group_by` method on Table, and passes this through to the Declaration.to_table call. This also allows to specify `use_threads=False` to get stable ordering of the output, and which is also required to specify for certain aggregations (eg `"first"` will fail with the default of `use_threads=True`)

### Are these changes tested?

Yes, added a test (similar to the one we have for this for `filter`), that would fail (>50% of the times) if the output was no longer ordered.

* Closes: #36709

Authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
@jorisvandenbossche jorisvandenbossche added this to the 14.0.0 milestone Oct 5, 2023
JerAguilon pushed a commit to JerAguilon/arrow that referenced this issue Oct 23, 2023
…group_by to have stable ordering (apache#36768)

### Rationale for this change

Add a `use_threads` keyword to the `group_by` method on Table, and passes this through to the Declaration.to_table call. This also allows to specify `use_threads=False` to get stable ordering of the output, and which is also required to specify for certain aggregations (eg `"first"` will fail with the default of `use_threads=True`)

### Are these changes tested?

Yes, added a test (similar to the one we have for this for `filter`), that would fail (>50% of the times) if the output was no longer ordered.

* Closes: apache#36709

Authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
loicalleyne pushed a commit to loicalleyne/arrow that referenced this issue Nov 13, 2023
…group_by to have stable ordering (apache#36768)

### Rationale for this change

Add a `use_threads` keyword to the `group_by` method on Table, and passes this through to the Declaration.to_table call. This also allows to specify `use_threads=False` to get stable ordering of the output, and which is also required to specify for certain aggregations (eg `"first"` will fail with the default of `use_threads=True`)

### Are these changes tested?

Yes, added a test (similar to the one we have for this for `filter`), that would fail (>50% of the times) if the output was no longer ordered.

* Closes: apache#36709

Authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
dgreiss pushed a commit to dgreiss/arrow that referenced this issue Feb 19, 2024
…group_by to have stable ordering (apache#36768)

### Rationale for this change

Add a `use_threads` keyword to the `group_by` method on Table, and passes this through to the Declaration.to_table call. This also allows to specify `use_threads=False` to get stable ordering of the output, and which is also required to specify for certain aggregations (eg `"first"` will fail with the default of `use_threads=True`)

### Are these changes tested?

Yes, added a test (similar to the one we have for this for `filter`), that would fail (>50% of the times) if the output was no longer ordered.

* Closes: apache#36709

Authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment