From 9637807e7a1342c01ba4c4a472d2205691f07f37 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Tue, 4 Feb 2020 16:33:32 -0800 Subject: [PATCH 1/7] Hacking and failing to compile --- r/R/arrowExports.R | 8 ++++++++ r/R/dataset.R | 9 +++++++++ r/src/arrowExports.cpp | 32 ++++++++++++++++++++++++++++++++ r/src/arrow_types.h | 1 + r/src/dataset.cpp | 21 +++++++++++++++++++++ r/tests/testthat/test-dataset.R | 5 +++++ 6 files changed, 76 insertions(+) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 0f6b758df1468..215917b182b43 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -456,6 +456,14 @@ dataset___Scanner__ToTable <- function(scanner){ .Call(`_arrow_dataset___Scanner__ToTable` , scanner) } +dataset___Scanner__ToBatchIterators <- function(scanner){ + .Call(`_arrow_dataset___Scanner__ToBatchIterators` , scanner) +} + +RBI_get_batches <- function(rbi){ + .Call(`_arrow_RBI_get_batches` , rbi) +} + shared_ptr_is_null <- function(xp){ .Call(`_arrow_shared_ptr_is_null` , xp) } diff --git a/r/R/dataset.R b/r/R/dataset.R index 78dec8e4b92dd..dc012a88a6120 100644 --- a/r/R/dataset.R +++ b/r/R/dataset.R @@ -444,6 +444,15 @@ Scanner <- R6Class("Scanner", inherit = ArrowObject, ) ) +lapply_scanner <- function(X, FUN, ...) { + lapply(dataset___Scanner__ToBatchIterators(X), function(rbi) { + batch_ptrs <- lapply(rbi, RBI_get_batches) + lapply(batch_ptrs, function(b) { + FUN(shared_ptr(RecordBatch, b), ...) + }) + }) +} + #' @usage NULL #' @format NULL #' @rdname Scanner diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index b38edfc4ded0a..d97099e4870e8 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1799,6 +1799,36 @@ RcppExport SEXP _arrow_dataset___Scanner__ToTable(SEXP scanner_sexp){ } #endif +// dataset.cpp +#if defined(ARROW_R_WITH_ARROW) +std::vector> dataset___Scanner__ToBatchIterators(const std::shared_ptr& scanner); +RcppExport SEXP _arrow_dataset___Scanner__ToBatchIterators(SEXP scanner_sexp){ +BEGIN_RCPP + Rcpp::traits::input_parameter&>::type scanner(scanner_sexp); + return Rcpp::wrap(dataset___Scanner__ToBatchIterators(scanner)); +END_RCPP +} +#else +RcppExport SEXP _arrow_dataset___Scanner__ToBatchIterators(SEXP scanner_sexp){ + Rf_error("Cannot call dataset___Scanner__ToBatchIterators(). Please use arrow::install_arrow() to install required runtime libraries. "); +} +#endif + +// dataset.cpp +#if defined(ARROW_R_WITH_ARROW) +std::vector> RBI_get_batches(const std::shared_ptr& rbi); +RcppExport SEXP _arrow_RBI_get_batches(SEXP rbi_sexp){ +BEGIN_RCPP + Rcpp::traits::input_parameter&>::type rbi(rbi_sexp); + return Rcpp::wrap(RBI_get_batches(rbi)); +END_RCPP +} +#else +RcppExport SEXP _arrow_RBI_get_batches(SEXP rbi_sexp){ + Rf_error("Cannot call RBI_get_batches(). Please use arrow::install_arrow() to install required runtime libraries. "); +} +#endif + // datatype.cpp #if defined(ARROW_R_WITH_ARROW) bool shared_ptr_is_null(SEXP xp); @@ -5863,6 +5893,8 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_dataset___ScannerBuilder__schema", (DL_FUNC) &_arrow_dataset___ScannerBuilder__schema, 1}, { "_arrow_dataset___ScannerBuilder__Finish", (DL_FUNC) &_arrow_dataset___ScannerBuilder__Finish, 1}, { "_arrow_dataset___Scanner__ToTable", (DL_FUNC) &_arrow_dataset___Scanner__ToTable, 1}, + { "_arrow_dataset___Scanner__ToBatchIterators", (DL_FUNC) &_arrow_dataset___Scanner__ToBatchIterators, 1}, + { "_arrow_RBI_get_batches", (DL_FUNC) &_arrow_RBI_get_batches, 1}, { "_arrow_shared_ptr_is_null", (DL_FUNC) &_arrow_shared_ptr_is_null, 1}, { "_arrow_unique_ptr_is_null", (DL_FUNC) &_arrow_unique_ptr_is_null, 1}, { "_arrow_Int8__initialize", (DL_FUNC) &_arrow_Int8__initialize, 0}, diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index 632067e0f89d6..3cc19fc68403c 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -197,6 +197,7 @@ inline std::shared_ptr extract(SEXP x) { #if defined(ARROW_R_WITH_ARROW) #include +#include #include #include #include diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index 00c480f0ec9cd..2cd2c9b4e0415 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -233,4 +233,25 @@ std::shared_ptr dataset___Scanner__ToTable( return VALUE_OR_STOP(scanner->ToTable()); } +// [[arrow::export]] +std::vector> dataset___Scanner__ToBatchIterators( + const std::shared_ptr& scanner) { + auto it = VALUE_OR_STOP(scanner->Scan()); + std::vector> out; + for (auto rbi : it) { + VALUE_OR_STOP(out.push_back(rbi)); + } + return(out); +} + +// [[arrow::export]] +std::vector> RBI_get_batches( + const std::shared_ptr& rbi) { + std::vector> out; + for (auto batch : rbi) { + VALUE_OR_STOP(out.push_back(batch)); + } + return(out); +} + #endif diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index 94f10c2af47a2..08acc34c94bf3 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -206,6 +206,11 @@ test_that("Dataset with multiple file formats", { ) }) +test_that("lapply_batches", { + ds <- open_dataset(dataset_dir, partitioning = "part") + lapply_batches(ds$NewScan()$Finish(), print) +}) + test_that("partitioning = NULL to ignore partition information (but why?)", { ds <- open_dataset(hive_dir, partitioning = NULL) expect_identical(names(ds), names(df1)) # i.e. not c(names(df1), "group", "other") From 4459d77efe62159c56142223388aaa27c61c60ea Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Wed, 5 Feb 2020 13:58:52 -0800 Subject: [PATCH 2/7] Make it work and add a basic test --- r/R/arrowExports.R | 8 +++---- r/R/dataset.R | 42 ++++++++++++++++++++++++++++++--- r/R/dplyr.R | 11 +++------ r/src/arrowExports.cpp | 26 ++++++++++---------- r/src/arrow_types.h | 1 + r/src/dataset.cpp | 22 ++++++++++------- r/tests/testthat/test-dataset.R | 12 ++++++++-- 7 files changed, 84 insertions(+), 38 deletions(-) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 215917b182b43..a31d72b2272b6 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -456,12 +456,12 @@ dataset___Scanner__ToTable <- function(scanner){ .Call(`_arrow_dataset___Scanner__ToTable` , scanner) } -dataset___Scanner__ToBatchIterators <- function(scanner){ - .Call(`_arrow_dataset___Scanner__ToBatchIterators` , scanner) +dataset___Scanner__Scan <- function(scanner){ + .Call(`_arrow_dataset___Scanner__Scan` , scanner) } -RBI_get_batches <- function(rbi){ - .Call(`_arrow_RBI_get_batches` , rbi) +dataset___ScanTask__get_batches <- function(scan_task){ + .Call(`_arrow_dataset___ScanTask__get_batches` , scan_task) } shared_ptr_is_null <- function(xp){ diff --git a/r/R/dataset.R b/r/R/dataset.R index dc012a88a6120..42b2e578fb324 100644 --- a/r/R/dataset.R +++ b/r/R/dataset.R @@ -443,11 +443,47 @@ Scanner <- R6Class("Scanner", inherit = ArrowObject, ToTable = function() shared_ptr(Table, dataset___Scanner__ToTable(self)) ) ) +Scanner$create <- function(dataset, projection = NULL, filter = TRUE, use_threads = TRUE, ...) { + if (inherits(dataset, "arrow_dplyr_query") && inherits(dataset$.data, "Dataset")) { + return(Scanner$create( + dataset$.data, + dataset$selected_columns, + dataset$filtered_rows, + use_threads, + ... + )) + } + assert_is(dataset, "Dataset") + scanner_builder <- dataset$NewScan() + if (use_threads) { + scanner_builder$UseThreads() + } + if (!is.null(projection)) { + scanner_builder$Project(projection) + } + if (!isTRUE(filter)) { + scanner_builder$Filter(filter) + } + scanner_builder$Finish() +} -lapply_scanner <- function(X, FUN, ...) { - lapply(dataset___Scanner__ToBatchIterators(X), function(rbi) { - batch_ptrs <- lapply(rbi, RBI_get_batches) +map_batches <- function(X, FUN, ..., .data.frame = TRUE) { + if (.data.frame) { + lapply <- purrr::map_dfr + } + scanner <- Scanner$create(X) + # If X is arrow_dplyr_query and !all(names(X$selected_columns) == X$selected_columns) warn + FUN <- purrr::as_mapper(FUN) + message("Making ScanTasks") + scan_tasks <- dataset___Scanner__Scan(scanner) + message("Done making ScanTasks") + lapply(scan_tasks, function(rbi) { + # This outer lapply could be parallelized + message("Making Batches") + batch_ptrs <- dataset___ScanTask__get_batches(shared_ptr(Object, rbi)) lapply(batch_ptrs, function(b) { + message("Processing Batch") + # This inner lapply cannot be parallelized FUN(shared_ptr(RecordBatch, b), ...) }) }) diff --git a/r/R/dplyr.R b/r/R/dplyr.R index 3f57742404dcc..a208cbaab7c8d 100644 --- a/r/R/dplyr.R +++ b/r/R/dplyr.R @@ -239,17 +239,12 @@ collect.arrow_dplyr_query <- function(x, ...) { # Pull only the selected rows and cols into R if (query_on_dataset(x)) { # See dataset.R for Dataset and Scanner(Builder) classes - scanner_builder <- x$.data$NewScan() - scanner_builder$UseThreads() - scanner_builder$Project(colnames) - if (!isTRUE(x$filtered_rows)) { - scanner_builder$Filter(x$filtered_rows) - } - df <- as.data.frame(scanner_builder$Finish()$ToTable()) + df <- Scanner$create(x)$ToTable() } else { # This is a Table/RecordBatch. See record-batch.R for the [ method - df <- as.data.frame(x$.data[x$filtered_rows, colnames, keep_na = FALSE]) + df <- x$.data[x$filtered_rows, colnames, keep_na = FALSE] } + df <- as.data.frame(df) # In case variables were renamed, apply those names names(df) <- names(colnames) diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index d97099e4870e8..2a33dea7d3fb7 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1801,31 +1801,31 @@ RcppExport SEXP _arrow_dataset___Scanner__ToTable(SEXP scanner_sexp){ // dataset.cpp #if defined(ARROW_R_WITH_ARROW) -std::vector> dataset___Scanner__ToBatchIterators(const std::shared_ptr& scanner); -RcppExport SEXP _arrow_dataset___Scanner__ToBatchIterators(SEXP scanner_sexp){ +std::vector> dataset___Scanner__Scan(const std::shared_ptr& scanner); +RcppExport SEXP _arrow_dataset___Scanner__Scan(SEXP scanner_sexp){ BEGIN_RCPP Rcpp::traits::input_parameter&>::type scanner(scanner_sexp); - return Rcpp::wrap(dataset___Scanner__ToBatchIterators(scanner)); + return Rcpp::wrap(dataset___Scanner__Scan(scanner)); END_RCPP } #else -RcppExport SEXP _arrow_dataset___Scanner__ToBatchIterators(SEXP scanner_sexp){ - Rf_error("Cannot call dataset___Scanner__ToBatchIterators(). Please use arrow::install_arrow() to install required runtime libraries. "); +RcppExport SEXP _arrow_dataset___Scanner__Scan(SEXP scanner_sexp){ + Rf_error("Cannot call dataset___Scanner__Scan(). Please use arrow::install_arrow() to install required runtime libraries. "); } #endif // dataset.cpp #if defined(ARROW_R_WITH_ARROW) -std::vector> RBI_get_batches(const std::shared_ptr& rbi); -RcppExport SEXP _arrow_RBI_get_batches(SEXP rbi_sexp){ +std::vector> dataset___ScanTask__get_batches(const std::shared_ptr& scan_task); +RcppExport SEXP _arrow_dataset___ScanTask__get_batches(SEXP scan_task_sexp){ BEGIN_RCPP - Rcpp::traits::input_parameter&>::type rbi(rbi_sexp); - return Rcpp::wrap(RBI_get_batches(rbi)); + Rcpp::traits::input_parameter&>::type scan_task(scan_task_sexp); + return Rcpp::wrap(dataset___ScanTask__get_batches(scan_task)); END_RCPP } #else -RcppExport SEXP _arrow_RBI_get_batches(SEXP rbi_sexp){ - Rf_error("Cannot call RBI_get_batches(). Please use arrow::install_arrow() to install required runtime libraries. "); +RcppExport SEXP _arrow_dataset___ScanTask__get_batches(SEXP scan_task_sexp){ + Rf_error("Cannot call dataset___ScanTask__get_batches(). Please use arrow::install_arrow() to install required runtime libraries. "); } #endif @@ -5893,8 +5893,8 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_dataset___ScannerBuilder__schema", (DL_FUNC) &_arrow_dataset___ScannerBuilder__schema, 1}, { "_arrow_dataset___ScannerBuilder__Finish", (DL_FUNC) &_arrow_dataset___ScannerBuilder__Finish, 1}, { "_arrow_dataset___Scanner__ToTable", (DL_FUNC) &_arrow_dataset___Scanner__ToTable, 1}, - { "_arrow_dataset___Scanner__ToBatchIterators", (DL_FUNC) &_arrow_dataset___Scanner__ToBatchIterators, 1}, - { "_arrow_RBI_get_batches", (DL_FUNC) &_arrow_RBI_get_batches, 1}, + { "_arrow_dataset___Scanner__Scan", (DL_FUNC) &_arrow_dataset___Scanner__Scan, 1}, + { "_arrow_dataset___ScanTask__get_batches", (DL_FUNC) &_arrow_dataset___ScanTask__get_batches, 1}, { "_arrow_shared_ptr_is_null", (DL_FUNC) &_arrow_shared_ptr_is_null, 1}, { "_arrow_unique_ptr_is_null", (DL_FUNC) &_arrow_unique_ptr_is_null, 1}, { "_arrow_Int8__initialize", (DL_FUNC) &_arrow_Int8__initialize, 0}, diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index 3cc19fc68403c..8565801049f1e 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -215,6 +215,7 @@ inline std::shared_ptr extract(SEXP x) { #include #include #include +#include #include #include #include diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index 2cd2c9b4e0415..d1cbb65b0f0b1 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -234,22 +234,28 @@ std::shared_ptr dataset___Scanner__ToTable( } // [[arrow::export]] -std::vector> dataset___Scanner__ToBatchIterators( +std::vector> dataset___Scanner__Scan( const std::shared_ptr& scanner) { auto it = VALUE_OR_STOP(scanner->Scan()); - std::vector> out; - for (auto rbi : it) { - VALUE_OR_STOP(out.push_back(rbi)); + std::vector> out; + std::shared_ptr scan_task; + for (auto st : it) { + scan_task = VALUE_OR_STOP(st); + out.push_back(scan_task); } return(out); } // [[arrow::export]] -std::vector> RBI_get_batches( - const std::shared_ptr& rbi) { +std::vector> dataset___ScanTask__get_batches( + const std::shared_ptr& scan_task) { + arrow::RecordBatchIterator rbi; + rbi = VALUE_OR_STOP(scan_task->Execute()); std::vector> out; - for (auto batch : rbi) { - VALUE_OR_STOP(out.push_back(batch)); + std::shared_ptr batch; + for (auto b : rbi) { + batch = VALUE_OR_STOP(b); + out.push_back(batch); } return(out); } diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index 08acc34c94bf3..cae2bf3d72cea 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -206,9 +206,17 @@ test_that("Dataset with multiple file formats", { ) }) -test_that("lapply_batches", { +test_that("map_batches", { ds <- open_dataset(dataset_dir, partitioning = "part") - lapply_batches(ds$NewScan()$Finish(), print) + expect_equivalent( + ds %>% + filter(int > 5) %>% + select(int, lgl) %>% + map_batches(~summarize(., + min_int = min(int) + )), + tibble(min_int = c(6L, 101L)) + ) }) test_that("partitioning = NULL to ignore partition information (but why?)", { From 7997b880c4120bcdc7da3011d8de994a0e7cbcb5 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Wed, 5 Feb 2020 14:35:15 -0800 Subject: [PATCH 3/7] Doc and cleanup --- r/NAMESPACE | 3 +++ r/R/arrow-package.R | 2 +- r/R/dataset.R | 47 ++++++++++++++++++++++++++++++++++++-------- r/man/Scanner.Rd | 19 ++++++++++++++++-- r/man/map_batches.Rd | 30 ++++++++++++++++++++++++++++ r/src/dataset.cpp | 1 + 6 files changed, 91 insertions(+), 11 deletions(-) create mode 100644 r/man/map_batches.Rd diff --git a/r/NAMESPACE b/r/NAMESPACE index de26a53d3400a..f86bc21db9430 100644 --- a/r/NAMESPACE +++ b/r/NAMESPACE @@ -161,6 +161,7 @@ export(int64) export(int8) export(last_col) export(list_of) +export(map_batches) export(matches) export(mmap_create) export(mmap_open) @@ -205,9 +206,11 @@ importFrom(assertthat,assert_that) importFrom(bit64,print.integer64) importFrom(bit64,str.integer64) importFrom(methods,as) +importFrom(purrr,as_mapper) importFrom(purrr,map) importFrom(purrr,map2) importFrom(purrr,map_chr) +importFrom(purrr,map_dfr) importFrom(purrr,map_int) importFrom(purrr,map_lgl) importFrom(rlang,"%||%") diff --git a/r/R/arrow-package.R b/r/R/arrow-package.R index b04e735b5aedf..8fed1ea0861f7 100644 --- a/r/R/arrow-package.R +++ b/r/R/arrow-package.R @@ -16,7 +16,7 @@ # under the License. #' @importFrom R6 R6Class -#' @importFrom purrr map map_int map_lgl map_chr map2 +#' @importFrom purrr as_mapper map map2 map_chr map_dfr map_int map_lgl #' @importFrom assertthat assert_that #' @importFrom rlang list2 %||% is_false abort dots_n warn enquo quo_is_null enquos is_integerish quos eval_tidy new_data_mask syms env env_bind as_label set_names #' @importFrom Rcpp sourceCpp diff --git a/r/R/dataset.R b/r/R/dataset.R index 42b2e578fb324..ff3607fdb6e55 100644 --- a/r/R/dataset.R +++ b/r/R/dataset.R @@ -415,9 +415,20 @@ IpcFileFormat <- R6Class("IpcFileFormat", inherit = FileFormat) #' #' @description #' A `Scanner` iterates over a [Dataset]'s fragments and returns data -#' according to given row filtering and column projection. Use a -#' `ScannerBuilder`, from a `Dataset`'s `$NewScan()` method, to construct one. +#' according to given row filtering and column projection. A `ScannerBuilder` +#' can help create one. #' +#' @section Factory: +#' `Scanner$create()` wraps the `ScannerBuilder` interface to make a `Scanner`. +#' It takes the following arguments: +#' +#' * `dataset`: A `Dataset` or `arrow_dplyr_query` object, as returned by the +#' `dplyr` methods on `Dataset`. +#' * `projection`: A character vector of column names to select +#' * `filter`: A `Expression` to filter the scanned rows by, or `TRUE` (default) +#' to keep all rows. +#' * `use_threads`: logical: should scanning use multithreading? Default `TRUE` +#' * `...`: Additional arguments, currently ignored #' @section Methods: #' `ScannerBuilder` has the following methods: #' @@ -467,22 +478,42 @@ Scanner$create <- function(dataset, projection = NULL, filter = TRUE, use_thread scanner_builder$Finish() } +#' Apply a function to a stream of RecordBatches +#' +#' As an alternative to calling `collect()` on a `Dataset` query, you can +#' use this function to access the stream of `RecordBatch`es in the `Dataset`. +#' This lets you aggregate on each chunk and pull the intermediate results into +#' a `data.frame` for further aggregation, even if you couldn't fit the whole +#' `Dataset` result in memory. +#' +#' This is experimental and not recommended for production use. +#' +#' @param X A `Dataset` or `arrow_dplyr_query` object, as returned by the +#' `dplyr` methods on `Dataset`. +#' @param FUN A function or `purrr`-style lambda expression to apply to each +#' batch +#' @param ... Additional arguments passed to `FUN` +#' @param .data.frame logical: collect the resulting chunks into a single +#' `data.frame`? Default `TRUE` +#' @export map_batches <- function(X, FUN, ..., .data.frame = TRUE) { if (.data.frame) { - lapply <- purrr::map_dfr + lapply <- map_dfr } scanner <- Scanner$create(X) # If X is arrow_dplyr_query and !all(names(X$selected_columns) == X$selected_columns) warn - FUN <- purrr::as_mapper(FUN) - message("Making ScanTasks") + # that renaming is not handled + # Likewise, we aren't incorporating any group_by yet + FUN <- as_mapper(FUN) + # message("Making ScanTasks") scan_tasks <- dataset___Scanner__Scan(scanner) - message("Done making ScanTasks") + # message("Done making ScanTasks") lapply(scan_tasks, function(rbi) { # This outer lapply could be parallelized - message("Making Batches") + # message("Making Batches") batch_ptrs <- dataset___ScanTask__get_batches(shared_ptr(Object, rbi)) lapply(batch_ptrs, function(b) { - message("Processing Batch") + # message("Processing Batch") # This inner lapply cannot be parallelized FUN(shared_ptr(RecordBatch, b), ...) }) diff --git a/r/man/Scanner.Rd b/r/man/Scanner.Rd index a665c0b331e2e..2b82d2029a420 100644 --- a/r/man/Scanner.Rd +++ b/r/man/Scanner.Rd @@ -6,9 +6,24 @@ \title{Scan the contents of a dataset} \description{ A \code{Scanner} iterates over a \link{Dataset}'s fragments and returns data -according to given row filtering and column projection. Use a -\code{ScannerBuilder}, from a \code{Dataset}'s \verb{$NewScan()} method, to construct one. +according to given row filtering and column projection. A \code{ScannerBuilder} +can help create one. } +\section{Factory}{ + +\code{Scanner$create()} wraps the \code{ScannerBuilder} interface to make a \code{Scanner}. +It takes the following arguments: +\itemize{ +\item \code{dataset}: A \code{Dataset} or \code{arrow_dplyr_query} object, as returned by the +\code{dplyr} methods on \code{Dataset}. +\item \code{projection}: A character vector of column names to select +\item \code{filter}: A \code{Expression} to filter the scanned rows by, or \code{TRUE} (default) +to keep all rows. +\item \code{use_threads}: logical: should scanning use multithreading? Default \code{TRUE} +\item \code{...}: Additional arguments, currently ignored +} +} + \section{Methods}{ \code{ScannerBuilder} has the following methods: diff --git a/r/man/map_batches.Rd b/r/man/map_batches.Rd new file mode 100644 index 0000000000000..67d97a8f65522 --- /dev/null +++ b/r/man/map_batches.Rd @@ -0,0 +1,30 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/dataset.R +\name{map_batches} +\alias{map_batches} +\title{Apply a function to a stream of RecordBatches} +\usage{ +map_batches(X, FUN, ..., .data.frame = TRUE) +} +\arguments{ +\item{X}{A \code{Dataset} or \code{arrow_dplyr_query} object, as returned by the +\code{dplyr} methods on \code{Dataset}.} + +\item{FUN}{A function or \code{purrr}-style lambda expression to apply to each +batch} + +\item{...}{Additional arguments passed to \code{FUN}} + +\item{.data.frame}{logical: collect the resulting chunks into a single +\code{data.frame}? Default \code{TRUE}} +} +\description{ +As an alternative to calling \code{collect()} on a \code{Dataset} query, you can +use this function to access the stream of \code{RecordBatch}es in the \code{Dataset}. +This lets you aggregate on each chunk and pull the intermediate results into +a \code{data.frame} for further aggregation, even if you couldn't fit the whole +\code{Dataset} result in memory. +} +\details{ +This is experimental and not recommended for production use. +} diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index d1cbb65b0f0b1..be0f8db818b81 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -239,6 +239,7 @@ std::vector> dataset___Scanner__Scan( auto it = VALUE_OR_STOP(scanner->Scan()); std::vector> out; std::shared_ptr scan_task; + // TODO(npr): can this iteration be parallelized? for (auto st : it) { scan_task = VALUE_OR_STOP(st); out.push_back(scan_task); From d053c1aa8118633d834b9c6fa6a112ecda46fbfa Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Thu, 6 Feb 2020 14:12:17 -0800 Subject: [PATCH 4/7] Refactor/cleanup --- r/R/dataset.R | 25 ++++++++++++---------- r/R/dplyr.R | 37 ++++++++++++++++++++------------- r/tests/testthat/test-dataset.R | 8 ++++--- 3 files changed, 42 insertions(+), 28 deletions(-) diff --git a/r/R/dataset.R b/r/R/dataset.R index ff3607fdb6e55..f12c99ee0bfb0 100644 --- a/r/R/dataset.R +++ b/r/R/dataset.R @@ -451,7 +451,8 @@ IpcFileFormat <- R6Class("IpcFileFormat", inherit = FileFormat) #' @export Scanner <- R6Class("Scanner", inherit = ArrowObject, public = list( - ToTable = function() shared_ptr(Table, dataset___Scanner__ToTable(self)) + ToTable = function() shared_ptr(Table, dataset___Scanner__ToTable(self)), + Scan = function() map(dataset___Scanner__Scan(self), shared_ptr, class = ScanTask) ) ) Scanner$create <- function(dataset, projection = NULL, filter = TRUE, use_threads = TRUE, ...) { @@ -478,6 +479,12 @@ Scanner$create <- function(dataset, projection = NULL, filter = TRUE, use_thread scanner_builder$Finish() } +ScanTask <- R6Class("ScanTask", inherit = Object, + public = list( + Execute = function() map(dataset___ScanTask__get_batches(self), shared_ptr, class = RecordBatch) + ) +) + #' Apply a function to a stream of RecordBatches #' #' As an alternative to calling `collect()` on a `Dataset` query, you can @@ -500,22 +507,18 @@ map_batches <- function(X, FUN, ..., .data.frame = TRUE) { if (.data.frame) { lapply <- map_dfr } - scanner <- Scanner$create(X) - # If X is arrow_dplyr_query and !all(names(X$selected_columns) == X$selected_columns) warn - # that renaming is not handled - # Likewise, we aren't incorporating any group_by yet + scanner <- Scanner$create(ensure_group_vars(X)) FUN <- as_mapper(FUN) # message("Making ScanTasks") - scan_tasks <- dataset___Scanner__Scan(scanner) - # message("Done making ScanTasks") - lapply(scan_tasks, function(rbi) { + lapply(scanner$Scan(), function(scan_task) { # This outer lapply could be parallelized # message("Making Batches") - batch_ptrs <- dataset___ScanTask__get_batches(shared_ptr(Object, rbi)) - lapply(batch_ptrs, function(b) { + lapply(scan_task$Execute(), function(batch) { # message("Processing Batch") # This inner lapply cannot be parallelized - FUN(shared_ptr(RecordBatch, b), ...) + # TODO: wrap batch in arrow_dplyr_query with X$selected_columns and X$group_by_vars + # if X is arrow_dplyr_query, if some other arg (.dplyr?) == TRUE + FUN(batch, ...) }) }) } diff --git a/r/R/dplyr.R b/r/R/dplyr.R index a208cbaab7c8d..0f26402e3ab3e 100644 --- a/r/R/dplyr.R +++ b/r/R/dplyr.R @@ -229,13 +229,7 @@ set_filters <- function(.data, expressions) { } collect.arrow_dplyr_query <- function(x, ...) { - colnames <- x$selected_columns - # Be sure to retain any group_by vars - gv <- setdiff(dplyr::group_vars(x), names(colnames)) - if (length(gv)) { - colnames <- c(colnames, set_names(gv)) - } - + x <- ensure_group_vars(x) # Pull only the selected rows and cols into R if (query_on_dataset(x)) { # See dataset.R for Dataset and Scanner(Builder) classes @@ -245,18 +239,33 @@ collect.arrow_dplyr_query <- function(x, ...) { df <- x$.data[x$filtered_rows, colnames, keep_na = FALSE] } df <- as.data.frame(df) - # In case variables were renamed, apply those names - names(df) <- names(colnames) + restore_dplyr_features(df, x) +} +collect.Table <- as.data.frame.Table +collect.RecordBatch <- as.data.frame.RecordBatch +collect.Dataset <- function(x, ...) dplyr::collect(arrow_dplyr_query(x), ...) +ensure_group_vars <- function(x) { + if (inherits(x, "arrow_dplyr_query")) { + # Before pulling data from Arrow, make sure all group vars are in the projection + gv <- set_names(setdiff(dplyr::group_vars(x), names(x))) + x$selected_columns <- c(x$selected_columns, gv) + } + x +} + +restore_dplyr_features <- function(df, query) { + # An arrow_dplyr_query holds some attributes that Arrow doesn't know about + # After pulling data into a data.frame, make sure these features are carried over + + # In case variables were renamed, apply those names + names(df) <- names(query) # Preserve groupings, if present - if (length(x$group_by_vars)) { - df <- dplyr::grouped_df(df, dplyr::groups(x)) + if (length(query$group_by_vars)) { + df <- dplyr::grouped_df(df, dplyr::groups(query)) } df } -collect.Table <- as.data.frame.Table -collect.RecordBatch <- as.data.frame.RecordBatch -collect.Dataset <- function(x, ...) dplyr::collect(arrow_dplyr_query(x), ...) #' @importFrom tidyselect vars_pull pull.arrow_dplyr_query <- function(.data, var = -1) { diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index cae2bf3d72cea..b86c9f8144d24 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -212,9 +212,11 @@ test_that("map_batches", { ds %>% filter(int > 5) %>% select(int, lgl) %>% - map_batches(~summarize(., - min_int = min(int) - )), + map_batches( + ~summarize(., + min_int = min(int) + ) + ), tibble(min_int = c(6L, 101L)) ) }) From 8e3e846773e1bcf02951d96fdbed123c2cbe9b3a Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Thu, 6 Feb 2020 14:38:32 -0800 Subject: [PATCH 5/7] Test setup --- r/tests/testthat/test-dataset.R | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index b86c9f8144d24..14b5b6695670a 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -316,10 +316,21 @@ test_that("filter scalar validation doesn't crash (ARROW-7772)", { test_that("collect() on Dataset works (if fits in memory)", { expect_equal( collect(open_dataset(dataset_dir)), - rbind( - cbind(df1), - cbind(df2) - ) + rbind(df1, df2) + ) +}) + +test_that("count()", { + skip("count() is not a generic so we have to get here through summarize()") + ds <- open_dataset(dataset_dir) + df <- rbind(df1, df2) + expect_equal( + ds %>% + filter(int > 6, int < 108) %>% + count(chr), + df %>% + filter(int > 6, int < 108) %>% + count(chr) ) }) From 528ecfdfe55651facb6f18fdecb969868c17f318 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Wed, 8 Apr 2020 14:42:11 -0700 Subject: [PATCH 6/7] Fixes --- r/R/dataset.R | 2 +- r/R/dplyr.R | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/r/R/dataset.R b/r/R/dataset.R index f12c99ee0bfb0..ed3cf1e6ab9ce 100644 --- a/r/R/dataset.R +++ b/r/R/dataset.R @@ -479,7 +479,7 @@ Scanner$create <- function(dataset, projection = NULL, filter = TRUE, use_thread scanner_builder$Finish() } -ScanTask <- R6Class("ScanTask", inherit = Object, +ScanTask <- R6Class("ScanTask", inherit = ArrowObject, public = list( Execute = function() map(dataset___ScanTask__get_batches(self), shared_ptr, class = RecordBatch) ) diff --git a/r/R/dplyr.R b/r/R/dplyr.R index 0f26402e3ab3e..1d3b5eef7debb 100644 --- a/r/R/dplyr.R +++ b/r/R/dplyr.R @@ -236,7 +236,7 @@ collect.arrow_dplyr_query <- function(x, ...) { df <- Scanner$create(x)$ToTable() } else { # This is a Table/RecordBatch. See record-batch.R for the [ method - df <- x$.data[x$filtered_rows, colnames, keep_na = FALSE] + df <- x$.data[x$filtered_rows, x$selected_columns, keep_na = FALSE] } df <- as.data.frame(df) restore_dplyr_features(df, x) @@ -259,7 +259,9 @@ restore_dplyr_features <- function(df, query) { # After pulling data into a data.frame, make sure these features are carried over # In case variables were renamed, apply those names - names(df) <- names(query) + if (ncol(df)) { + names(df) <- names(query) + } # Preserve groupings, if present if (length(query$group_by_vars)) { df <- dplyr::grouped_df(df, dplyr::groups(query)) From 9cbf733ef09c812db60618f57a075dbd9c4f1fb1 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Wed, 8 Apr 2020 15:11:33 -0700 Subject: [PATCH 7/7] lint --- r/src/arrow_types.h | 2 +- r/src/dataset.cpp | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index 8565801049f1e..502ee4d2e4122 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -197,7 +197,6 @@ inline std::shared_ptr extract(SEXP x) { #if defined(ARROW_R_WITH_ARROW) #include -#include #include #include #include @@ -213,6 +212,7 @@ inline std::shared_ptr extract(SEXP x) { #include #include #include +#include #include #include #include diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index be0f8db818b81..8d87f1839f721 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -244,7 +244,7 @@ std::vector> dataset___Scanner__Scan( scan_task = VALUE_OR_STOP(st); out.push_back(scan_task); } - return(out); + return out; } // [[arrow::export]] @@ -258,7 +258,7 @@ std::vector> dataset___ScanTask__get_batches batch = VALUE_OR_STOP(b); out.push_back(batch); } - return(out); + return out; } #endif