Skip to content

Commit

Permalink
Add filesystem arg to read/write functions + tests; update docs and a…
Browse files Browse the repository at this point in the history
…ssorted cleanup
  • Loading branch information
nealrichardson committed Sep 17, 2020
1 parent 08da35a commit 5864fbf
Show file tree
Hide file tree
Showing 17 changed files with 168 additions and 38 deletions.
1 change: 1 addition & 0 deletions r/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ S3method(mean,Scalar)
S3method(min,Array)
S3method(min,ChunkedArray)
S3method(names,Dataset)
S3method(names,FeatherReader)
S3method(names,RecordBatch)
S3method(names,ScannerBuilder)
S3method(names,Schema)
Expand Down
9 changes: 6 additions & 3 deletions r/R/csv.R
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
#' parsing options provided in other arguments (e.g. `delim`, `quote`, etc.).
#' @param convert_options see [file reader options][CsvReadOptions]
#' @param read_options see [file reader options][CsvReadOptions]
#' @param filesystem A [FileSystem] where `file` can be found if it is a
#' string file path; default is the local file system
#' @param as_data_frame Should the function return a `data.frame` (default) or
#' an Arrow [Table]?
#'
Expand Down Expand Up @@ -98,6 +100,7 @@ read_delim_arrow <- function(file,
parse_options = NULL,
convert_options = NULL,
read_options = NULL,
filesystem = NULL,
as_data_frame = TRUE) {

if (is.null(parse_options)) {
Expand All @@ -119,7 +122,7 @@ read_delim_arrow <- function(file,
}

if (!inherits(file, "InputStream")) {
file <- make_readable_file(file)
file <- make_readable_file(file, filesystem = filesystem)
on.exit(file$close())
}
reader <- CsvTableReader$create(
Expand Down Expand Up @@ -206,7 +209,7 @@ read_tsv_arrow <- function(file,
#' The `CsvTableReader$create()` and `JsonTableReader$create()` factory methods
#' take the following arguments:
#'
#' - `file` A character path to a local file, or an Arrow input stream
#' - `file` An Arrow [InputStream]
#' - `convert_options` (CSV only), `parse_options`, `read_options`: see
#' [CsvReadOptions]
#' - `...` additional parameters.
Expand All @@ -227,7 +230,7 @@ CsvTableReader$create <- function(file,
parse_options = CsvParseOptions$create(),
convert_options = CsvConvertOptions$create(),
...) {
file <- make_readable_file(file)
assert_is(file, "InputStream")
shared_ptr(
CsvTableReader,
csv___TableReader__Make(file, read_options, parse_options, convert_options)
Expand Down
18 changes: 12 additions & 6 deletions r/R/feather.R
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#'
#' @param x `data.frame`, [RecordBatch], or [Table]
#' @param sink A string file path, URI, or [OutputStream]
#' @param filesystem A [FileSystem] where `sink` should be written if it is a
#' string file path; default is the local file system
#' @param version integer Feather file version. Version 2 is the current.
#' Version 1 is the more limited legacy format.
#' @param chunk_size For V2 files, the number of rows that each chunk of data
Expand Down Expand Up @@ -52,6 +54,7 @@
#' @include arrow-package.R
write_feather <- function(x,
sink,
filesystem = NULL,
version = 2,
chunk_size = 65536L,
compression = c("default", "lz4", "uncompressed", "zstd"),
Expand Down Expand Up @@ -106,7 +109,7 @@ write_feather <- function(x,
assert_is(x, "Table")

if (is.string(sink)) {
sink <- make_output_stream(sink)
sink <- make_output_stream(sink, filesystem)
on.exit(sink$close())
}
assert_is(sink, "OutputStream")
Expand Down Expand Up @@ -141,17 +144,16 @@ write_feather <- function(x,
#' # Can select columns
#' df <- read_feather(tf, col_select = starts_with("d"))
#' }
read_feather <- function(file, col_select = NULL, as_data_frame = TRUE, ...) {
read_feather <- function(file, col_select = NULL, as_data_frame = TRUE, filesystem = NULL, ...) {
if (!inherits(file, "RandomAccessFile")) {
file <- make_readable_file(file)
file <- make_readable_file(file, filesystem = filesystem)
on.exit(file$close())
}
reader <- FeatherReader$create(file, ...)

all_columns <- ipc___feather___Reader__column_names(reader)
col_select <- enquo(col_select)
columns <- if (!quo_is_null(col_select)) {
vars_select(all_columns, !!col_select)
vars_select(names(reader), !!col_select)
}

out <- reader$Read(columns)
Expand Down Expand Up @@ -198,10 +200,14 @@ FeatherReader <- R6Class("FeatherReader", inherit = ArrowObject,
),
active = list(
# versions are officially 2 for V1 and 3 for V2 :shrug:
version = function() ipc___feather___Reader__version(self) - 1L
version = function() ipc___feather___Reader__version(self) - 1L,
column_names = function() ipc___feather___Reader__column_names(self)
)
)

#' @export
names.FeatherReader <- function(x) x$column_names

FeatherReader$create <- function(file, mmap = TRUE, ...) {
assert_is(file, "RandomAccessFile")
shared_ptr(FeatherReader, ipc___feather___Reader__Open(file))
Expand Down
10 changes: 7 additions & 3 deletions r/R/io.R
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,16 @@ make_readable_file <- function(file, mmap = TRUE, compression = NULL, filesystem
file
}

make_output_stream <- function(x) {
make_output_stream <- function(x, filesystem = NULL) {
if (is_url(x)) {
fs_and_path <- FileSystem$from_uri(x)
fs_and_path$fs$OpenOutputStream(fs_and_path$path)
} else {
filesystem = fs_and_path$fs
x <- fs_and_path$path
}
if (is.null(filesystem)) {
FileOutputStream$create(x)
} else {
filesystem$OpenOutputStream(x)
}
}

Expand Down
10 changes: 6 additions & 4 deletions r/R/ipc_stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@
#' serialize data to a buffer.
#' [RecordBatchWriter] for a lower-level interface.
#' @export
write_ipc_stream <- function(x, sink, ...) {
write_ipc_stream <- function(x, sink, filesystem = NULL, ...) {
x_out <- x # So we can return the data we got
if (is.data.frame(x)) {
x <- Table$create(x)
}
if (is.string(sink)) {
sink <- make_output_stream(sink)
sink <- make_output_stream(sink, filesystem)
on.exit(sink$close())
}
assert_is(sink, "OutputStream")
Expand Down Expand Up @@ -90,16 +90,18 @@ write_to_raw <- function(x, format = c("stream", "file")) {
#' open.
#' @param as_data_frame Should the function return a `data.frame` (default) or
#' an Arrow [Table]?
#' @param filesystem A [FileSystem] where `file` can be found if it is a
#' string file path; default is the local file system
#' @param ... extra parameters passed to `read_feather()`.
#'
#' @return A `data.frame` if `as_data_frame` is `TRUE` (the default), or an
#' Arrow [Table] otherwise
#' @seealso [read_feather()] for writing IPC files. [RecordBatchReader] for a
#' lower-level interface.
#' @export
read_ipc_stream <- function(file, as_data_frame = TRUE, ...) {
read_ipc_stream <- function(file, as_data_frame = TRUE, filesystem = NULL, ...) {
if (!inherits(file, "InputStream")) {
file <- make_readable_file(file)
file <- make_readable_file(file, filesystem = filesystem)
on.exit(file$close())
}

Expand Down
13 changes: 10 additions & 3 deletions r/R/json.R
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,15 @@
#' ', tf, useBytes=TRUE)
#' df <- read_json_arrow(tf)
#' }
read_json_arrow <- function(file, col_select = NULL, as_data_frame = TRUE, ...) {
read_json_arrow <- function(file,
col_select = NULL,
as_data_frame = TRUE,
filesystem = NULL,
...) {
if (!inherits(file, "InputStream")) {
file <- make_readable_file(file, filesystem = filesystem)
on.exit(file$close())
}
tab <- JsonTableReader$create(file, ...)$Read()

col_select <- enquo(col_select)
Expand Down Expand Up @@ -64,8 +72,7 @@ JsonTableReader$create <- function(file,
read_options = JsonReadOptions$create(),
parse_options = JsonParseOptions$create(),
...) {

file <- make_readable_file(file)
assert_is(file, "InputStream")
shared_ptr(
JsonTableReader,
json___TableReader__Make(file, read_options, parse_options)
Expand Down
13 changes: 8 additions & 5 deletions r/R/parquet.R
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ read_parquet <- function(file,
col_select = NULL,
as_data_frame = TRUE,
props = ParquetReaderProperties$create(),
filesystem = NULL,
...) {
if (is.string(file)) {
file <- make_readable_file(file)
file <- make_readable_file(file, filesystem = filesystem)
on.exit(file$close())
}
reader <- ParquetFileReader$create(file, props = props, ...)
Expand All @@ -58,9 +59,10 @@ read_parquet <- function(file,
#' [Parquet](https://parquet.apache.org/) is a columnar storage file format.
#' This function enables you to write Parquet files from R.
#'
#' @param x An [arrow::Table][Table], or an object convertible to it.
#' @param sink an [arrow::io::OutputStream][OutputStream] or a string
#' interpreted as a file path or URI
#' @param x `data.frame`, [RecordBatch], or [Table]
#' @param sink A string file path, URI, or [OutputStream]
#' @param filesystem A [FileSystem] where `sink` should be written if it is a
#' string file path; default is the local file system
#' @param chunk_size chunk size in number of rows. If NULL, the total number of rows is used.
#' @param version parquet version, "1.0" or "2.0". Default "1.0". Numeric values
#' are coerced to character.
Expand Down Expand Up @@ -112,6 +114,7 @@ read_parquet <- function(file,
#' @export
write_parquet <- function(x,
sink,
filesystem = NULL,
chunk_size = NULL,
# writer properties
version = NULL,
Expand All @@ -130,7 +133,7 @@ write_parquet <- function(x,
}

if (is.string(sink)) {
sink <- make_output_stream(sink)
sink <- make_output_stream(sink, filesystem)
on.exit(sink$close())
} else if (!inherits(sink, "OutputStream")) {
abort("sink must be a file path or an OutputStream")
Expand Down
2 changes: 1 addition & 1 deletion r/man/CsvTableReader.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions r/man/read_delim_arrow.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion r/man/read_feather.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion r/man/read_ipc_stream.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion r/man/read_json_arrow.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions r/man/read_parquet.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions r/man/write_feather.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion r/man/write_ipc_stream.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 5864fbf

Please sign in to comment.