-
Notifications
You must be signed in to change notification settings - Fork 1.3k
[FLINK-27580] Implement filter pushdown for TableStoreHiveStorageHandler #124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
JingsongLi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution, left some comments.
| public Optional<Predicate> convert() { | ||
| try { | ||
| return Optional.of(convertTree(tree)); | ||
| } catch (Throwable t) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to catch a specific exception to avoid unexpected bugs.
| private static final Logger LOG = | ||
| LoggerFactory.getLogger(SearchArgumentToPredicateConverter.class); | ||
|
|
||
| private final ExpressionTree tree; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename tree to root?
There are several places where the exception reference to tree is wrong, and the same name tends to cause this problem
| } | ||
| } | ||
|
|
||
| private Predicate convertNotTree(ExpressionTree tree) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think negate logical for not should in flink-table-store-core.
| Preconditions.checkArgument(idx >= 0, "Column " + columnName + " not found."); | ||
| LogicalType columnType = columnTypes.get(idx); | ||
| switch (leaf.getOperator()) { | ||
| case EQUALS: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto (I think negate logical for not should in flink-table-store-core.)
| } else { | ||
| return FileStoreImpl.createWithValueCount( | ||
| tableLocation, new FileStoreOptions(options), user, partitionType, rowType); | ||
| private static class FileStoreWrapper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should have a FileStoreFactory in flink-table-store-core after #101
Then we can create Predicate from that. (Maybe get row type from it)
JingsongLi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me!
Filter pushdown is a critical optimization for sources as it can decrease number of records to read. Hive provides a
HiveStoragePredicateHandlerinterface for this purpose. We need to implement this interface inTableStoreHiveStorageHandler.