Skip to content

Commit

Permalink
ARROW-3490: [R] streaming of arrow objects to streams
Browse files Browse the repository at this point in the history
This makes `write_record_batch` and `write_table` generic with dispatch on the stream type.

```r
write_record_batch <- function(x, stream, ...){
  UseMethod("write_record_batch", stream)
}
write_table <- function(x, stream, ...) {
  UseMethod("write_table", stream)
}
```

The `stream` argument can be various things for different use cases:

- an `arrow::pic::RecordBatchWriter` created either with `record_batch_stream_writer()` or `record_batch_file_writer()`. This is the lowest level and that calls its `$WriteBatch()` or `$WriteTable()` method depending on what is being streamed

- an `arrow::io::OutputStream` : this first creates an `arrow::ipc::RecordBatchStreamWriter` and streams into it. In particular this *does not* add the bytes of arrow files.

- an `fs_path` from 📦 `fs` : this opens a `arrow::ipc::RecordBatchFileWriter` and streams to it, so that the file gets the ARROW1 bytes

- A `character`, we just assert it is of length one and then call the `fs_path` method

- A `raw()` which is just used for its type, in that case we stream into a byte buffer and returns it as a raw vector

Some examples:

``` r
library(arrow)
tbl <- tibble::tibble(
  int = 1:10, dbl = as.numeric(1:10),
  lgl = sample(c(TRUE, FALSE, NA), 10, replace = TRUE),
  chr = letters[1:10]
)
batch <- record_batch(tbl)

tf <- tempfile()

# stream the batch to the file
write_record_batch(batch, tf)

# same
write_record_batch(batch, fs::path_abs(tf))

# to an InputStream
file_stream <- file_output_stream(tf)
write_record_batch(batch, file_stream)
file_stream$Close()

# to a RecordBatchFileWriter
file_stream <- file_output_stream(tf)
file_writer <- record_batch_file_writer(file_stream,  batch$schema())
write_record_batch(batch, file_writer)
file_writer$Close()
file_stream$Close()

# get the bytes directly
write_record_batch(batch, raw())
#>   [1] 04 01 00 00 10 00 00 00 00 00 0a 00 0c 00 06 00 05 00 08 00 0a 00 00
#>  [24] 00 00 01 03 00 0c 00 00 00 08 00 08 00 00 00 04 00 08 00 00 00 04 00
#>  [47] 00 00 04 00 00 00 9c 00 00 00 58 00 00 00 2c 00 00 00 04 00 00 00 84
#>  [70] ff ff ff 00 00 01 05 14 00 00 00 0c 00 00 00 04 00 00 00 00 00 00 00
#>  [93] dc ff ff ff 03 00 00 00 63 68 72 00 a8 ff ff ff 00 00 01 06 18 00 00
#> [116] 00 10 00 00 00 04 00 00 00 00 00 00 00 04 00 04 00 04 00 00 00 03 00
#> [139] 00 00 6c 67 6c 00 d0 ff ff ff 00 00 01 03 20 00 00 00 14 00 00 00 04
#> [162] 00 00 00 00 00 00 00 00 00 06 00 08 00 06 00 06 00 00 00 00 00 02 00
#> [185] 03 00 00 00 64 62 6c 00 10 00 14 00 08 00 06 00 07 00 0c 00 00 00 10
#> [208] 00 10 00 00 00 00 00 01 02 24 00 00 00 14 00 00 00 04 00 00 00 00 00
#> [231] 00 00 08 00 0c 00 08 00 07 00 08 00 00 00 00 00 00 01 20 00 00 00 03
#> [254] 00 00 00 69 6e 74 00 00 00 00 00 2c 01 00 00 14 00 00 00 00 00 00 00
#> [277] 0c 00 16 00 06 00 05 00 08 00 0c 00 0c 00 00 00 00 03 03 00 18 00 00
#> [300] 00 c8 00 00 00 00 00 00 00 00 00 0a 00 18 00 0c 00 04 00 08 00 0a 00
#> [323] 00 00 ac 00 00 00 10 00 00 00 0a 00 00 00 00 00 00 00 00 00 00 00 09
#> [346] 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
#> [369] 00 00 00 00 28 00 00 00 00 00 00 00 28 00 00 00 00 00 00 00 00 00 00
#> [392] 00 00 00 00 00 28 00 00 00 00 00 00 00 50 00 00 00 00 00 00 00 78 00
#> [415] 00 00 00 00 00 00 08 00 00 00 00 00 00 00 80 00 00 00 00 00 00 00 08
#> [438] 00 00 00 00 00 00 00 88 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
#> [461] 88 00 00 00 00 00 00 00 30 00 00 00 00 00 00 00 b8 00 00 00 00 00 00
#> [484] 00 10 00 00 00 00 00 00 00 00 00 00 00 04 00 00 00 0a 00 00 00 00 00
#> [507] 00 00 00 00 00 00 00 00 00 00 0a 00 00 00 00 00 00 00 00 00 00 00 00
#> [530] 00 00 00 0a 00 00 00 00 00 00 00 03 00 00 00 00 00 00 00 0a 00 00 00
#> [553] 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 01 00 00 00 02 00 00
#> [576] 00 03 00 00 00 04 00 00 00 05 00 00 00 06 00 00 00 07 00 00 00 08 00
#> [599] 00 00 09 00 00 00 0a 00 00 00 00 00 00 00 00 00 f0 3f 00 00 00 00 00
#> [622] 00 00 40 00 00 00 00 00 00 08 40 00 00 00 00 00 00 10 40 00 00 00 00
#> [645] 00 00 14 40 00 00 00 00 00 00 18 40 00 00 00 00 00 00 1c 40 00 00 00
#> [668] 00 00 00 20 40 00 00 00 00 00 00 22 40 00 00 00 00 00 00 24 40 6b 03
#> [691] 00 00 00 00 00 00 22 01 00 00 00 00 00 00 00 00 00 00 01 00 00 00 02
#> [714] 00 00 00 03 00 00 00 04 00 00 00 05 00 00 00 06 00 00 00 07 00 00 00
#> [737] 08 00 00 00 09 00 00 00 0a 00 00 00 00 00 00 00 61 62 63 64 65 66 67
#> [760] 68 69 6a 00 00 00 00 00 00 00 00 00 00
```

Created on 2018-10-12 by the [reprex package](https://reprex.tidyverse.org) (v0.2.1.9000)

Author: Romain Francois <romain@purrple.cat>

Closes #2749 from romainfrancois/ARROW-3490/stream-2 and squashes the following commits:

ce4ec06 <Romain Francois> type promotion for types that do not exist in R
338f75f <Romain Francois> More flexible read_table
5cb8dbd <Romain Francois> more flexible read_record_batch with various dispatch
072b7f0 <Romain Francois> + BufferOutputStream
f301f1e <Romain Francois> ⏪ to write_record_batch, write_table and write_arrow
a17b375 <Romain Francois> Trying less double dispatch
9b9a6b8 <Romain Francois> roxygen
2ae2ab3 <Romain Francois> - to_file and to_stream - write_arrow + stream.data.frame
8d0e581 <Romain Francois> stream.arrow::Table methods
e1f62cc <Romain Francois> R6 arrrow::io::FixedSizeBufferWriter
80ea2b7 <Romain Francois> R6 arrow::io::MockOutputSream
a93933a <Romain Francois> +close_on_exit, local_tempfile
ac20df3 <Romain Francois> + stream
  • Loading branch information
romainfrancois authored and wesm committed Oct 18, 2018
1 parent 4ed4053 commit d3ec690
Show file tree
Hide file tree
Showing 35 changed files with 1,392 additions and 414 deletions.
9 changes: 7 additions & 2 deletions r/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ Imports:
vctrs (>= 0.0.0.9000),
fs,
tibble,
crayon
crayon,
withr
Remotes:
r-lib/vctrs,
RcppCore/Rcpp
RcppCore/Rcpp,
romainfrancois/withr@bug-79/defer
Roxygen: list(markdown = TRUE)
RoxygenNote: 6.1.0.9000
Suggests:
Expand All @@ -43,6 +45,8 @@ Collate:
'List.R'
'RcppExports.R'
'RecordBatch.R'
'RecordBatchReader.R'
'RecordBatchWriter.R'
'Schema.R'
'Struct.R'
'Table.R'
Expand All @@ -51,5 +55,6 @@ Collate:
'dictionary.R'
'io.R'
'memory_pool.R'
'on_exit.R'
'reexports-tibble.R'
'zzz.R'
39 changes: 35 additions & 4 deletions r/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,43 @@ S3method(buffer,numeric)
S3method(buffer,raw)
S3method(buffer_reader,"arrow::Buffer")
S3method(buffer_reader,default)
S3method(fixed_size_buffer_writer,"arrow::Buffer")
S3method(fixed_size_buffer_writer,default)
S3method(length,"arrow::Array")
S3method(names,"arrow::RecordBatch")
S3method(print,"arrow-enum")
S3method(read_record_batch,"arrow::io::BufferReader")
S3method(read_record_batch,"arrow::io::RandomAccessFile")
S3method(read_record_batch,"arrow::ipc::RecordBatchFileReader")
S3method(read_record_batch,"arrow::ipc::RecordBatchStreamReader")
S3method(read_record_batch,character)
S3method(read_record_batch,fs_path)
S3method(read_record_batch,raw)
S3method(read_table,"arrow::io::BufferReader")
S3method(read_table,"arrow::io::RandomAccessFile")
S3method(read_table,"arrow::ipc::RecordBatchFileReader")
S3method(read_table,"arrow::ipc::RecordBatchStreamReader")
S3method(read_table,character)
S3method(read_table,fs_path)
S3method(read_table,raw)
S3method(record_batch_file_reader,"arrow::io::RandomAccessFile")
S3method(record_batch_file_reader,character)
S3method(record_batch_file_reader,fs_path)
S3method(record_batch_stream_reader,"arrow::io::InputStream")
S3method(record_batch_stream_reader,raw)
S3method(write_arrow,"arrow::RecordBatch")
S3method(write_arrow,"arrow::Table")
S3method(write_arrow,data.frame)
S3method(write_record_batch,"arrow::io::OutputStream")
S3method(write_record_batch,"arrow::ipc::RecordBatchWriter")
S3method(write_record_batch,character)
S3method(write_record_batch,fs_path)
S3method(write_record_batch,raw)
S3method(write_table,"arrow::io::OutputStream")
S3method(write_table,"arrow::ipc::RecordBatchWriter")
S3method(write_table,character)
S3method(write_table,fs_path)
S3method(write_table,raw)
export(DateUnit)
export(FileMode)
export(StatusCode)
Expand All @@ -35,13 +59,16 @@ export(array)
export(as_tibble)
export(boolean)
export(buffer)
export(buffer_output_stream)
export(buffer_reader)
export(chunked_array)
export(date32)
export(date64)
export(decimal)
export(dictionary)
export(file_open)
export(file_output_stream)
export(fixed_size_buffer_writer)
export(float16)
export(float32)
export(float64)
Expand All @@ -52,11 +79,16 @@ export(int8)
export(list_of)
export(mmap_create)
export(mmap_open)
export(mock_output_stream)
export(null)
export(read_arrow)
export(read_record_batch)
export(read_table)
export(record_batch)
export(record_batch_file_reader)
export(record_batch_file_writer)
export(record_batch_stream_reader)
export(record_batch_stream_writer)
export(schema)
export(struct)
export(table)
Expand All @@ -69,17 +101,16 @@ export(uint64)
export(uint8)
export(utf8)
export(write_arrow)
export(write_record_batch)
export(write_table)
importFrom(R6,R6Class)
importFrom(Rcpp,sourceCpp)
importFrom(assertthat,assert_that)
importFrom(glue,glue)
importFrom(purrr,map)
importFrom(purrr,map2)
importFrom(purrr,map_chr)
importFrom(purrr,map_int)
importFrom(rlang,dots_n)
importFrom(rlang,quo_name)
importFrom(rlang,seq2)
importFrom(rlang,set_names)
importFrom(tibble,as_tibble)
importFrom(withr,defer_parent)
useDynLib(arrow, .registration = TRUE)
4 changes: 4 additions & 0 deletions r/R/R6.R
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@
},
pointer_address = function(){
Object__pointer_address(self$pointer())
},

is_null = function(){
Object__is_null(self)
}
)
)
Expand Down
132 changes: 100 additions & 32 deletions r/R/RcppExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 3 additions & 43 deletions r/R/RecordBatch.R
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
num_columns = function() RecordBatch__num_columns(self),
num_rows = function() RecordBatch__num_rows(self),
schema = function() `arrow::Schema`$new(RecordBatch__schema(self)),
to_file = function(path) invisible(RecordBatch__to_file(self, fs::path_abs(path))),
to_stream = function() RecordBatch__to_stream(self),
column = function(i) `arrow::Array`$new(RecordBatch__column(self, i)),
column_name = function(i) RecordBatch__column_name(self, i),
names = function() RecordBatch__names(self),
Expand All @@ -40,7 +38,9 @@
} else {
`arrow::RecordBatch`$new(RecordBatch__Slice2(self, offset, length))
}
}
},

serialize = function(output_stream, ...) write_record_batch(self, output_stream, ...)
)
)

Expand All @@ -67,43 +67,3 @@
record_batch <- function(.data){
`arrow::RecordBatch`$new(RecordBatch__from_dataframe(.data))
}

#' Read a single record batch from a stream
#'
#' @param stream input stream
#'
#' @details `stream` can be a `arrow::io::RandomAccessFile` stream as created by [file_open()] or [mmap_open()] or a path.
#'
#' @export
read_record_batch <- function(stream){
UseMethod("read_record_batch")
}

#' @export
read_record_batch.character <- function(stream){
assert_that(length(stream) == 1L)
read_record_batch(fs::path_abs(stream))
}

#' @export
read_record_batch.fs_path <- function(stream){
stream <- file_open(stream); on.exit(stream$Close())
read_record_batch(stream)
}

#' @export
`read_record_batch.arrow::io::RandomAccessFile` <- function(stream){
`arrow::RecordBatch`$new(read_record_batch_RandomAccessFile(stream))
}

#' @export
`read_record_batch.arrow::io::BufferReader` <- function(stream){
`arrow::RecordBatch`$new(read_record_batch_BufferReader(stream))
}

#' @export
read_record_batch.raw <- function(stream){
stream <- buffer_reader(stream); on.exit(stream$Close())
read_record_batch(stream)
}

0 comments on commit d3ec690

Please sign in to comment.