Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-9869: [R] Implement full S3FileSystem/S3Options constructor #8197

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 4 additions & 2 deletions dev/tasks/homebrew-formulae/travis.osx.r.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@ before_install:
- sed -i.bak -E -e 's@https://github.com/apache/arrow.git"$@{{ arrow.remote }}.git", :revision => "{{ arrow.head }}"@' tools/apache-arrow.rb && rm -f tools/apache-arrow.rb.bak
# Sometimes crossbow gives a remote URL with .git and sometimes not. Make sure there's only one
- sed -i.bak -E -e 's@.git.git@.git@' tools/apache-arrow.rb && rm -f tools/apache-arrow.rb.bak
# Get minio for S3 testing
- brew install minio/stable/minio
script:
- Rscript -e 'install.packages("rcmdcheck")'
- Rscript -e 'install.packages(c("rcmdcheck", "sys"))'
# Note that this is not --as-cran. CRAN doesn't do macOS checks --as-cran
- travis_wait Rscript -e "rcmdcheck::rcmdcheck(build_args = '--no-build-vignettes', args = c('--no-manual', '--ignore-vignettes', '--run-donttest'), error_on = 'warning', check_dir = 'check')"
- travis_wait Rscript -e "minio_dir <- tempfile(); dir.create(minio_dir); pid <- sys::exec_background('minio', c('server', minio_dir)); on.exit(tools::pskill(pid)); rcmdcheck::rcmdcheck(build_args = '--no-build-vignettes', args = c('--no-manual', '--ignore-vignettes', '--run-donttest'), error_on = 'warning', check_dir = 'check')"
# If there's a build failure, it's probably in this log. Let's print it regardless though
- cat check/arrow.Rcheck/00install.out
2 changes: 1 addition & 1 deletion docs/source/python/filesystems.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ and Amazon S3-compatible storage (:class:`S3FileSystem`).
Usage
-----

A FileSystem object can be created with one of the constuctors (and check the
A FileSystem object can be created with one of the constructors (and check the
respective constructor for its options)::

>>> from pyarrow import fs
Expand Down
3 changes: 3 additions & 0 deletions r/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ S3method(mean,Scalar)
S3method(min,Array)
S3method(min,ChunkedArray)
S3method(names,Dataset)
S3method(names,FeatherReader)
S3method(names,RecordBatch)
S3method(names,ScannerBuilder)
S3method(names,Schema)
Expand Down Expand Up @@ -287,6 +288,7 @@ importFrom(rlang,enquos)
importFrom(rlang,env)
importFrom(rlang,env_bind)
importFrom(rlang,eval_tidy)
importFrom(rlang,exec)
importFrom(rlang,is_false)
importFrom(rlang,is_integerish)
importFrom(rlang,list2)
Expand All @@ -309,6 +311,7 @@ importFrom(tidyselect,vars_rename)
importFrom(tidyselect,vars_select)
importFrom(utils,head)
importFrom(utils,install.packages)
importFrom(utils,modifyList)
importFrom(utils,tail)
importFrom(vctrs,s3_register)
importFrom(vctrs,vec_ptype_abbr)
Expand Down
2 changes: 1 addition & 1 deletion r/NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
## AWS S3 support

* S3 support is now enabled in binary macOS and Windows (Rtools40 only, i.e. R >= 4.0) packages. To enable it on Linux, you will need to build and install `aws-sdk-cpp` from source, then set the environment variable `EXTRA_CMAKE_FLAGS="-DARROW_S3=ON -DAWSSDK_SOURCE=SYSTEM"` prior to building the R package (with bundled C++ build, not with Arrow system libraries) from source.
* File readers and writers (`read_parquet()`, `write_feather()`, et al.) now accept an `s3://` URI as the source or destination file, as do `open_dataset()` and `write_dataset()`. See `vignette("fs", package = "arrow")` for details.
* File readers and writers (`read_parquet()`, `write_feather()`, et al.), as well as `open_dataset()` and `write_dataset()`, allow you to access resources on S3 (or on file systems that emulate S3) either by providing an `s3://` URI or by passing an additional `filesystem` argument. See `vignette("fs", package = "arrow")` for details.

## Computation

Expand Down
2 changes: 1 addition & 1 deletion r/R/arrow-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#' @importFrom R6 R6Class
#' @importFrom purrr as_mapper map map2 map_chr map_dfr map_int map_lgl
#' @importFrom assertthat assert_that is.string
#' @importFrom rlang list2 %||% is_false abort dots_n warn enquo quo_is_null enquos is_integerish quos eval_tidy new_data_mask syms env env_bind as_label set_names
#' @importFrom rlang list2 %||% is_false abort dots_n warn enquo quo_is_null enquos is_integerish quos eval_tidy new_data_mask syms env env_bind as_label set_names exec
#' @importFrom tidyselect vars_select
#' @useDynLib arrow, .registration = TRUE
#' @keywords internal
Expand Down
8 changes: 2 additions & 6 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions r/R/csv.R
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
#' parsing options provided in other arguments (e.g. `delim`, `quote`, etc.).
#' @param convert_options see [file reader options][CsvReadOptions]
#' @param read_options see [file reader options][CsvReadOptions]
#' @param filesystem A [FileSystem] where `file` can be found if it is a
#' string file path; default is the local file system
#' @param as_data_frame Should the function return a `data.frame` (default) or
#' an Arrow [Table]?
#'
Expand Down Expand Up @@ -98,6 +100,7 @@ read_delim_arrow <- function(file,
parse_options = NULL,
convert_options = NULL,
read_options = NULL,
filesystem = NULL,
as_data_frame = TRUE) {

if (is.null(parse_options)) {
Expand All @@ -119,7 +122,7 @@ read_delim_arrow <- function(file,
}

if (!inherits(file, "InputStream")) {
file <- make_readable_file(file)
file <- make_readable_file(file, filesystem = filesystem)
on.exit(file$close())
}
reader <- CsvTableReader$create(
Expand Down Expand Up @@ -206,7 +209,7 @@ read_tsv_arrow <- function(file,
#' The `CsvTableReader$create()` and `JsonTableReader$create()` factory methods
#' take the following arguments:
#'
#' - `file` A character path to a local file, or an Arrow input stream
#' - `file` An Arrow [InputStream]
#' - `convert_options` (CSV only), `parse_options`, `read_options`: see
#' [CsvReadOptions]
#' - `...` additional parameters.
Expand All @@ -227,7 +230,7 @@ CsvTableReader$create <- function(file,
parse_options = CsvParseOptions$create(),
convert_options = CsvConvertOptions$create(),
...) {
file <- make_readable_file(file)
assert_is(file, "InputStream")
shared_ptr(
CsvTableReader,
csv___TableReader__Make(file, read_options, parse_options, convert_options)
Expand Down
18 changes: 12 additions & 6 deletions r/R/feather.R
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#'
#' @param x `data.frame`, [RecordBatch], or [Table]
#' @param sink A string file path, URI, or [OutputStream]
#' @param filesystem A [FileSystem] where `sink` should be written if it is a
#' string file path; default is the local file system
#' @param version integer Feather file version. Version 2 is the current.
#' Version 1 is the more limited legacy format.
#' @param chunk_size For V2 files, the number of rows that each chunk of data
Expand Down Expand Up @@ -52,6 +54,7 @@
#' @include arrow-package.R
write_feather <- function(x,
sink,
filesystem = NULL,
version = 2,
chunk_size = 65536L,
compression = c("default", "lz4", "uncompressed", "zstd"),
Expand Down Expand Up @@ -106,7 +109,7 @@ write_feather <- function(x,
assert_is(x, "Table")

if (is.string(sink)) {
sink <- make_output_stream(sink)
sink <- make_output_stream(sink, filesystem)
on.exit(sink$close())
}
assert_is(sink, "OutputStream")
Expand Down Expand Up @@ -141,17 +144,16 @@ write_feather <- function(x,
#' # Can select columns
#' df <- read_feather(tf, col_select = starts_with("d"))
#' }
read_feather <- function(file, col_select = NULL, as_data_frame = TRUE, ...) {
read_feather <- function(file, col_select = NULL, as_data_frame = TRUE, filesystem = NULL, ...) {
if (!inherits(file, "RandomAccessFile")) {
file <- make_readable_file(file)
file <- make_readable_file(file, filesystem = filesystem)
on.exit(file$close())
}
reader <- FeatherReader$create(file, ...)

all_columns <- ipc___feather___Reader__column_names(reader)
col_select <- enquo(col_select)
columns <- if (!quo_is_null(col_select)) {
vars_select(all_columns, !!col_select)
vars_select(names(reader), !!col_select)
}

out <- reader$Read(columns)
Expand Down Expand Up @@ -198,10 +200,14 @@ FeatherReader <- R6Class("FeatherReader", inherit = ArrowObject,
),
active = list(
# versions are officially 2 for V1 and 3 for V2 :shrug:
version = function() ipc___feather___Reader__version(self) - 1L
version = function() ipc___feather___Reader__version(self) - 1L,
column_names = function() ipc___feather___Reader__column_names(self)
)
)

#' @export
names.FeatherReader <- function(x) x$column_names

FeatherReader$create <- function(file, mmap = TRUE, ...) {
assert_is(file, "RandomAccessFile")
shared_ptr(FeatherReader, ipc___feather___Reader__Open(file))
Expand Down
94 changes: 84 additions & 10 deletions r/R/filesystem.R
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,39 @@ FileSelector$create <- function(base_dir, allow_not_found = FALSE, recursive = F
#'
#' @section Factory:
#'
#' The `$create()` factory methods instantiate the `FileSystem` object and
#' take the following arguments, depending on the subclass:
#' `LocalFileSystem$create()` returns the object and takes no arguments.
#'
#' - no argument is needed for instantiating a `LocalFileSystem`
#' - `base_path` and `base_fs` for instantiating a `SubTreeFileSystem`
#' `SubTreeFileSystem$create()` takes the following arguments:
#'
#' - `base_path`, a string path
#' - `base_fs`, a `FileSystem` object
#'
#' `S3FileSystem$create()` optionally takes arguments:
#'
#' - `anonymous`: logical, default `FALSE`. If true, will not attempt to look up
#' credentials using standard AWS configuration methods.
#' - `access_key`, `secret_key`: authentication credentials. If one is provided,
#' the other must be as well. If both are provided, they will override any
#' AWS configuration set at the environment level.
#' - `session_token`: optional string for authentication along with
#' `access_key` and `secret_key`
#' - `role_arn`: string AWS ARN of an AccessRole. If provided instead of `access_key` and
#' `secret_key`, temporary credentials will be fetched by assuming this role.
#' - `session_name`: optional string identifier for the assumed role session.
#' - `external_id`: optional unique string identifier that might be required
#' when you assume a role in another account.
#' - `load_frequency`: integer, frequency (in seconds) with which temporary
#' credentials from an assumed role session will be refreshed. Default is
#' 900 (i.e. 15 minutes)
#' - `region`: AWS region to connect to. If omitted, the AWS library will
#' provide a sensible default based on client configuration, falling back
#' to "us-east-1" if no other alternatives are found.
#' - `endpoint_override`: If non-empty, override region with a connect string
#' such as "localhost:9000". This is useful for connecting to file systems
#' that emulate S3.
#' - `scheme`: S3 connection transport (default "https")
#' - `background_writes`: logical, whether `OutputStream` writes will be issued
#' in the background, without blocking (default `TRUE`)
#'
#' @section Methods:
#'
Expand Down Expand Up @@ -279,13 +307,56 @@ LocalFileSystem$create <- function() {
#' @usage NULL
#' @format NULL
#' @rdname FileSystem
#' @importFrom utils modifyList
#' @export
S3FileSystem <- R6Class("S3FileSystem", inherit = FileSystem)
S3FileSystem$create <- function() {
fs___EnsureS3Initialized()
shared_ptr(S3FileSystem, fs___S3FileSystem__create())
S3FileSystem$create <- function(anonymous = FALSE, ...) {
args <- list2(...)
if (anonymous) {
invalid_args <- intersect(c("access_key", "secret_key", "session_token", "role_arn", "session_name", "external_id", "load_frequency"), names(args))
if (length(invalid_args)) {
stop("Cannot specify ", oxford_paste(invalid_args), " when anonymous = TRUE", call. = FALSE)
}
} else {
keys_present <- length(intersect(c("access_key", "secret_key"), names(args)))
if (keys_present == 1) {
stop("Key authentication requires both access_key and secret_key", call. = FALSE)
}
if ("session_token" %in% names(args) && keys_present != 2) {
stop(
"In order to initialize a session with temporary credentials, ",
"both secret_key and access_key must be provided ",
"in addition to session_token.",
call. = FALSE
)
}
arn <- "role_arn" %in% names(args)
if (keys_present == 2 && arn) {
stop("Cannot provide both key authentication and role_arn", call. = FALSE)
}
arn_extras <- intersect(c("session_name", "external_id", "load_frequency"), names(args))
if (length(arn_extras) > 0 && !arn) {
stop("Cannot specify ", oxford_paste(arn_extras), " without providing a role_arn string", call. = FALSE)
}
}
args <- c(modifyList(default_s3_options, args), anonymous = anonymous)
shared_ptr(S3FileSystem, exec(fs___S3FileSystem__create, !!!args))
}

default_s3_options <- list(
access_key = "",
secret_key = "",
session_token = "",
role_arn = "",
session_name = "",
external_id = "",
load_frequency = 900L,
region = "",
endpoint_override = "",
scheme = "",
background_writes = TRUE
)

arrow_with_s3 <- function() {
.Call(`_s3_available`)
}
Expand All @@ -295,9 +366,12 @@ arrow_with_s3 <- function() {
#' @rdname FileSystem
#' @export
SubTreeFileSystem <- R6Class("SubTreeFileSystem", inherit = FileSystem)
SubTreeFileSystem$create <- function(base_path, base_fs) {
xp <- fs___SubTreeFileSystem__create(clean_path_rel(base_path), base_fs)
shared_ptr(SubTreeFileSystem, xp)
SubTreeFileSystem$create <- function(base_path, base_fs = NULL) {
fs_and_path <- get_path_and_filesystem(base_path, base_fs)
shared_ptr(
SubTreeFileSystem,
fs___SubTreeFileSystem__create(fs_and_path$path, fs_and_path$fs)
)
}

#' Copy files between FileSystems
Expand Down
10 changes: 7 additions & 3 deletions r/R/io.R
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,16 @@ make_readable_file <- function(file, mmap = TRUE, compression = NULL, filesystem
file
}

make_output_stream <- function(x) {
make_output_stream <- function(x, filesystem = NULL) {
if (is_url(x)) {
fs_and_path <- FileSystem$from_uri(x)
fs_and_path$fs$OpenOutputStream(fs_and_path$path)
} else {
filesystem = fs_and_path$fs
x <- fs_and_path$path
}
if (is.null(filesystem)) {
FileOutputStream$create(x)
} else {
filesystem$OpenOutputStream(x)
}
}

Expand Down
10 changes: 6 additions & 4 deletions r/R/ipc_stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@
#' serialize data to a buffer.
#' [RecordBatchWriter] for a lower-level interface.
#' @export
write_ipc_stream <- function(x, sink, ...) {
write_ipc_stream <- function(x, sink, filesystem = NULL, ...) {
x_out <- x # So we can return the data we got
if (is.data.frame(x)) {
x <- Table$create(x)
}
if (is.string(sink)) {
sink <- make_output_stream(sink)
sink <- make_output_stream(sink, filesystem)
on.exit(sink$close())
}
assert_is(sink, "OutputStream")
Expand Down Expand Up @@ -90,16 +90,18 @@ write_to_raw <- function(x, format = c("stream", "file")) {
#' open.
#' @param as_data_frame Should the function return a `data.frame` (default) or
#' an Arrow [Table]?
#' @param filesystem A [FileSystem] where `file` can be found if it is a
#' string file path; default is the local file system
#' @param ... extra parameters passed to `read_feather()`.
#'
#' @return A `data.frame` if `as_data_frame` is `TRUE` (the default), or an
#' Arrow [Table] otherwise
#' @seealso [read_feather()] for writing IPC files. [RecordBatchReader] for a
#' lower-level interface.
#' @export
read_ipc_stream <- function(file, as_data_frame = TRUE, ...) {
read_ipc_stream <- function(file, as_data_frame = TRUE, filesystem = NULL, ...) {
if (!inherits(file, "InputStream")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a lot of this happening. Does this really need file= and filesystem. Can this be something like:

read_ipc_stream(filesystem$open(file))

?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, it's spelled filesystem$OpenInputStream(file) if it's for reading, or filesystem$OpenOutputStream(file) for writing. I resisted adding the extra argument, but I thought it was less bad than requiring users to learn those.

But we could think of other solutions. One possibility is having some sort of FileLocator object that is filesystem + path; we've started exploring that in places in the C++ library, and I've been looking into similar things for ARROW-9870. Maybe we could revisit this question in that PR (next week-ish)?

file <- make_readable_file(file)
file <- make_readable_file(file, filesystem = filesystem)
on.exit(file$close())
}

Expand Down
13 changes: 10 additions & 3 deletions r/R/json.R
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,15 @@
#' ', tf, useBytes=TRUE)
#' df <- read_json_arrow(tf)
#' }
read_json_arrow <- function(file, col_select = NULL, as_data_frame = TRUE, ...) {
read_json_arrow <- function(file,
col_select = NULL,
as_data_frame = TRUE,
filesystem = NULL,
...) {
if (!inherits(file, "InputStream")) {
file <- make_readable_file(file, filesystem = filesystem)
on.exit(file$close())
}
tab <- JsonTableReader$create(file, ...)$Read()

col_select <- enquo(col_select)
Expand Down Expand Up @@ -64,8 +72,7 @@ JsonTableReader$create <- function(file,
read_options = JsonReadOptions$create(),
parse_options = JsonParseOptions$create(),
...) {

file <- make_readable_file(file)
assert_is(file, "InputStream")
shared_ptr(
JsonTableReader,
json___TableReader__Make(file, read_options, parse_options)
Expand Down