New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-13231] [pubsub] Replace Max outstanding acknowledgement ids li… #9387
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit ae64263 (Fri Aug 23 10:21:17 UTC 2019) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering what is the reason of having a byte rate limit instead of message rate limit?
…mit with a FlinkConnectorRateLimiter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Xeli Sorry for the late reply. I did not notice that you have updated the patch. I think there is a misunderstanding on the "global rate limit" here. We might want to fix that.
@@ -92,6 +93,10 @@ public void open(Configuration configuration) throws Exception { | |||
|
|||
getRuntimeContext().getMetricGroup().gauge("PubSubMessagesProcessedNotAcked", this::getOutstandingMessagesToAck); | |||
|
|||
//convert per-subtask-limit to global rate limit, as FlinkConnectorRateLimiter::setRate expects a global rate limit. | |||
rateLimiter.setRate(messagePerSecondRateLimit * getRuntimeContext().getNumberOfParallelSubtasks()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if this is the correct logic. I think the rate limiter mentioned it is a "global" rate limit meaning the configuration itself is applied globally. But the effect of rate limit is still per subtask. I don't think there is any communication in the rate limiter among different subtasks to ensure it is a rate limit shared by all the subtasks.
With the current code, if I have 2 subtasks. And I want to have each subtask to consume 100 records/second. I have to set the config to 50, this gives me 100 messages per subtask and 200 messages/s for the entire job. That seems very confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Talked to @Xeli offline. Actually the rate limit is a global rate as the value was divided by the parallelism in the rate limiter implementation. So the current patch does set the per task rate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Xeli Thanks for the patch. LGTM. I fixed two minor things when merging the PR. Hope that is OK.
|
||
//verify handling messages | ||
verify(rateLimiter, times(1)).acquire(receivedMessages.get(0).getSerializedSize() + receivedMessages.get(1).getSerializedSize()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test was left from the previous commit while the rate limit was still on bytes. I fixed it when merging the code.
*/ | ||
public PubSubSourceBuilder<OUT> withMaxMessageToAcknowledge(int maxMessageToAcknowledge) { | ||
this.maxMessageToAcknowledge = maxMessageToAcknowledge; | ||
public PubSubSourceBuilder<OUT> withRateLimit(int messagePerSecondRateLimit) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed this method to withMessageRateLimit
in case we will have bytes rate limit in the future.
What is the purpose of the change
Follow up on PubSub connector PR. This replaces the max acknowledgement id limit by a FlinkConnectorRateLimiter
Brief change log
Verifying this change
I've made sure this change is covered by existing unit tests.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: Yes, unless we can add this to 1.9. The pubsub connector has not yet been released.Documentation