-
Notifications
You must be signed in to change notification settings - Fork 13k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-25379][connectors] Support limit push down in DATAGEN connector #18151
[FLINK-25379][connectors] Support limit push down in DATAGEN connector #18151
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit b2fc0dd (Mon Dec 20 07:18:03 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
@flinkbot run azure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can already specify the number of records for this connector per the documentation, see https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/datagen/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice change, can you add a simple test in DataGeneratorConnectorITCase
that checks that using the LIMIT
keyword instead of the number-of-rows
option works fine?
@MartijnVisser this PR adds support for the limit keyword, that is instead of defining |
b2fc0dd
to
be07029
Compare
...t/java/org/apache/flink/table/planner/runtime/stream/table/DataGeneratorConnectorITCase.java
Outdated
Show resolved
Hide resolved
be07029
to
1f1e5df
Compare
...t/java/org/apache/flink/table/planner/runtime/stream/table/DataGeneratorConnectorITCase.java
Outdated
Show resolved
Hide resolved
1f1e5df
to
4bd3f99
Compare
...t/java/org/apache/flink/table/planner/runtime/stream/table/DataGeneratorConnectorITCase.java
Outdated
Show resolved
Hide resolved
Thank you for the contribution @ZhangChaoming, just left another small comment and then it looks good! |
I am wondering if LIMIT is the best option for the Datagen connector, given the special circumstances for this connector because it generates random rows every time. I think LIMIT implies that a user wants to limit the available (or in this case, generated) results. If I would apply a LIMIT in combination with a specific reading position for Kafka, I get the same results over and over. That's not the case for Datagen of course, because it generates random values. I rather be consistent in what LIMIT does for all connectors and keep the current method of |
The expectation that a query with Let's keep in mind that the limit is already being applied, by Flink itself outside the source. The pushdown just allows the source itself to aid with that. I don't see a great reason why the datagen connector should need to support this, but I certainly don't see harm in it either. |
For me the question is what a user expects when its applying LIMIT. The fact we don't (yet) support it is less relevant in that context. Is there anything on LIMIT in the SQL standards?
The harm I see is that we have two options to achieve apparently the same result, which could also be conflicting with each other ("What output can I expect when I set |
A datagen table with a set number of rows is |
But
Why should it behave differently than for any other bounded source to which a limit is applied? |
...t/java/org/apache/flink/table/planner/runtime/stream/table/DataGeneratorConnectorITCase.java
Outdated
Show resolved
Hide resolved
15cb79a
to
74c203d
Compare
If that's the case, then I agree that my argument is not applicable here.
It shouldn't, but Datagen itself behaves differently than any other bounded source because of its randomness. |
@ZhangChaoming Please have a look first at the CI, because the build is currently not passing |
95205ff
to
ec8d224
Compare
@MartijnVisser I can not find any CI progress for this PR. |
@ZhangChaoming It always updates the one from the first comment, which you can find here #18151 (comment) |
fc9c6f6
to
849140b
Compare
@flinkbot run azure |
@MartijnVisser Thanks, check code style failed, I will fix it. |
@MartijnVisser Pipeline still failed, but it seems the error was not caused by my code. |
@ZhangChaoming You should probably rebase your branch on the latest master to resolve those issues |
@MartijnVisser I think my code is safe, since the error was caused by testing |
@ZhangChaoming Still, I wouldn't merge it unless CI has fully passed. Note: you'll need a review from probably @slinkydeveloper before this could be merged |
@flinkbot run azure |
@MartijnVisser The CI has passed, and could you please review this PR? |
@ZhangChaoming I hope that @Airblader or @slinkydeveloper can take care of the review |
...t/java/org/apache/flink/table/planner/runtime/stream/table/DataGeneratorConnectorITCase.java
Outdated
Show resolved
Hide resolved
...t/java/org/apache/flink/table/planner/runtime/stream/table/DataGeneratorConnectorITCase.java
Outdated
Show resolved
Hide resolved
849140b
to
2ce22ff
Compare
2ce22ff
to
15f9429
Compare
@flinkbot run azure |
@MartijnVisser Could you please help review this ? |
@slinkydeveloper Can you help with the review? |
@slinkydeveloper Thanks for the review and @ZhangChaoming thanks for the PR. I've merged it. |
What is the purpose of the change
Support limit push down in DATAGEN connector.
Brief change log
Verifying this change
Apply
LIMIT
in datagen table query, seeDataGeneratorConnectorITCase#testLimit
.Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation