Skip to content

Commit

Permalink
ARROW-8390: [R] Expose schema unification features
Browse files Browse the repository at this point in the history
Recent changes made the default on dataset creation no longer scan all files and find a unified schema. This patch exposes an option on `$Inspect` and `$Finish` to reënable the old behavior. It also adds a wrapper around `UnifySchemas` for general use and adds that in `Dataset$create()` when making a `UnionDataset`.

Dataset tests aren't really exercising the new options; hoping those are sufficiently tested in the C++ library.

Closes #6890 from nealrichardson/unify-schemas

Authored-by: Neal Richardson <neal.p.richardson@gmail.com>
Signed-off-by: Neal Richardson <neal.p.richardson@gmail.com>
  • Loading branch information
nealrichardson committed Apr 10, 2020
1 parent 4e9ced6 commit 988a3f8
Show file tree
Hide file tree
Showing 13 changed files with 168 additions and 32 deletions.
1 change: 1 addition & 0 deletions r/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ export(uint16)
export(uint32)
export(uint64)
export(uint8)
export(unify_schemas)
export(utf8)
export(write_arrow)
export(write_feather)
Expand Down
3 changes: 2 additions & 1 deletion r/NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ See `vignette("python", package = "arrow")` for details.
* Installation on Linux now builds C++ the library from source by default, with some compression libraries disabled. For a faster, richer build, set the environment variable `NOT_CRAN=true`. See `vignette("install", package = "arrow")` for details and more options.
* Source installation is faster and more reliable on more Linux distributions.

## Other bug fixes
## Other bug fixes and enhancements

* `unify_schemas()` to create a `Schema` containing the union of fields in multiple schemas
* Timezones are faithfully preserved in roundtrip between R and Arrow
* `read_feather()` and other reader functions close any file connections they open
* Arrow R6 objects no longer have namespace collisions when the `R.oo` package is also loaded
Expand Down
12 changes: 8 additions & 4 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 35 additions & 10 deletions r/R/dataset.R
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,35 @@
#' * `NULL` for no partitioning
#'
#' The default is to autodetect Hive-style partitions.
#' @param unify_schemas logical: should all data fragments (files, `Dataset`s)
#' be scanned in order to create a unified schema from them? If `FALSE`, only
#' the first fragment will be inspected for its schema. Use this
#' fast path when you know and trust that all fragments have an identical schema.
#' The default is `FALSE` when creating a dataset from a file path (because
#' there may be many files and scanning may be slow) but `TRUE` when `sources`
#' is a list of `Dataset`s (because there should be few `Dataset`s in the list
#' and their `Schema`s are already in memory).
#' @param ... additional arguments passed to `dataset_factory()` when
#' `sources` is a file path, otherwise ignored.
#' @return A [Dataset] R6 object. Use `dplyr` methods on it to query the data,
#' or call [`$NewScan()`][Scanner] to construct a query directly.
#' @export
#' @seealso `vignette("dataset", package = "arrow")`
#' @include arrow-package.R
open_dataset <- function(sources, schema = NULL, partitioning = hive_partition(), ...) {
open_dataset <- function(sources,
schema = NULL,
partitioning = hive_partition(),
unify_schemas = NULL,
...) {
if (is_list_of(sources, "Dataset")) {
if (is.null(schema)) {
# Take the first one.
# Someday, we should expose a way to unify schemas
schema <- sources[[1]]$schema
if (is.null(unify_schemas) || isTRUE(unify_schemas)) {
# Default is to unify schemas here
schema <- unify_schemas(schemas = map(sources, ~.$schema))
} else {
# Take the first one.
schema <- sources[[1]]$schema
}
}
# Enforce that all datasets have the same schema
sources <- lapply(sources, function(x) {
Expand All @@ -65,7 +81,8 @@ open_dataset <- function(sources, schema = NULL, partitioning = hive_partition()
return(shared_ptr(UnionDataset, dataset___UnionDataset__create(sources, schema)))
}
factory <- DatasetFactory$create(sources, partitioning = partitioning, ...)
factory$Finish(schema)
# Default is _not_ to inspect/unify schemas
factory$Finish(schema, isTRUE(unify_schemas))
}

#' Multi-file datasets
Expand All @@ -92,8 +109,14 @@ open_dataset <- function(sources, schema = NULL, partitioning = hive_partition()
#' For the `DatasetFactory$create()` factory method, see [dataset_factory()], an
#' alias for it. A `DatasetFactory` has:
#'
#' - `$Inspect()`: Returns a common [Schema] for all data discovered by the factory.
#' - `$Finish(schema)`: Returns a `Dataset`
#' - `$Inspect(unify_schemas)`: If `unify_schemas` is `TRUE`, all fragments
#' will be scanned and a unified [Schema] will be created from them; if `FALSE`
#' (default), only the first fragment will be inspected for its schema. Use this
#' fast path when you know and trust that all fragments have an identical schema.
#' - `$Finish(schema, unify_schemas)`: Returns a `Dataset`. If `schema` is provided,
#' it will be used for the `Dataset`; if omitted, a `Schema` will be created from
#' inspecting the fragments (files) in the dataset, following `unify_schemas`
#' as described above.
#'
#' `FileSystemDatasetFactory$create()` is a lower-level factory method and
#' takes the following arguments:
Expand Down Expand Up @@ -231,15 +254,17 @@ UnionDataset <- R6Class("UnionDataset", inherit = Dataset,
#' @export
DatasetFactory <- R6Class("DatasetFactory", inherit = ArrowObject,
public = list(
Finish = function(schema = NULL) {
Finish = function(schema = NULL, unify_schemas = FALSE) {
if (is.null(schema)) {
ptr <- dataset___DatasetFactory__Finish1(self)
ptr <- dataset___DatasetFactory__Finish1(self, unify_schemas)
} else {
ptr <- dataset___DatasetFactory__Finish2(self, schema)
}
shared_ptr(Dataset, ptr)$..dispatch()
},
Inspect = function() shared_ptr(Schema, dataset___DatasetFactory__Inspect(self))
Inspect = function(unify_schemas = FALSE) {
shared_ptr(Schema, dataset___DatasetFactory__Inspect(self, unify_schemas))
}
)
)
DatasetFactory$create <- function(x,
Expand Down
16 changes: 16 additions & 0 deletions r/R/schema.R
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,19 @@ read_schema <- function(stream, ...) {
return(shared_ptr(Schema, ipc___ReadSchema_InputStream(stream)))
}
}

#' Combine and harmonize schemas
#'
#' @param ... [Schema]s to unify
#' @param schemas Alternatively, a list of schemas
#' @return A `Schema` with the union of fields contained in the inputs
#' @export
#' @examples
#' \dontrun{
#' a <- schema(b = double(), c = bool())
#' z <- schema(b = double(), k = utf8())
#' unify_schemas(a, z),
#' }
unify_schemas <- function(..., schemas = list(...)) {
shared_ptr(Schema, arrow__UnifySchemas(schemas))
}
1 change: 1 addition & 0 deletions r/_pkgdown.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ reference:
- title: Arrow data types and schema
contents:
- Schema
- unify_schemas
- type
- dictionary
- Field
Expand Down
10 changes: 8 additions & 2 deletions r/man/Dataset.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 16 additions & 1 deletion r/man/open_dataset.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions r/man/unify_schemas.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 28 additions & 10 deletions r/src/arrowExports.cpp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 12 additions & 4 deletions r/src/dataset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,12 @@ std::vector<std::string> dataset___FileSystemDataset__files(

// [[arrow::export]]
std::shared_ptr<ds::Dataset> dataset___DatasetFactory__Finish1(
const std::shared_ptr<ds::DatasetFactory>& factory) {
return VALUE_OR_STOP(factory->Finish());
const std::shared_ptr<ds::DatasetFactory>& factory, bool unify_schemas) {
ds::FinishOptions opts;
if (unify_schemas) {
opts.inspect_options.fragments = ds::InspectOptions::kInspectAllFragments;
}
return VALUE_OR_STOP(factory->Finish(opts));
}

// [[arrow::export]]
Expand All @@ -89,8 +93,12 @@ std::shared_ptr<ds::Dataset> dataset___DatasetFactory__Finish2(

// [[arrow::export]]
std::shared_ptr<arrow::Schema> dataset___DatasetFactory__Inspect(
const std::shared_ptr<ds::DatasetFactory>& factory) {
return VALUE_OR_STOP(factory->Inspect());
const std::shared_ptr<ds::DatasetFactory>& factory, bool unify_schemas) {
ds::InspectOptions opts;
if (unify_schemas) {
opts.fragments = ds::InspectOptions::kInspectAllFragments;
}
return VALUE_OR_STOP(factory->Inspect(opts));
}

// [[arrow::export]]
Expand Down
6 changes: 6 additions & 0 deletions r/src/schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,10 @@ bool Schema__Equals(const std::shared_ptr<arrow::Schema>& schema,
return schema->Equals(*other, check_metadata);
}

// [[arrow::export]]
std::shared_ptr<arrow::Schema> arrow__UnifySchemas(
const std::vector<std::shared_ptr<arrow::Schema>>& schemas) {
return VALUE_OR_STOP(arrow::UnifySchemas(schemas));
}

#endif
9 changes: 9 additions & 0 deletions r/tests/testthat/test-schema.R
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,12 @@ test_that("Schema$Equals", {
# Non-schema object
expect_false(a$Equals(42))
})

test_that("unify_schemas", {
a <- schema(b = double(), c = bool())
z <- schema(b = double(), k = utf8())
expect_equal(
unify_schemas(a, z),
schema(b = double(), c = bool(), k = utf8())
)
})

0 comments on commit 988a3f8

Please sign in to comment.