Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-3490: [R] streaming of arrow objects to streams #2749

Closed

Conversation

romainfrancois
Copy link
Contributor

@romainfrancois romainfrancois commented Oct 12, 2018

This makes write_record_batch and write_table generic with dispatch on the stream type.

write_record_batch <- function(x, stream, ...){
  UseMethod("write_record_batch", stream)
}
write_table <- function(x, stream, ...) {
  UseMethod("write_table", stream)
}

The stream argument can be various things for different use cases:

  • an arrow::pic::RecordBatchWriter created either with record_batch_stream_writer() or record_batch_file_writer(). This is the lowest level and that calls its $WriteBatch() or $WriteTable() method depending on what is being streamed

  • an arrow::io::OutputStream : this first creates an arrow::ipc::RecordBatchStreamWriter and streams into it. In particular this does not add the bytes of arrow files.

  • an fs_path from 📦 fs : this opens a arrow::ipc::RecordBatchFileWriter and streams to it, so that the file gets the ARROW1 bytes

  • A character, we just assert it is of length one and then call the fs_path method

  • A raw() which is just used for its type, in that case we stream into a byte buffer and returns it as a raw vector

Some examples:

library(arrow)
tbl <- tibble::tibble(
  int = 1:10, dbl = as.numeric(1:10),
  lgl = sample(c(TRUE, FALSE, NA), 10, replace = TRUE),
  chr = letters[1:10]
)
batch <- record_batch(tbl)

tf <- tempfile()

# stream the batch to the file
write_record_batch(batch, tf)

# same
write_record_batch(batch, fs::path_abs(tf))

# to an InputStream
file_stream <- file_output_stream(tf)
write_record_batch(batch, file_stream)
file_stream$Close()

# to a RecordBatchFileWriter
file_stream <- file_output_stream(tf)
file_writer <- record_batch_file_writer(file_stream,  batch$schema())
write_record_batch(batch, file_writer)
file_writer$Close()
file_stream$Close()

# get the bytes directly
write_record_batch(batch, raw())
#>   [1] 04 01 00 00 10 00 00 00 00 00 0a 00 0c 00 06 00 05 00 08 00 0a 00 00
#>  [24] 00 00 01 03 00 0c 00 00 00 08 00 08 00 00 00 04 00 08 00 00 00 04 00
#>  [47] 00 00 04 00 00 00 9c 00 00 00 58 00 00 00 2c 00 00 00 04 00 00 00 84
#>  [70] ff ff ff 00 00 01 05 14 00 00 00 0c 00 00 00 04 00 00 00 00 00 00 00
#>  [93] dc ff ff ff 03 00 00 00 63 68 72 00 a8 ff ff ff 00 00 01 06 18 00 00
#> [116] 00 10 00 00 00 04 00 00 00 00 00 00 00 04 00 04 00 04 00 00 00 03 00
#> [139] 00 00 6c 67 6c 00 d0 ff ff ff 00 00 01 03 20 00 00 00 14 00 00 00 04
#> [162] 00 00 00 00 00 00 00 00 00 06 00 08 00 06 00 06 00 00 00 00 00 02 00
#> [185] 03 00 00 00 64 62 6c 00 10 00 14 00 08 00 06 00 07 00 0c 00 00 00 10
#> [208] 00 10 00 00 00 00 00 01 02 24 00 00 00 14 00 00 00 04 00 00 00 00 00
#> [231] 00 00 08 00 0c 00 08 00 07 00 08 00 00 00 00 00 00 01 20 00 00 00 03
#> [254] 00 00 00 69 6e 74 00 00 00 00 00 2c 01 00 00 14 00 00 00 00 00 00 00
#> [277] 0c 00 16 00 06 00 05 00 08 00 0c 00 0c 00 00 00 00 03 03 00 18 00 00
#> [300] 00 c8 00 00 00 00 00 00 00 00 00 0a 00 18 00 0c 00 04 00 08 00 0a 00
#> [323] 00 00 ac 00 00 00 10 00 00 00 0a 00 00 00 00 00 00 00 00 00 00 00 09
#> [346] 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
#> [369] 00 00 00 00 28 00 00 00 00 00 00 00 28 00 00 00 00 00 00 00 00 00 00
#> [392] 00 00 00 00 00 28 00 00 00 00 00 00 00 50 00 00 00 00 00 00 00 78 00
#> [415] 00 00 00 00 00 00 08 00 00 00 00 00 00 00 80 00 00 00 00 00 00 00 08
#> [438] 00 00 00 00 00 00 00 88 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
#> [461] 88 00 00 00 00 00 00 00 30 00 00 00 00 00 00 00 b8 00 00 00 00 00 00
#> [484] 00 10 00 00 00 00 00 00 00 00 00 00 00 04 00 00 00 0a 00 00 00 00 00
#> [507] 00 00 00 00 00 00 00 00 00 00 0a 00 00 00 00 00 00 00 00 00 00 00 00
#> [530] 00 00 00 0a 00 00 00 00 00 00 00 03 00 00 00 00 00 00 00 0a 00 00 00
#> [553] 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 01 00 00 00 02 00 00
#> [576] 00 03 00 00 00 04 00 00 00 05 00 00 00 06 00 00 00 07 00 00 00 08 00
#> [599] 00 00 09 00 00 00 0a 00 00 00 00 00 00 00 00 00 f0 3f 00 00 00 00 00
#> [622] 00 00 40 00 00 00 00 00 00 08 40 00 00 00 00 00 00 10 40 00 00 00 00
#> [645] 00 00 14 40 00 00 00 00 00 00 18 40 00 00 00 00 00 00 1c 40 00 00 00
#> [668] 00 00 00 20 40 00 00 00 00 00 00 22 40 00 00 00 00 00 00 24 40 6b 03
#> [691] 00 00 00 00 00 00 22 01 00 00 00 00 00 00 00 00 00 00 01 00 00 00 02
#> [714] 00 00 00 03 00 00 00 04 00 00 00 05 00 00 00 06 00 00 00 07 00 00 00
#> [737] 08 00 00 00 09 00 00 00 0a 00 00 00 00 00 00 00 61 62 63 64 65 66 67
#> [760] 68 69 6a 00 00 00 00 00 00 00 00 00 00

Created on 2018-10-12 by the reprex package (v0.2.1.9000)

@romainfrancois
Copy link
Contributor Author

This currently fails because of documentation/roxygen issue :

N  checking S3 generic/method consistency (1.3s)
   Found the following apparent S3 methods exported but not registered:
     stream.arrow::RecordBatch stream.arrow::Table
   See section ‘Registering S3 methods’ in the ‘Writing R Extensions’
   manual.
...
W  checking Rd \usage sections ...
   Undocumented arguments in documentation object 'record_batch_stream_writer'
     ‘schema’
   
   Functions with \usage entries need to have the appropriate \alias
   entries, and all their arguments documented.
   The \usage entries must correspond to syntactically valid R code.
   See chapter ‘Writing R documentation files’ in the ‘Writing R
   Extensions’ manual.
   S3 methods shown with full name in documentation object 'stream.arrow::RecordBatch':
     ‘stream.arrow::RecordBatch’
   
   S3 methods shown with full name in documentation object 'stream.arrow::Table':
     ‘stream.arrow::Table’
   
   The \usage entries for S3 methods should use the \method markup and not
   their full name.
   See chapter ‘Writing R documentation files’ in the ‘Writing R
   Extensions’ manual.

Not quite sure what the workaround is.

@romainfrancois
Copy link
Contributor Author

Independently of this, @javierluraschi I'd like your input on stream, as a replacement to $to_stream()

@romainfrancois
Copy link
Contributor Author

I'd have used serialize like in 🐍 but there's already base::serialize

@wesm
Copy link
Member

wesm commented Oct 12, 2018

Hm, I'm concerned the term "stream" might cause confusion with the other kinds of streams. If serialize is unavailable we could use some synonym

@romainfrancois
Copy link
Contributor Author

romainfrancois commented Oct 12, 2018

Maybe we can make serialize generic ? @hadley

@codecov-io
Copy link

Codecov Report

Merging #2749 into master will decrease coverage by 0.01%.
The diff coverage is n/a.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #2749      +/-   ##
==========================================
- Coverage   87.57%   87.56%   -0.02%     
==========================================
  Files         403      403              
  Lines       61483    61483              
==========================================
- Hits        53845    53835      -10     
- Misses       7568     7574       +6     
- Partials       70       74       +4
Impacted Files Coverage Δ
go/arrow/math/int64_avx2_amd64.go 0% <0%> (-100%) ⬇️
go/arrow/memory/memory_avx2_amd64.go 0% <0%> (-100%) ⬇️
go/arrow/math/float64_avx2_amd64.go 0% <0%> (-100%) ⬇️
go/arrow/math/uint64_avx2_amd64.go 0% <0%> (-100%) ⬇️
go/arrow/memory/memory_amd64.go 28.57% <0%> (-14.29%) ⬇️
go/arrow/math/math_amd64.go 31.57% <0%> (-5.27%) ⬇️
go/arrow/math/float64_amd64.go 33.33% <0%> (ø) ⬆️
go/arrow/math/int64_amd64.go 33.33% <0%> (ø) ⬆️
go/arrow/math/uint64_amd64.go 33.33% <0%> (ø) ⬆️
go/arrow/math/uint64_sse4_amd64.go 100% <0%> (+100%) ⬆️
... and 3 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 146e7df...b3ea534. Read the comment docs.

@romainfrancois
Copy link
Contributor Author

Still interested in input about the functionality, and the choices of what underlying stream is used for each case.

And I guess we need a name for both ways, maybe repurpose the read_arrow / write_arrow couple we initially had.

@javierluraschi
Copy link
Contributor

The replacement of $to_stream() looks great; looks like I would use stream(df, raw()) to send to Spark and then df <- read_table(raw_stream) to get the data frame back from Spark. I'll give it a shot and close my PR once this is merged.

Regarding naming, I like stream() but not sure I understand which other streams Arrow provides that are not of RecordBatch type, but assuming there are other ones, we could consider renaming this to record_batch_stream().

@kou kou changed the title Arrow 3490. streaming of arrow objects to streams Arrow 3490: [R] streaming of arrow objects to streams Oct 12, 2018
@javierluraschi
Copy link
Contributor

javierluraschi commented Oct 13, 2018

@romainfrancois one more comment... related to https://github.com/apache/arrow/pull/2714/files#r224517253, and specifically:

Reading a single record batch alone given a known schema

I just found out a relevant use case to optimize R/Spark, I still have work a few days implementing another piece of sparklyr, but this feature is definitely useful, I'll spare you the details unless you want them. Let me know if I misread the PR and is in fact already implemented here, otherwise, looking forward to this.

The way I would expect this to work, not sure if python is the same, would be something like:

schema <- read_schema(raw_schema)
df1 <- read_table(raw_batch_1, schema)
df2 <- read_table(raw_batch_2, schema)
df3 <- read_table(raw_batch_3, schema)

@kou kou changed the title Arrow 3490: [R] streaming of arrow objects to streams ARROW-3490: [R] streaming of arrow objects to streams Oct 13, 2018
@romainfrancois
Copy link
Contributor Author

Maybe i’ll just roll back to write_table and write_record_batch and have them be s3 generics, i.e drop having a single double dispatch generic.

@romainfrancois
Copy link
Contributor Author

@javierluraschi you’re one step ahead again. I’ll do reading from various streams in the next pr.

@romainfrancois
Copy link
Contributor Author

I ⏪ to :

  • write_record_batch to stream arrow::RecordBatch
  • write_table to stream arrow::Table

The previous stream is now write_arrow which dispatch on the first argument with methods for:

  • arrow::RecordBatch -> write_record_batch
  • arrow::Table -> write_table
  • data.frame -> ~ write_record_batch(record_batch(.), ...)

@romainfrancois
Copy link
Contributor Author

Ready to be merged as far as I'm concerned

@wesm
Copy link
Member

wesm commented Oct 15, 2018

Will review later today or tomorrow

@romainfrancois
Copy link
Contributor Author

I have a few more commits, for implementing changes to read_table and read_record_batch

@wesm I can either merge them here or create another PR once this one is squashed ?

@wesm
Copy link
Member

wesm commented Oct 17, 2018

Plan to review this tomorrow (I'm on US/Eastern time, so feel free to add more commits if you are working earlier in the day)

@romainfrancois
Copy link
Contributor Author

Alright I folded in the other branches I had.

  • type promotion (e.g. promote an arrow::Array. of int16 to an R integer vector, ... There is no support for casting arrays yet, so this lacks testing.

  • read_record_batch and read_table from various streams

@romainfrancois
Copy link
Contributor Author

romainfrancois commented Oct 17, 2018

read_record_batch is now generic with the following methods:

  • arrow::ipc::RecordBatchFileReader : takes a 0-based i and read that record batch with $ReadRecordBatch(i)

  • arrow::ipc::RecordBatchStreamReader : uses the $ReadNext() method. This always return a arrow::RecordBatch but it might be null, I've added the is_null() method to Object (the base class for all arrow R6 objects.

  • arrow::io::BufferReader : open a arrow::ipc::RecordBatchStreamReader from the buffer reader and redispatch

  • raw: dispatch to the arrow::io::BufferReader method.

  • arrow::io::RandomAccessFile : open a arrow::ipc::RecordBatchFileReader and redispatch

  • fs::fs_path : open a arrow::io::ReadableFile with file_open(), dispatch to the arrow::io::RandomAccessFile, and close the arrow::io::ReadableFile on exit.

  • character : dispatch to the fs::fs_path method.

@romainfrancois
Copy link
Contributor Author

Similar dispatch rules for read_table, except that the lowest level methods use :

  • arrow::ipc::RecordBatchFileReader calls the Table__from_RecordBatchFileReader function, so that the loop to retrieved all the record batches is done on the C++ side.

  • arrow::ipc::RecordBatchStreamReader calls the Table__from_RecordBatchStreamReader function for the same reason.

@romainfrancois
Copy link
Contributor Author

The test covers various ways to read a record batch (same thing for table):

test_that("read_record_batch handles various streams (ARROW-3450, ARROW-3505)", {
  tbl <- tibble::tibble(
    int = 1:10, dbl = as.numeric(1:10),
    lgl = sample(c(TRUE, FALSE, NA), 10, replace = TRUE),
    chr = letters[1:10]
  )
  batch <- record_batch(tbl)
  tf <- local_tempfile()
  write_record_batch(batch, tf)

  bytes <- write_record_batch(batch, raw())
  buf_reader <- buffer_reader(bytes)

  batch1 <- read_record_batch(tf)
  batch2 <- read_record_batch(fs::path_abs(tf))

  readable_file <- close_on_exit(file_open(tf))
  batch3 <- read_record_batch(readable_file)

  mmap_file <- close_on_exit(mmap_open(tf))
  batch4 <- read_record_batch(mmap_file)

  batch5 <- read_record_batch(bytes)
  batch6 <- read_record_batch(buf_reader)

  stream_reader <- record_batch_stream_reader(bytes)
  batch7 <- read_record_batch(stream_reader)

  file_reader <- record_batch_file_reader(tf)
  batch8 <- read_record_batch(file_reader)

  expect_equal(batch, batch1)
  expect_equal(batch, batch2)
  expect_equal(batch, batch3)
  expect_equal(batch, batch4)
  expect_equal(batch, batch5)
  expect_equal(batch, batch6)
  expect_equal(batch, batch7)
  expect_equal(batch, batch8)
})

@javierluraschi
Copy link
Contributor

@wesm could we merge this one or are we missing anything? From my side, just moved the sparklyr PR (sparklyr/sparklyr#1611) to use this new interface.

@wesm
Copy link
Member

wesm commented Oct 18, 2018

I'm pretty waterlogged but reviewing this now

Copy link
Member

@wesm wesm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some questions about names and how shared_ptr<T>(nullptr) are handled. I opened a few follow up JIRAs

I'm going to merge this in the interest of helping things move along but please discuss the naming issues and decide on something unambiguous to distinguish the "write a single record batch to an existing IPC stream" and "write a complete IPC stream" concepts

r/R/R6.R Show resolved Hide resolved
r/R/RecordBatchReader.R Show resolved Hide resolved
#' @export
`read_record_batch.arrow::io::RandomAccessFile` <- function(stream, ...){
reader <- record_batch_file_reader(stream)
reader$ReadRecordBatch(0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this assert that there is only one batch in the stream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, but is it not useful to be able to just read the first one ?

#' @export
`read_record_batch.arrow::io::BufferReader` <- function(stream, ...){
reader <- record_batch_stream_reader(stream)
reader$ReadNext()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this read a second time and assert that the reader returns null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

#' @param ... extra parameters
#'
#' @export
write_record_batch <- function(x, stream, ...){
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm struggling a bit with the name of the function and what it actually does. This writes a single record batch as the IPC streaming format including the schema. See e.g. some Python versions of similar things

https://github.com/apache/arrow/blob/master/python/pyarrow/tests/test_ipc.py#L599

See RecordBatch.serialize implementation, Schema.serialize. These write a single IPC message (just the record batch, no schema) in memory, though.

It is true that if you create a stream writer yourself and then call write_record_batch(x, writer) multiple times, you can write multiple message, but the function that

  • creates a stream writer from an output sink
  • writes a single batch to it
  • closes the writer

should probably be called something different. Can leave to a follow up patch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what it does depend on what the stream, if it's a RecordBatchWriter already, then it just serialize the record batch.


#' Write an object to a stream
#'
#' @param x An object to stream
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to make a more precise statement about what these functions do, per comment above. Like "Writes a complete Arrow IPC stream to the output sink including schema to serialize the input object"

It might be useful to describe things as "output sinks" to distinguish from "IPC streams"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. In the meantime, it's just so that the builds passes. As above, we need to discuss which function does what.

using value_type = typename TypeTraits<Type>::ArrayType::value_type;

auto n = array->length();
auto start = reinterpret_cast<const value_type*>(array->data()->buffers[1]->data()) +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be null. See GetValues<T> used in Arrow C++ codebase

case Type::INT64:
return arrow::r::promotion_Array_to_Vector<REALSXP, arrow::Int64Type>(array);
case Type::UINT64:
return arrow::r::promotion_Array_to_Vector<REALSXP, arrow::UInt64Type>(array);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these error on data loss? opened https://issues.apache.org/jira/browse/ARROW-3553

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should. However, I'll probably switch to using the bit64 package in a follow up pr. it just uses a numeric vector (array of double as the host and interpret the bytes as a int64_t.

This way, there is no loss.

@wesm wesm closed this in d3ec690 Oct 18, 2018
wesm pushed a commit that referenced this pull request Oct 20, 2018
…tr), use bits64::integer64

Follow up to #2749 with the following fixes:

 - shared_ptr of `nullptr` don't construct R6 objects
 - int64 uses `bit64::integer64`, based on r-lib/vctrs#121

Author: Romain Francois <romain@purrple.cat>

Closes #2795 from romainfrancois/ARROW-3553/int64 and squashes the following commits:

49c34bb <Romain Francois> using lower case cpp files
cf35994 <Romain Francois> lint
018092a <Romain Francois> bounds check on RecordBatchFileReader$ReadRecordBatch
3200352 <Romain Francois> - test
089f951 <Romain Francois> additional check
72ccdcb <Romain Francois> replace direct class$new(.) by construct(class, .)
5b10dd1 <Romain Francois> R -> bit64::integer64
3e6bc9a <Romain Francois> using integer64 class for arrow (int64) -> R conversion.
@romainfrancois romainfrancois deleted the ARROW-3490/stream-2 branch October 20, 2018 18:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants