Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[R] Use FetchNode and OrderByNode #34437

Closed
nealrichardson opened this issue Mar 3, 2023 · 10 comments · Fixed by #34685
Closed

[R] Use FetchNode and OrderByNode #34437

nealrichardson opened this issue Mar 3, 2023 · 10 comments · Fixed by #34685

Comments

@nealrichardson
Copy link
Member

Describe the enhancement requested

See #34059. There's at least one workaround we can remove and push the work into the ExecPlan instead of massaging the result after.

Component(s)

R

@westonpace
Copy link
Member

This might wait until order_by is also ready. Then there is actually quite a bit that can be removed from the R bindings. We should be able to remove all reference to ExecPlan in favor of DeclarationToReader.

@nealrichardson nealrichardson changed the title [R] Use FetchNode [R] Use FetchNode and OrderByNode Mar 21, 2023
@nealrichardson nealrichardson self-assigned this Mar 21, 2023
@nealrichardson
Copy link
Member Author

I'll start this now. Will do order_by first, then tackle the Declaration refactor after.

@nealrichardson
Copy link
Member Author

@westonpace I started wiring these up and have run into a couple of issues. If these are user error, I'll appreciate the pointers to resolving them. The change to use these nodes is a pure refactor, in that I am expecting all existing tests to pass when using these nodes--and there are a lot of tests. I haven't gotten to everything yet but am hitting a couple of errors in C++:

  • FetchNode requires order_by first and raises a validation error if ordering is not set, which means a basic SELECT * LIMIT 10 wouldn't work. I recognize that this operation is non-deterministic, as the validation message states, but it should still be permitted, right? This is causing the tests of head() to fail.
  • This is not a C++ regression I don't think, but for order_by, I get an error in one test that seems to be sorting on a dictionary column. I notice that [C++] Sorting dictionary array not implemented #29887 is open, and I think the reason the error is happening now has to do with a quirk in how the query is assembled in R--in the current implementation, we replace the sort on the dictionary column with a different sort before the sink node, so the unsupported sort doesn't end up happening. But I wanted to flag that as a nuisance that I'm surprised we haven't had to address yet.
── 1. Error (test-dplyr-summarize.R:617:3): min() and max() on character strings ───────────────────
Error in `compute.arrow_dplyr_query(x)`: Type error: Sorting not supported for type dictionary<values=string, indices=int8, ordered=0>

Also, am I correct that select_k_sink_node should be dropped and that there is no replacement select_k_node? IIRC it was never actually optimized and was just doing a full sort of the data, so while there would in theory be an efficient way to optimize an "order_by" then "fetch" as TopK, we don't have that implemented in Acero.

@nealrichardson
Copy link
Member Author

I tried just removing the validation in the fetch node like this:

--- a/cpp/src/arrow/compute/exec/fetch_node.cc
+++ b/cpp/src/arrow/compute/exec/fetch_node.cc
@@ -126,12 +126,6 @@ class FetchNode : public ExecNode, public TracedNode, util::SequencingQueue::Pro
 
   Status Validate() const override {
     ARROW_RETURN_NOT_OK(ExecNode::Validate());
-    if (inputs_[0]->ordering().is_unordered()) {
-      return Status::Invalid(
-          "Fetch node's input has no meaningful ordering and so limit/offset will be "
-          "non-deterministic.  Please establish order in some way (e.g. by inserting an "
-          "order_by node)");
-    }
     return Status::OK();
   }

and that caused the tests to hang.

@westonpace
Copy link
Member

FetchNode requires order_by first and raises a validation error if ordering is not set, which means a basic SELECT * LIMIT 10 wouldn't work.

Once #34698 merges then you should be able to ask the scan to emit data in a deterministic order (if it's a dataset scan this will be the order in which files are given to the dataset. If the dataset is created through discovery this is (usually, but not necessarily always) lexicographical ordered filenames)

Note that in-memory sources (record batch reader, table, etc.) already declare implicit ordering. So if your source is a table in memory then SELECT * LIMIT 10 should work (deterministically) today.

The performance penalty for doing so is pretty minor. So this should allow you to do SELECT * LIMIT 10 in a deterministic fashion.

However, it still wouldn't support something like SELECT * FROM left INNER JOIN right ON left.id = right.id LIMIT 10 because the join is going to randomly shuffle the data.

If we really want / need to support that non-deterministic approach then we can add a boolean flag to the fetch node options to allow_nondeterministic or something like that.

Also, am I correct that select_k_sink_node should be dropped and that there is no replacement select_k_node?

You are correct. It's still very doable but I don't see the purpose until someone gets around to implementing the more efficient solution. I think the only reason we had it before is because ordering and limit were both sink nodes and so you couldn't chain them.

and that caused the tests to hang.

Yes. The implementation will have to change slightly when we do a non-deterministic fetch but it's not too bad. We can just skip the sequencing queue and call process immediately (guarded by a mutex). Right now it's hanging because the sequencing queue just accumulates everything and never emits because it never sees the first batch (it should probably error though when it sees an unordered batch. That would be a nice cleanup).

@nealrichardson
Copy link
Member Author

Note that in-memory sources (record batch reader, table, etc.) already declare implicit ordering. So if your source is a table in memory then SELECT * LIMIT 10 should work (deterministically) today.

I'm pretty sure I saw this failing all over and not just on dataset tests. Most tests of acero in R just use tables and recordbatches.

@jorisvandenbossche
Copy link
Member

I was just testing the FetchNode in python, and at least for a Table source, I seem to get deterministic behaviour:

import pyarrow as pa
from pyarrow._acero import TableSourceNodeOptions, FetchNodeOptions, Declaration

table = pa.table({'a': np.arange(10_000_000)})
table = pa.Table.from_batches(table.to_batches(max_chunksize=1_000_000))

for _ in range(100):
    decl = Declaration.from_sequence([
        Declaration("table_source", TableSourceNodeOptions(table)), 
        Declaration("fetch", FetchNodeOptions(0, 5))
    ])

    assert decl.to_table()['a'].to_pylist() == [0, 1, 2, 3, 4]

@nealrichardson
Copy link
Member Author

I checked again with the current state of my branch and 4 tests fail if I use the FetchNode, 3 of which are on Datasets. The other one is a query on a table, but fetch comes after aggregation:

library(arrow)
library(dplyr)

mtcars %>%
    arrow_table() %>%
    summarize(mean(hp)) %>%
    head() %>%
    collect()

# Error in `compute.arrow_dplyr_query()`:
# ! Invalid: Fetch node's input has no meaningful ordering and so limit/offset will be non-deterministic.  Please establish order in some way (e.g. by inserting an order_by node)

I guess that's reasonable that it should error (or at least warn)?

Seems like I should wait for #34698 to happen so that I'm not having to special-case datasets temporarily.

@nealrichardson
Copy link
Member Author

One other question: this implicit order, can I reference it in some way? Is there some __magic_field I can reference? Suppose I wanted to reverse the implicit order. One might want to do that anyway, and since there doesn't seem to be a way with the FetchNode to take the tail of a table, I could reverse sort and take head.

@westonpace
Copy link
Member

That's a fun question.

No, there is no __magic_field that we introduce for the implicit order. We could probably hack something into the scan node to give you one but if we had window functions, then a rank() would give you one, so I'd rather spend the time implementing some kind of basic window function support.

Since there is no magic column there are some things you cannot do with the implicit order. For example, you cannot "first order by X and then by implicit" (the implicit ordering is an entire ordering and not just a sort key). Since there is no column corresponding to it you also cannot "restore" it after some kind of order-destroying operation.

However...reversing it should be possible in theory. Though reversing a dataset and taking head is a very inefficient way to compute tail.

A much better feature that (presumably) wouldn't be too difficult, would be to support a "reverse scan" that scans the data in reverse order. This would be a very efficient way to implement tail.

thisisnic pushed a commit that referenced this issue Apr 11, 2023
### Rationale for this change

See also #32991. By using the new nodes, we're closer to having all dplyr query business happening inside the ExecPlan. Unfortunately, there are still two cases where we have to apply operations in R after running a query:

* #34941: Taking head/tail on unordered data, which has non-deterministic results but that should be possible, in the case where the user wants to see a slice of the result, any slice
* #34942: Implementing tail in the FetchNode or similar would enable removing more hacks and workarounds.

Once those are resolved, we can simply further and then move to the new Declaration class.

### What changes are included in this PR?

This removes the use of different SinkNodes and many R-specific workarounds to support sorting and head/tail, so *almost* 
everything we do in a query should be represented in an ExecPlan. 

### Are these changes tested?

Yes. This is mostly an internal refactor, but behavior changes are accompanied by test updates.

### Are there any user-facing changes?

The `show_query()` method will print slightly different ExecPlans. In many cases, they will be more informative. 

`tail()` now actually returns the tail of the data in cases where the data has an implicit order (currently only in-memory tables). Previously it was non-deterministic (and would return the head or some other slice of the data).

When printing query objects that include `summarize()` when the `arrow.summarize.sort = TRUE` option is set, the sorting is correctly printed.

It's unclear if there should be changes in performance; running benchmarks would be good but it's also not clear that our benchmarks cover all affected scenarios. 

* Closes: #34437
* Closes: #31980
* Closes: #31982

Authored-by: Neal Richardson <neal.p.richardson@gmail.com>
Signed-off-by: Nic Crane <thisisnic@gmail.com>
@thisisnic thisisnic added this to the 12.0.0 milestone Apr 11, 2023
liujiacheng777 pushed a commit to LoongArch-Python/arrow that referenced this issue May 11, 2023
### Rationale for this change

See also apache#32991. By using the new nodes, we're closer to having all dplyr query business happening inside the ExecPlan. Unfortunately, there are still two cases where we have to apply operations in R after running a query:

* apache#34941: Taking head/tail on unordered data, which has non-deterministic results but that should be possible, in the case where the user wants to see a slice of the result, any slice
* apache#34942: Implementing tail in the FetchNode or similar would enable removing more hacks and workarounds.

Once those are resolved, we can simply further and then move to the new Declaration class.

### What changes are included in this PR?

This removes the use of different SinkNodes and many R-specific workarounds to support sorting and head/tail, so *almost* 
everything we do in a query should be represented in an ExecPlan. 

### Are these changes tested?

Yes. This is mostly an internal refactor, but behavior changes are accompanied by test updates.

### Are there any user-facing changes?

The `show_query()` method will print slightly different ExecPlans. In many cases, they will be more informative. 

`tail()` now actually returns the tail of the data in cases where the data has an implicit order (currently only in-memory tables). Previously it was non-deterministic (and would return the head or some other slice of the data).

When printing query objects that include `summarize()` when the `arrow.summarize.sort = TRUE` option is set, the sorting is correctly printed.

It's unclear if there should be changes in performance; running benchmarks would be good but it's also not clear that our benchmarks cover all affected scenarios. 

* Closes: apache#34437
* Closes: apache#31980
* Closes: apache#31982

Authored-by: Neal Richardson <neal.p.richardson@gmail.com>
Signed-off-by: Nic Crane <thisisnic@gmail.com>
ArgusLi pushed a commit to Bit-Quill/arrow that referenced this issue May 15, 2023
### Rationale for this change

See also apache#32991. By using the new nodes, we're closer to having all dplyr query business happening inside the ExecPlan. Unfortunately, there are still two cases where we have to apply operations in R after running a query:

* apache#34941: Taking head/tail on unordered data, which has non-deterministic results but that should be possible, in the case where the user wants to see a slice of the result, any slice
* apache#34942: Implementing tail in the FetchNode or similar would enable removing more hacks and workarounds.

Once those are resolved, we can simply further and then move to the new Declaration class.

### What changes are included in this PR?

This removes the use of different SinkNodes and many R-specific workarounds to support sorting and head/tail, so *almost* 
everything we do in a query should be represented in an ExecPlan. 

### Are these changes tested?

Yes. This is mostly an internal refactor, but behavior changes are accompanied by test updates.

### Are there any user-facing changes?

The `show_query()` method will print slightly different ExecPlans. In many cases, they will be more informative. 

`tail()` now actually returns the tail of the data in cases where the data has an implicit order (currently only in-memory tables). Previously it was non-deterministic (and would return the head or some other slice of the data).

When printing query objects that include `summarize()` when the `arrow.summarize.sort = TRUE` option is set, the sorting is correctly printed.

It's unclear if there should be changes in performance; running benchmarks would be good but it's also not clear that our benchmarks cover all affected scenarios. 

* Closes: apache#34437
* Closes: apache#31980
* Closes: apache#31982

Authored-by: Neal Richardson <neal.p.richardson@gmail.com>
Signed-off-by: Nic Crane <thisisnic@gmail.com>
rtpsw pushed a commit to rtpsw/arrow that referenced this issue May 16, 2023
### Rationale for this change

See also apache#32991. By using the new nodes, we're closer to having all dplyr query business happening inside the ExecPlan. Unfortunately, there are still two cases where we have to apply operations in R after running a query:

* apache#34941: Taking head/tail on unordered data, which has non-deterministic results but that should be possible, in the case where the user wants to see a slice of the result, any slice
* apache#34942: Implementing tail in the FetchNode or similar would enable removing more hacks and workarounds.

Once those are resolved, we can simply further and then move to the new Declaration class.

### What changes are included in this PR?

This removes the use of different SinkNodes and many R-specific workarounds to support sorting and head/tail, so *almost* 
everything we do in a query should be represented in an ExecPlan. 

### Are these changes tested?

Yes. This is mostly an internal refactor, but behavior changes are accompanied by test updates.

### Are there any user-facing changes?

The `show_query()` method will print slightly different ExecPlans. In many cases, they will be more informative. 

`tail()` now actually returns the tail of the data in cases where the data has an implicit order (currently only in-memory tables). Previously it was non-deterministic (and would return the head or some other slice of the data).

When printing query objects that include `summarize()` when the `arrow.summarize.sort = TRUE` option is set, the sorting is correctly printed.

It's unclear if there should be changes in performance; running benchmarks would be good but it's also not clear that our benchmarks cover all affected scenarios. 

* Closes: apache#34437
* Closes: apache#31980
* Closes: apache#31982

Authored-by: Neal Richardson <neal.p.richardson@gmail.com>
Signed-off-by: Nic Crane <thisisnic@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants