Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-16703: [R] Refactor map_batches() so it can stream results #13650

Merged
merged 7 commits into from Jul 23, 2022

Conversation

paleolimbot
Copy link
Member

No description provided.

@github-actions
Copy link

@github-actions
Copy link

⚠️ Ticket has not been started in JIRA, please click 'Start Progress'.

@paleolimbot
Copy link
Member Author

@wjones127 I'd love your review on this when you have a chance!

A note that I'm going to keep this as a draft until #13397/ARROW-16444 is merged because after it is we can pass this type of record batch reader directly into the query engine (i.e., the awkward (function(x) x$read_table()) additions in this PR can disappear because we'll be running most queries with the required event loop for SafeCallIntoR() to work.

Copy link
Member

@wjones127 wjones127 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think as_record_batch_reader.function is so cool! 😍 As a follow-up, we should add an example to the datasets vignette. It seems like it might be useful to show how to use it to generate a larger-than-memory simulated dataset.

@@ -703,6 +703,7 @@ test_that("Dataset min_rows_per_group", {

row_group_sizes <- ds %>%
map_batches(~ record_batch(nrows = .$num_rows)) %>%
(function(x) x$read_table()) %>%
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would collect() not work here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't right now because the requisite RunWithCapturedR() isn't there yet (it gets added here: https://github.com/apache/arrow/pull/13397/files#diff-0d1ff6f17f571f6a348848af7de9c05ed588d3339f46dd3bcf2808489f7dca92R132-R144 )

library(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.

source_reader <- RecordBatchReader$create(
  batches = list(
    as_record_batch(mtcars[1:10, ]),
    as_record_batch(mtcars[11:20, ]),
    as_record_batch(mtcars[21:nrow(mtcars), ])
  )
)

reader <- source_reader |> 
  map_batches(~rbind(as.data.frame(.), as.data.frame(.))) 

dplyr::collect(reader)
#> Error in `dplyr::collect()` at r/R/dplyr-collect.R:43:48:
#> ! NotImplemented: Call to R from a non-R thread without calling RunWithCapturedR
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/record_batch.h:242  ReadNext(&batch)
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/util/iterator.h:428  it_.Next()
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/compute/exec/exec_plan.cc:559  iterator_.Next()
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/record_batch.cc:337  ReadNext(&batch)
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/record_batch.cc:351  ToRecordBatches()

Created on 2022-07-19 by the reprex package (v2.0.1)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right I should have read your earlier comment to completion. Thanks for explaining!

NULL
} else {
as_record_batch(
do.call(FUN, c(list(batch, dots))),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably add a test to make sure the dots are being passed through.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was a really good catch (it needed to be c(list(batch), dots))!

@paleolimbot paleolimbot marked this pull request as ready for review July 22, 2022 17:15
@paleolimbot
Copy link
Member Author

@wjones127 any thoughts on whether or not this is too large of a change to merge? I don't know how many users we have of map_batches()...this does restrict what one can do with the result of map_batches() (for example, an exec plan that ends with head() won't work). One option is to add a lazy = TRUE|FALSE argument to allow the streaming behaviour but default to non-streaming like we did before.

@wjones127
Copy link
Member

any thoughts on whether or not this is too large of a change to merge? I don't know how many users we have of map_batches()...this does restrict what one can do with the result of map_batches() (for example, an exec plan that ends with head() won't work). One option is to add a lazy = TRUE|FALSE argument to allow the streaming behaviour but default to non-streaming like we did before.

On one hand we do mark this as experimental, so I'm no super worried about breaking it right now. But also I find "an exec plan that ends with head() won't work" alarming. Why wouldn't that work?

@paleolimbot
Copy link
Member Author

It is a bit alarming...it's because some exec plans that end in head() rely on an R-level RecordBatchReader where the ExecPlan is executing stuff in the background. The user-defined functions PR only works when we evaluate the whole plan into a table (so that we can guarantee all the R function calls have happened before we hop out of the event loop). I have ARROW-17178 for the follow-up to fix it, but it sounds like it would be best to add a lazy argument with a default that mirrors the current behaviour so that users can opt-in to the even more experimental behaviour.

@wjones127
Copy link
Member

Okay that’s less alarming than I thought. Let’s add that extra param then; it sounds like the best solution for now. Hopefully we can make a goal to stabilize and document in the next version though; we’ve been messing with it for a while.

@paleolimbot paleolimbot merged commit 70904df into apache:master Jul 23, 2022
@paleolimbot paleolimbot deleted the map-batches branch July 23, 2022 12:10
@ursabot
Copy link

ursabot commented Jul 24, 2022

Benchmark runs are scheduled for baseline = ee2e944 and contender = 70904df. 70904df is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Failed ⬇️0.0% ⬆️0.0%] ec2-t3-xlarge-us-east-2
[Failed ⬇️1.57% ⬆️0.07%] test-mac-arm
[Failed ⬇️0.54% ⬆️0.0%] ursa-i9-9960x
[Finished ⬇️0.86% ⬆️0.11%] ursa-thinkcentre-m75q
Buildkite builds:
[Failed] 70904dff ec2-t3-xlarge-us-east-2
[Failed] 70904dff test-mac-arm
[Failed] 70904dff ursa-i9-9960x
[Finished] 70904dff ursa-thinkcentre-m75q
[Failed] ee2e9448 ec2-t3-xlarge-us-east-2
[Finished] ee2e9448 test-mac-arm
[Finished] ee2e9448 ursa-i9-9960x
[Finished] ee2e9448 ursa-thinkcentre-m75q
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

kou pushed a commit that referenced this pull request Feb 20, 2023
…Hub issue numbers (#34260)

Rewrite the Jira issue numbers to the GitHub issue numbers, so that the GitHub issue numbers are automatically linked to the issues by pkgdown's auto-linking feature.

Issue numbers have been rewritten based on the following correspondence.
Also, the pkgdown settings have been changed and updated to link to GitHub.

I generated the Changelog page using the `pkgdown::build_news()` function and verified that the links work correctly.

---
ARROW-6338	#5198
ARROW-6364	#5201
ARROW-6323	#5169
ARROW-6278	#5141
ARROW-6360	#5329
ARROW-6533	#5450
ARROW-6348	#5223
ARROW-6337	#5399
ARROW-10850	#9128
ARROW-10624	#9092
ARROW-10386	#8549
ARROW-6994	#23308
ARROW-12774	#10320
ARROW-12670	#10287
ARROW-16828	#13484
ARROW-14989	#13482
ARROW-16977	#13514
ARROW-13404	#10999
ARROW-16887	#13601
ARROW-15906	#13206
ARROW-15280	#13171
ARROW-16144	#13183
ARROW-16511	#13105
ARROW-16085	#13088
ARROW-16715	#13555
ARROW-16268	#13550
ARROW-16700	#13518
ARROW-16807	#13583
ARROW-16871	#13517
ARROW-16415	#13190
ARROW-14821	#12154
ARROW-16439	#13174
ARROW-16394	#13118
ARROW-16516	#13163
ARROW-16395	#13627
ARROW-14848	#12589
ARROW-16407	#13196
ARROW-16653	#13506
ARROW-14575	#13160
ARROW-15271	#13170
ARROW-16703	#13650
ARROW-16444	#13397
ARROW-15016	#13541
ARROW-16776	#13563
ARROW-15622	#13090
ARROW-18131	#14484
ARROW-18305	#14581
ARROW-18285	#14615
* Closes: #33631

Authored-by: SHIMA Tatsuya <ts1s1andn@gmail.com>
Signed-off-by: Sutou Kouhei <kou@clear-code.com>
fatemehp pushed a commit to fatemehp/arrow that referenced this pull request Feb 24, 2023
…to GitHub issue numbers (apache#34260)

Rewrite the Jira issue numbers to the GitHub issue numbers, so that the GitHub issue numbers are automatically linked to the issues by pkgdown's auto-linking feature.

Issue numbers have been rewritten based on the following correspondence.
Also, the pkgdown settings have been changed and updated to link to GitHub.

I generated the Changelog page using the `pkgdown::build_news()` function and verified that the links work correctly.

---
ARROW-6338	apache#5198
ARROW-6364	apache#5201
ARROW-6323	apache#5169
ARROW-6278	apache#5141
ARROW-6360	apache#5329
ARROW-6533	apache#5450
ARROW-6348	apache#5223
ARROW-6337	apache#5399
ARROW-10850	apache#9128
ARROW-10624	apache#9092
ARROW-10386	apache#8549
ARROW-6994	apache#23308
ARROW-12774	apache#10320
ARROW-12670	apache#10287
ARROW-16828	apache#13484
ARROW-14989	apache#13482
ARROW-16977	apache#13514
ARROW-13404	apache#10999
ARROW-16887	apache#13601
ARROW-15906	apache#13206
ARROW-15280	apache#13171
ARROW-16144	apache#13183
ARROW-16511	apache#13105
ARROW-16085	apache#13088
ARROW-16715	apache#13555
ARROW-16268	apache#13550
ARROW-16700	apache#13518
ARROW-16807	apache#13583
ARROW-16871	apache#13517
ARROW-16415	apache#13190
ARROW-14821	apache#12154
ARROW-16439	apache#13174
ARROW-16394	apache#13118
ARROW-16516	apache#13163
ARROW-16395	apache#13627
ARROW-14848	apache#12589
ARROW-16407	apache#13196
ARROW-16653	apache#13506
ARROW-14575	apache#13160
ARROW-15271	apache#13170
ARROW-16703	apache#13650
ARROW-16444	apache#13397
ARROW-15016	apache#13541
ARROW-16776	apache#13563
ARROW-15622	apache#13090
ARROW-18131	apache#14484
ARROW-18305	apache#14581
ARROW-18285	apache#14615
* Closes: apache#33631

Authored-by: SHIMA Tatsuya <ts1s1andn@gmail.com>
Signed-off-by: Sutou Kouhei <kou@clear-code.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants