Skip to content

Commit

Permalink
GH-33526: [R] Implement new function open_dataset_csv with signature …
Browse files Browse the repository at this point in the history
…more closely matching read_csv_arrow (#33614)

This PR implements a wrapper around `open_dataset()` specifically for value-delimited files. It takes the parameters from `open_dataset()` and appends the parameters of `read_csv_arrow()` which are compatible with `open_dataset()`. This should make it easier for users to switch between the two, e.g.:

``` r
library(arrow)
library(dplyr)

# Set up directory for examples
tf <- tempfile()
dir.create(tf)
on.exit(unlink(tf))
df <- data.frame(x = c("1", "2", "NULL"))

file_path <- file.path(tf, "file1.txt")
write.table(df, file_path, sep = ",", row.names = FALSE)

read_csv_arrow(file_path, na = c("", "NA", "NULL"), col_names = "y", skip = 1)
#> # A tibble: 3 × 1
#>       y
#>   <int>
#> 1     1
#> 2     2
#> 3    NA

open_csv_dataset(file_path, na = c("", "NA", "NULL"), col_names = "y", skip = 1) %>% collect()
#> # A tibble: 3 × 1
#>       y
#>   <int>
#> 1     1
#> 2     2
#> 3    NA
```

This PR also hooks up the "na" (readr-style) parameter to "null_values" (i.e. CSVConvertOptions parameter).

In the process of making this PR, I also refactored `CsvFileFormat$create()`.  Unfortunately, many changes needed to be made at once, which has considerably increasing the size/complexity of this PR.

Authored-by: Nic Crane <thisisnic@gmail.com>
Signed-off-by: Nic Crane <thisisnic@gmail.com>
  • Loading branch information
thisisnic authored and raulcd committed Jan 18, 2023
1 parent c2199dc commit 6633251
Show file tree
Hide file tree
Showing 10 changed files with 648 additions and 91 deletions.
3 changes: 3 additions & 0 deletions r/NAMESPACE
Expand Up @@ -348,7 +348,10 @@ export(new_extension_type)
export(null)
export(num_range)
export(one_of)
export(open_csv_dataset)
export(open_dataset)
export(open_delim_dataset)
export(open_tsv_dataset)
export(read_csv_arrow)
export(read_delim_arrow)
export(read_feather)
Expand Down
2 changes: 1 addition & 1 deletion r/R/csv.R
Expand Up @@ -500,7 +500,7 @@ CsvWriteOptions$create <- function(include_header = TRUE, batch_size = 1024L, nu
)
}

readr_to_csv_read_options <- function(skip = 0, col_names = TRUE, col_types = NULL) {
readr_to_csv_read_options <- function(skip = 0, col_names = TRUE) {
if (isTRUE(col_names)) {
# C++ default to parse is 0-length string array
col_names <- character(0)
Expand Down
252 changes: 168 additions & 84 deletions r/R/dataset-format.R
Expand Up @@ -53,7 +53,7 @@
#' It returns the appropriate subclass of `FileFormat` (e.g. `ParquetFileFormat`)
#' @rdname FileFormat
#' @name FileFormat
#' @examplesIf arrow_with_dataset() && tolower(Sys.info()[["sysname"]]) != "windows"
#' @examplesIf arrow_with_dataset()
#' ## Semi-colon delimited files
#' # Set up directory for examples
#' tf <- tempfile()
Expand Down Expand Up @@ -113,107 +113,105 @@ ParquetFileFormat$create <- function(...,
#' @export
IpcFileFormat <- R6Class("IpcFileFormat", inherit = FileFormat)

#' @usage NULL
#' @format NULL
#' @rdname FileFormat
#' CSV dataset file format
#'
#' @description
#' A `CSVFileFormat` is a [FileFormat] subclass which holds information about how to
#' read and parse the files included in a CSV `Dataset`.
#'
#' @section Factory:
#' `CSVFileFormat$create()` can take options in the form of lists passed through as `parse_options`,
#' `read_options`, or `convert_options` parameters. Alternatively, readr-style options can be passed
#' through individually. While it is possible to pass in `CSVReadOptions`, `CSVConvertOptions`, and `CSVParseOptions`
#' objects, this is not recommended as options set in these objects are not validated for compatibility.
#'
#' @return A `CsvFileFormat` object
#' @rdname CsvFileFormat
#' @name CsvFileFormat
#' @seealso [FileFormat]
#' @examplesIf arrow_with_dataset()
#' # Set up directory for examples
#' tf <- tempfile()
#' dir.create(tf)
#' on.exit(unlink(tf))
#' df <- data.frame(x = c("1", "2", "NULL"))
#' write.table(df, file.path(tf, "file1.txt"), sep = ",", row.names = FALSE)
#'
#' # Create CsvFileFormat object with Arrow-style null_values option
#' format <- CsvFileFormat$create(convert_options = list(null_values = c("", "NA", "NULL")))
#' open_dataset(tf, format = format)
#'
#' # Use readr-style options
#' format <- CsvFileFormat$create(na = c("", "NA", "NULL"))
#' open_dataset(tf, format = format)
#'
#' @export
CsvFileFormat <- R6Class("CsvFileFormat", inherit = FileFormat)
CsvFileFormat$create <- function(...,
opts = csv_file_format_parse_options(...),
convert_options = csv_file_format_convert_opts(...),
read_options = csv_file_format_read_opts(...)) {
check_csv_file_format_args(...)
# Evaluate opts first to catch any unsupported arguments
force(opts)

options <- list(...)
schema <- options[["schema"]]
if (!is.null(schema) && !inherits(schema, "Schema")) {
abort(paste0(
"`schema` must be an object of class 'Schema' not '",
class(schema)[1],
"'."
))
}

if (!inherits(read_options, "CsvReadOptions")) {
read_options <- do.call(CsvReadOptions$create, read_options)
}
CsvFileFormat$create <- function(...) {
dots <- list(...)
options <- check_csv_file_format_args(dots)
check_schema(options[["schema"]], options[["read_options"]]$column_names)

if (!inherits(convert_options, "CsvConvertOptions")) {
convert_options <- do.call(CsvConvertOptions$create, convert_options)
}

if (!inherits(opts, "CsvParseOptions")) {
opts <- do.call(CsvParseOptions$create, opts)
}

column_names <- read_options$column_names
schema_names <- names(schema)
dataset___CsvFileFormat__Make(options$parse_options, options$convert_options, options$read_options)
}

if (!is.null(schema) && !identical(schema_names, column_names)) {
missing_from_schema <- setdiff(column_names, schema_names)
missing_from_colnames <- setdiff(schema_names, column_names)
message_colnames <- NULL
message_schema <- NULL
message_order <- NULL
# Check all arguments are valid
check_csv_file_format_args <- function(args) {
options <- list(
parse_options = args$parse_options,
convert_options = args$convert_options,
read_options = args$read_options,
schema = args$schema
)

if (length(missing_from_colnames) > 0) {
message_colnames <- paste(
oxford_paste(missing_from_colnames, quote_symbol = "`"),
"not present in `column_names`"
)
}
check_unsupported_args(args)
check_unrecognised_args(args)

if (length(missing_from_schema) > 0) {
message_schema <- paste(
oxford_paste(missing_from_schema, quote_symbol = "`"),
"not present in `schema`"
)
}
# Evaluate parse_options first to catch any unsupported arguments
if (is.null(args$parse_options)) {
options$parse_options <- do.call(csv_file_format_parse_opts, args)
} else if (is.list(args$parse_options)) {
options$parse_options <- do.call(CsvParseOptions$create, args$parse_options)
}

if (length(missing_from_schema) == 0 && length(missing_from_colnames) == 0) {
message_order <- "`column_names` and `schema` field names match but are not in the same order"
}
if (is.null(args$convert_options)) {
options$convert_options <- do.call(csv_file_format_convert_opts, args)
} else if (is.list(args$convert_options)) {
options$convert_options <- do.call(CsvConvertOptions$create, args$convert_options)
}

abort(
c(
"Values in `column_names` must match `schema` field names",
x = message_order,
x = message_schema,
x = message_colnames
)
)
if (is.null(args$read_options)) {
options$read_options <- do.call(csv_file_format_read_opts, args)
} else if (is.list(args$read_options)) {
options$read_options <- do.call(CsvReadOptions$create, args$read_options)
}

dataset___CsvFileFormat__Make(opts, convert_options, read_options)
options
}

# Check all arguments are valid
check_csv_file_format_args <- function(...) {
opts <- list(...)
check_unsupported_args <- function(args) {
opt_names <- get_opt_names(args)

# Filter out arguments meant for CsvConvertOptions/CsvReadOptions
convert_opts <- c(names(formals(CsvConvertOptions$create)))
supported_convert_opts <- c(names(formals(CsvConvertOptions$create)), "na")

read_opts <- c(
supported_read_opts <- c(
names(formals(CsvReadOptions$create)),
names(formals(readr_to_csv_read_options))
)

# We only currently support all of the readr options for parseoptions
parse_opts <- c(
supported_parse_opts <- c(
names(formals(CsvParseOptions$create)),
names(formals(readr_to_csv_parse_options))
)

opt_names <- names(opts)

# Catch any readr-style options specified with full option names that are
# supported by read_delim_arrow() (and its wrappers) but are not yet
# supported here
unsup_readr_opts <- setdiff(
names(formals(read_delim_arrow)),
c(convert_opts, read_opts, parse_opts, "schema")
c(supported_convert_opts, supported_read_opts, supported_parse_opts, "schema")
)

is_unsup_opt <- opt_names %in% unsup_readr_opts
Expand All @@ -228,9 +226,36 @@ check_csv_file_format_args <- function(...) {
call. = FALSE
)
}
}

# unlists "parse_options", "convert_options", "read_options" and returns them along with
# names of options passed in individually via args. `get_opt_names()` ignores any
# CSV*Options objects passed in as these are not validated - users must ensure they've
# chosen reasonable values in this case.
get_opt_names <- function(args) {
opt_names <- names(args)

# extract names of parse_options, read_options, and convert_options
if ("parse_options" %in% names(args) && is.list(args[["parse_options"]])) {
opt_names <- c(opt_names, names(args[["parse_options"]]))
}

if ("read_options" %in% names(args) && is.list(args[["read_options"]])) {
opt_names <- c(opt_names, names(args[["read_options"]]))
}

if ("convert_options" %in% names(args) && is.list(args[["convert_options"]])) {
opt_names <- c(opt_names, names(args[["convert_options"]]))
}

setdiff(opt_names, c("parse_options", "read_options", "convert_options"))
}

check_unrecognised_args <- function(opts) {
# Catch any options with full or partial names that do not match any of the
# recognized Arrow C++ option names or readr-style option names
opt_names <- get_opt_names(opts)

arrow_opts <- c(
names(formals(CsvParseOptions$create)),
names(formals(CsvReadOptions$create)),
Expand All @@ -240,7 +265,8 @@ check_csv_file_format_args <- function(...) {

readr_opts <- c(
names(formals(readr_to_csv_parse_options)),
names(formals(readr_to_csv_read_options))
names(formals(readr_to_csv_read_options)),
"na"
)

is_arrow_opt <- !is.na(pmatch(opt_names, arrow_opts))
Expand Down Expand Up @@ -271,26 +297,74 @@ check_ambiguous_options <- function(passed_opts, opts1, opts2) {
}
}

check_schema <- function(schema, column_names) {
if (!is.null(schema) && !inherits(schema, "Schema")) {
abort(paste0(
"`schema` must be an object of class 'Schema' not '",
class(schema)[1],
"'."
))
}

schema_names <- names(schema)

if (!is.null(schema) && !identical(schema_names, column_names)) {
missing_from_schema <- setdiff(column_names, schema_names)
missing_from_colnames <- setdiff(schema_names, column_names)
message_colnames <- NULL
message_schema <- NULL
message_order <- NULL

if (length(missing_from_colnames) > 0) {
message_colnames <- paste(
oxford_paste(missing_from_colnames, quote_symbol = "`"),
"not present in `column_names`"
)
}

if (length(missing_from_schema) > 0) {
message_schema <- paste(
oxford_paste(missing_from_schema, quote_symbol = "`"),
"not present in `schema`"
)
}

if (length(missing_from_schema) == 0 && length(missing_from_colnames) == 0) {
message_order <- "`column_names` and `schema` field names match but are not in the same order"
}

abort(
c(
"Values in `column_names` must match `schema` field names",
x = message_order,
x = message_schema,
x = message_colnames
)
)
}
}

# Support both readr-style option names and Arrow C++ option names
csv_file_format_parse_options <- function(...) {
csv_file_format_parse_opts <- function(...) {
opts <- list(...)
# Filter out arguments meant for CsvConvertOptions/CsvReadOptions
convert_opts <- names(formals(CsvConvertOptions$create))
convert_opts <- c(names(formals(CsvConvertOptions$create)), "na", "convert_options")
read_opts <- c(
names(formals(CsvReadOptions$create)),
names(formals(readr_to_csv_read_options))
names(formals(readr_to_csv_read_options)),
"read_options"
)
opts[convert_opts] <- NULL
opts[read_opts] <- NULL
opts[["schema"]] <- NULL
opt_names <- names(opts)
opts[["parse_options"]] <- NULL
opt_names <- get_opt_names(opts)

arrow_opts <- c(names(formals(CsvParseOptions$create)))
readr_opts <- c(names(formals(readr_to_csv_parse_options)))

is_arrow_opt <- !is.na(pmatch(opt_names, arrow_opts))
is_readr_opt <- !is.na(pmatch(opt_names, readr_opts))

# Catch options with ambiguous partial names (such as "del") that make it
# unclear whether the user is specifying Arrow C++ options ("delimiter") or
# readr-style options ("delim")
Expand All @@ -313,28 +387,38 @@ csv_file_format_parse_options <- function(...) {
csv_file_format_convert_opts <- function(...) {
opts <- list(...)
# Filter out arguments meant for CsvParseOptions/CsvReadOptions
arrow_opts <- names(formals(CsvParseOptions$create))
arrow_opts <- c(names(formals(CsvParseOptions$create)), "parse_options")
readr_opts <- names(formals(readr_to_csv_parse_options))
read_opts <- c(
names(formals(CsvReadOptions$create)),
names(formals(readr_to_csv_read_options))
names(formals(readr_to_csv_read_options)),
"read_options"
)
opts[arrow_opts] <- NULL
opts[readr_opts] <- NULL
opts[read_opts] <- NULL
opts[["schema"]] <- NULL
opts[["convert_options"]] <- NULL

# map "na" to "null_values"
if ("na" %in% names(opts)) {
opts[["null_values"]] <- opts[["na"]]
opts[["na"]] <- NULL
}

do.call(CsvConvertOptions$create, opts)
}

csv_file_format_read_opts <- function(schema = NULL, ...) {
opts <- list(...)
# Filter out arguments meant for CsvParseOptions/CsvConvertOptions
arrow_opts <- names(formals(CsvParseOptions$create))
arrow_opts <- c(names(formals(CsvParseOptions$create)), "parse_options")
readr_opts <- names(formals(readr_to_csv_parse_options))
convert_opts <- names(formals(CsvConvertOptions$create))
convert_opts <- c(names(formals(CsvConvertOptions$create)), "na", "convert_options")
opts[arrow_opts] <- NULL
opts[readr_opts] <- NULL
opts[convert_opts] <- NULL
opts[["read_options"]] <- NULL

opt_names <- names(opts)
arrow_opts <- c(names(formals(CsvReadOptions$create)))
Expand Down

0 comments on commit 6633251

Please sign in to comment.