-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-13465: [R] to_arrow() from duckdb #11032
ARROW-13465: [R] to_arrow() from duckdb #11032
Conversation
383cbb4
to
398bc03
Compare
87c3a96
to
5b9d598
Compare
|
931bc0c
to
5192d72
Compare
@@ -840,6 +841,7 @@ TEST(ExecPlanExecution, ScalarSourceScalarAggSink) { | |||
})))); | |||
} | |||
|
|||
<<<<<<< HEAD |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like you have a merge issue here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm working on this again this morning — I think I'm close to constructing the proper arrow_dplyr_query
object
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Weird, this wasn't showing up in vscode, but was definitely there — should be fixed now regardless.
r/R/duckdb.R
Outdated
# * get the record batch reader from duckdb | ||
# * produce the SourceNode | ||
# * build an ExecPlan with that in place of the ScanNode you would have gotten from ExecNode_Scan | ||
plan <- ExecPlan$create() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this will work because you're making this node with an ExecPlan here, but I think you'll be creating a different ExecPlan in collect()
.
Why not return the RecordBatchReader from duckdb::duckdb_fetch_record_batch(res)? Then do the wrapping of that inside plan$Build
(or even plan$Scan
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh that's an interesting idea. Let me explore that. AFAIU we can't simply return the RecordBatchReader, since we want to be able to continue building an arrow_dplyr_query
here (unless we make all the dplyr methods available for RecordBatchReaders too)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. So to restate: allow an arrow_dplyr_query to hold a RecordBatchReader in .data (similar to how I extended it recently to allow holding an arrow_dplyr_query itself, in collapse). So you'd return an arrow_dplyr_query here, containing a RecordBatchReader.
@@ -59,6 +59,10 @@ ExecPlan <- R6Class("ExecPlan", | |||
Scan = function(dataset) { | |||
# Handle arrow_dplyr_query | |||
if (inherits(dataset, "arrow_dplyr_query")) { | |||
if(inherits(dataset$.data, "RecordBatchReader")) { | |||
return(ExecNode_ReadFromRecordBatchReader(self, dataset$.data)) | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't (yet) work, I'm getting a segfault
Warning: stack imbalance in '[[', 215 then 216
Warning: stack imbalance in 'is.null', 213 then 220
Warning: stack imbalance in '!', 212 then 219
Warning: stack imbalance in '&&', 210 then 217
Error: C stack usage 17587191479824 is too close to the limit
Warning: stack imbalance in '&&', 208 then 215
Warning: stack imbalance in '(', 207 then 214
Warning: stack imbalance in '||', 205 then 212
Warning: stack imbalance in 'if', 203 then 210
Warning: stack imbalance in '{', 199 then 218
Warning: stack imbalance in '{', 192 then 211
Warning: stack imbalance in 'is.null', 187 then 206
Warning: stack imbalance in '!', 186 then 205
Warning: stack imbalance in '&&', 184 then 203
Warning: stack imbalance in '&&', 182 then 201
Warning: stack imbalance in '(', 181 then 200
Warning: stack imbalance in '||', 179 then 198
Warning: stack imbalance in 'if', 177 then 196
Warning: stack imbalance in '{', 173 then 192
Warning: stack imbalance in '<-', 168 then 187
Warning: stack imbalance in '{', 164 then 183
Warning: stack imbalance in 'if', 162 then 181
*** caught segfault ***
address 0x0, cause 'memory not mapped'
Warning: stack imbalance in '>', 183 then 187
I'm digging into this now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a duckdb issue right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we believe so. I don't yet have a reprex, but I'm working on one.
r/R/duckdb.R
Outdated
# * build an ExecPlan with that in place of the ScanNode you would have gotten from ExecNode_Scan | ||
# source_node <- ExecNode_ReadFromRecordBatchReader(plan, RBR) | ||
.data <- duckdb::duckdb_fetch_record_batch(res) | ||
structure( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You shouldn't have to copy this from arrow_dplyr_query()
should you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I did out of expedience, but will come back and clean this up to do the right thing later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might need to collapse()
this further, judging from how you're handling this case in plan$Scan
: you don't want any filter/projection at this level of nesting
Ok, this is getting closer. I've added a test using our own recordbatchreader export and then wrapping that with With the DuckDB reader, I'm getting a number of segfaults. The messages / where is reported as a fault varies, each of the following are errors I've seen so far (on separate runs):
|
c78e54d
to
aaaa9f9
Compare
r/R/duckdb.R
Outdated
#' summarize(mean_mpg = mean(mpg, na.rm = TRUE)) %>% | ||
#' to_arrow() %>% | ||
#' collect() | ||
to_arrow <- function(.data, as_table = TRUE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ultimately, I don't believe we want to have as_table = TRUE
, but I've added it for now to show that arrow_dplyr_query(duckdb::duckdb_fetch_record_batch(res)$read_table())
(the default, as_table = TRUE
) works just fine, but when we arrow_dplyr_query(duckdb::duckdb_fetch_record_batch(res))
we get a segfault.
We might need (or want) to release with the $read_table()
version this time, and then upgrade to the version that doesn't convert to a table in 7.0.0 (after duckdb resolves the issue + releases, presuming the issue is on their side)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Were you going to remove this argument before merging?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, sorry I removed the code that used it, but not the option itself.
r/R/duckdb.R
Outdated
#' to_arrow() %>% | ||
#' collect() | ||
to_arrow <- function(.data, as_table = TRUE) { | ||
# TODO: figure out WTAF .data is before just doing stuff |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will also do this, since at the very least we should gate this to tbl
s that are backed by duckdb connections.
There might be a few other options for things we could accept here too, in principle we could accept a DBIResult
(again, backed by duckdb) which would make it easier to send SQL and avoid dbplyr totally.
r/R/query-engine.R
Outdated
@@ -59,6 +59,10 @@ ExecPlan <- R6Class("ExecPlan", | |||
Scan = function(dataset) { | |||
# Handle arrow_dplyr_query | |||
if (inherits(dataset, "arrow_dplyr_query")) { | |||
if(inherits(dataset$.data, "RecordBatchReader")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if(inherits(dataset$.data, "RecordBatchReader")) { | |
if (inherits(dataset$.data, "RecordBatchReader")) { |
MakeBackgroundGenerator(std::move(batch_it), io_executor, max_q, q_restart)); | ||
|
||
return std::function<Future<util::optional<ExecBatch>>()>([batch_gen] { | ||
// TODO(ARROW-14070) Awful workaround for MSVC 19.0 (Visual Studio 2015) bug. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bkietz is this still needed? We dropped Visual Studio 2015 didn't we?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have removed vs2015, this can be simplified.
- ARROW_ASSIGN_OR_RAISE(
- auto batch_gen,
- MakeBackgroundGenerator(std::move(batch_it), io_executor, max_q, q_restart));
-
- return std::function<Future<util::optional<ExecBatch>>()>([batch_gen] {
- // TODO(ARROW-14070) Awful workaround for MSVC 19.0 (Visual Studio 2015) bug.
- //...
+ return MakeBackgroundGenerator(std::move(batch_it), io_executor, max_q, q_restart);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you mind making a suggestion
that we can apply?
r/tests/testthat/test-dataset.R
Outdated
reader_adq <- arrow_dplyr_query(circle) | ||
|
||
tab_from_c_new <- reader_adq %>% | ||
dplyr::collect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you do more dplyr stuff here? filter/mutate/summarize something? Just to confirm that we can do more than just collect the reader into a table.
dplyr::collect() | ||
expect_equal( | ||
tab_from_c_new %>% | ||
arrange(dbl), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should be able to arrange before collect, that is supported.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm getting a segfault on that right now (though maybe I need a rebase to get the work that you've done to make that possible?) I'll add it as a TODO after we merge
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah try rebasing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm this actually still seems to persist, even after a rebase: https://github.com/apache/arrow/pull/11032/checks?check_run_id=3895824638#step:9:11237
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you get a segfault please make a followup jira
r/tests/testthat/test-duckdb.R
Outdated
# And we can continue the pipeline | ||
ds_rt <- ds %>% | ||
to_duckdb() %>% | ||
# factors don't roundtrip |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Jira? duckdb issue number?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added duckdb/duckdb#1879 which talks about implementing them in duckdb
@github-actions autotune |
b30c344
to
4ce5e50
Compare
@@ -69,7 +69,22 @@ class SinkNode : public ExecNode { | |||
util::BackpressureOptions backpressure) { | |||
PushGenerator<util::optional<ExecBatch>> push_gen(std::move(backpressure)); | |||
auto out = push_gen.producer(); | |||
*out_gen = std::move(push_gen); | |||
*out_gen = [push_gen] { | |||
// TODO(ARROW-14070) Awful workaround for MSVC 19.0 (Visual Studio 2015) bug. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bkietz same here, could you suggest a fix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just undo this change? It seems that would be enough as a fix, unless I'm missing something.
constexpr int kDefaultBackgroundQRestart = 16; | ||
|
||
ARROW_EXPORT | ||
Result<std::function<Future<util::optional<ExecBatch>>()>> MakeReaderGenerator( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a docstring here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added some here, though I'm sure they could be polished by those who know the style and execplan setup better.
20150a3
to
6b3368a
Compare
Co-authored-by: Neal Richardson <neal.p.richardson@gmail.com>
Co-authored-by: Neal Richardson <neal.p.richardson@gmail.com>
870f19f
to
38624c3
Compare
Co-authored-by: Neal Richardson <neal.p.richardson@gmail.com>
Benchmark runs are scheduled for baseline = 0059d61 and contender = b868090. b868090 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Benchmark runs are scheduled for baseline = 0059d61 and contender = b868090. b868090 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
This isn't yet fully functional, but I've unmarked[WIP]
to get fuller CI on it.This now works, something like: