-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
STORM-2296 Kafka spout no dup on leader changes, 1.x-branch #1888
STORM-2296 Kafka spout no dup on leader changes, 1.x-branch #1888
Conversation
waitForRefresh(); | ||
when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks, 9093))); | ||
List<List<PartitionManager>> partitionManagersAfterRefresh = getPartitionManagers(coordinatorList); | ||
assertEquals(partitionManagersAfterRefresh.size(), partitionManagersAfterRefresh.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It compares exactly same thing. Did you miss here, or it's redundant line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, there's the same error in another test that got copy-pasted here :)
Fixed in both places.
Iterator<List<PartitionManager>> iterator = partitionManagersAfterRefresh.iterator(); | ||
for (List<PartitionManager> partitionManagersBefore : partitionManagersBeforeRefresh) { | ||
List<PartitionManager> partitionManagersAfter = iterator.next(); | ||
for (PartitionManager before : partitionManagersBefore) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather create a new hash map for before or after which key is partition, and compare based on the map and other list. The depth of if
statement is already 4.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, simplified the test.
private void assertStateIsTheSame(PartitionManager managerBefore, PartitionManager managerAfter) { | ||
// check if state was actually moved from old PartitionManager | ||
assertNotSame(managerBefore, managerAfter); | ||
assertSame(managerBefore._waitingToEmit, managerAfter._waitingToEmit); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's check all the copying fields, not only _waitingToEmit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added checks for _emittedToOffset and _committedTo fields.
However _failedMsgRetryManager and _pending are private, so we either do not check them or increase the fields visibility to "package". Do you think it warrants visibility change ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No I think that's enough.
@ernisv |
+1 |
@ernisv I can handle it if you mind, but then you need to close this PR manually after merging, since Github auto-close doesn't work against non-master branch. |
2bdcbdd
to
b076399
Compare
Ok, I've squashed all the changes to single commit. |
Current behavior of Kafka spout emits duplicate tuples whenever Kafka topic leader's change.
In case of exception caused by leader changes, PartitionManagers are simply recreated losing the state about which tuples were already emitted and new PartitionManager re-emits them again.
This is fine as at-least-once is fulfilled, but still it would be better to not emit duplicate data if possible.
Moreover this could be easily avoided by moving the state related to emitted tuples from old PartitionManager to new one.