Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial Arrow support #866

Merged
merged 39 commits into from Aug 26, 2020
Merged

Initial Arrow support #866

merged 39 commits into from Aug 26, 2020

Conversation

hannes
Copy link
Member

@hannes hannes commented Aug 25, 2020

Apache Arrow defines a "standardized column-oriented memory format that is able to represent flat and hierarchical data for efficient analytic operations". There has been a long-standing feature request to support fetching DuckDB result sets as Arrow arrays (#151). In this PR, we define two interfaces between DuckDB and Arrow:

  1. Reading Arrow Arrays as tables from DuckDB queries
  2. Fetching DuckDB query results as Arrow Arrays

Thankfully, Arrow has defined a lightweight "C Data Interface" that allows us to provide both interfaces without any build or runtime dependency on the Arrow library itself. This interface is even being extended at the moment to also allow streaming data in and out of arrow without said dependency. DuckDB already adopts the proposed streaming interface internally.

For now, the Arrow bridge is implemented for DuckDB's C++ and Python APIs. There are also some restrictions wrt. the kind of Arrow arrays that can be passed for now:

  • Data types: Only signed integer types, floating point types and strings are supported
  • Dictionaries are not supported
  • Nested types are not supported

Below some usage examples.

1. Reading Arrow Arrays as tables from DuckDB queries

Reading Arrow arrays from DuckDB is implemented as a table-producing function, arrow_scan. This function takes a ArrowArrayStream* parameter. The Python API provides a wrapper so we can use pyarrow.Table instances.

Using the Python Relation API, we can for example run arbitrary SQL on an Arrow Table from a Parquet file and convert to Pandas:

my_arrow_table  = pyarrow.parquet.read_table(parquet_filename) # for example
rel = duckdb.from_arrow_table(my_arrow_table) # or just arrow() 
print(rel.query("arrow", "SELECT * FROM arrow LIMIT 10").df())

From C++:

ArrowArrayStream stream;
// fill stream with some meaning, then
auto result = con.TableFunction("arrow_scan", {Value::POINTER((uintptr_t)&stream)})->Execute();

2. Fetching DuckDB query results as Arrow Table

Using the Python DB API:

con = duckdb.connect()
con.execute('select 44')
print(con.fetch_arrow_table()) # or just arrow() 

Using the Python Relation API:

print(duckdb.values([42]).to_arrow_table()) # or just arrow() 

From C++: (already a streaming API!)

DuckDB db(nullptr);
Connection con(db);
auto result = con.Query("SELECT 42");

ArrowSchema arrow_schema;
result->ToArrowSchema(&arrow_schema);
// do something with arrow_schema's contents

while(true) {
    ArrowArray arrow_chunk;
    auto chunk = result->Fetch();
    if (chunk.size() == 0) break;
    chunk->ToArrowArray(&arrow_chunk);
    // Do something with arrow_chunk's contents
}

See tools/pythonpkg/tests/test_arrow.py and test/api/test_arrow.cpp for some more examples.

Many thanks to @wesm, @pitrou and @fsaintjacques for making this possible!

@hannes hannes requested a review from Mytherin August 25, 2020 15:37
@hannes
Copy link
Member Author

hannes commented Aug 25, 2020

@Mytherin can you have a look please?

@Mytherin
Copy link
Collaborator

Looks good to me

@hannes hannes linked an issue Aug 26, 2020 that may be closed by this pull request
Copy link

@wesm wesm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool! Thanks for being the guinea pig on the C interface iterator, getting that working well going the other direction will help us harden the interactions with pyarrow, that could be tested in a branch and then merged later once it's available in the production pyarrow packages.

@pitrou may have some more detailed comments on the C interface details

} else if (format == "d:38,0") { // decimal128
return_types.push_back(LogicalType::HUGEINT);
} else if (format == "u") {
return_types.push_back(LogicalType::VARCHAR);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are your VARCHAR required to be utf8? Curious how Arrow BINARY might be handled

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they are required to be UTF8 yes

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To add to that, we have a LogicalType::BLOB that can handle arbitrary binary blobs

bitset<STANDARD_VECTOR_SIZE + 8> temp_nullmask;
memcpy(&temp_nullmask, (uint8_t *)array.buffers[0] + bit_offset / 8, n_bitmask_bytes + 1);

temp_nullmask >>= (bit_offset % 8); // why this has to be a right shift is a mystery to me
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StringVector::AddString(output.data[col_idx], cptr, str_len);
break;
case UnicodeType::UNICODE:
// this regrettably copies to normalize
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is that?

for (idx_t row = 0; row < output.size(); row++) {
auto source_idx = data.chunk_offset + row;

auto ms = src_ptr[source_idx] / 1000000; // nanoseconds
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to use the time unit from the schema -- this will be incorrect for units other than nanoseconds?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes we need to do another iteration on the many time types that arrow seems to have


out_schema->children = root_holder->children.get();

out_schema->format = "+s"; // struct apparently
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More precisely you could say that the C interface uses a struct to communicate an Arrow record batch (to avoid having an unwieldy dichotomy between an Array and a RecordBatch).

https://github.com/apache/arrow/blob/master/docs/source/format/CDataInterface.rst#record-batches

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am quite happy with the struct approach

batches.append(batch_import_func((uint64_t)&data, (uint64_t)&schema));
}
return from_batches_func(batches, schema_obj);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As soon as pyarrow has built-in support for the C interface stream/iterator (i.e. hopefully in the next release!) this can be basically dropped

@hannes hannes merged commit 9d86f78 into master Aug 26, 2020
@wesm
Copy link

wesm commented Aug 26, 2020

Need to open a follow up issue to fix the timestamp issue?

@hannes
Copy link
Member Author

hannes commented Aug 26, 2020

@wesm yes its in #868

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support Arrow for table scans and result sets
3 participants