Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[R] Write compressed data streams (particularly over S3) #31551

Closed
asfimport opened this issue Apr 7, 2022 · 4 comments
Closed

[R] Write compressed data streams (particularly over S3) #31551

asfimport opened this issue Apr 7, 2022 · 4 comments

Comments

@asfimport
Copy link

The python bindings have CompressedOutputStream, but  I don't see how we can do this on the R side (e.g. with write_csv_arrow()).  It would be wonderful if we could both read and write compressed streams, particularly for CSV and particularly for remote filesystems, where this can provide considerable performance improvements. 

(For comparison, readr will write a compressed stream automatically based on the extension for the given filename, e.g. readr::write_csv(data, "file.csv.gz") or write_csv("data.file.xz")  )

Reporter: Carl Boettiger / @cboettig
Assignee: Sam Albers / @boshek

PRs and other links:

Note: This issue was originally created as ARROW-16144. Please see the migration documentation for further details.

@asfimport
Copy link
Author

Dewey Dunnington / @paleolimbot:
I'm fairly sure this is implemented...is this the kind of behaviour you were looking for?

library(arrow, warn.conflicts = FALSE)

tf <- tempfile(fileext = ".gz")
write_csv_arrow(mtcars, tf)
readr::read_csv(gzfile(tf))
#> Rows: 32 Columns: 11
#> ── Column specification ────────────────────────────────────────────────────────
#> Delimiter: ","
#> dbl (11): mpg, cyl, disp, hp, drat, wt, qsec, vs, am, gear, carb
#> 
#> ℹ Use `spec()` to retrieve the full column specification for this data.
#> ℹ Specify the column types or set `show_col_types = FALSE` to quiet this message.
#> # A tibble: 32 × 11
#>      mpg   cyl  disp    hp  drat    wt  qsec    vs    am  gear  carb
#>    <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
#>  1  21       6  160    110  3.9   2.62  16.5     0     1     4     4
#>  2  21       6  160    110  3.9   2.88  17.0     0     1     4     4
#>  3  22.8     4  108     93  3.85  2.32  18.6     1     1     4     1
#>  4  21.4     6  258    110  3.08  3.22  19.4     1     0     3     1
#>  5  18.7     8  360    175  3.15  3.44  17.0     0     0     3     2
#>  6  18.1     6  225    105  2.76  3.46  20.2     1     0     3     1
#>  7  14.3     8  360    245  3.21  3.57  15.8     0     0     3     4
#>  8  24.4     4  147.    62  3.69  3.19  20       1     0     4     2
#>  9  22.8     4  141.    95  3.92  3.15  22.9     1     0     4     2
#> 10  19.2     6  168.   123  3.92  3.44  18.3     1     0     4     4
#> # … with 22 more rows

@asfimport
Copy link
Author

Carl Boettiger / @cboettig:
Hi, sorry, but I believe this is not compressing the file at all.   (Note that there's no need to use gzfile in your call to readr, nor does it cause a failure if the file is not compressed.  connection types can be confusing in R. 

Witness this example: with readr, changing the extension to include .gz results in a smaller file with a different hash, but in arrow I get the identical hash and file size, so I think no compression is happening. 

library(arrow, warn.conflicts = FALSE)tf <- tempfile(fileext = ".gz")
tf_plain <- tempfile(fileext = ".csv")## In readr, compressed & non-compressed have different hashes as expected
readr::write_csv(mtcars, tf)
gz <- openssl::sha256(file(tf, "rb"))
readr::write_csv(mtcars, tf_plain)
plain <- openssl::sha256(file(tf_plain, "rb"))
testthat::expect_lt(fs::file_size(tf), fs::file_size(tf_plain))
testthat::expect_false(identical(gz, plain))
write_csv_arrow(mtcars, tf)
gz <- openssl::sha256(file(tf, "rb"))
write_csv_arrow(mtcars, tf_plain)
plain <- openssl::sha256(file(tf_plain, "rb"))# these fail
testthat::expect_lt(fs::file_size(tf), fs::file_size(tf_plain))
#> Error: fs::file_size(tf) is not strictly less than fs::file_size(tf_plain). Difference: 0
testthat::expect_false(identical(gz, plain)) 
#> Error: identical(gz, plain) is not FALSE
#> 
#> `actual`:   TRUE 
#> `expected`: FALSE

I should also have mentioned, I'm hoping with arrow that I can use this with the S3 object store explicitly, e.g. 

s3 <- s3_bucket("test", endpoint_override= "minio.thelio.carlboettiger.info")
path <- s3$path("cars.csv.gz")
openssl::sha256(url("https://minio.thelio.carlboettiger.info/test/cars.csv.gz")) 

 

But that doesn't work either.  Also I would expect to have finer control over output stream to write generic compressed objects with the filesystem interface, e.g. using something like s3$OpenCompressedOutputStream, just like we can use s3$OpenOutputStream. 

Thanks so much for your help.  Arrow is just such an amazing library, and really a game-changer for data science workflows in R in particular.  Very much appreciate all you do and eagerly watch for each release!

 

@asfimport
Copy link
Author

Dewey Dunnington / @paleolimbot:
Thank you for catching my error here! I know that we did some compression detection but it turns out that's only on read: https://github.com/apache/arrow/blob/master/r/R/io.R#L240-L298

You can use OpenOutputStream and CompressedOutputStream for any filesystem (including S3), although we would need to implement the compression detection based on filename for this to "just work" with the .gz suffix:

library(arrow, warn.conflicts = FALSE)

dir <- tempfile()
dir.create(dir)
subdir <- file.path(dir, "bucket")
dir.create(subdir)


minio_server <- processx::process$new("minio", args = c("server", dir), supervise = TRUE)
Sys.sleep(1)
stopifnot(minio_server$is_alive())

s3_uri <- "s3://minioadmin:minioadmin@?scheme=http&endpoint_override=localhost%3A9000"
bucket <- s3_bucket(s3_uri)

data <- data.frame(x = 1:1e4)

out_compressed <- CompressedOutputStream$create(bucket$OpenOutputStream("bucket/data.csv.gz"))
write_csv_arrow(data, out_compressed)
out_compressed$close()


out <- bucket$OpenOutputStream("bucket/data.csv")
write_csv_arrow(data, out)
out$close()

file.size(file.path(subdir, "data.csv.gz"))
#> [1] 22627
file.size(file.path(subdir, "data.csv"))
#> [1] 48898

minio_server$interrupt()
#> [1] TRUE
Sys.sleep(1)
stopifnot(!minio_server$is_alive())

@asfimport
Copy link
Author

Neal Richardson / @nealrichardson:
Issue resolved by pull request 13183
#13183

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant