Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: KafkaSpout is blocked in AutoCommitMode #1863

Closed
wants to merge 2 commits into from
Closed

fix: KafkaSpout is blocked in AutoCommitMode #1863

wants to merge 2 commits into from

Conversation

mingmxu
Copy link

@mingmxu mingmxu commented Jan 5, 2017

What's the issue?
When Storm topology is run on 'At-Most-Once' mode, with ack executor=0, KafkaSpout cannot emit records after some time.

What's the cause?
Function poll() controls whether it need to pull more data from Kafka cluster. With the condition !waitingToEmit() && numUncommittedOffsets < maxUncommittedOffsets, it's always FALSE after reaching threshold, as numUncommittedOffsets is creasing and never reset on 'At-Most-Once' mode.

What's the solution?
I change the condition to !waitingToEmit() && (numUncommittedOffsets < maxUncommittedOffsets || consumerAutoCommitMode), that return TRUE with AutoCommitMode regarding of UnCommittedOffset, see the comments inline.
Also, it's not required to track emitted(msgId) and numUncommittedOffsets with AutoCommitMode .

What's the issue?
When Storm topology is run on 'At-Most-Once' mode, with ack executor=0, KafkaSpout cannot emit records after some time.

What's the cause?
Function poll() controls whether it need to pull more data from Kafka cluster. With the condition `!waitingToEmit() && numUncommittedOffsets < maxUncommittedOffsets`, it's always `FALSE` after reaching threshold, as `numUncommittedOffsets` is creasing and never reset on 'At-Most-Once' mode.  

What's the solution?
I change the condition to `!waitingToEmit() && (numUncommittedOffsets < maxUncommittedOffsets || consumerAutoCommitMode)`, that return `TRUE` with AutoCommitMode regarding of UnCommittedOffset, see the comments inline.
Also, it's not required to track emitted(msgId) and numUncommittedOffsets with AutoCommitMode .
@srdo
Copy link
Contributor

srdo commented Jan 12, 2017

Nice find :) You should go to https://issues.apache.org/jira/ and create an issue for this. Then rename this PR to mention the jira issue number (see the other open PRs for an example). The commit message should probably also mention the issue number.

The fix itself looks fine to me.

@HeartSaVioR
Copy link
Contributor

@xumingmin Could you rebase this? STORM-2225 touches many places on storm-kafka-client so you might need to check it's still valid.

@mingmxu
Copy link
Author

mingmxu commented Feb 3, 2017

@srdo I create STORM-2340,

@HeartSaVioR , let me check with the latest code, and open a new pull request

@mingmxu
Copy link
Author

mingmxu commented Feb 3, 2017

@HeartSaVioR The issue is still there, so I create a new pull request #1919 after rebase,

@mingmxu
Copy link
Author

mingmxu commented Feb 6, 2017

close this one, use #1919 instead.

@mingmxu mingmxu closed this Feb 6, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants