Skip to content

Commit

Permalink
ARROW-8376: [R] Add experimental interface to ScanTask/RecordBatch it…
Browse files Browse the repository at this point in the history
…erators

As an alternative to calling `ToTable()` to bring everything into memory, it would be nice to expose the stream of batches so that you could aggregate (or really do whatever) on each chunk. That gives access to the full dataset, which otherwise you can't handle unless it's small.

On the NYC taxi dataset (10.5 years, 125 parquet files),

```r
tab <- ds %>%
  select(passenger_count) %>%
  map_batches(~count(., passenger_count)) %>%
  group_by(passenger_count) %>%
  summarize(n = sum(n))
```

gives me the tabulation of `passenger_count` in about 200s (no parallelization). And you can see all sorts of weird features in the data:

```
> as.data.frame(tab)
   passenger_count          n
1             -127          7
2             -123          1
3             -122          1
4             -119          1
5             -115          1
6             -101          1
7              -98          1
8              -96          1
9              -93          1
10             -92          1
11             -91          1
12             -79          1
13             -64          2
14             -63          1
15             -48       1508
16             -45          1
17             -43          4
18             -33          1
19             -31          1
20              -9          1
21              -7          1
22              -6          3
23              -2          1
24              -1         10
25               0    5809809
26               1 1078624900
27               2  227454966
28               3   67096194
29               4   32443710
30               5   9906444
31               6   37241244
32               7       1753
33               8       1437
34               9       1304
35              10         17
36              13          1
37              15          2
38              17          1
39              19          1
40              25          1
41              33          2
42              34          1
43              36          1
44              37          1
45              38          1
46              47          1
47              49         26
48              53          1
49              58          2
50              61          1
51              65          3
52              66          1
53              69          1
54              70          1
55              84          1
56              96          1
57              97          1
58             113          1
59             125          1
```

Closes #6365 from nealrichardson/r-map-reduce

Authored-by: Neal Richardson <neal.p.richardson@gmail.com>
Signed-off-by: François Saint-Jacques <fsaintjacques@gmail.com>
  • Loading branch information
nealrichardson authored and fsaintjacques committed Apr 9, 2020
1 parent b7044a1 commit 9662dd6
Show file tree
Hide file tree
Showing 11 changed files with 261 additions and 32 deletions.
3 changes: 3 additions & 0 deletions r/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ export(int64)
export(int8)
export(last_col)
export(list_of)
export(map_batches)
export(matches)
export(mmap_create)
export(mmap_open)
Expand Down Expand Up @@ -205,9 +206,11 @@ importFrom(assertthat,assert_that)
importFrom(bit64,print.integer64)
importFrom(bit64,str.integer64)
importFrom(methods,as)
importFrom(purrr,as_mapper)
importFrom(purrr,map)
importFrom(purrr,map2)
importFrom(purrr,map_chr)
importFrom(purrr,map_dfr)
importFrom(purrr,map_int)
importFrom(purrr,map_lgl)
importFrom(rlang,"%||%")
Expand Down
2 changes: 1 addition & 1 deletion r/R/arrow-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.

#' @importFrom R6 R6Class
#' @importFrom purrr map map_int map_lgl map_chr map2
#' @importFrom purrr as_mapper map map2 map_chr map_dfr map_int map_lgl
#' @importFrom assertthat assert_that
#' @importFrom rlang list2 %||% is_false abort dots_n warn enquo quo_is_null enquos is_integerish quos eval_tidy new_data_mask syms env env_bind as_label set_names
#' @importFrom Rcpp sourceCpp
Expand Down
8 changes: 8 additions & 0 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

85 changes: 82 additions & 3 deletions r/R/dataset.R
Original file line number Diff line number Diff line change
Expand Up @@ -415,9 +415,20 @@ IpcFileFormat <- R6Class("IpcFileFormat", inherit = FileFormat)
#'
#' @description
#' A `Scanner` iterates over a [Dataset]'s fragments and returns data
#' according to given row filtering and column projection. Use a
#' `ScannerBuilder`, from a `Dataset`'s `$NewScan()` method, to construct one.
#' according to given row filtering and column projection. A `ScannerBuilder`
#' can help create one.
#'
#' @section Factory:
#' `Scanner$create()` wraps the `ScannerBuilder` interface to make a `Scanner`.
#' It takes the following arguments:
#'
#' * `dataset`: A `Dataset` or `arrow_dplyr_query` object, as returned by the
#' `dplyr` methods on `Dataset`.
#' * `projection`: A character vector of column names to select
#' * `filter`: A `Expression` to filter the scanned rows by, or `TRUE` (default)
#' to keep all rows.
#' * `use_threads`: logical: should scanning use multithreading? Default `TRUE`
#' * `...`: Additional arguments, currently ignored
#' @section Methods:
#' `ScannerBuilder` has the following methods:
#'
Expand All @@ -440,9 +451,77 @@ IpcFileFormat <- R6Class("IpcFileFormat", inherit = FileFormat)
#' @export
Scanner <- R6Class("Scanner", inherit = ArrowObject,
public = list(
ToTable = function() shared_ptr(Table, dataset___Scanner__ToTable(self))
ToTable = function() shared_ptr(Table, dataset___Scanner__ToTable(self)),
Scan = function() map(dataset___Scanner__Scan(self), shared_ptr, class = ScanTask)
)
)
Scanner$create <- function(dataset, projection = NULL, filter = TRUE, use_threads = TRUE, ...) {
if (inherits(dataset, "arrow_dplyr_query") && inherits(dataset$.data, "Dataset")) {
return(Scanner$create(
dataset$.data,
dataset$selected_columns,
dataset$filtered_rows,
use_threads,
...
))
}
assert_is(dataset, "Dataset")
scanner_builder <- dataset$NewScan()
if (use_threads) {
scanner_builder$UseThreads()
}
if (!is.null(projection)) {
scanner_builder$Project(projection)
}
if (!isTRUE(filter)) {
scanner_builder$Filter(filter)
}
scanner_builder$Finish()
}

ScanTask <- R6Class("ScanTask", inherit = ArrowObject,
public = list(
Execute = function() map(dataset___ScanTask__get_batches(self), shared_ptr, class = RecordBatch)
)
)

#' Apply a function to a stream of RecordBatches
#'
#' As an alternative to calling `collect()` on a `Dataset` query, you can
#' use this function to access the stream of `RecordBatch`es in the `Dataset`.
#' This lets you aggregate on each chunk and pull the intermediate results into
#' a `data.frame` for further aggregation, even if you couldn't fit the whole
#' `Dataset` result in memory.
#'
#' This is experimental and not recommended for production use.
#'
#' @param X A `Dataset` or `arrow_dplyr_query` object, as returned by the
#' `dplyr` methods on `Dataset`.
#' @param FUN A function or `purrr`-style lambda expression to apply to each
#' batch
#' @param ... Additional arguments passed to `FUN`
#' @param .data.frame logical: collect the resulting chunks into a single
#' `data.frame`? Default `TRUE`
#' @export
map_batches <- function(X, FUN, ..., .data.frame = TRUE) {
if (.data.frame) {
lapply <- map_dfr
}
scanner <- Scanner$create(ensure_group_vars(X))
FUN <- as_mapper(FUN)
# message("Making ScanTasks")
lapply(scanner$Scan(), function(scan_task) {
# This outer lapply could be parallelized
# message("Making Batches")
lapply(scan_task$Execute(), function(batch) {
# message("Processing Batch")
# This inner lapply cannot be parallelized
# TODO: wrap batch in arrow_dplyr_query with X$selected_columns and X$group_by_vars
# if X is arrow_dplyr_query, if some other arg (.dplyr?) == TRUE
FUN(batch, ...)
})
})
}

#' @usage NULL
#' @format NULL
Expand Down
50 changes: 28 additions & 22 deletions r/R/dplyr.R
Original file line number Diff line number Diff line change
Expand Up @@ -229,39 +229,45 @@ set_filters <- function(.data, expressions) {
}

collect.arrow_dplyr_query <- function(x, ...) {
colnames <- x$selected_columns
# Be sure to retain any group_by vars
gv <- setdiff(dplyr::group_vars(x), names(colnames))
if (length(gv)) {
colnames <- c(colnames, set_names(gv))
}

x <- ensure_group_vars(x)
# Pull only the selected rows and cols into R
if (query_on_dataset(x)) {
# See dataset.R for Dataset and Scanner(Builder) classes
scanner_builder <- x$.data$NewScan()
scanner_builder$UseThreads()
scanner_builder$Project(colnames)
if (!isTRUE(x$filtered_rows)) {
scanner_builder$Filter(x$filtered_rows)
}
df <- as.data.frame(scanner_builder$Finish()$ToTable())
df <- Scanner$create(x)$ToTable()
} else {
# This is a Table/RecordBatch. See record-batch.R for the [ method
df <- as.data.frame(x$.data[x$filtered_rows, colnames, keep_na = FALSE])
df <- x$.data[x$filtered_rows, x$selected_columns, keep_na = FALSE]
}
# In case variables were renamed, apply those names
names(df) <- names(colnames)
df <- as.data.frame(df)
restore_dplyr_features(df, x)
}
collect.Table <- as.data.frame.Table
collect.RecordBatch <- as.data.frame.RecordBatch
collect.Dataset <- function(x, ...) dplyr::collect(arrow_dplyr_query(x), ...)

ensure_group_vars <- function(x) {
if (inherits(x, "arrow_dplyr_query")) {
# Before pulling data from Arrow, make sure all group vars are in the projection
gv <- set_names(setdiff(dplyr::group_vars(x), names(x)))
x$selected_columns <- c(x$selected_columns, gv)
}
x
}

restore_dplyr_features <- function(df, query) {
# An arrow_dplyr_query holds some attributes that Arrow doesn't know about
# After pulling data into a data.frame, make sure these features are carried over

# In case variables were renamed, apply those names
if (ncol(df)) {
names(df) <- names(query)
}
# Preserve groupings, if present
if (length(x$group_by_vars)) {
df <- dplyr::grouped_df(df, dplyr::groups(x))
if (length(query$group_by_vars)) {
df <- dplyr::grouped_df(df, dplyr::groups(query))
}
df
}
collect.Table <- as.data.frame.Table
collect.RecordBatch <- as.data.frame.RecordBatch
collect.Dataset <- function(x, ...) dplyr::collect(arrow_dplyr_query(x), ...)

#' @importFrom tidyselect vars_pull
pull.arrow_dplyr_query <- function(.data, var = -1) {
Expand Down
19 changes: 17 additions & 2 deletions r/man/Scanner.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 30 additions & 0 deletions r/man/map_batches.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions r/src/arrowExports.cpp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions r/src/arrow_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,10 @@ inline std::shared_ptr<T> extract(SEXP x) {
#include <arrow/json/reader.h>
#include <arrow/result.h>
#include <arrow/type.h>
#include <arrow/type_fwd.h>
#include <arrow/util/checked_cast.h>
#include <arrow/util/compression.h>
#include <arrow/util/iterator.h>
#include <arrow/util/ubsan.h>
#include <arrow/visitor_inline.h>
#include <parquet/arrow/reader.h>
Expand Down
28 changes: 28 additions & 0 deletions r/src/dataset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,4 +233,32 @@ std::shared_ptr<arrow::Table> dataset___Scanner__ToTable(
return VALUE_OR_STOP(scanner->ToTable());
}

// [[arrow::export]]
std::vector<std::shared_ptr<ds::ScanTask>> dataset___Scanner__Scan(
const std::shared_ptr<ds::Scanner>& scanner) {
auto it = VALUE_OR_STOP(scanner->Scan());
std::vector<std::shared_ptr<ds::ScanTask>> out;
std::shared_ptr<ds::ScanTask> scan_task;
// TODO(npr): can this iteration be parallelized?
for (auto st : it) {
scan_task = VALUE_OR_STOP(st);
out.push_back(scan_task);
}
return out;
}

// [[arrow::export]]
std::vector<std::shared_ptr<arrow::RecordBatch>> dataset___ScanTask__get_batches(
const std::shared_ptr<ds::ScanTask>& scan_task) {
arrow::RecordBatchIterator rbi;
rbi = VALUE_OR_STOP(scan_task->Execute());
std::vector<std::shared_ptr<arrow::RecordBatch>> out;
std::shared_ptr<arrow::RecordBatch> batch;
for (auto b : rbi) {
batch = VALUE_OR_STOP(b);
out.push_back(batch);
}
return out;
}

#endif
Loading

0 comments on commit 9662dd6

Please sign in to comment.