From 9662dd6a314af264699d99081953ece87771ebe9 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Thu, 9 Apr 2020 16:05:43 -0400 Subject: [PATCH] ARROW-8376: [R] Add experimental interface to ScanTask/RecordBatch iterators MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit As an alternative to calling `ToTable()` to bring everything into memory, it would be nice to expose the stream of batches so that you could aggregate (or really do whatever) on each chunk. That gives access to the full dataset, which otherwise you can't handle unless it's small. On the NYC taxi dataset (10.5 years, 125 parquet files), ```r tab <- ds %>% select(passenger_count) %>% map_batches(~count(., passenger_count)) %>% group_by(passenger_count) %>% summarize(n = sum(n)) ``` gives me the tabulation of `passenger_count` in about 200s (no parallelization). And you can see all sorts of weird features in the data: ``` > as.data.frame(tab) passenger_count n 1 -127 7 2 -123 1 3 -122 1 4 -119 1 5 -115 1 6 -101 1 7 -98 1 8 -96 1 9 -93 1 10 -92 1 11 -91 1 12 -79 1 13 -64 2 14 -63 1 15 -48 1508 16 -45 1 17 -43 4 18 -33 1 19 -31 1 20 -9 1 21 -7 1 22 -6 3 23 -2 1 24 -1 10 25 0 5809809 26 1 1078624900 27 2 227454966 28 3 67096194 29 4 32443710 30 5 99064441 31 6 37241244 32 7 1753 33 8 1437 34 9 1304 35 10 17 36 13 1 37 15 2 38 17 1 39 19 1 40 25 1 41 33 2 42 34 1 43 36 1 44 37 1 45 38 1 46 47 1 47 49 26 48 53 1 49 58 2 50 61 1 51 65 3 52 66 1 53 69 1 54 70 1 55 84 1 56 96 1 57 97 1 58 113 1 59 125 1 ``` Closes #6365 from nealrichardson/r-map-reduce Authored-by: Neal Richardson Signed-off-by: François Saint-Jacques --- r/NAMESPACE | 3 ++ r/R/arrow-package.R | 2 +- r/R/arrowExports.R | 8 ++++ r/R/dataset.R | 85 +++++++++++++++++++++++++++++++-- r/R/dplyr.R | 50 ++++++++++--------- r/man/Scanner.Rd | 19 +++++++- r/man/map_batches.Rd | 30 ++++++++++++ r/src/arrowExports.cpp | 32 +++++++++++++ r/src/arrow_types.h | 2 + r/src/dataset.cpp | 28 +++++++++++ r/tests/testthat/test-dataset.R | 34 +++++++++++-- 11 files changed, 261 insertions(+), 32 deletions(-) create mode 100644 r/man/map_batches.Rd diff --git a/r/NAMESPACE b/r/NAMESPACE index de26a53d3400a..f86bc21db9430 100644 --- a/r/NAMESPACE +++ b/r/NAMESPACE @@ -161,6 +161,7 @@ export(int64) export(int8) export(last_col) export(list_of) +export(map_batches) export(matches) export(mmap_create) export(mmap_open) @@ -205,9 +206,11 @@ importFrom(assertthat,assert_that) importFrom(bit64,print.integer64) importFrom(bit64,str.integer64) importFrom(methods,as) +importFrom(purrr,as_mapper) importFrom(purrr,map) importFrom(purrr,map2) importFrom(purrr,map_chr) +importFrom(purrr,map_dfr) importFrom(purrr,map_int) importFrom(purrr,map_lgl) importFrom(rlang,"%||%") diff --git a/r/R/arrow-package.R b/r/R/arrow-package.R index b04e735b5aedf..8fed1ea0861f7 100644 --- a/r/R/arrow-package.R +++ b/r/R/arrow-package.R @@ -16,7 +16,7 @@ # under the License. #' @importFrom R6 R6Class -#' @importFrom purrr map map_int map_lgl map_chr map2 +#' @importFrom purrr as_mapper map map2 map_chr map_dfr map_int map_lgl #' @importFrom assertthat assert_that #' @importFrom rlang list2 %||% is_false abort dots_n warn enquo quo_is_null enquos is_integerish quos eval_tidy new_data_mask syms env env_bind as_label set_names #' @importFrom Rcpp sourceCpp diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 0f6b758df1468..a31d72b2272b6 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -456,6 +456,14 @@ dataset___Scanner__ToTable <- function(scanner){ .Call(`_arrow_dataset___Scanner__ToTable` , scanner) } +dataset___Scanner__Scan <- function(scanner){ + .Call(`_arrow_dataset___Scanner__Scan` , scanner) +} + +dataset___ScanTask__get_batches <- function(scan_task){ + .Call(`_arrow_dataset___ScanTask__get_batches` , scan_task) +} + shared_ptr_is_null <- function(xp){ .Call(`_arrow_shared_ptr_is_null` , xp) } diff --git a/r/R/dataset.R b/r/R/dataset.R index 78dec8e4b92dd..ed3cf1e6ab9ce 100644 --- a/r/R/dataset.R +++ b/r/R/dataset.R @@ -415,9 +415,20 @@ IpcFileFormat <- R6Class("IpcFileFormat", inherit = FileFormat) #' #' @description #' A `Scanner` iterates over a [Dataset]'s fragments and returns data -#' according to given row filtering and column projection. Use a -#' `ScannerBuilder`, from a `Dataset`'s `$NewScan()` method, to construct one. +#' according to given row filtering and column projection. A `ScannerBuilder` +#' can help create one. #' +#' @section Factory: +#' `Scanner$create()` wraps the `ScannerBuilder` interface to make a `Scanner`. +#' It takes the following arguments: +#' +#' * `dataset`: A `Dataset` or `arrow_dplyr_query` object, as returned by the +#' `dplyr` methods on `Dataset`. +#' * `projection`: A character vector of column names to select +#' * `filter`: A `Expression` to filter the scanned rows by, or `TRUE` (default) +#' to keep all rows. +#' * `use_threads`: logical: should scanning use multithreading? Default `TRUE` +#' * `...`: Additional arguments, currently ignored #' @section Methods: #' `ScannerBuilder` has the following methods: #' @@ -440,9 +451,77 @@ IpcFileFormat <- R6Class("IpcFileFormat", inherit = FileFormat) #' @export Scanner <- R6Class("Scanner", inherit = ArrowObject, public = list( - ToTable = function() shared_ptr(Table, dataset___Scanner__ToTable(self)) + ToTable = function() shared_ptr(Table, dataset___Scanner__ToTable(self)), + Scan = function() map(dataset___Scanner__Scan(self), shared_ptr, class = ScanTask) ) ) +Scanner$create <- function(dataset, projection = NULL, filter = TRUE, use_threads = TRUE, ...) { + if (inherits(dataset, "arrow_dplyr_query") && inherits(dataset$.data, "Dataset")) { + return(Scanner$create( + dataset$.data, + dataset$selected_columns, + dataset$filtered_rows, + use_threads, + ... + )) + } + assert_is(dataset, "Dataset") + scanner_builder <- dataset$NewScan() + if (use_threads) { + scanner_builder$UseThreads() + } + if (!is.null(projection)) { + scanner_builder$Project(projection) + } + if (!isTRUE(filter)) { + scanner_builder$Filter(filter) + } + scanner_builder$Finish() +} + +ScanTask <- R6Class("ScanTask", inherit = ArrowObject, + public = list( + Execute = function() map(dataset___ScanTask__get_batches(self), shared_ptr, class = RecordBatch) + ) +) + +#' Apply a function to a stream of RecordBatches +#' +#' As an alternative to calling `collect()` on a `Dataset` query, you can +#' use this function to access the stream of `RecordBatch`es in the `Dataset`. +#' This lets you aggregate on each chunk and pull the intermediate results into +#' a `data.frame` for further aggregation, even if you couldn't fit the whole +#' `Dataset` result in memory. +#' +#' This is experimental and not recommended for production use. +#' +#' @param X A `Dataset` or `arrow_dplyr_query` object, as returned by the +#' `dplyr` methods on `Dataset`. +#' @param FUN A function or `purrr`-style lambda expression to apply to each +#' batch +#' @param ... Additional arguments passed to `FUN` +#' @param .data.frame logical: collect the resulting chunks into a single +#' `data.frame`? Default `TRUE` +#' @export +map_batches <- function(X, FUN, ..., .data.frame = TRUE) { + if (.data.frame) { + lapply <- map_dfr + } + scanner <- Scanner$create(ensure_group_vars(X)) + FUN <- as_mapper(FUN) + # message("Making ScanTasks") + lapply(scanner$Scan(), function(scan_task) { + # This outer lapply could be parallelized + # message("Making Batches") + lapply(scan_task$Execute(), function(batch) { + # message("Processing Batch") + # This inner lapply cannot be parallelized + # TODO: wrap batch in arrow_dplyr_query with X$selected_columns and X$group_by_vars + # if X is arrow_dplyr_query, if some other arg (.dplyr?) == TRUE + FUN(batch, ...) + }) + }) +} #' @usage NULL #' @format NULL diff --git a/r/R/dplyr.R b/r/R/dplyr.R index 3f57742404dcc..1d3b5eef7debb 100644 --- a/r/R/dplyr.R +++ b/r/R/dplyr.R @@ -229,39 +229,45 @@ set_filters <- function(.data, expressions) { } collect.arrow_dplyr_query <- function(x, ...) { - colnames <- x$selected_columns - # Be sure to retain any group_by vars - gv <- setdiff(dplyr::group_vars(x), names(colnames)) - if (length(gv)) { - colnames <- c(colnames, set_names(gv)) - } - + x <- ensure_group_vars(x) # Pull only the selected rows and cols into R if (query_on_dataset(x)) { # See dataset.R for Dataset and Scanner(Builder) classes - scanner_builder <- x$.data$NewScan() - scanner_builder$UseThreads() - scanner_builder$Project(colnames) - if (!isTRUE(x$filtered_rows)) { - scanner_builder$Filter(x$filtered_rows) - } - df <- as.data.frame(scanner_builder$Finish()$ToTable()) + df <- Scanner$create(x)$ToTable() } else { # This is a Table/RecordBatch. See record-batch.R for the [ method - df <- as.data.frame(x$.data[x$filtered_rows, colnames, keep_na = FALSE]) + df <- x$.data[x$filtered_rows, x$selected_columns, keep_na = FALSE] } - # In case variables were renamed, apply those names - names(df) <- names(colnames) + df <- as.data.frame(df) + restore_dplyr_features(df, x) +} +collect.Table <- as.data.frame.Table +collect.RecordBatch <- as.data.frame.RecordBatch +collect.Dataset <- function(x, ...) dplyr::collect(arrow_dplyr_query(x), ...) +ensure_group_vars <- function(x) { + if (inherits(x, "arrow_dplyr_query")) { + # Before pulling data from Arrow, make sure all group vars are in the projection + gv <- set_names(setdiff(dplyr::group_vars(x), names(x))) + x$selected_columns <- c(x$selected_columns, gv) + } + x +} + +restore_dplyr_features <- function(df, query) { + # An arrow_dplyr_query holds some attributes that Arrow doesn't know about + # After pulling data into a data.frame, make sure these features are carried over + + # In case variables were renamed, apply those names + if (ncol(df)) { + names(df) <- names(query) + } # Preserve groupings, if present - if (length(x$group_by_vars)) { - df <- dplyr::grouped_df(df, dplyr::groups(x)) + if (length(query$group_by_vars)) { + df <- dplyr::grouped_df(df, dplyr::groups(query)) } df } -collect.Table <- as.data.frame.Table -collect.RecordBatch <- as.data.frame.RecordBatch -collect.Dataset <- function(x, ...) dplyr::collect(arrow_dplyr_query(x), ...) #' @importFrom tidyselect vars_pull pull.arrow_dplyr_query <- function(.data, var = -1) { diff --git a/r/man/Scanner.Rd b/r/man/Scanner.Rd index a665c0b331e2e..2b82d2029a420 100644 --- a/r/man/Scanner.Rd +++ b/r/man/Scanner.Rd @@ -6,9 +6,24 @@ \title{Scan the contents of a dataset} \description{ A \code{Scanner} iterates over a \link{Dataset}'s fragments and returns data -according to given row filtering and column projection. Use a -\code{ScannerBuilder}, from a \code{Dataset}'s \verb{$NewScan()} method, to construct one. +according to given row filtering and column projection. A \code{ScannerBuilder} +can help create one. } +\section{Factory}{ + +\code{Scanner$create()} wraps the \code{ScannerBuilder} interface to make a \code{Scanner}. +It takes the following arguments: +\itemize{ +\item \code{dataset}: A \code{Dataset} or \code{arrow_dplyr_query} object, as returned by the +\code{dplyr} methods on \code{Dataset}. +\item \code{projection}: A character vector of column names to select +\item \code{filter}: A \code{Expression} to filter the scanned rows by, or \code{TRUE} (default) +to keep all rows. +\item \code{use_threads}: logical: should scanning use multithreading? Default \code{TRUE} +\item \code{...}: Additional arguments, currently ignored +} +} + \section{Methods}{ \code{ScannerBuilder} has the following methods: diff --git a/r/man/map_batches.Rd b/r/man/map_batches.Rd new file mode 100644 index 0000000000000..67d97a8f65522 --- /dev/null +++ b/r/man/map_batches.Rd @@ -0,0 +1,30 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/dataset.R +\name{map_batches} +\alias{map_batches} +\title{Apply a function to a stream of RecordBatches} +\usage{ +map_batches(X, FUN, ..., .data.frame = TRUE) +} +\arguments{ +\item{X}{A \code{Dataset} or \code{arrow_dplyr_query} object, as returned by the +\code{dplyr} methods on \code{Dataset}.} + +\item{FUN}{A function or \code{purrr}-style lambda expression to apply to each +batch} + +\item{...}{Additional arguments passed to \code{FUN}} + +\item{.data.frame}{logical: collect the resulting chunks into a single +\code{data.frame}? Default \code{TRUE}} +} +\description{ +As an alternative to calling \code{collect()} on a \code{Dataset} query, you can +use this function to access the stream of \code{RecordBatch}es in the \code{Dataset}. +This lets you aggregate on each chunk and pull the intermediate results into +a \code{data.frame} for further aggregation, even if you couldn't fit the whole +\code{Dataset} result in memory. +} +\details{ +This is experimental and not recommended for production use. +} diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index b38edfc4ded0a..2a33dea7d3fb7 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1799,6 +1799,36 @@ RcppExport SEXP _arrow_dataset___Scanner__ToTable(SEXP scanner_sexp){ } #endif +// dataset.cpp +#if defined(ARROW_R_WITH_ARROW) +std::vector> dataset___Scanner__Scan(const std::shared_ptr& scanner); +RcppExport SEXP _arrow_dataset___Scanner__Scan(SEXP scanner_sexp){ +BEGIN_RCPP + Rcpp::traits::input_parameter&>::type scanner(scanner_sexp); + return Rcpp::wrap(dataset___Scanner__Scan(scanner)); +END_RCPP +} +#else +RcppExport SEXP _arrow_dataset___Scanner__Scan(SEXP scanner_sexp){ + Rf_error("Cannot call dataset___Scanner__Scan(). Please use arrow::install_arrow() to install required runtime libraries. "); +} +#endif + +// dataset.cpp +#if defined(ARROW_R_WITH_ARROW) +std::vector> dataset___ScanTask__get_batches(const std::shared_ptr& scan_task); +RcppExport SEXP _arrow_dataset___ScanTask__get_batches(SEXP scan_task_sexp){ +BEGIN_RCPP + Rcpp::traits::input_parameter&>::type scan_task(scan_task_sexp); + return Rcpp::wrap(dataset___ScanTask__get_batches(scan_task)); +END_RCPP +} +#else +RcppExport SEXP _arrow_dataset___ScanTask__get_batches(SEXP scan_task_sexp){ + Rf_error("Cannot call dataset___ScanTask__get_batches(). Please use arrow::install_arrow() to install required runtime libraries. "); +} +#endif + // datatype.cpp #if defined(ARROW_R_WITH_ARROW) bool shared_ptr_is_null(SEXP xp); @@ -5863,6 +5893,8 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_dataset___ScannerBuilder__schema", (DL_FUNC) &_arrow_dataset___ScannerBuilder__schema, 1}, { "_arrow_dataset___ScannerBuilder__Finish", (DL_FUNC) &_arrow_dataset___ScannerBuilder__Finish, 1}, { "_arrow_dataset___Scanner__ToTable", (DL_FUNC) &_arrow_dataset___Scanner__ToTable, 1}, + { "_arrow_dataset___Scanner__Scan", (DL_FUNC) &_arrow_dataset___Scanner__Scan, 1}, + { "_arrow_dataset___ScanTask__get_batches", (DL_FUNC) &_arrow_dataset___ScanTask__get_batches, 1}, { "_arrow_shared_ptr_is_null", (DL_FUNC) &_arrow_shared_ptr_is_null, 1}, { "_arrow_unique_ptr_is_null", (DL_FUNC) &_arrow_unique_ptr_is_null, 1}, { "_arrow_Int8__initialize", (DL_FUNC) &_arrow_Int8__initialize, 0}, diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index 632067e0f89d6..502ee4d2e4122 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -212,8 +212,10 @@ inline std::shared_ptr extract(SEXP x) { #include #include #include +#include #include #include +#include #include #include #include diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index 00c480f0ec9cd..8d87f1839f721 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -233,4 +233,32 @@ std::shared_ptr dataset___Scanner__ToTable( return VALUE_OR_STOP(scanner->ToTable()); } +// [[arrow::export]] +std::vector> dataset___Scanner__Scan( + const std::shared_ptr& scanner) { + auto it = VALUE_OR_STOP(scanner->Scan()); + std::vector> out; + std::shared_ptr scan_task; + // TODO(npr): can this iteration be parallelized? + for (auto st : it) { + scan_task = VALUE_OR_STOP(st); + out.push_back(scan_task); + } + return out; +} + +// [[arrow::export]] +std::vector> dataset___ScanTask__get_batches( + const std::shared_ptr& scan_task) { + arrow::RecordBatchIterator rbi; + rbi = VALUE_OR_STOP(scan_task->Execute()); + std::vector> out; + std::shared_ptr batch; + for (auto b : rbi) { + batch = VALUE_OR_STOP(b); + out.push_back(batch); + } + return out; +} + #endif diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index 94f10c2af47a2..14b5b6695670a 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -206,6 +206,21 @@ test_that("Dataset with multiple file formats", { ) }) +test_that("map_batches", { + ds <- open_dataset(dataset_dir, partitioning = "part") + expect_equivalent( + ds %>% + filter(int > 5) %>% + select(int, lgl) %>% + map_batches( + ~summarize(., + min_int = min(int) + ) + ), + tibble(min_int = c(6L, 101L)) + ) +}) + test_that("partitioning = NULL to ignore partition information (but why?)", { ds <- open_dataset(hive_dir, partitioning = NULL) expect_identical(names(ds), names(df1)) # i.e. not c(names(df1), "group", "other") @@ -301,10 +316,21 @@ test_that("filter scalar validation doesn't crash (ARROW-7772)", { test_that("collect() on Dataset works (if fits in memory)", { expect_equal( collect(open_dataset(dataset_dir)), - rbind( - cbind(df1), - cbind(df2) - ) + rbind(df1, df2) + ) +}) + +test_that("count()", { + skip("count() is not a generic so we have to get here through summarize()") + ds <- open_dataset(dataset_dir) + df <- rbind(df1, df2) + expect_equal( + ds %>% + filter(int > 6, int < 108) %>% + count(chr), + df %>% + filter(int > 6, int < 108) %>% + count(chr) ) })