-
Notifications
You must be signed in to change notification settings - Fork 89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Convert predicate to arrow filter and push down to parquet reader #295
Conversation
c9e53f0
to
22af140
Compare
5606bbe
to
67c3f05
Compare
67c3f05
to
9262774
Compare
cc @viirya Is this ready for review or you still need to do more update? |
@liurenjie1024 It is ready for review. I will fix the conflicts. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @viirya Thanks for this pr, the general idea of reusing arrow kernel looks great! But I found some small problems which could be improved.
crates/iceberg/src/arrow.rs
Outdated
PredicateOperator::NotNull => Ok(Box::new(ArrowPredicateFn::new( | ||
self.projection_mask.clone(), | ||
move |batch| { | ||
let column = batch.column(term_index); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This maybe incorrect for nested column, I think maybe we should either return projection_mask for each leave column, or implement a general purpose flatten method for struct array.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Parquet reader API will flatten required columns based on the projection_mask we provide. I.e., If the projection mask selects one nested column a.b
, it will be the first column of the record batch when calling evaluate
of ArrowPredicate
API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did some test but it seems that it doesn't work like this, say the schem is like following:
message {
struct a {
int b
}
}
And we pass ProjectionMask::leaves([0])
, it will return struct array, so batch.column[term_index]
will return StructArray
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, that's what I got from reading the Parquet code. Let me take further look and test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just wrote a test to test it in arrow-rs. Yea, for a nested column such as struct.a
, the batch passed to evaluate
contains a struct array with the column a
. Looks like Parquet will project the requested column indices and construct the upper nested type (i.e., struct
) before passing to evaluate
,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the test to arrow-rs to clarify its usage: apache/arrow-rs#5600
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think to have a flatten method for struct array sounds more simple way. I'm looking in arrow-rs to see if there is existing one, if not, we need to implement it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This maybe incorrect for nested column, I think maybe we should either return projection_mask for each leave column, or implement a general purpose flatten method for struct array.
I tried to change to return projection_mask for each leave column, it is pretty straightforward to implement. Please let me know if it looks good to you. Thanks.
I've addressed some of above reviews. I will resolve other reviews soon. Thanks. |
@liurenjie1024 I've addressed all comments. Thank you. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@liurenjie1024 Thanks for review. Sorry for late. I addressed the comments by rewriting the visitors using the new API. I replied with another questions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @viirya for this great pr! It's really hard work, and you did it in a quite elegant way. I found some small problems to fix.
crates/iceberg/src/arrow/reader.rs
Outdated
fn always_true(&mut self) -> Result<Self::T> { | ||
Ok(Box::new(ArrowPredicateFn::new( | ||
self.projection_mask.clone(), | ||
|batch| Ok(BooleanArray::from(vec![true; batch.num_rows()])), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This maybe not a blocker, but is it possible to build a const array in arrow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I know, there is no const array in arrow for now. It has dictionary array which I think is close to const array, but the ArrowPredicateFn
API defines returned type to be BooleanArray
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've created an issue to track this: apache/arrow-rs#5701
crates/iceberg/src/arrow/reader.rs
Outdated
reference: &BoundReference, | ||
_predicate: &BoundPredicate, | ||
) -> Result<Self::T> { | ||
let projected_mask = self.bound_reference(reference)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems incorrect to me. Let's say the predicate is a is null AND b >1
, then the batch passed to this ArrowPredicateFn
is constructed by projection mask of [a, b]. I think one possible solution is to use same project mask for all predicates, and pass the column_idx to get_leaf_column.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, as RowFilter
API design accepts multiple ArrowPredicate
s. Each ArrowPredicate
has its projection
and the API doc said the API will be passed batches that contains the columns specified in projection
. I think it should be projected for each ArrowPredicate
and projection
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will go to test it in arrow-rs to verify it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, no, although I used multiple ArrowPredicates
before, it was changed to one ArrowPredicates
after you suggested. So now we generate only one ArrowPredicates
.
Hmm, let me think how do deal with it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we use one ArrowPredicates
to represent the entire predicate, we should use one projection mask which contains all leaf columns in the predicate.
But it brings another question, how do we access the correct array from the RecordBatch
. For top-level column, it should be straightforward, but for nested column, I don't find a way to get it quickly.
I created one issue at arrow-rs: apache/arrow-rs#5699
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on what I searched and the kindly reply on the issue, I think there is no way to do nested projection on RecordBatch currently.
To implement the feature in arrow-rs might block this. I tend to finish top-level column only in this PR.
WDYT, @liurenjie1024 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that we can support top-level column only first to move on.
} | ||
|
||
#[tokio::test] | ||
async fn test_filter_on_arrow_is_not_null() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the tests, is it possible to add serveral test cases for more complex types such as AND
, OR
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I added some more tests using AND
and OR
.
crates/iceberg/src/arrow/reader.rs
Outdated
}, | ||
))) | ||
} else { | ||
self.build_always_true() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a column is missing, I think we should treat it as null, so this should be false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, fixed it.
Ok(BooleanArray::from(vec![true; batch.num_rows()])) | ||
}))) | ||
} else { | ||
self.build_always_true() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Iceberg spec has no definition for NULL is_nan
, but java defines it as false: https://github.com/apache/iceberg/blob/c07f2aabc0a1d02f068ecf1514d2479c0fbdd3b0/api/src/main/java/org/apache/iceberg/util/NaNUtil.java#L25
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay
crates/iceberg/src/arrow/reader.rs
Outdated
}, | ||
))) | ||
} else { | ||
self.build_always_true() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java impl comparator uses null first: https://github.com/apache/iceberg/blob/c07f2aabc0a1d02f068ecf1514d2479c0fbdd3b0/api/src/main/java/org/apache/iceberg/expressions/Literals.java#L616
I think we should return false here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay
49721e8
to
c36560c
Compare
@liurenjie1024 I've addressed your comments. Please take a look when you can. Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @viirya for this effort!
Oh, sorry, seems we need to resolve conflicts. Others LGTM, thanks! |
Thanks @liurenjie1024. I just resolved the conflicts. |
Thanks @viirya for this great effort! |
Thanks @liurenjie1024 for your review! |
This implements the feature of row filtering when reading Parquet files in Iceberg scan. It is achieved by converting predicates into Parquet Arrow filter which is used to filter rows during reading in Parquet reader.
This implements AlwaysTrue, AlwaysFalse, And, Or, Not, Binary, partial Unary predicates. Unimplemented predicates (some Unary and Set predicates) are because no existing kernels to be used in arrow. I'll implement them in following works.
close #265