Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-8376: [R] Add experimental interface to ScanTask/RecordBatch iterators #6365

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions r/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ export(int64)
export(int8)
export(last_col)
export(list_of)
export(map_batches)
export(matches)
export(mmap_create)
export(mmap_open)
Expand Down Expand Up @@ -205,9 +206,11 @@ importFrom(assertthat,assert_that)
importFrom(bit64,print.integer64)
importFrom(bit64,str.integer64)
importFrom(methods,as)
importFrom(purrr,as_mapper)
importFrom(purrr,map)
importFrom(purrr,map2)
importFrom(purrr,map_chr)
importFrom(purrr,map_dfr)
importFrom(purrr,map_int)
importFrom(purrr,map_lgl)
importFrom(rlang,"%||%")
Expand Down
2 changes: 1 addition & 1 deletion r/R/arrow-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.

#' @importFrom R6 R6Class
#' @importFrom purrr map map_int map_lgl map_chr map2
#' @importFrom purrr as_mapper map map2 map_chr map_dfr map_int map_lgl
#' @importFrom assertthat assert_that
#' @importFrom rlang list2 %||% is_false abort dots_n warn enquo quo_is_null enquos is_integerish quos eval_tidy new_data_mask syms env env_bind as_label set_names
#' @importFrom Rcpp sourceCpp
Expand Down
8 changes: 8 additions & 0 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

85 changes: 82 additions & 3 deletions r/R/dataset.R
Original file line number Diff line number Diff line change
Expand Up @@ -415,9 +415,20 @@ IpcFileFormat <- R6Class("IpcFileFormat", inherit = FileFormat)
#'
#' @description
#' A `Scanner` iterates over a [Dataset]'s fragments and returns data
#' according to given row filtering and column projection. Use a
#' `ScannerBuilder`, from a `Dataset`'s `$NewScan()` method, to construct one.
#' according to given row filtering and column projection. A `ScannerBuilder`
#' can help create one.
#'
#' @section Factory:
#' `Scanner$create()` wraps the `ScannerBuilder` interface to make a `Scanner`.
#' It takes the following arguments:
#'
#' * `dataset`: A `Dataset` or `arrow_dplyr_query` object, as returned by the
#' `dplyr` methods on `Dataset`.
#' * `projection`: A character vector of column names to select
#' * `filter`: A `Expression` to filter the scanned rows by, or `TRUE` (default)
#' to keep all rows.
#' * `use_threads`: logical: should scanning use multithreading? Default `TRUE`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use_threads default to option(arrow.use_threads) for consistency and other API?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps so, though at least these threads should be safer because they're in the C++ library and not the R bindings. I can make this change in my current PR though.

#' * `...`: Additional arguments, currently ignored
#' @section Methods:
#' `ScannerBuilder` has the following methods:
#'
Expand All @@ -440,9 +451,77 @@ IpcFileFormat <- R6Class("IpcFileFormat", inherit = FileFormat)
#' @export
Scanner <- R6Class("Scanner", inherit = ArrowObject,
public = list(
ToTable = function() shared_ptr(Table, dataset___Scanner__ToTable(self))
ToTable = function() shared_ptr(Table, dataset___Scanner__ToTable(self)),
Scan = function() map(dataset___Scanner__Scan(self), shared_ptr, class = ScanTask)
)
)
Scanner$create <- function(dataset, projection = NULL, filter = TRUE, use_threads = TRUE, ...) {
if (inherits(dataset, "arrow_dplyr_query") && inherits(dataset$.data, "Dataset")) {
return(Scanner$create(
dataset$.data,
dataset$selected_columns,
dataset$filtered_rows,
use_threads,
...
))
}
assert_is(dataset, "Dataset")
scanner_builder <- dataset$NewScan()
if (use_threads) {
scanner_builder$UseThreads()
}
if (!is.null(projection)) {
scanner_builder$Project(projection)
}
if (!isTRUE(filter)) {
scanner_builder$Filter(filter)
}
scanner_builder$Finish()
}

ScanTask <- R6Class("ScanTask", inherit = ArrowObject,
public = list(
Execute = function() map(dataset___ScanTask__get_batches(self), shared_ptr, class = RecordBatch)
)
)

#' Apply a function to a stream of RecordBatches
#'
#' As an alternative to calling `collect()` on a `Dataset` query, you can
#' use this function to access the stream of `RecordBatch`es in the `Dataset`.
#' This lets you aggregate on each chunk and pull the intermediate results into
#' a `data.frame` for further aggregation, even if you couldn't fit the whole
#' `Dataset` result in memory.
#'
#' This is experimental and not recommended for production use.
#'
#' @param X A `Dataset` or `arrow_dplyr_query` object, as returned by the
#' `dplyr` methods on `Dataset`.
#' @param FUN A function or `purrr`-style lambda expression to apply to each
#' batch
#' @param ... Additional arguments passed to `FUN`
#' @param .data.frame logical: collect the resulting chunks into a single
#' `data.frame`? Default `TRUE`
#' @export
map_batches <- function(X, FUN, ..., .data.frame = TRUE) {
if (.data.frame) {
lapply <- map_dfr
}
scanner <- Scanner$create(ensure_group_vars(X))
FUN <- as_mapper(FUN)
# message("Making ScanTasks")
lapply(scanner$Scan(), function(scan_task) {
# This outer lapply could be parallelized
# message("Making Batches")
lapply(scan_task$Execute(), function(batch) {
# message("Processing Batch")
# This inner lapply cannot be parallelized
# TODO: wrap batch in arrow_dplyr_query with X$selected_columns and X$group_by_vars
# if X is arrow_dplyr_query, if some other arg (.dplyr?) == TRUE
FUN(batch, ...)
})
})
}

#' @usage NULL
#' @format NULL
Expand Down
50 changes: 28 additions & 22 deletions r/R/dplyr.R
Original file line number Diff line number Diff line change
Expand Up @@ -229,39 +229,45 @@ set_filters <- function(.data, expressions) {
}

collect.arrow_dplyr_query <- function(x, ...) {
colnames <- x$selected_columns
# Be sure to retain any group_by vars
gv <- setdiff(dplyr::group_vars(x), names(colnames))
if (length(gv)) {
colnames <- c(colnames, set_names(gv))
}

x <- ensure_group_vars(x)
# Pull only the selected rows and cols into R
if (query_on_dataset(x)) {
# See dataset.R for Dataset and Scanner(Builder) classes
scanner_builder <- x$.data$NewScan()
scanner_builder$UseThreads()
scanner_builder$Project(colnames)
if (!isTRUE(x$filtered_rows)) {
scanner_builder$Filter(x$filtered_rows)
}
df <- as.data.frame(scanner_builder$Finish()$ToTable())
df <- Scanner$create(x)$ToTable()
} else {
# This is a Table/RecordBatch. See record-batch.R for the [ method
df <- as.data.frame(x$.data[x$filtered_rows, colnames, keep_na = FALSE])
df <- x$.data[x$filtered_rows, x$selected_columns, keep_na = FALSE]
}
# In case variables were renamed, apply those names
names(df) <- names(colnames)
df <- as.data.frame(df)
restore_dplyr_features(df, x)
}
collect.Table <- as.data.frame.Table
collect.RecordBatch <- as.data.frame.RecordBatch
collect.Dataset <- function(x, ...) dplyr::collect(arrow_dplyr_query(x), ...)

ensure_group_vars <- function(x) {
if (inherits(x, "arrow_dplyr_query")) {
# Before pulling data from Arrow, make sure all group vars are in the projection
gv <- set_names(setdiff(dplyr::group_vars(x), names(x)))
x$selected_columns <- c(x$selected_columns, gv)
}
x
}

restore_dplyr_features <- function(df, query) {
# An arrow_dplyr_query holds some attributes that Arrow doesn't know about
# After pulling data into a data.frame, make sure these features are carried over

# In case variables were renamed, apply those names
if (ncol(df)) {
names(df) <- names(query)
}
# Preserve groupings, if present
if (length(x$group_by_vars)) {
df <- dplyr::grouped_df(df, dplyr::groups(x))
if (length(query$group_by_vars)) {
df <- dplyr::grouped_df(df, dplyr::groups(query))
}
df
}
collect.Table <- as.data.frame.Table
collect.RecordBatch <- as.data.frame.RecordBatch
collect.Dataset <- function(x, ...) dplyr::collect(arrow_dplyr_query(x), ...)

#' @importFrom tidyselect vars_pull
pull.arrow_dplyr_query <- function(.data, var = -1) {
Expand Down
19 changes: 17 additions & 2 deletions r/man/Scanner.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 30 additions & 0 deletions r/man/map_batches.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions r/src/arrowExports.cpp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions r/src/arrow_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,10 @@ inline std::shared_ptr<T> extract(SEXP x) {
#include <arrow/json/reader.h>
#include <arrow/result.h>
#include <arrow/type.h>
#include <arrow/type_fwd.h>
#include <arrow/util/checked_cast.h>
#include <arrow/util/compression.h>
#include <arrow/util/iterator.h>
#include <arrow/util/ubsan.h>
#include <arrow/visitor_inline.h>
#include <parquet/arrow/reader.h>
Expand Down
28 changes: 28 additions & 0 deletions r/src/dataset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,4 +233,32 @@ std::shared_ptr<arrow::Table> dataset___Scanner__ToTable(
return VALUE_OR_STOP(scanner->ToTable());
}

// [[arrow::export]]
std::vector<std::shared_ptr<ds::ScanTask>> dataset___Scanner__Scan(
const std::shared_ptr<ds::Scanner>& scanner) {
auto it = VALUE_OR_STOP(scanner->Scan());
std::vector<std::shared_ptr<ds::ScanTask>> out;
std::shared_ptr<ds::ScanTask> scan_task;
// TODO(npr): can this iteration be parallelized?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can, but it's a hazard, e.g. each ScanTask can be attached to an open file descriptor, so you may bust limits if you collect them before aggregating them. That's why you want to consume them immediately, because you control the number of resource in-flight.

for (auto st : it) {
scan_task = VALUE_OR_STOP(st);
out.push_back(scan_task);
}
return out;
}

// [[arrow::export]]
std::vector<std::shared_ptr<arrow::RecordBatch>> dataset___ScanTask__get_batches(
const std::shared_ptr<ds::ScanTask>& scan_task) {
arrow::RecordBatchIterator rbi;
rbi = VALUE_OR_STOP(scan_task->Execute());
std::vector<std::shared_ptr<arrow::RecordBatch>> out;
std::shared_ptr<arrow::RecordBatch> batch;
for (auto b : rbi) {
batch = VALUE_OR_STOP(b);
out.push_back(batch);
}
return out;
}

#endif
Loading