Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-17426][blink planner] Dynamic Source supportsLimit pushdown #12964

Merged
merged 7 commits into from
Jul 31, 2020

Conversation

liuyongvs
Copy link
Contributor

What is the purpose of the change

  • make the DynamicSource supports LimitPushDown Rule

Verifying this change

This change added tests and can be verified as follows:

  • Added LimitTest to verify the plan
  • Extended LimitITCase (only batch)to verify the result limit projection
  • make the TestValueSource supports LimitPushDown

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): ( no)
  • The public API, i.e., is any changed class annotated with @public(Evolving): (no)
  • The serializers: ( no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (JavaDocs)

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 984b8a1 (Thu Jul 23 02:22:57 UTC 2020)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Jul 23, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

@liuyongvs
Copy link
Contributor Author

Hi @godfreyhe @wuchong ,it is here, thanks for your review.

Copy link
Contributor

@godfreyhe godfreyhe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution, I left some comments

return onlyLimit
&& tableSourceTable != null
&& tableSourceTable.tableSource() instanceof SupportsLimitPushDown
&& !Arrays.stream(tableSourceTable.extraDigests()).anyMatch(str -> str.startsWith("limit=["));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: simpler way: use noneMatch instead of ! + anyMatch.
btw, it's better we can also update the matches method of PushFilterIntoTableSourceScanRule

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok


// update extraDigests
String[] newExtraDigests = new String[0];
if (limit > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should update the digest anyway, otherwise the rule will be applied endless loop if limit is 0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it will look strange, like this limit=[0] And this rule FlinkLimit0RemoveRule will remove it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't rely on other rules to ensure the correctness of this rule, we must make sure each rule itself is correct.

TableSourceTable newTableSourceTable = applyLimit(limit, tableSourceTable);

FlinkLogicalTableSourceScan newScan = FlinkLogicalTableSourceScan.create(scan.getCluster(), newTableSourceTable);
call.transformTo(sort.copy(sort.getTraitSet(), Collections.singletonList(newScan)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better we can put sort.copy(sort.getTraitSet(), Collections.singletonList(newScan)) in a single line, which could make debugging more easy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@@ -359,7 +361,7 @@ private ChangelogMode parseChangelogMode(String string) {
/**
* Values {@link DynamicTableSource} for testing.
*/
private static class TestValuesTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown, SupportsFilterPushDown {
private static class TestValuesTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown, SupportsFilterPushDown, SupportsLimitPushDown {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line is too long...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

* implement.
* 3.We can support limit with offset, we can push down offset + fetch to table source.
*/
public class PushLimitIntoTableSourceScanRule extends RelOptRule {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a rule test to verify this rule, just like PushFilterIntoTableSourceScanRuleTest

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the unit test is LimitTest.

Copy link
Contributor

@godfreyhe godfreyhe Jul 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rule test is different from plan test. Rule test focuses on the new rule you implement and only a few must-involved rules can be added to the rule set to help the test. Plan test will verify the plan based on whole rule sets.

Copy link
Contributor

@godfreyhe godfreyhe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update, LGTM overall, I left some minor comments.
btw, please fix the typo of title: plannger -> planner


// update extraDigests
String[] newExtraDigests = new String[0];
if (limit >= 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about we remove the if ?

Copy link
Contributor Author

@liuyongvs liuyongvs Jul 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it can not be removed. Although, it doesn't have problem now. But if the calcite supports this syntax limit x,y, which is the mysql limit offset syntax. y represents limit, which can be negative, such as -1 , you can refer this https://www.cnblogs.com/acm-bingzi/p/msqlLimit.html.
And you can also read the code of calcite FlinkSqlParserImpl.OrderedQueryOrExpr, where has some comments.
That is the reason why i add this test testMysqlLimit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not we add limitation limit >= 0 in matches method ?
btw, LIMIT x,y can be expressed as LIMIT y OFFSET x in sql standard.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap, calcite fetch member is null when it doesn't have limit currently. And calcite doesn't support mysql limit x,y syntax. If it supports, limit 5, -1. The -1 represents the end. That is to say [offset5, end).
So the calcite may transform -1 to fetch null when it supports in parser. And we don't need to limit >=0.
So i just do as you say, remove the limit >= 0 and don't add limitation in matches.

<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2])
+- FlinkLogicalSort(fetch=[5])
+- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable, source: [limit: 5]]], fields=[a, b, c])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the digest pattern is not limit=5 ?

Copy link
Contributor Author

@liuyongvs liuyongvs Jul 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is PushLimitIntoLegacyTableSourceScanRuleTest, which is legacy Rule.And read the code of TestLegacyLimitableTableSource.explainSource. You can compare the PushProjectIntoLegacyTableSourceScanRuleTest and PushProjectIntoTableSourceScanRuleTest. And there are the same.
new: LogicalTableScan(table=[[default_catalog, default_database, MyTable, project=[a, c]]])
Legacy: LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestSource(physical fields: a, c)]]])

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, this comment should be at the line 129 in PushLimitIntoTableSourceScanRuleTest.xml.

}

@Test
def testCannotPushDownWithoutLimit(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unify the case of Pushdown

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@liuyongvs liuyongvs changed the title [FLINK-17426][blink plannger] Dynamic Source supportsLimit pushdown [FLINK-17426][blink planner] Dynamic Source supportsLimit pushdown Jul 28, 2020
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2])
+- FlinkLogicalSort(fetch=[5])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LimitTable, source: [limit: 5]]], fields=[a, b, c])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the digest pattern is not limit=5 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, it should be removed. i have replace the testLimitWithoutOffset by testCanPushdownLimitWithoutOffset. And forgot to remove it.

Copy link
Contributor

@godfreyhe godfreyhe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update, LGTM. cc @wuchong

TableStats newTableStats = new TableStats(newRowCount);
FlinkStatistic newStatistic = FlinkStatistic.builder()
.statistic(statistic)
.tableStats(newTableStats)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: -> .tableStats(new TableStats(newRowCount))

return oldTableSourceTable.copy(
newTableSource,
newStatistic,
newExtraDigests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newExtraDigests is only used once, just move new String[] {"limit=[" + limit + "]"} into here.

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@wuchong wuchong merged commit 3ab1a1c into apache:master Jul 31, 2020
twalthr pushed a commit to twalthr/flink that referenced this pull request Aug 3, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants