New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-14634: [Flatbuffers] introduction of ColumnBag #11646
base: main
Are you sure you want to change the base?
Conversation
Thanks for opening a pull request! If this is not a minor PR. Could you open an issue for this pull request on JIRA? https://issues.apache.org/jira/browse/ARROW Opening JIRAs ahead of time contributes to the Openness of the Apache Arrow project. Then could you also rename pull request title in the following format?
or
See also: |
|
|
format/Message.fbs
Outdated
/// If not provided, all field nodes are included and this payload is | ||
/// identical to a RecordBatch. Otherwise the reader needs to skip | ||
/// top level FieldNodes that were not included. | ||
includedNodes: [FieldNodeRange]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So to be clear, we can't do something like provide only a nested array - and implementations will need to validate that this only skips entire top level fields?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we mark this as experimental like how Tensor does?
Lines 18 to 20 in 939db7f
/// EXPERIMENTAL: Metadata for n-dimensional arrays, aka "tensors" or | |
/// "ndarrays". Arrow implementations in general are not required to implement | |
/// this type |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In RecordBatch, the FieldNodes are listed in-order from first field node, to its children, and grandchildren, followed by the second field node. Note that if we include a top level field node we must include its children. This requirement certainly applies to array-types, and I assume it applies to nested structures -- but I have not used them enough to play with the idea.
I think there are a few options to represent which top level nodes to include.
- encoded BitSet, but it is too easy to create degenerate cases
- each FieldNode could include a third parameter -- but in flatbuffers this means that the struct is written down differently (I think if the struct is greater than 16B then it must be pre-written before constructing the flatbuffer table that uses it)
- include a parallel array with field node indicating which field offset, but this would be empty for child nodes
- what remains is a compromise listing ranges of columns that were included -- the use case I have in mind is single-digit number of ranges almost always - but columns can be easily into the tens of dozens.
So to be clear, we can't do something like provide only a nested array - and implementations will need to validate that this only skips entire top level fields?
I can't quite tell what solution you are proposing here. I think client implementations do end up working exactly like you are saying, though. Could you elaborate on your idea or defend an alternative approach?
--
Can we mark this as experimental like how Tensor does?
Absolutely, patch update incoming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I guess what I mean is "Can we make it explicit that only top-level arrays can be skipped, and top-level arrays must be skipped as a whole", i.e. we can't "patch" a child of a nested array. (And that implementations must reject "degenerate" messages that skip, say, only one child of a struct, since that makes no sense in the first place, but should be validated.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ooh. I wish there was a way to tell. I don't think you can tell that a message is a degenerate. The FieldNode themselves do not identify which node or location in the schema it belongs to. They are expected to be visited in-order (parent, children, grandchildren; then the parent's siblings -- the other root field nodes, etc). The point of FieldNodeRange is that this is the list of top level field nodes only -- not including nested nodes. So, as typical, it is impossible to tell if the message was written incorrectly. You typically only find out from out-of-memory access where a node thinks it's an array but it's really just a column of uint64.
Point is, I think I'm preventing this by not being able to describe it on the wire.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the comment. Maybe this makes the intention more obvious? Does this resolve your concern?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, thanks.
I think consumers can validate, so long as they have the schema: they would traverse the schema as usual, and look at both nodes and includedNodes. If while traversing a type they ran into a buffer not included in includedNodes, then the message is invalid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, wait, this is a list of top level nodes/fields only. Ok, I see the comments now, thanks - that works.
format/Message.fbs
Outdated
|
||
/// A data header describing the shared memory layout of a "bag" of "columns". | ||
/// It is similar to a RecordBatch but not every top level FieldNode is required | ||
/// to be included in the wire payload. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking ahead - how do the APIs for this look like? In Java, this would be a "ragged" VectorSchemaRoot?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am open to anything. I haven't quite gotten far enough to consider more than the high level -- that the ColumnBag doesn't know its size; so the user will be asking each column instead. Otherwise; I think the API should be pretty similar. I'd like to avoid any other divergence if that also makes sense to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Understandable. In C++ we would need a container similar to RecordBatch, and I suppose in Java something similar.
@alamb FYI, pertaining to partition columns in datafusion |
Thank you for the writeup and PR @nbauernfeind -- I added a link to the mailing list discussion and left a question on the context for this change in the google doc. |
6a96992
to
9366a04
Compare
I've updated the PR based on feedback so far. I had to move FieldNode to Schema.fbs -- to avoid a cyclical dependency between Schema, Message, and ColumnBag. |
@@ -501,6 +501,24 @@ struct Buffer { | |||
length: long; | |||
} | |||
|
|||
/// ---------------------------------------------------------------------- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Field node seems more a part of message than schema? If it is needed as a top level type lets make it a separate file?
/// be listed in strictly increasing order and be non-overlapping. | ||
includedNodes: [FieldNodeRange]; | ||
|
||
/// Nodes correspond to the pre-ordered flattened logical schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For top-level field nodes are they going to be allowed to have separate lengths?
|
||
/// A data header describing the shared memory layout of a "bag" of "columns". | ||
/// It is similar to a RecordBatch but not every top level node is required | ||
/// to be included in the wire payload. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the mentions that different number of rows might also be important but I might be misunderstanding?
@@ -37,7 +37,7 @@ enum MetadataVersion:short { | |||
/// >= 0.8.0 (December 2017). Non-backwards compatible with V3. | |||
V4, | |||
|
|||
/// >= 1.0.0 (July 2020. Backwards compatible with V4 (V5 readers can read V4 | |||
/// >= 1.0.0 (July 2020). Backwards compatible with V4 (V5 readers can read V4 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is a enum defined with "features" we should add column bag to that enum. Unfortunately, I haven't had the time to integrate the enum but the purpose was to allow clients to express which capabilities they handle the server.
This Draft PR is the proposed flatbuffer change referenced on the mailing list with subject "[DISCUSS] next iteration of flatbuffer structures"
I wrote a small document with some details of the proposal: https://docs.google.com/document/d/1jsmmqLTyJkU8fx0sUGIqd6yu72N4v9uHFsuGSgB_DfE
This is the non-documentation side of the proposal.