Skip to content

Commit

Permalink
ARROW-16690: [R][FlightRPC] Additional max_chunksize parameter in do_…
Browse files Browse the repository at this point in the history
…put method (#13267)

**Summary**
An additional parameter in Flight do_put to specify chunk size in R.

**Problem**
Currently, all data is sent through in a single message. It's a likely scenario that users will want the ability to control the batch sizes without building a custom do_put method.

**Solution**
Additional (optional) parameter to specify chunk size.

Lead-authored-by: Christopher.Dunderdale <Christopher.Dunderdale@dyna-mo.com>
Co-authored-by: Christopher Dunderdale <47271795+thatstatsguy@users.noreply.github.com>
Signed-off-by: Dewey Dunnington <dewey@fishandwhistle.net>
  • Loading branch information
thatstatsguy committed Aug 22, 2022
1 parent 5f84335 commit 6d8624b
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 3 deletions.
11 changes: 9 additions & 2 deletions r/R/flight.R
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ flight_disconnect <- function(client) {
#' @param overwrite logical: if `path` exists on `client` already, should we
#' replace it with the contents of `data`? Default is `TRUE`; if `FALSE` and
#' `path` exists, the function will error.
#' @param max_chunksize integer: Maximum size for RecordBatch chunks when a `data.frame` is sent.
#' Individual chunks may be smaller depending on the chunk layout of individual columns.
#' @return `client`, invisibly.
#' @export
flight_put <- function(client, data, path, overwrite = TRUE) {
flight_put <- function(client, data, path, overwrite = TRUE, max_chunksize = NULL) {
assert_is(data, c("data.frame", "Table", "RecordBatch"))

if (!overwrite && flight_path_exists(client, path)) {
Expand All @@ -70,8 +72,13 @@ flight_put <- function(client, data, path, overwrite = TRUE) {

py_data <- reticulate::r_to_py(data)
writer <- client$do_put(descriptor_for_path(path), py_data$schema)[[1]]
if (inherits(data, "RecordBatch")) {
if (inherits(data, "RecordBatch") && !is.null(max_chunksize)) {
warning("`max_chunksize` is not supported for flight_put with RecordBatch")
writer$write_batch(py_data)
} else if (inherits(data, "RecordBatch")) {
writer$write_batch(py_data)
} else if (!is.null(max_chunksize)) {
writer$write_table(py_data, max_chunksize)
} else {
writer$write_table(py_data)
}
Expand Down
5 changes: 4 additions & 1 deletion r/man/flight_put.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions r/tests/testthat/test-python-flight.R
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,20 @@ if (process_is_running("demo_flight_server")) {
regexp = 'data must be a "data.frame", "Table", or "RecordBatch"'
)
})

test_that("flight_put with max_chunksize", {
flight_put(client, example_data, path = flight_obj, max_chunksize = 1)
expect_true(flight_path_exists(client, flight_obj))
expect_true(flight_obj %in% list_flights(client))
expect_warning(
flight_put(client, record_batch(example_data), path = flight_obj, max_chunksize = 123),
regexp = "`max_chunksize` is not supported for flight_put with RecordBatch"
)
expect_error(
flight_put(client, Array$create(c(1:3)), path = flight_obj),
regexp = 'data must be a "data.frame", "Table", or "RecordBatch"'
)
})

test_that("flight_get", {
expect_identical(as.data.frame(flight_get(client, flight_obj)), example_data)
Expand Down

0 comments on commit 6d8624b

Please sign in to comment.