Skip to content

Commit

Permalink
Doc and cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
nealrichardson committed Feb 5, 2020
1 parent b94c7b0 commit 0b81419
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 11 deletions.
3 changes: 3 additions & 0 deletions r/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ export(int64)
export(int8)
export(last_col)
export(list_of)
export(map_batches)
export(matches)
export(mmap_create)
export(mmap_open)
Expand Down Expand Up @@ -207,9 +208,11 @@ importFrom(assertthat,assert_that)
importFrom(bit64,print.integer64)
importFrom(bit64,str.integer64)
importFrom(methods,as)
importFrom(purrr,as_mapper)
importFrom(purrr,map)
importFrom(purrr,map2)
importFrom(purrr,map_chr)
importFrom(purrr,map_dfr)
importFrom(purrr,map_int)
importFrom(purrr,map_lgl)
importFrom(rlang,"%||%")
Expand Down
2 changes: 1 addition & 1 deletion r/R/arrow-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.

#' @importFrom R6 R6Class
#' @importFrom purrr map map_int map_lgl map_chr map2
#' @importFrom purrr as_mapper map map2 map_chr map_dfr map_int map_lgl
#' @importFrom assertthat assert_that
#' @importFrom rlang list2 %||% is_false abort dots_n warn enquo quo_is_null enquos is_integerish quos eval_tidy new_data_mask syms env env_bind as_label set_names
#' @importFrom Rcpp sourceCpp
Expand Down
47 changes: 39 additions & 8 deletions r/R/dataset.R
Original file line number Diff line number Diff line change
Expand Up @@ -348,9 +348,20 @@ IpcFileFormat <- R6Class("IpcFileFormat", inherit = FileFormat)
#'
#' @description
#' A `Scanner` iterates over a [Dataset]'s fragments and returns data
#' according to given row filtering and column projection. Use a
#' `ScannerBuilder`, from a `Dataset`'s `$NewScan()` method, to construct one.
#' according to given row filtering and column projection. A `ScannerBuilder`
#' can help create one.
#'
#' @section Factory:
#' `Scanner$create()` wraps the `ScannerBuilder` interface to make a `Scanner`.
#' It takes the following arguments:
#'
#' * `dataset`: A `Dataset` or `arrow_dplyr_query` object, as returned by the
#' `dplyr` methods on `Dataset`.
#' * `projection`: A character vector of column names to select
#' * `filter`: A `Expression` to filter the scanned rows by, or `TRUE` (default)
#' to keep all rows.
#' * `use_threads`: logical: should scanning use multithreading? Default `TRUE`
#' * `...`: Additional arguments, currently ignored
#' @section Methods:
#' `ScannerBuilder` has the following methods:
#'
Expand Down Expand Up @@ -397,22 +408,42 @@ Scanner$create <- function(dataset, projection = NULL, filter = TRUE, use_thread
scanner_builder$Finish()
}

#' Apply a function to a stream of RecordBatches
#'
#' As an alternative to calling `collect()` on a `Dataset` query, you can
#' use this function to access the stream of `RecordBatch`es in the `Dataset`.
#' This lets you aggregate on each chunk and pull the intermediate results into
#' a `data.frame` for further aggregation, even if you couldn't fit the whole
#' `Dataset` result in memory.
#'
#' This is experimental and not recommended for production use.
#'
#' @param X A `Dataset` or `arrow_dplyr_query` object, as returned by the
#' `dplyr` methods on `Dataset`.
#' @param FUN A function or `purrr`-style lambda expression to apply to each
#' batch
#' @param ... Additional arguments passed to `FUN`
#' @param .data.frame logical: collect the resulting chunks into a single
#' `data.frame`? Default `TRUE`
#' @export
map_batches <- function(X, FUN, ..., .data.frame = TRUE) {
if (.data.frame) {
lapply <- purrr::map_dfr
lapply <- map_dfr
}
scanner <- Scanner$create(X)
# If X is arrow_dplyr_query and !all(names(X$selected_columns) == X$selected_columns) warn
FUN <- purrr::as_mapper(FUN)
message("Making ScanTasks")
# that renaming is not handled
# Likewise, we aren't incorporating any group_by yet
FUN <- as_mapper(FUN)
# message("Making ScanTasks")
scan_tasks <- dataset___Scanner__Scan(scanner)
message("Done making ScanTasks")
# message("Done making ScanTasks")
lapply(scan_tasks, function(rbi) {
# This outer lapply could be parallelized
message("Making Batches")
# message("Making Batches")
batch_ptrs <- dataset___ScanTask__get_batches(shared_ptr(Object, rbi))
lapply(batch_ptrs, function(b) {
message("Processing Batch")
# message("Processing Batch")
# This inner lapply cannot be parallelized
FUN(shared_ptr(RecordBatch, b), ...)
})
Expand Down
19 changes: 17 additions & 2 deletions r/man/Scanner.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 30 additions & 0 deletions r/man/map_batches.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions r/src/dataset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ std::vector<std::shared_ptr<ds::ScanTask>> dataset___Scanner__Scan(
auto it = VALUE_OR_STOP(scanner->Scan());
std::vector<std::shared_ptr<ds::ScanTask>> out;
std::shared_ptr<ds::ScanTask> scan_task;
// TODO(npr): can this iteration be parallelized?
for (auto st : it) {
scan_task = VALUE_OR_STOP(st);
out.push_back(scan_task);
Expand Down

0 comments on commit 0b81419

Please sign in to comment.