Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flink 16024][Connector][JDBC] Support FilterPushdown #20140

Closed

Conversation

qingwei91
Copy link
Contributor

@qingwei91 qingwei91 commented Jul 3, 2022

What is the purpose of the change

Implement Filter Pushdown in JDBC Connector Source, this is an optimization that will avoid scanning the whole table into Flink when we only need a subset of the table.

Brief change log

  • Implement a SQL Visitor that converts Flink SQL AST to JDBC SQL String
  • Implement FilterPushdown in JDBCDynamicSource using the Visitor
  • Add tests for both Visitor and JDBCDynamicSource

Verifying this change

This change added tests and can be verified as follows:

  • Added Integrated Test in JdbcDynamicTableSourceITCase to make sure there is no regression on SQL Filtering
  • Added JdbcFilterPushdownVisitorTest to verify we are generating SQL correctly

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Jul 3, 2022

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

*
* @return {@link JdbcFilterPushdownVisitor}
*/
default JdbcFilterPushdownVisitor getFilterPushdownVisitor() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be dialect specific so that it can be specialized to specific SQL syntax.

I noticed we removed all default implementation from this interface, should I do the same for this method?


/** A {@link DynamicTableSource} for JDBC. */
@Internal
public class JdbcDynamicTableSource
implements ScanTableSource,
LookupTableSource,
SupportsProjectionPushDown,
SupportsLimitPushDown {
SupportsLimitPushDown,
SupportsFilterPushDown {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Support Filter Pushdown in JDBC source

* then be handled in Flink runtime.
*/
@Override
public Result applyFilters(List<ResolvedExpression> filters) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Core implementation of this change:

We traverse the ResolvedExpression, and produce a String if we know how to push it to SQL Database, returning None if we cannot handle it.

The unhandled expression will be kept as Flink SQL and run in the job

@MartijnVisser
Copy link
Contributor

@qingwei91 Thanks for the PR. Can you make sure that your PR title and especially your commit messages are confirming to the Code Contribution guide? https://flink.apache.org/contributing/contribute-code.html

For example, your commit message should start with [FLINK-16024][Connector][JDBC] in this case.

@qingwei91 qingwei91 force-pushed the FLINK-16024/1.16-jdbc-filter branch from 3672733 to 16a7978 Compare July 5, 2022 17:52
@qingwei91 qingwei91 changed the title Flink 16024/1.16 jdbc filter [Flink 16024][Connector][JDBC] Support FilterPushdown Jul 10, 2022
@qingwei91
Copy link
Contributor Author

@flinkbot run azure

1 similar comment
@qingwei91
Copy link
Contributor Author

@flinkbot run azure

@qingwei91 qingwei91 force-pushed the FLINK-16024/1.16-jdbc-filter branch from 8ed32ca to 6c97da0 Compare July 13, 2022 19:41
@qingwei91
Copy link
Contributor Author

@flinkbot run azure

Copy link
Contributor

@JingGe JingGe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @qingwei91 for driving this PR.

@@ -53,7 +52,6 @@
* Visitor to walk a Expression AST. Produces a String that can be used to pushdown the filter,
* return Optional.empty() if we cannot pushdown the filter.
*/
@Public
Copy link
Contributor

@JingGe JingGe Jul 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing public interface is not allowed. Please check FLIP-196: Source API stability guarantees for more information.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @JingGe, thanks for the comment.

This is a file I newly added, and in my latest commit, I marked it as PublicEvolving: 6c97da0

Do you mean we cannot expose new API?
For instance, I added this here: https://github.com/apache/flink/pull/20140/files#diff-ae60653ffe2ac890a3c1b01da41405bcc4e6913949176c36edc009df5090c38fR157, which adds a new method and a new return type to JdbcDialect which is PublicEvolving, because filter pushdown might differ across JDBC dialect. Is this approach a problem?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @JingGe did you get a chance to check on this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean we cannot expose new API?

You can definitely add a new API to the master branch, but the reason why it's problematic here is that you've changed the status of the API in your different commits. That means that when a reviewer is checking individual commits, it shows that you've removed @Public. In a later commit, you've again added @PublicEvolving. It's better to squash your commits in this case to avoid this type of confusion.

Copy link
Contributor

@JingGe JingGe Jul 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @qingwei91 Could you please explain why a public API is required? Does @internal work too? Thanks.

Copy link
Contributor Author

@qingwei91 qingwei91 Jul 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @JingGe , I believe it has to be PublicEvolving because this class is returned by a public method in JdbcDialect, which is a PublicEvolving class. Here's an example where its failing if I mark it as Internal: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=38493&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461

I will sort out the commit, sorry for the confusion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've squash the commits now and we have a green build.

Can anyone of you review this again? Thank you!

@MartijnVisser
Copy link
Contributor

@qingwei91 I can't review this since this is not my expertise. I would like to ask @hailin0 to help with a review due to #20304 - Else I would like to ask @leonardBang if he can help out

@MartijnVisser MartijnVisser removed their request for review July 29, 2022 09:36
@libenchao
Copy link
Member

@qingwei91 Thanks for your contribution. For the current design, I see you use ValueLiteralExpression#toString() to generate the string for literals. This may work for some cases, but not for all cases IMHO.
Consider following cases:

  1. ValueLiteralExpression#toString() uses Flink dialect to represent string, and it hard-coded the quote with '. However, in many DBMS, ' is not the only choice.
  2. ValueLiteralExpression#toString() only handles special character ' using escape. In many DBMS, they need more special character handing, e.g. mysql driver
  3. For other types, e.g. TIMESTAMP, DATE, TIME, INTERVAL and so on, they may suffers from this too, because we cannot assume that all DB dialects handle them in the same way.

Another more general way to handle this is to use PrepareStatement.setXXX just like we already did in TableSimpleStatementExecutor and JdbcRowDataLookupFunction. WDYT?

@qingwei91
Copy link
Contributor Author

Hi @libenchao , thanks for the review!

Thanks for pointing out the flaw, you're right. 🙇

On your recommended approach, I believe JdbcFilterPushdownVisitor needs to produce strings or some equivalent data, so that it can be used by JdbcDynamicTableSource::getScanRuntimeProvider, how should I make use of PrepareStatement here? Maybe I am missing something?

I think you're pointing out a fundamental issue with this PR, SQL statement generation has to be dialect-specific, and me trying to provide a default implementation might be a lost cause here.

If we cannot go down the prepared statement route, I can think of 2 ideas:

  1. Implement dialect-specific method that converts ValueLiteralExpression to SQL string literal, and have JdbcFilterPushdownVisitor to make use of it. We don't have to support all dialect in 1 go, and we can simply fallback to in-memory filter when its not supported.
  2. Let JdbcFilterPushdownVisitor be dialect-specific, then every implementation will need to deal with dialect specific thing including literal stringification. This is similar to current approach, the only difference is that currently this PR provides a default implementation, where this option will stop doing it. My implementation is tested and used in Production for SQL Server. Maybe I can rename it and make it specific to SQL Server jdbc dialect? (SQL Server dialect is still opened [Flink-14101][Connectors][Jdbc] SQL Server dialect #20235), so this will have a dependency to it. Likewise, we can fallback to in-memory filtering when dialect without an implementation.

I think option 1 is less code, but probably more fiddly and easier to break. Option 2 is likely gonna be more code, but the separation is cleaner and less likely to break.

Let me know your thoughts. 😄

@libenchao
Copy link
Member

I believe JdbcFilterPushdownVisitor needs to produce strings or some equivalent data, so that it can be used by JdbcDynamicTableSource::getScanRuntimeProvider, how should I make use of PrepareStatement here? Maybe I am missing something?

@qingwei91 Currently JdbcRowDataInputFormat already uses PreparedStatement, and 'scan.partition' is implemented using it, hence we can do it. JdbcFilterPushdownVisitor does not necessarily need to return String, it can return anything we need.
We may need to rethink about the design, especially for the following points:

  1. What functions should we support, e.g. IN, BETWEEN?
  2. Are these functions supported in all JDBC dialects, and do they use the same operator? If not, how should we abstract it out to make it dialect specific?

@qingwei91
Copy link
Contributor Author

Hi @libenchao, oh I see, I didnt spot that JdbcRowDataInputFormat is using PreparedStatement under the hood.

Thanks for pointing it out, this will be larger change than I expect, I will give it a go.

  1. What functions should we support, e.g. IN, BETWEEN?

I think we can tackle this incrementally? I believe IN is supported out of the box, because Flink compile IN into multiple X=Y condition chained together by OR, I never looked into BETWEEN though

  1. Are these functions supported in all JDBC dialects, and do they use the same operator? If not, how should we abstract it out to make it dialect specific?

I think ultimate we need to allow dialect specificness, right now I design it such that the query generator (ie. JdbcFilterPushdownVisitor) is part of JDBCDialect, so each dialect can provide their own instance to deal with it. Do you think this design is okay? Or is there a better way?

@libenchao
Copy link
Member

I believe IN is supported out of the box, because Flink compile IN into multiple X=Y condition chained together by OR

Not exactly, we have a threshold (default 4):

I think we can tackle this incrementally?

I agree, we can start from some common functions, such as =, <>, <, >, <=, >=, IS NULL, IS NOT NULL, IN, and leave others to the future.

@HeChuanXUPT
Copy link

Wow, Thanks, It works for me~

@qingwei91 qingwei91 force-pushed the FLINK-16024/1.16-jdbc-filter branch 4 times, most recently from cf84956 to f5516a5 Compare October 2, 2022 18:24
Implement Filter Pushdown for JDBC connector source using expression visitor pattern.
@qingwei91
Copy link
Contributor Author

@flinkbot run azure

@qingwei91
Copy link
Contributor Author

Hi @libenchao , thank you very much for your review 👍

I've addressed all of your concern. On the TablePlanTest, do you mind to check if that's how it supposed to work? I don't think I understand internal good enough to judge

Copy link
Member

@libenchao libenchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One small tip, do not rebase/force-pushing before the reviewer asks you because this will make the reviewer hard to do incremental review.

</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[id, time_col, real_col], where=[((time_col <> '11:11:11.000111') OR (double_col >= -1000.23))])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the condition still exists in the Calc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figure out, its because the column type Time wasn't supported, I've added support here: 74fe5ee

Thanks for calling it out.

@qingwei91
Copy link
Contributor Author

One small tip, do not rebase/force-pushing before the reviewer asks you because this will make the reviewer hard to do incremental review.

Sorry, my bad, I was advised to squashed in prev PR, but of course that should only be done right before merging.

@qingwei91
Copy link
Contributor Author

@flinkbot run azure

LogicalType tpe = litExp.getOutputDataType().getLogicalType();
Class<?> typeCs = tpe.getClass();

if (SUPPORTED_DATA_TYPES.contains(typeCs)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about change this to something like:

switch(tpe.getTypeRoot()) {
  case INTEGER: ...
  case VARCHAR: ...
  ...
  default:
    return Optional.empty();
}

@qingwei91
Copy link
Contributor Author

@flinkbot run azure

@qingwei91
Copy link
Contributor Author

@flinkbot run azure

Copy link
Member

@libenchao libenchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@qingwei91 Thanks for the update. Please do not use 'squash' or 'force-push' unless you must or the reviewer asks you. (I go through all the codes again, and left several minor comment)


@Override
public Optional<ParameterizedPredicate> visit(FieldReferenceExpression fieldReference) {
String predicateStr = (this.quoteIdentifierFunction.apply(fieldReference.toString()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useless ( )

public class JdbcFilterPushdownPreparedStatementVisitor
extends ExpressionDefaultVisitor<Optional<ParameterizedPredicate>> {

private Function<String, String> quoteIdentifierFunction;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final

@qingwei91
Copy link
Contributor Author

@libenchao

Thanks for the update. Please do not use 'squash' or 'force-push' unless you must or the reviewer asks you. (I go through all the codes again, and left several minor comment)

Sorry, in the last commit, I thought to bundle all changes I've made since your last review into 55d5227, so that you can review just that without noise. I wont do it again.

I will go through the comments and fix them

Add test for IS NULL and IS NOT NULL
@qingwei91
Copy link
Contributor Author

qingwei91 commented Nov 1, 2022

Hi @libenchao , this is the new commit I added to address your comment.

42078a4

I also added support IS NULL and IS NOT NULL as these 2 are quite common

Copy link
Member

@libenchao libenchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also added support IS NULL and IS NOT NULL as these 2 are quite common

This is great!
The PR LGTM with only one minor comment, after fixing that, I'll merge this.

…/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java


Prefer primitive

Co-authored-by: Benchao Li <libenchao@gmail.com>
Copy link
Member

@libenchao libenchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, merging. @qingwei91 Thanks for your contribution and the patience and consistence during the review.

@libenchao libenchao closed this in 4967001 Nov 2, 2022
huangxiaofeng10047 pushed a commit to huangxiaofeng10047/flink that referenced this pull request Nov 3, 2022
dchristle pushed a commit to dchristle/flink that referenced this pull request Nov 18, 2022
akkinenivijay pushed a commit to krisnaru/flink that referenced this pull request Feb 11, 2023
}

@Override
public Optional<ParameterizedPredicate> visit(CallExpression call) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@qingwei91 - Is there any reason why LIKE was not added here? It seems to be as easy as other binary operators below.
I will gladly add LIKE but I just want to ensure I didn't overlook something tricky.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @grzegorz8 , I think we simply missed it, thanks for noticing this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants