Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-15280: [R] Expose FileSystemFactoryOptions #13171

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 5 additions & 12 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

150 changes: 123 additions & 27 deletions r/R/dataset-factory.R
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ DatasetFactory$create <- function(x,
format = c("parquet", "arrow", "ipc", "feather", "csv", "tsv", "text"),
partitioning = NULL,
hive_style = NA,
factory_options = list(),
...) {
if (is_list_of(x, "DatasetFactory")) {
return(dataset___UnionDatasetFactory__Make(x))
Expand All @@ -58,9 +59,26 @@ DatasetFactory$create <- function(x,

if (length(info) > 1 || info[[1]]$type == FileType$File) {
# x looks like a vector of one or more file paths (not a directory path)
return(FileSystemDatasetFactory$create(path_and_fs$fs, NULL, path_and_fs$path, format))
return(FileSystemDatasetFactory$create(
path_and_fs$fs,
NULL,
path_and_fs$path,
format,
factory_options = factory_options
))
}

partitioning <- handle_partitioning(partitioning, path_and_fs, hive_style)
selector <- FileSelector$create(
path_and_fs$path,
allow_not_found = FALSE,
recursive = TRUE
)

FileSystemDatasetFactory$create(path_and_fs$fs, selector, NULL, format, partitioning, factory_options)
}

handle_partitioning <- function(partitioning, path_and_fs, hive_style) {
# Handle partitioning arg in cases where it is "character" or "Schema"
if (!is.null(partitioning) && !inherits(partitioning, c("Partitioning", "PartitioningFactory"))) {
if (!is_false(hive_style)) {
Expand Down Expand Up @@ -120,14 +138,7 @@ DatasetFactory$create <- function(x,
}
}
}

selector <- FileSelector$create(
path_and_fs$path,
allow_not_found = FALSE,
recursive = TRUE
)

FileSystemDatasetFactory$create(path_and_fs$fs, selector, NULL, format, partitioning)
partitioning
}

#' Create a DatasetFactory
Expand Down Expand Up @@ -161,20 +172,38 @@ DatasetFactory$create <- function(x,
#' it is assumed to be "text".
#' @param partitioning One of
#' * A `Schema`, in which case the file paths relative to `sources` will be
#' parsed, and path segments will be matched with the schema fields. For
#' example, `schema(year = int16(), month = int8())` would create partitions
#' for file paths like "2019/01/file.parquet", "2019/02/file.parquet", etc.
#' parsed, and path segments will be matched with the schema fields. For
#' example, `schema(year = int16(), month = int8())` would create partitions
#' for file paths like "2019/01/file.parquet", "2019/02/file.parquet", etc.
#' * A character vector that defines the field names corresponding to those
#' path segments (that is, you're providing the names that would correspond
#' to a `Schema` but the types will be autodetected)
#' path segments (that is, you're providing the names that would correspond
#' to a `Schema` but the types will be autodetected)
#' * A `HivePartitioning` or `HivePartitioningFactory`, as returned
#' by [hive_partition()] which parses explicit or autodetected fields from
#' Hive-style path segments
#' by [hive_partition()] which parses explicit or autodetected fields from
#' Hive-style path segments
#' * `NULL` for no partitioning
#' @param hive_style Logical: if `partitioning` is a character vector or a
#' `Schema`, should it be interpreted as specifying Hive-style partitioning?
#' Default is `NA`, which means to inspect the file paths for Hive-style
#' partitioning and behave accordingly.
#' @param factory_options list of optional FileSystemFactoryOptions:
#' * `partition_base_dir`: string path segment prefix to ignore when
#' discovering partition information with DirectoryPartitioning. Not
#' meaningful (ignored with a warning) for HivePartitioning, nor is it
#' valid when providing a vector of file paths.
#' * `exclude_invalid_files`: logical: should files that are not valid data
#' files be excluded? Default is `FALSE` because checking all files up
#' front incurs I/O and thus will be slower, especially on remote
#' filesystems. If false and there are invalid files, there will be an
#' error at scan time. This is the only FileSystemFactoryOption that is
#' valid for both when providing a directory path in which to discover
#' files and when providing a vector of file paths.
#' * `selector_ignore_prefixes`: character vector of file prefixes to ignore
#' when discovering files in a directory. If invalid files can be excluded
#' by a common filename prefix this way, you can avoid the I/O cost of
#' `exclude_invalid_files`. Not valid when providing a vector of file paths
#' (but if you're providing the file list, you can filter invalid files
#' yourself).
#' @param ... Additional format-specific options, passed to
#' `FileFormat$create()`. For CSV options, note that you can specify them either
#' with the Arrow C++ library naming ("delimiter", "quoting", etc.) or the
Expand All @@ -198,7 +227,8 @@ FileSystemDatasetFactory$create <- function(filesystem,
selector = NULL,
paths = NULL,
format,
partitioning = NULL) {
partitioning = NULL,
factory_options = list()) {
assert_is(filesystem, "FileSystem")
is.null(selector) || assert_is(selector, "FileSelector")
is.null(paths) || assert_is(paths, "character")
Expand All @@ -208,23 +238,89 @@ FileSystemDatasetFactory$create <- function(filesystem,
)
assert_is(format, "FileFormat")
if (!is.null(paths)) {
assert_that(is.null(partitioning), msg = "Partitioning not supported with paths")
assert_that(
is.null(partitioning),
msg = "Partitioning not supported with paths"
)
# Validate that exclude_invalid_files is only option provided
# All other options are only relevant for the FileSelector method
invalid_opts <- setdiff(names(factory_options), "exclude_invalid_files")
if (length(invalid_opts)) {
stop(
"Invalid factory_options for creating a Dataset from a vector of file paths: ",
oxford_paste(invalid_opts),
call. = FALSE
)
}
return(dataset___FileSystemDatasetFactory__MakePaths(
filesystem,
paths,
format,
isTRUE(factory_options[["exclude_invalid_files"]])
))
}

if (!is.null(paths)) {
ptr <- dataset___FileSystemDatasetFactory__Make0(filesystem, paths, format)
} else if (is.null(partitioning)) {
ptr <- dataset___FileSystemDatasetFactory__Make1(filesystem, selector, format)
} else if (inherits(partitioning, "PartitioningFactory")) {
ptr <- dataset___FileSystemDatasetFactory__Make3(filesystem, selector, format, partitioning)
dataset___FileSystemDatasetFactory__Make(
filesystem,
selector,
format,
fsf_options(factory_options, partitioning)
)
}

fsf_options <- function(factory_options, partitioning) {
# Validate FileSystemFactoryOptions and put partitioning in it
valid_opts <- c(
"partition_base_dir",
"exclude_invalid_files",
"selector_ignore_prefixes"
)
invalid_opts <- setdiff(names(factory_options), valid_opts)
if (length(invalid_opts)) {
stop("Invalid factory_options: ", oxford_paste(invalid_opts), call. = FALSE)
}
if (!is.null(factory_options$partition_base_dir)) {
if (
inherits(partitioning, "HivePartitioning") ||
(
inherits(partitioning, "PartitioningFactory") &&
identical(partitioning$type_name, "hive")
)
) {
warning(
"factory_options$partition_base_dir is not meaningful for Hive partitioning",
call. = FALSE
)
} else {
assert_that(is.string(factory_options$partition_base_dir))
}
}

exclude <- factory_options$exclude_invalid_files %||% FALSE
if (!(isTRUE(exclude) || is_false(exclude))) {
stop(
"factory_options$exclude_invalid_files must be TRUE/FALSE",
call. = FALSE
)
}

if (!is.character(factory_options$selector_ignore_prefixes %||% character())) {
stop(
"factory_options$selector_ignore_prefixes must be a character vector",
call. = FALSE
)
}

if (inherits(partitioning, "PartitioningFactory")) {
factory_options[["partitioning_factory"]] <- partitioning
} else if (inherits(partitioning, "Partitioning")) {
ptr <- dataset___FileSystemDatasetFactory__Make2(filesystem, selector, format, partitioning)
} else {
factory_options[["partitioning"]] <- partitioning
} else if (!is.null(partitioning)) {
stop(
"Expected 'partitioning' to be NULL, PartitioningFactory or Partitioning",
call. = FALSE
)
}

ptr
factory_options
}
3 changes: 3 additions & 0 deletions r/R/dataset.R
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
#' is a directory path/URI or vector of file paths/URIs, otherwise ignored.
#' These may include `format` to indicate the file format, or other
#' format-specific options (see [read_csv_arrow()], [read_parquet()] and [read_feather()] on how to specify these).
#' @inheritParams dataset_factory
#' @return A [Dataset] R6 object. Use `dplyr` methods on it to query the data,
#' or call [`$NewScan()`][Scanner] to construct a query directly.
#' @export
Expand Down Expand Up @@ -178,6 +179,7 @@ open_dataset <- function(sources,
hive_style = NA,
unify_schemas = NULL,
format = c("parquet", "arrow", "ipc", "feather", "csv", "tsv", "text"),
factory_options = list(),
...) {
stop_if_no_datasets()

Expand Down Expand Up @@ -212,6 +214,7 @@ open_dataset <- function(sources,
format = format,
schema = schema,
hive_style = hive_style,
factory_options = factory_options,
...
)
tryCatch(
Expand Down
22 changes: 22 additions & 0 deletions r/man/dataset_factory.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions r/man/open_dataset.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading