Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-3479: [R] Support to write record_batch as stream #2727

Closed
wants to merge 7 commits into from

Conversation

javierluraschi
Copy link
Contributor

@javierluraschi javierluraschi commented Oct 8, 2018

Using this PR as a WIP to efficiently transfer data from R to Spark using Arrow.

This PR might be ultimately closed and not merged, but thought it would be good to give visibility as to what I'm exploring.

Specifically, I'm working on supporting efficient execution of:

library(sparklyr)
sc <- spark_connect(master = "local")
system.time({
  tbl_data <- sdf_copy_to(sc, data.frame(y = runif(10^6, 0, 1)), "data", overwrite = TRUE)
})

Currently, without this PR and without using arrow:

system.time({
  tbl_data <- sdf_copy_to(sc, data.frame(y = runif(10^6, 0, 1)), "data", overwrite = TRUE)
})
   user  system elapsed 
  1.120   0.087   3.482 

Using arrow is down to:

library(arrow)
system.time({
  tbl_data <- sdf_copy_to(sc, data.frame(y = runif(10^6, 0, 1)), "data", overwrite = TRUE)
})
   user  system elapsed 
  0.222   0.029   0.641 

and down to the following while using record$to_raw() from this PR instead of record$to_file():

   user  system elapsed 
  0.102   0.007   0.351 

Copy link
Member

@wesm wesm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you open a JIRA for this and maybe write a unit test that round trips to the R raw type?

r/src/RecordBatch.cpp Outdated Show resolved Hide resolved
@@ -23,6 +23,7 @@
num_rows = function() RecordBatch__num_rows(self),
schema = function() `arrow::Schema`$new(RecordBatch__schema(self)),
to_file = function(path) invisible(RecordBatch__to_file(self, fs::path_abs(path))),
to_raw = function() RecordBatch__to_raw(self),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe "to_stream_raw`?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about to_stream(), I think this implies that the returned data is raw().

std::shared_ptr<arrow::ipc::RecordBatchWriter> mockWriter;
R_ERROR_NOT_OK(arrow::ipc::RecordBatchStreamWriter::Open(mockSink.get(),
batch->schema(),
&mockWriter));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the function arrow::ipc::WriteRecordBatchStream({batch}, &mock_writer) to save yourself about 3 lines.

MemoryPool* pool = default_memory_pool();
RawVector RecordBatch__to_stream(const std::shared_ptr<arrow::RecordBatch>& batch) {
std::unique_ptr<io::MockOutputStream> mockSink;
mockSink.reset(new io::MockOutputStream());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just declare this on the stack

io::MockOutputStream mock_sink;

RawVector res(mockSink->GetExtentBytesWritten());

std::unique_ptr<RawVectorOutputStream> sink;
sink.reset(new RawVectorOutputStream(res));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a bit elaborate. Is there a way to get the pointer to the raw memory in res? Then you can just use FixedSizeBufferWriter https://github.com/apache/arrow/blob/master/cpp/src/arrow/io/memory.h#L103 cc @romainfrancois

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

res.begin()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, then RawVectorOutputStream is not needed

R_ERROR_NOT_OK(arrow::ipc::RecordBatchStreamWriter::Open(sink.get(), batch->schema(), &writer));
R_ERROR_NOT_OK(arrow::ipc::RecordBatchStreamWriter::Open(sink.get(),
batch->schema(),
&writer));

R_ERROR_NOT_OK(writer->WriteRecordBatch(*batch));
R_ERROR_NOT_OK(writer->Close());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

@romainfrancois
Copy link
Contributor

I think we should start with simpler OutputStream first, e.g. replace $to_file by a write_record_batch <- function(batch, stream){ ... } S3 generic, or maybe even a stream generic with double dispatch on what is streamed and the stream, so that we can e.g. :

batch <- ...
stream(batch, output_stream(...))

@javierluraschi
Copy link
Contributor Author

Here is the JIRA issue: https://issues.apache.org/jira/browse/ARROW-3479

@javierluraschi javierluraschi changed the title [WIP] Improvements to support R to Spark in socket serialization ARROW-3479: [R] Support to write record_batch as stream Oct 9, 2018
@javierluraschi
Copy link
Contributor Author

@romainfrancois that makes sense to me; however, I still like this PR as it is to make progress in sparklyr. However, I'm not making the sparklyr work public for several months, so feel free to override this function with a more appropriate binding. I can also take a look at this at some point; however, I want to try to get data from Spark to R implemented since R to Spark is currently at a descent place.

@codecov-io
Copy link

Codecov Report

Merging #2727 into master will increase coverage by 0.94%.
The diff coverage is n/a.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #2727      +/-   ##
==========================================
+ Coverage   87.57%   88.52%   +0.94%     
==========================================
  Files         402      341      -61     
  Lines       61454    57649    -3805     
==========================================
- Hits        53821    51031    -2790     
+ Misses       7561     6618     -943     
+ Partials       72        0      -72
Impacted Files Coverage Δ
rust/src/record_batch.rs
go/arrow/datatype_nested.go
rust/src/util/bit_util.rs
go/arrow/math/uint64_amd64.go
go/arrow/internal/testing/tools/bool.go
go/arrow/internal/bitutil/bitutil.go
go/arrow/memory/memory_avx2_amd64.go
go/arrow/array/null.go
rust/src/lib.rs
rust/src/array.rs
... and 51 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update f4f6269...0e302a5. Read the comment docs.

@romainfrancois
Copy link
Contributor

LGTM, but I might still change the interface for streaming out, after #2714 is merged

Copy link
Member

@wesm wesm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, thanks @javierluraschi!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants