Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(r): Use basic_array_stream() to improve array stream to data.frame conversion #279

Merged
merged 8 commits into from
Aug 22, 2023

Conversation

paleolimbot
Copy link
Member

@paleolimbot paleolimbot commented Aug 17, 2023

When collecting an array stream with unknown size into a data.frame, nanoarrow has pretty terrible performance. This is because it collects and converts all batches and does c() or rbind() on the result. This is particularly bad when collecting many tiny batches (e.g., like those returned by many ADBC drivers).

convert_array_stream() has long had a "preallocate + fill" mode when size was explicitly set. Recently, the addition of basic_array_stream() makes it possible to recreate an array stream from a previously-collected result. Collectively, this means we can collect the whole stream, compute the size, and then call convert_array_stream() with a known size. This is only about twice as fast as the old approach but significantly reduces memory consumption and makes for fewer code paths that need testing.

It would be nice to support ALTREP in the future (at the very least for strings); however, I don't envision having the bandwidth to implement that before the next release.

@krlmlr Would you mind having a look at this to see if it seems reasonable?

Before this PR:

library(nanoarrow)

data_frames <- replicate(
  1000,
  nanoarrow:::vec_gen(
    data.frame(x = logical(), y = double(), z = character()),
    n = 1000
  ),
  simplify = FALSE
)

bench::mark(
  convert_known_size = {
    stream <- basic_array_stream(data_frames, validate = FALSE)
    convert_array_stream(stream, size = 1000 * 1000)
  },
  convert_unknown_size = {
    stream <- basic_array_stream(data_frames, validate = FALSE)
    as.data.frame(stream)
  },
  convert_arrow_altrep = {
    options(arrow.use_altrep = TRUE)
    stream <- basic_array_stream(data_frames, validate = FALSE)
    reader <- arrow::as_record_batch_reader(stream)
    as.data.frame(as.data.frame(reader))
  },
  convert_arrow = {
    options(arrow.use_altrep = FALSE)
    stream <- basic_array_stream(data_frames, validate = FALSE)
    reader <- arrow::as_record_batch_reader(stream)
    as.data.frame(as.data.frame(reader))
  },
  min_iterations = 20
)
#> Warning: Some expressions had a GC in every iteration; so filtering is
#> disabled.
#> # A tibble: 4 × 6
#>   expression                min   median `itr/sec` mem_alloc `gc/sec`
#>   <bch:expr>           <bch:tm> <bch:tm>     <dbl> <bch:byt>    <dbl>
#> 1 convert_known_size    196.9ms    234ms      3.85    23.1MB     4.23
#> 2 convert_unknown_size  375.8ms    479ms      2.12   429.3MB    13.2 
#> 3 convert_arrow_altrep   67.4ms    164ms      4.78    20.4MB     6.70
#> 4 convert_arrow         107.8ms    240ms      2.96    22.9MB     3.56

After this PR:

#> Warning: Some expressions had a GC in every iteration; so filtering is
#> disabled.
#> # A tibble: 4 × 6
#>   expression                min   median `itr/sec` mem_alloc `gc/sec`
#>   <bch:expr>           <bch:tm> <bch:tm>     <dbl> <bch:byt>    <dbl>
#> 1 convert_known_size    203.4ms    225ms     3.99     23.2MB     3.99
#> 2 convert_unknown_size  266.5ms    396ms     0.895    23.1MB     2.60
#> 3 convert_arrow_altrep   68.5ms    214ms     3.76     20.4MB     3.76
#> 4 convert_arrow         130.6ms    227ms     2.93     22.9MB     3.23

Created on 2023-08-17 with reprex v2.0.2

@codecov-commenter
Copy link

codecov-commenter commented Aug 17, 2023

Codecov Report

Merging #279 (233dcf3) into main (ad83497) will decrease coverage by 0.03%.
Report is 1 commits behind head on main.
The diff coverage is 95.23%.

@@            Coverage Diff             @@
##             main     #279      +/-   ##
==========================================
- Coverage   87.10%   87.07%   -0.03%     
==========================================
  Files          66       63       -3     
  Lines       10226     9879     -347     
==========================================
- Hits         8907     8602     -305     
+ Misses       1319     1277      -42     
Files Changed Coverage Δ
r/R/convert-array-stream.R 97.56% <95.23%> (+0.78%) ⬆️

... and 3 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@paleolimbot paleolimbot marked this pull request as ready for review August 17, 2023 20:16
Copy link
Contributor

@krlmlr krlmlr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, looks good!

n_batches <- n_batches + 1L
batches[[n_batches]] <- .Call(nanoarrow_c_convert_array, array, to)
batches[[n_batches]] <- array
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without array doubling, the run time still might be superlinear for very many batches. I have no idea if this is of practical relevance, though.

fill_list <- function(n) {
  out <- vector("list", 1024)
  for (i in seq_len(n)) {
    out[[i]] <- i
  }
  NULL
}

bench::mark(fill_list(1e6), fill_list(2e6), fill_list(4e6), fill_list(8e6))
#> Warning: Some expressions had a GC in every iteration; so filtering is
#> disabled.
#> # A tibble: 4 × 6
#>   expression            min   median `itr/sec` mem_alloc `gc/sec`
#>   <bch:expr>       <bch:tm> <bch:tm>     <dbl> <bch:byt>    <dbl>
#> 1 fill_list(1e+06) 446.17ms 453.62ms     2.20   165.33MB     2.20
#> 2 fill_list(2e+06)    1.38s    1.38s     0.723  334.95MB     2.17
#> 3 fill_list(4e+06)    4.72s    4.72s     0.212  663.35MB     1.69
#> 4 fill_list(8e+06)    8.42s    8.42s     0.119    1.28GB     1.42

Created on 2023-08-20 with reprex v2.0.2

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good point...I think if this becomes limiting the entire implementation should probably be moved to C or C++, which would also save the overhead of exposing arrays as external pointers. I added a comment to make sure anybody touching this code in the future knows about this!

batches <- vector("list", 1024L)
n_batches <- 0L
get_next <- array_stream$get_next
while (!is.null(array <- get_next(schema, validate = FALSE)) && (n_batches < n)) {
while (!is.null(array <- get_next(schema, validate = validate)) && (n_batches < n)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer a break inside the loop for the n_batches < n case, it also makes it very clear when the check occurs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

Comment on lines 86 to 93
lengths <- vapply(
batches,
# Use custom accessor because array$length in a loop is slow
function(array) {
.Call(nanoarrow_c_array_proxy, array, NULL, FALSE)$length
},
double(1)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this be faster and more elegant in C, or even with a single for loop, since you only seem to need sum(lengths) ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

double(1)
)
basic_stream <- .Call(nanoarrow_c_basic_array_stream, batches, schema, FALSE)
convert_array_stream(basic_stream, to = to, size = sum(lengths))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe something like this, to avoid the tail recursion and extra work?

Suggested change
convert_array_stream(basic_stream, to = to, size = sum(lengths))
batches <- collect_array_stream(
array_stream,
n,
schema = schema,
validate = FALSE
)
# We are guaranteed to have only one batch here:
stopifnot(length(batches) == 1)
.Call(nanoarrow_c_convert_array, batches[[1]], to)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did something slightly different because at this point length(batches) is guaranteed not to be 1. Hopefully the change makes that more obvious + faster + clearer!

@paleolimbot
Copy link
Member Author

Thank you for the review!

Documentation and Packaging failures are a sphinx update issue (#284).

@paleolimbot paleolimbot merged commit 7fecddf into apache:main Aug 22, 2023
11 of 13 checks passed
@paleolimbot paleolimbot deleted the convert-array-stream-no-rbind branch September 6, 2023 13:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants