Skip to content

Commit

Permalink
feat: support RecordBatchReader on boxed trait objects (apache#4475)
Browse files Browse the repository at this point in the history
* Impl RBR for Box

* Require send to create a FFI stream
  • Loading branch information
wjones127 committed Jul 3, 2023
1 parent 414235e commit 5ea197d
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 5 deletions.
24 changes: 24 additions & 0 deletions arrow-array/src/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ pub trait RecordBatchReader: Iterator<Item = Result<RecordBatch, ArrowError>> {
}
}

impl<R: RecordBatchReader + ?Sized> RecordBatchReader for Box<R> {
fn schema(&self) -> SchemaRef {
self.as_ref().schema()
}
}

/// Trait for types that can write `RecordBatch`'s.
pub trait RecordBatchWriter {
/// Write a single batch to the writer.
Expand Down Expand Up @@ -1115,4 +1121,22 @@ mod tests {
// Cannot remove metadata
batch.with_schema(nullable_schema).unwrap_err();
}

#[test]
fn test_boxed_reader() {
// Make sure we can pass a boxed reader to a function generic over
// RecordBatchReader.
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let schema = Arc::new(schema);

let reader = RecordBatchIterator::new(std::iter::empty(), schema);
let reader: Box<dyn RecordBatchReader + Send> = Box::new(reader);

fn get_size(reader: impl RecordBatchReader) -> usize {
reader.size_hint().0
}

let size = get_size(reader);
assert_eq!(size, 0);
}
}
10 changes: 5 additions & 5 deletions arrow/src/ffi_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ unsafe extern "C" fn release_stream(stream: *mut FFI_ArrowArrayStream) {
}

struct StreamPrivateData {
batch_reader: Box<dyn RecordBatchReader>,
batch_reader: Box<dyn RecordBatchReader + Send>,
last_error: String,
}

Expand Down Expand Up @@ -157,7 +157,7 @@ impl Drop for FFI_ArrowArrayStream {

impl FFI_ArrowArrayStream {
/// Creates a new [`FFI_ArrowArrayStream`].
pub fn new(batch_reader: Box<dyn RecordBatchReader>) -> Self {
pub fn new(batch_reader: Box<dyn RecordBatchReader + Send>) -> Self {
let private_data = Box::new(StreamPrivateData {
batch_reader,
last_error: String::new(),
Expand Down Expand Up @@ -371,7 +371,7 @@ impl RecordBatchReader for ArrowArrayStreamReader {
/// Assumes that the pointer represents valid C Stream Interfaces, both in memory
/// representation and lifetime via the `release` mechanism.
pub unsafe fn export_reader_into_raw(
reader: Box<dyn RecordBatchReader>,
reader: Box<dyn RecordBatchReader + Send>,
out_stream: *mut FFI_ArrowArrayStream,
) {
let stream = FFI_ArrowArrayStream::new(reader);
Expand All @@ -388,13 +388,13 @@ mod tests {

struct TestRecordBatchReader {
schema: SchemaRef,
iter: Box<dyn Iterator<Item = Result<RecordBatch>>>,
iter: Box<dyn Iterator<Item = Result<RecordBatch>> + Send>,
}

impl TestRecordBatchReader {
pub fn new(
schema: SchemaRef,
iter: Box<dyn Iterator<Item = Result<RecordBatch>>>,
iter: Box<dyn Iterator<Item = Result<RecordBatch>> + Send>,
) -> Box<TestRecordBatchReader> {
Box::new(TestRecordBatchReader { schema, iter })
}
Expand Down

0 comments on commit 5ea197d

Please sign in to comment.