Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-16690: [R][FlightRPC] Additional max_chunksize parameter in do_put method #13267

Merged
merged 8 commits into from
Aug 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions r/R/flight.R
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ flight_disconnect <- function(client) {
#' @param overwrite logical: if `path` exists on `client` already, should we
#' replace it with the contents of `data`? Default is `TRUE`; if `FALSE` and
#' `path` exists, the function will error.
#' @param max_chunksize integer: Maximum size for RecordBatch chunks when a `data.frame` is sent.
#' Individual chunks may be smaller depending on the chunk layout of individual columns.
#' @return `client`, invisibly.
#' @export
flight_put <- function(client, data, path, overwrite = TRUE) {
flight_put <- function(client, data, path, overwrite = TRUE, max_chunksize = NULL) {
assert_is(data, c("data.frame", "Table", "RecordBatch"))

if (!overwrite && flight_path_exists(client, path)) {
Expand All @@ -70,8 +72,13 @@ flight_put <- function(client, data, path, overwrite = TRUE) {

py_data <- reticulate::r_to_py(data)
writer <- client$do_put(descriptor_for_path(path), py_data$schema)[[1]]
if (inherits(data, "RecordBatch")) {
if (inherits(data, "RecordBatch") && !is.null(max_chunksize)) {
warning("`max_chunksize` is not supported for flight_put with RecordBatch")
writer$write_batch(py_data)
} else if (inherits(data, "RecordBatch")) {
writer$write_batch(py_data)
} else if (!is.null(max_chunksize)) {
thatstatsguy marked this conversation as resolved.
Show resolved Hide resolved
writer$write_table(py_data, max_chunksize)
} else {
writer$write_table(py_data)
}
Expand Down
5 changes: 4 additions & 1 deletion r/man/flight_put.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions r/tests/testthat/test-python-flight.R
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,20 @@ if (process_is_running("demo_flight_server")) {
regexp = 'data must be a "data.frame", "Table", or "RecordBatch"'
)
})

test_that("flight_put with max_chunksize", {
flight_put(client, example_data, path = flight_obj, max_chunksize = 1)
expect_true(flight_path_exists(client, flight_obj))
expect_true(flight_obj %in% list_flights(client))
expect_warning(
flight_put(client, record_batch(example_data), path = flight_obj, max_chunksize = 123),
regexp = "`max_chunksize` is not supported for flight_put with RecordBatch"
)
expect_error(
flight_put(client, Array$create(c(1:3)), path = flight_obj),
regexp = 'data must be a "data.frame", "Table", or "RecordBatch"'
)
thatstatsguy marked this conversation as resolved.
Show resolved Hide resolved
})

test_that("flight_get", {
expect_identical(as.data.frame(flight_get(client, flight_obj)), example_data)
Expand Down