Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
867e147
Refactor ExecPlan building; use it in collect()
nealrichardson Aug 24, 2021
1bf8a07
Implement order_by_sink and sort results of summarize
nealrichardson Aug 24, 2021
d75f4bd
Cleanup
nealrichardson Aug 24, 2021
7e9aa03
Add some comments
nealrichardson Aug 24, 2021
2cb3ea6
summarize now doesn't evaluate
nealrichardson Aug 26, 2021
9a2cde5
Make dataset tests not assume row order
nealrichardson Aug 26, 2021
a1cd90f
Add support for derived grouping columns in summarize
nealrichardson Aug 26, 2021
90612b5
summarize() collapses the query and we can do things on it after
nealrichardson Aug 27, 2021
f6cf638
Rename test file
nealrichardson Aug 27, 2021
bd6e363
Refactor and fix tests
nealrichardson Aug 27, 2021
bcea9c8
Clarify comments and add todos for the collapse() work
nealrichardson Aug 27, 2021
11c7066
Add collapse()
nealrichardson Aug 30, 2021
cc2f0d7
Style and unskip test
nealrichardson Aug 30, 2021
2ea6d04
use arrange instead of hardcoding
jonkeane Aug 31, 2021
9e26457
Skip column metadata warning test
nealrichardson Sep 1, 2021
92d8d3f
Note breaking changes before I forget
nealrichardson Sep 1, 2021
88d07bb
Add options(arrow.summarise.sort), default FALSE
nealrichardson Sep 2, 2021
b7d6313
Skip all dataset tests on 32-bit windows rtools35
nealrichardson Sep 2, 2021
31ec558
Correct but not super satisfying print method
nealrichardson Sep 2, 2021
bd25135
sort more tests
nealrichardson Sep 2, 2021
f07b420
More sort
nealrichardson Sep 2, 2021
be2499e
Apply suggestions from code review
nealrichardson Sep 3, 2021
f7e3e54
Cleanups
nealrichardson Sep 3, 2021
a63acb9
Improve test verbosity on windows
nealrichardson Sep 3, 2021
3462b24
Skip all dataset tests on old 32-bit windows
nealrichardson Sep 3, 2021
4fa2684
Final final tweaks
nealrichardson Sep 3, 2021
ceecc8f
Fix python skip
nealrichardson Sep 3, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/r.yml
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ jobs:
Sys.setenv(
RWINLIB_LOCAL = file.path(Sys.getenv("GITHUB_WORKSPACE"), "libarrow.zip"),
MAKEFLAGS = paste0("-j", parallel::detectCores()),
ARROW_R_DEV = TRUE,
"_R_CHECK_FORCE_SUGGESTS_" = FALSE
)
rcmdcheck::rcmdcheck("r",
Expand Down
1 change: 1 addition & 0 deletions r/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ importFrom(bit64,print.integer64)
importFrom(bit64,str.integer64)
importFrom(methods,as)
importFrom(purrr,as_mapper)
importFrom(purrr,imap)
importFrom(purrr,imap_chr)
importFrom(purrr,keep)
importFrom(purrr,map)
Expand Down
5 changes: 5 additions & 0 deletions r/NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@

# arrow 5.0.0.9000

## Breaking changes

* `dplyr::summarize()` on an in-memory Arrow Table or RecordBatch no longer eagerly evaluates. Call `compute()` or `collect()` to evaluate the query.
* Row order of data from a Dataset query is no longer deterministic. If you need a stable sort order, you should explicitly `arrange()` the query. For calls to `summarize()`, you can set `options(arrow.summarise.sort = TRUE)` to match the current `dplyr` behavior of sorting on the grouping columns.

# arrow 5.0.0

## More dplyr
Expand Down
4 changes: 2 additions & 2 deletions r/R/arrow-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

#' @importFrom stats quantile median na.omit na.exclude na.pass na.fail
#' @importFrom R6 R6Class
#' @importFrom purrr as_mapper map map2 map_chr map2_chr map_dfr map_int map_lgl keep imap_chr
#' @importFrom purrr as_mapper map map2 map_chr map2_chr map_dfr map_int map_lgl keep imap imap_chr
#' @importFrom assertthat assert_that is.string
#' @importFrom rlang list2 %||% is_false abort dots_n warn enquo quo_is_null enquos is_integerish quos
#' @importFrom rlang eval_tidy new_data_mask syms env new_environment env_bind as_label set_names exec
Expand All @@ -35,7 +35,7 @@
c(
"select", "filter", "collect", "summarise", "group_by", "groups",
"group_vars", "group_by_drop_default", "ungroup", "mutate", "transmute",
"arrange", "rename", "pull", "relocate", "compute"
"arrange", "rename", "pull", "relocate", "compute", "collapse"
)
)
for (cl in c("Dataset", "ArrowTabular", "arrow_dplyr_query")) {
Expand Down
8 changes: 6 additions & 2 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 5 additions & 9 deletions r/R/dataset-scan.R
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,14 @@ Scanner$create <- function(dataset,
projection = NULL,
filter = TRUE,
use_threads = option_use_threads(),
use_async = NULL,
use_async = getOption("arrow.use_async", FALSE),
batch_size = NULL,
fragment_scan_options = NULL,
...) {
if (is.null(use_async)) {
use_async <- getOption("arrow.use_async", FALSE)
}

if (inherits(dataset, "arrow_dplyr_query")) {
if (inherits(dataset$.data, "ArrowTabular")) {
# To handle mutate() on Table/RecordBatch, we need to collect(as_data_frame=FALSE) now
dataset <- dplyr::collect(dataset, as_data_frame = FALSE)
if (is_collapsed(dataset)) {
# TODO: Is there a way to get a RecordBatchReader rather than evaluating?
dataset$.data <- as_adq(dplyr::compute(dataset$.data))$.data
}

proj <- c(dataset$selected_columns, dataset$temp_columns)
Expand Down Expand Up @@ -117,7 +113,7 @@ Scanner$create <- function(dataset,
...
))
}
if (inherits(dataset, c("data.frame", "RecordBatch", "Table"))) {
if (inherits(dataset, c("data.frame", "ArrowTabular"))) {
dataset <- InMemoryDataset$create(dataset)
}
assert_is(dataset, "Dataset")
Expand Down
2 changes: 1 addition & 1 deletion r/R/dplyr-arrange.R
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ arrange.arrow_dplyr_query <- function(.data, ..., .by_group = FALSE) {
# Nothing to do
return(.data)
}
.data <- arrow_dplyr_query(.data)
.data <- as_adq(.data)
# find and remove any dplyr::desc() and tidy-eval
# the arrange expressions inside an Arrow data_mask
sorts <- vector("list", length(exprs))
Expand Down
74 changes: 59 additions & 15 deletions r/R/dplyr-collect.R
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,8 @@
# The following S3 methods are registered on load if dplyr is present

collect.arrow_dplyr_query <- function(x, as_data_frame = TRUE, ...) {
x <- ensure_group_vars(x)
x <- ensure_arrange_vars(x) # this sets x$temp_columns
# Pull only the selected rows and cols into R
# See dataset.R for Dataset and Scanner(Builder) classes
tab <- Scanner$create(x)$ToTable()
# Arrange rows
if (length(x$arrange_vars) > 0) {
tab <- tab[
tab$SortIndices(names(x$arrange_vars), x$arrange_desc),
names(x$selected_columns), # this omits x$temp_columns from the result
drop = FALSE
]
}
# See query-engine.R for ExecPlan/Nodes
tab <- do_exec_plan(x)
if (as_data_frame) {
df <- as.data.frame(tab)
tab$invalidate()
Expand All @@ -47,16 +36,71 @@ collect.ArrowTabular <- function(x, as_data_frame = TRUE, ...) {
x
}
}
collect.Dataset <- function(x, ...) dplyr::collect(arrow_dplyr_query(x), ...)
collect.Dataset <- function(x, ...) dplyr::collect(as_adq(x), ...)

compute.arrow_dplyr_query <- function(x, ...) dplyr::collect(x, as_data_frame = FALSE)
compute.ArrowTabular <- function(x, ...) x
compute.Dataset <- compute.arrow_dplyr_query

pull.arrow_dplyr_query <- function(.data, var = -1) {
.data <- arrow_dplyr_query(.data)
.data <- as_adq(.data)
var <- vars_pull(names(.data), !!enquo(var))
.data$selected_columns <- set_names(.data$selected_columns[var], var)
dplyr::collect(.data)[[1]]
}
pull.Dataset <- pull.ArrowTabular <- pull.arrow_dplyr_query

# TODO: Correctly handle group_vars after summarize; also in collapse()
restore_dplyr_features <- function(df, query) {
# An arrow_dplyr_query holds some attributes that Arrow doesn't know about
# After calling collect(), make sure these features are carried over

if (length(query$group_by_vars) > 0) {
# Preserve groupings, if present
if (is.data.frame(df)) {
df <- dplyr::grouped_df(
df,
dplyr::group_vars(query),
drop = dplyr::group_by_drop_default(query)
)
} else {
# This is a Table, via compute() or collect(as_data_frame = FALSE)
df <- as_adq(df)
df$group_by_vars <- query$group_by_vars
df$drop_empty_groups <- query$drop_empty_groups
}
}
df
}

collapse.arrow_dplyr_query <- function(x, ...) {
# Figure out what schema will result from the query
x$schema <- implicit_schema(x)
# Nest inside a new arrow_dplyr_query
arrow_dplyr_query(x)
}
collapse.Dataset <- collapse.ArrowTabular <- function(x, ...) {
arrow_dplyr_query(x)
}

implicit_schema <- function(.data) {
.data <- ensure_group_vars(.data)
old_schm <- .data$.data$schema

if (is.null(.data$aggregations)) {
new_fields <- map(.data$selected_columns, ~ .$type(old_schm))
} else {
new_fields <- map(summarize_projection(.data), ~ .$type(old_schm))
# * Put group_by_vars first (this can't be done by summarize,
# they have to be last per the aggregate node signature,
# and they get projected to this order after aggregation)
# * Infer the output types from the aggregations
group_fields <- new_fields[.data$group_by_vars]
agg_fields <- imap(
new_fields[setdiff(names(new_fields), .data$group_by_vars)],
~ output_type(.data$aggregations[[.y]][["fun"]], .x)
)
new_fields <- c(group_fields, agg_fields)
}
schema(!!!new_fields)
}
2 changes: 1 addition & 1 deletion r/R/dplyr-filter.R
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ filter.arrow_dplyr_query <- function(.data, ..., .preserve = FALSE) {
return(.data)
}

.data <- arrow_dplyr_query(.data)
.data <- as_adq(.data)
# tidy-eval the filter expressions inside an Arrow data_mask
filters <- lapply(filts, arrow_eval, arrow_mask(.data))
bad_filters <- map_lgl(filters, ~ inherits(., "try-error"))
Expand Down
15 changes: 15 additions & 0 deletions r/R/dplyr-functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -840,3 +840,18 @@ agg_funcs$n <- function() {
options = list()
)
}

output_type <- function(fun, input_type) {
# These are quick and dirty heuristics.
if (fun %in% c("any", "all")) {
bool()
} else if (fun %in% "sum") {
# It may upcast to a bigger type but this is close enough
input_type
} else if (fun %in% c("mean", "stddev", "variance")) {
float64()
} else {
# Just so things don't error, assume the resulting type is the same
input_type
}
}
2 changes: 1 addition & 1 deletion r/R/dplyr-group-by.R
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ group_by.arrow_dplyr_query <- function(.data,
.add = FALSE,
add = .add,
.drop = dplyr::group_by_drop_default(.data)) {
.data <- arrow_dplyr_query(.data)
.data <- as_adq(.data)
new_groups <- enquos(...)
# ... can contain expressions (i.e. can add (or rename?) columns) and so we
# need to identify those and add them on to the query with mutate. Specifically,
Expand Down
17 changes: 10 additions & 7 deletions r/R/dplyr-mutate.R
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ mutate.arrow_dplyr_query <- function(.data,
.before = NULL,
.after = NULL) {
call <- match.call()
exprs <- quos(...)
exprs <- ensure_named_exprs(quos(...))

.keep <- match.arg(.keep)
.before <- enquo(.before)
Expand All @@ -35,7 +35,7 @@ mutate.arrow_dplyr_query <- function(.data,
return(.data)
}

.data <- arrow_dplyr_query(.data)
.data <- as_adq(.data)

# Restrict the cases we support for now
if (length(dplyr::group_vars(.data)) > 0) {
Expand All @@ -45,11 +45,6 @@ mutate.arrow_dplyr_query <- function(.data,
return(abandon_ship(call, .data, "mutate() on grouped data not supported in Arrow"))
}

# Check for unnamed expressions and fix if any
unnamed <- !nzchar(names(exprs))
# Deparse and take the first element in case they're long expressions
names(exprs)[unnamed] <- map_chr(exprs[unnamed], as_label)

mask <- arrow_mask(.data)
results <- list()
for (i in seq_along(exprs)) {
Expand Down Expand Up @@ -133,3 +128,11 @@ check_transmute_args <- function(..., .keep, .before, .after) {
}
enquos(...)
}

ensure_named_exprs <- function(exprs) {
# Check for unnamed expressions and fix if any
unnamed <- !nzchar(names(exprs))
# Deparse and take the first element in case they're long expressions
names(exprs)[unnamed] <- map_chr(exprs[unnamed], as_label)
exprs
}
6 changes: 3 additions & 3 deletions r/R/dplyr-select.R
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ tbl_vars.arrow_dplyr_query <- function(x) names(x$selected_columns)

select.arrow_dplyr_query <- function(.data, ...) {
check_select_helpers(enexprs(...))
column_select(arrow_dplyr_query(.data), !!!enquos(...))
column_select(as_adq(.data), !!!enquos(...))
}
select.Dataset <- select.ArrowTabular <- select.arrow_dplyr_query

rename.arrow_dplyr_query <- function(.data, ...) {
check_select_helpers(enexprs(...))
column_select(arrow_dplyr_query(.data), !!!enquos(...), .FUN = vars_rename)
column_select(as_adq(.data), !!!enquos(...), .FUN = vars_rename)
}
rename.Dataset <- rename.ArrowTabular <- rename.arrow_dplyr_query

Expand Down Expand Up @@ -60,7 +60,7 @@ relocate.arrow_dplyr_query <- function(.data, ..., .before = NULL, .after = NULL
# at https://github.com/tidyverse/dplyr/blob/master/R/relocate.R
# TODO: revisit this after https://github.com/tidyverse/dplyr/issues/5829

.data <- arrow_dplyr_query(.data)
.data <- as_adq(.data)

# Assign the schema to the expressions
map(.data$selected_columns, ~ (.$schema <- .data$.data$schema))
Expand Down
Loading