Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-16144: [R] Write compressed data streams (particularly over S3) #13183

Closed
wants to merge 15 commits into from
23 changes: 18 additions & 5 deletions r/R/io.R
Expand Up @@ -270,7 +270,7 @@ make_readable_file <- function(file, mmap = TRUE, compression = NULL, filesystem
file <- ReadableFile$create(file)
}

if (!identical(compression, "uncompressed")) {
if (is_compressed(compression)) {
file <- CompressedInputStream$create(file, compression)
}
} else if (inherits(file, c("raw", "Buffer"))) {
Expand All @@ -292,7 +292,7 @@ make_readable_file <- function(file, mmap = TRUE, compression = NULL, filesystem
file
}

make_output_stream <- function(x, filesystem = NULL) {
make_output_stream <- function(x, filesystem = NULL, compression = NULL) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing to watch out here: sometimes people name their parquet files something.parquet.snappy, but you wouldn't use a CompressedOutputStream for that, you'd pass the compression option to the parquet writer itself. I would guess that the make_readable_file() path handles this already, maybe that can be a model (or maybe it doesn't and needs to).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So for the parquet.snappy or even snappy.parquet I think it works because "snappy" isn't included here:

arrow/r/R/io.R

Lines 325 to 330 in 3df2e05

switch(tools::file_ext(path),
bz2 = "bz2",
gz = "gzip",
lz4 = "lz4",
zst = "zstd",
"uncompressed"

But if someone tried something like this we do get an error that isn't super informative. I think this is outside this PR so could the resolution here be to open another ticket for this specifically?

library(arrow, warn.conflicts = FALSE)
tf <- tempfile(fileext = ".parquet.gz")
write_parquet(data.frame(x = 1:5), tf, compression = "gzip", compression_level = 5)
read_parquet(tf)
#> Error: file must be a "RandomAccessFile"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, this fails on master too so ok to make a separate JIRA (please link to it here when you make it).

if (inherits(x, "connection")) {
if (!isOpen(x)) {
open(x, "wb")
Expand All @@ -309,11 +309,21 @@ make_output_stream <- function(x, filesystem = NULL) {
filesystem <- fs_and_path$fs
x <- fs_and_path$path
}

if (is.null(compression)) {
# Infer compression from sink
compression <- detect_compression(x)
}

assert_that(is.string(x))
if (is.null(filesystem)) {
FileOutputStream$create(x)
if (is.null(filesystem) && is_compressed(compression)) {
CompressedOutputStream$create(x) ##compressed local
} else if (is.null(filesystem) && !is_compressed(compression)) {
FileOutputStream$create(x) ## uncompressed local
} else if (!is.null(filesystem) && is_compressed(compression)) {
CompressedOutputStream$create(filesystem$OpenOutputStream(x)) ## compressed remote
} else {
filesystem$OpenOutputStream(x)
filesystem$OpenOutputStream(x) ## uncompressed remote
}
}

Expand All @@ -322,6 +332,9 @@ detect_compression <- function(path) {
return("uncompressed")
}

# Remove any trailing slashes, which FileSystem$from_uri may add
path <- gsub("/$", "", path)

boshek marked this conversation as resolved.
Show resolved Hide resolved
switch(tools::file_ext(path),
bz2 = "bz2",
gz = "gzip",
Expand Down
4 changes: 4 additions & 0 deletions r/R/util.R
Expand Up @@ -211,3 +211,7 @@ handle_csv_read_error <- function(e, schema, call) {
}
abort(msg, call = call)
}

is_compressed <- function(compression) {
!identical(compression, "uncompressed")
}
17 changes: 17 additions & 0 deletions r/tests/testthat/test-csv.R
Expand Up @@ -564,6 +564,23 @@ test_that("write_csv_arrow can write from RecordBatchReader objects", {
expect_equal(nrow(tbl_in), 3)
})

test_that("read/write compressed file successfully", {
skip_if_not_available("gzip")
tfgz <- tempfile(fileext = ".csv.gz")
tf <- tempfile(fileext = ".csv")
on.exit(unlink(tf))
on.exit(unlink(tfgz))

write_csv_arrow(tbl, tf)
write_csv_arrow(tbl, tfgz)
expect_lt(file.size(tfgz), file.size(tf))

expect_identical(
read_csv_arrow(tfgz),
tbl
)
})

test_that("read_csv_arrow() can read sub-second timestamps with col_types T setting (ARROW-15599)", {
tbl <- tibble::tibble(time = c("2018-10-07 19:04:05.000", "2018-10-07 19:04:05.001"))
tf <- tempfile()
Expand Down
20 changes: 20 additions & 0 deletions r/tests/testthat/test-s3-minio.R
Expand Up @@ -54,6 +54,26 @@ if (arrow_with_s3() && process_is_running("minio server")) {
)
})

test_that("read/write compressed csv by filesystem", {
skip_if_not_available("gzip")
dat <- tibble(x = seq(1, 10, by = 0.2))
boshek marked this conversation as resolved.
Show resolved Hide resolved
write_csv_arrow(dat, fs$path(minio_path("test.csv.gz")))
expect_identical(
read_csv_arrow(fs$path(minio_path("test.csv.gz"))),
dat
)
})

test_that("read/write csv by filesystem", {
skip_if_not_available("gzip")
dat <- tibble(x = seq(1, 10, by = 0.2))
write_csv_arrow(dat, fs$path(minio_path("test.csv")))
expect_identical(
read_csv_arrow(fs$path(minio_path("test.csv"))),
dat
)
})

test_that("read/write stream", {
write_ipc_stream(example_data, fs$path(minio_path("test3.ipc")))
expect_identical(
Expand Down