Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[R] Complete ptype inferences and array conversions #65

Merged
merged 71 commits into from
Nov 18, 2022

Conversation

paleolimbot
Copy link
Member

@paleolimbot paleolimbot commented Nov 1, 2022

This PR implements a conversion system from struct ArrowArray and struct ArrowArrayStream to R vector (in the "it works as a column in a data.frame" sense). An example to get started:

# remotes::install_github("apache/arrow-nanoarrow/r#65")
library(nanoarrow)

array <- as_nanoarrow_array(1:5)
infer_nanoarrow_ptype(array)
#> integer(0)
str(convert_array(array))
#>  int [1:5] 1 2 3 4 5
str(convert_array(array, to = double()))
#>  num [1:5] 1 2 3 4 5
str(convert_array(array, to = as.Date(character())))
#> Error in convert_array.default(array, to = as.Date(character())): Can't convert array <int32> to R vector of type Date

A more complex example using nycflights13 and streams:

# remotes::install_github("apache/arrow-nanoarrow/r#65")
library(nanoarrow)
library(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.
library(nycflights13)

# single array
flights_array <- as_nanoarrow_array(flights)

# stream
flights_batch <- as_record_batch(flights)
chunk_start <- seq(0, nrow(flights_batch), by = 1024)
chunk_end <- pmin(chunk_start + 1024, nrow(flights))
chunk_length <- chunk_end - chunk_start
flights_batches <- Map(
  flights_batch$Slice,
  chunk_start,
  chunk_length
)

# converting a single array with ptype inference
tibble::as_tibble(infer_nanoarrow_ptype(flights_array))
#> # A tibble: 0 × 19
#> # … with 19 variables: year <int>, month <int>, day <int>, dep_time <int>,
#> #   sched_dep_time <int>, dep_delay <dbl>, arr_time <int>,
#> #   sched_arr_time <int>, arr_delay <dbl>, carrier <chr>, flight <int>,
#> #   tailnum <chr>, origin <chr>, dest <chr>, air_time <dbl>, distance <dbl>,
#> #   hour <dbl>, minute <dbl>, time_hour <dttm>
tibble::as_tibble(convert_array(flights_array))
#> # A tibble: 336,776 × 19
#>     year month   day dep_time sched_de…¹ dep_d…² arr_t…³ sched…⁴ arr_d…⁵ carrier
#>    <int> <int> <int>    <int>      <int>   <dbl>   <int>   <int>   <dbl> <chr>  
#>  1  2013     1     1      517        515       2     830     819      11 UA     
#>  2  2013     1     1      533        529       4     850     830      20 UA     
#>  3  2013     1     1      542        540       2     923     850      33 AA     
#>  4  2013     1     1      544        545      -1    1004    1022     -18 B6     
#>  5  2013     1     1      554        600      -6     812     837     -25 DL     
#>  6  2013     1     1      554        558      -4     740     728      12 UA     
#>  7  2013     1     1      555        600      -5     913     854      19 B6     
#>  8  2013     1     1      557        600      -3     709     723     -14 EV     
#>  9  2013     1     1      557        600      -3     838     846      -8 B6     
#> 10  2013     1     1      558        600      -2     753     745       8 AA     
#> # … with 336,766 more rows, 9 more variables: flight <int>, tailnum <chr>,
#> #   origin <chr>, dest <chr>, air_time <dbl>, distance <dbl>, hour <dbl>,
#> #   minute <dbl>, time_hour <dttm>, and abbreviated variable names
#> #   ¹​sched_dep_time, ²​dep_delay, ³​arr_time, ⁴​sched_arr_time, ⁵​arr_delay

# stream of unknown size (slow: it collects, converts, and rbinds)
stream <- as_nanoarrow_array_stream(
  RecordBatchReader$create(batches = flights_batches)
)

tibble::as_tibble(convert_array_stream(stream))
#> # A tibble: 336,776 × 19
#>     year month   day dep_time sched_de…¹ dep_d…² arr_t…³ sched…⁴ arr_d…⁵ carrier
#>    <int> <int> <int>    <int>      <int>   <dbl>   <int>   <int>   <dbl> <chr>  
#>  1  2013     1     1      517        515       2     830     819      11 UA     
#>  2  2013     1     1      533        529       4     850     830      20 UA     
#>  3  2013     1     1      542        540       2     923     850      33 AA     
#>  4  2013     1     1      544        545      -1    1004    1022     -18 B6     
#>  5  2013     1     1      554        600      -6     812     837     -25 DL     
#>  6  2013     1     1      554        558      -4     740     728      12 UA     
#>  7  2013     1     1      555        600      -5     913     854      19 B6     
#>  8  2013     1     1      557        600      -3     709     723     -14 EV     
#>  9  2013     1     1      557        600      -3     838     846      -8 B6     
#> 10  2013     1     1      558        600      -2     753     745       8 AA     
#> # … with 336,766 more rows, 9 more variables: flight <int>, tailnum <chr>,
#> #   origin <chr>, dest <chr>, air_time <dbl>, distance <dbl>, hour <dbl>,
#> #   minute <dbl>, time_hour <dttm>, and abbreviated variable names
#> #   ¹​sched_dep_time, ²​dep_delay, ³​arr_time, ⁴​sched_arr_time, ⁵​arr_delay


# stream of known size (faster: it preallocates + fills but doesn't do ALTREP yet)
stream <- as_nanoarrow_array_stream(
  RecordBatchReader$create(batches = flights_batches)
)

tibble::as_tibble(convert_array_stream(stream, n = nrow(flights)))
#> # A tibble: 336,776 × 19
#>     year month   day dep_time sched_de…¹ dep_d…² arr_t…³ sched…⁴ arr_d…⁵ carrier
#>    <int> <int> <int>    <int>      <int>   <dbl>   <int>   <int>   <dbl> <chr>  
#>  1  2013     1     1      517        515       2     830     819      11 UA     
#>  2  2013     1     1      533        529       4     850     830      20 UA     
#>  3  2013     1     1      542        540       2     923     850      33 AA     
#>  4  2013     1     1      544        545      -1    1004    1022     -18 B6     
#>  5  2013     1     1      554        600      -6     812     837     -25 DL     
#>  6  2013     1     1      554        558      -4     740     728      12 UA     
#>  7  2013     1     1      555        600      -5     913     854      19 B6     
#>  8  2013     1     1      557        600      -3     709     723     -14 EV     
#>  9  2013     1     1      557        600      -3     838     846      -8 B6     
#> 10  2013     1     1      558        600      -2     753     745       8 AA     
#> # … with 336,766 more rows, 9 more variables: flight <int>, tailnum <chr>,
#> #   origin <chr>, dest <chr>, air_time <dbl>, distance <dbl>, hour <dbl>,
#> #   minute <dbl>, time_hour <dttm>, and abbreviated variable names
#> #   ¹​sched_dep_time, ²​dep_delay, ³​arr_time, ⁴​sched_arr_time, ⁵​arr_delay

Created on 2022-11-09 with reprex v2.0.2

These conversions are complicated because there are at least three types of sources to consider:

  • A single struct ArrowArray
  • A struct ArrowArrayStream whose number of rows is unknown (e.g., a streaming database result)
  • A struct ArrowArrayStream whose number of rows/chunks is known (e.g., most GDAL datasets or an Arrow Table/ChunkedArray)

There are also a few options for the target of a conversion:

  • A single R object
  • A portion of an R object (e.g., one that might have been preallocated in advance)
  • An ALTREP R object (i.e., where conversion is done lazily if/when elements are accessed)
  • A child of an R object (e.g., column in data frame, element in a list for nested types)

An additional source of complexity is that there is not one true answer for the "best" target for converting an Arrow type. For example, most int64 values can be exactly represented by an integer() or double() array but you also might want a bit64::integer64() if you need to roundtrip the value losslessly.

The conversion system proposed here implements conversions using one "converter" API with three entry points:

  • An optimized path for single array/record batch
  • A "convert one batch at a time and rbind them at the end" approach for streaming results with unknown size
  • A "preallocate + fill" approach for streaming results with known size

I tried to write the entrypoints in such a way that the internals could be refactored. In particular, the converter API can't generate ALTREP objects (mostly the reason why an optimized path for a single array was needed). There is also a "materialize" API, which is used by the converter and by ALTREP classes...that API is what does the real work (e.g., converting int64 values to double values with NA sentinels in a loop); the converter bits are about nested types, structure, and making sure nothing is garbage collected until the values have been converted.

Finally, the API deals with the "no one true best target" problem by modeling conversions as "many to many" rather than "one Arrow type ==== one R vector type" (which is vaguely what the Arrow R package currently does). There's a good chance a user or developer already knows what R vector type is needed and the proposed system lets the user provide that information (e.g., convert_array(some_array, to = character())). The default can be inferred using infer_nanoarrow_ptype(some_array). This is vaguely like (and was inspried by) vctrs::vec_ptype2() and vctrs::vec_cast().

This system is slightly different than the current system in the Arrow R package, which has a lot more tools at its disposal (e.g., the Table/ChunkedArray abstraction, compute functions for fast calculation of ranges, etc.). The Table/ChunkedArray makes the streaming problem more or less go away: everything can be streamed into a Table in Arrow form...because columns are all converted as ALTREP by default, in theory the memory usage is the same (i.e., no need for a "preallocate + fill" approach to avoid two complete copies of the data having to exist at once).

The "many to many" model is another difference. I really like it because it provides the opportunity to decentralize conversions: the S3 object owner gets to decide how or if an Arrow array gets converted into R land.

Performance wise, the main optimization that the Arrow R package does is ALTREP: conversions are deferred until values are accessed by default. This is really good for things like reading a Parquet file, where the average user will just read_parquet() and is quite happy to have that return quickly with a preview of the first 10 rows (after which a bunch of columns might get discarded and never converted). The performance payoff is particularly apparent with strings: materializing strings into R land is expensive and it's not any faster to do it up front (it is faster to convert up front for other vector types if those values are going to be accessed, particularly in Arrow C++ which does non-string non-ALTREP conversions in parallel). In order for the nanoarrow R package's conversions to be as fast as the Arrow R packages, there would need to be a chunked array ALTREP equivalent and more optimized conversions between numeric types (probably using C++ templating).

@paleolimbot
Copy link
Member Author

paleolimbot commented Nov 3, 2022

Still needs testing with more datasets, but these conversions are about as fast as the current implementations (although this has a lot to do with string materialization):

# remotes::install_github("apache/arrow-nanoarrow/r#65")
library(nanoarrow)
# latest master (i.e., with latest ALTREP improvement PR)
library(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.
library(nycflights13)

flights <- nycflights13::flights

flights_table <- as_arrow_table(flights)
flights_array <- as_nanoarrow_array(flights_table)
n <- nrow(flights)

# with altrep, arrow is faster
bench::mark(
  arrow_altrep = as.data.frame(as.data.frame(flights_table)),
  nanoarrow = as.data.frame(as.data.frame(flights_array))
)
#> # A tibble: 2 × 6
#>   expression        min   median `itr/sec` mem_alloc `gc/sec`
#>   <bch:expr>   <bch:tm> <bch:tm>     <dbl> <bch:byt>    <dbl>
#> 1 arrow_altrep  399.5µs 539.87µs     1815.    3.57MB     83.0
#> 2 nanoarrow      2.68ms   3.01ms      326.   28.38MB    341.

# with materialization nanoarrow is much faster because it's faster to
# subset non-ALTREP non-string vectors (probably?)
bench::mark(
  arrow_altrep = as.data.frame(as.data.frame(flights_table))[n:1, ],
  nanoarrow = as.data.frame(as.data.frame(flights_array))[n:1, ],
  min_iterations = 5
)
#> # A tibble: 2 × 6
#>   expression        min   median `itr/sec` mem_alloc `gc/sec`
#>   <bch:expr>   <bch:tm> <bch:tm>     <dbl> <bch:byt>    <dbl>
#> 1 arrow_altrep   79.9ms   80.7ms      12.4    50.3MB     31.0
#> 2 nanoarrow      44.5ms   44.7ms      22.4    75.9MB    101.

# without altrep, they're about the same
withr::with_options(list(arrow.use_altrep = FALSE), {
  bench::mark(
    arrow_no_altrep = as.data.frame(as.data.frame(flights_table)),
    # Can't turn off altrep in nanoarrow yet
    nanoarrow = {
      df <- as.data.frame(as.data.frame(flights_array))
      nanoarrow:::nanoarrow_altrep_force_materialize(df, recursive = TRUE)
      df
    },
    min_iterations = 5
  )
})
#> # A tibble: 2 × 6
#>   expression           min   median `itr/sec` mem_alloc `gc/sec`
#>   <bch:expr>      <bch:tm> <bch:tm>     <dbl> <bch:byt>    <dbl>
#> 1 arrow_no_altrep   21.2ms   21.6ms      46.1    38.5MB     26.3
#> 2 nanoarrow         23.2ms     24ms      41.9    38.5MB     37.2

# ...but that's mostly string materialization...Arrow does other materialization
# in parallel since it's only the allocation bit that has to happen on the main
# thread
flights <- nycflights13::flights |> 
  dplyr::select_if(is.numeric)
flights_table <- as_arrow_table(flights)
flights_array <- as_nanoarrow_array(flights_table)
withr::with_options(list(arrow.use_altrep = FALSE), {
  bench::mark(
    arrow_no_altrep = as.data.frame(as.data.frame(flights_table)),
    nanoarrow = as.data.frame(as.data.frame(flights_array)),
    min_iterations = 5
  )
})
#> # A tibble: 2 × 6
#>   expression           min   median `itr/sec` mem_alloc `gc/sec`
#>   <bch:expr>      <bch:tm> <bch:tm>     <dbl> <bch:byt>    <dbl>
#> 1 arrow_no_altrep   1.64ms   1.86ms      532.    25.7MB     245.
#> 2 nanoarrow         2.04ms   2.53ms      392.    25.7MB     160.

Created on 2022-11-09 with reprex v2.0.2

// Avoid materializing the row.names if we can
return Rf_xlength(VECTOR_ELT(vec_sexp, 0));
} else {
return Rf_xlength(Rf_getAttrib(vec_sexp, R_RowNamesSymbol));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might be able to get around what Rf_getAttrib() does with the "row.names" argument by using .row_names_info() or perhaps a function that goes through ATTRIB(). something like:

cpp11::cpp_function('
SEXP row_names(SEXP x){ 
   for(SEXP atts = ATTRIB(x); atts != R_NilValue; atts = CDR(atts)){
      if (TAG(atts) == Rf_install("row.names")) return CAR(atts);
   }  
   return R_NilValue; 
}')
df <- data.frame(x = 1:10)
row_names(df)
> row_names(df)
[1]  NA -10

When this is of the form c(NA, -n) then n is the size.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I played with this a little bit and if I go this route I have to implement inspecting for c(NA, nrow), c(NA, -nrow) myself and I'm a little worried I will mess this up. It looks like rownames are sufficiently ALTREP even when expanded in recent R (although I don't know how far back this goes) such that computing the length shouldn't be an expensive operation?

df <- nanoarrow:::new_data_frame(x = 1:1e9, 1e9)
.Internal(inspect(df))
#> @141c96758 13 INTSXP g0c0 [OBJ,REF(2),ATT]  wrapper [srt=-2147483648,no_na=0]
#>   @141c93060 13 INTSXP g0c0 [REF(65535)]  1 : 1000000000 (compact)
#> ATTRIB:
#>   @141c96720 02 LISTSXP g0c0 [REF(1)] 
#>     TAG: @15780c9f0 01 SYMSXP g1c0 [MARK,REF(65535),LCK,gp=0x4000] "row.names" (has value)
#>     @141d29098 13 INTSXP g0c1 [REF(1)] (len=2, tl=0) -2147483648,1000000000
#>     TAG: @15780d1d0 01 SYMSXP g1c0 [MARK,REF(38209),LCK,gp=0x6000] "class" (has value)
#>     @141d29290 16 STRSXP g0c1 [REF(65535)] (len=1, tl=0)
#>       @157891e08 09 CHARSXP g1c2 [MARK,REF(576),gp=0x61,ATT] [ASCII] [cached] "data.frame"
str(attr(df, "row.names"))
#>  int [1:1000000000] 1 2 3 4 5 6 7 8 9 10 ...
bench::mark(attr(df, "row.names"))
#> # A tibble: 1 × 6
#>   expression                 min   median `itr/sec` mem_alloc `gc/sec`
#>   <bch:expr>            <bch:tm> <bch:tm>     <dbl> <bch:byt>    <dbl>
#> 1 attr(df, "row.names")     41ns     82ns  8860040.        0B        0

Created on 2022-11-18 with reprex v2.0.2

SEXP nanoarrow_converter_result(SEXP converter_xptr) {
struct RConverter* converter = (struct RConverter*)R_ExternalPtrAddr(converter_xptr);
SEXP converter_shelter = R_ExternalPtrProtected(converter_xptr);
SEXP result = PROTECT(VECTOR_ELT(converter_shelter, 4));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the PROTECT() is redundant here, as converter_shelter would already be protected by converter_xptr

// dispatch to find a convert_array() method (or error if there
// isn't one)
static SEXP call_convert_array(SEXP array_xptr, SEXP ptype_sexp) {
SEXP ns = PROTECT(R_FindNamespace(Rf_mkString("nanoarrow")));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rf_mkString("nanoarrow") should be protected, because R_FindNamespace might allocate, and then trigger a gc ...

Or perhaps, since it's used in several places, it could be cached.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe the result of R_FindNamespace() could be cached. e.g. this caches a few namespace environment in dplyr: https://github.com/tidyverse/dplyr/blob/935cd11694786ac82748d82f9e19600566b8fbcb/src/init.cpp#L76

// message than we can provide in a reasonable amount of C code here
static void call_stop_cant_convert_array(SEXP array_xptr, enum VectorType type,
SEXP ptype_sexp) {
int n_protected = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should handle UNPROTECT()ing objects that were protected somewhere else. This should rather be the job of the function calling call_stop_cant_convert_array() and so here it should start at 0.

I assume that stop_cant_convert_array will call stop() eventually in the R side ? so the UNPROTECT() is not reached anyway, and the stack of protection is restored automatically by .Call()

if (Rf_inherits(result, "data.frame")) {
SEXP rownames = PROTECT(Rf_allocVector(INTSXP, 2));
INTEGER(rownames)[0] = NA_INTEGER;
INTEGER(rownames)[1] = array->length;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think both are supported, but -array->length is preferred

int nanoarrow_ptype_is_data_frame(SEXP ptype) {
return Rf_isObject(ptype) && TYPEOF(ptype) == VECSXP &&
(Rf_inherits(ptype, "data.frame") ||
(Rf_xlength(ptype) > 0 && Rf_getAttrib(ptype, R_NamesSymbol) != R_NilValue));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This getAttrib() might materialize the row names vector, which can be avoided.

r/src/materialize.c Outdated Show resolved Hide resolved
// then to be garbage collected and invalidate the converter
SEXP array_xptr =
PROTECT(R_MakeExternalPtr(converter->array_view.array, R_NilValue, R_NilValue));
Rf_setAttrib(array_xptr, R_ClassSymbol, Rf_mkString("nanoarrow_array"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Rf_mkString("nanoarrow_array") needs protection before it goes into Rf_setAttrib()

Rf_setAttrib(array_xptr, R_ClassSymbol, Rf_mkString("nanoarrow_array"));
SEXP schema_xptr =
PROTECT(R_MakeExternalPtr(converter->schema_view.schema, R_NilValue, R_NilValue));
Rf_setAttrib(schema_xptr, R_ClassSymbol, Rf_mkString("nanoarrow_schema"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same for Rf_mkString("nanoarrow_schema")

#include "materialize_dbl.h"
#include "nanoarrow.h"

static inline int nanoarrow_materialize_difftime(struct RConverter* converter) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit odd these live in a header file.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed - I moved them out of materialize.c because that file was getting very crowded but this solution is far from elegant. Most of these conversions would benefit from some optimization for specific pairs of conversions which may be a good opportunity to split out the .c files.

@paleolimbot paleolimbot merged commit e6e71d6 into apache:main Nov 18, 2022
@paleolimbot paleolimbot deleted the r-pkg-conversions branch November 18, 2022 20:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants