Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-9645: Fallback to unsubscribe during Task Migrated #8220

Merged
merged 3 commits into from Mar 7, 2020

Conversation

abbccdda
Copy link
Contributor

@abbccdda abbccdda commented Mar 4, 2020

After #7312, we could still return data during the rebalance phase, which means it could be possible to find records without corresponding tasks. We have to fallback to the unsubscribe mode during task migrated as the assignment should be cleared out to keep sync with task manager state.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@guozhangwang
Copy link
Contributor

The issue is that when handling TaskMigrated, we call handleLostAll which closes all tasks, and then call enforceRebalance which sets the flag. However inside consumer the assignment are not cleared yet and would still contain the assigned partitions. That why in the next poll call, we could still return the data for those still-assigned partitions --- inside consumers there's a filtering that we would not return data that are from no longer assigned partitions --- and then hit this issue.

What I'm thinking now is, should we reset the assignment inside consumer when enforceRebalance, which would cause the subscription's owned partitions to be empty. If it is due to task-migrated, then it is fine since we know we have likely lost those partitions anyways; if it is only due to version probing then we still want to encode those owned partitions. Thoughts? cc @ableegoldman

@abbccdda
Copy link
Contributor Author

abbccdda commented Mar 5, 2020

Thanks Guozhang for the summary, I think there is also an expectation around the enforceRebalance API, which is whether we should set the AbstractCoordinator state to REBALANCING immediately as if we don't see a coordinator, we shall retry discovering but during this period, the state of consumer is not correct. This could be an orthogonal fix to the consumer in the meantime if you think this makes sense @ableegoldman

@abbccdda abbccdda changed the title KAFKA-9645: Remove Illegal State Check for Records Addition KAFKA-9645: Fallback to unsubscribe during Task Migrated Mar 6, 2020
@abbccdda
Copy link
Contributor Author

abbccdda commented Mar 6, 2020

Double checked with unit test that under old code path, we shall hit the reported exception:

java.lang.NullPointerException: Task was unexpectedly missing for partition topic1-1
	at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:986)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:751)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:725)
	at org.junit.Assert.assertThrows(Assert.java:1001)
	at org.junit.Assert.assertThrows(Assert.java:981)

Copy link
Contributor

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Boyang, this LGTM

@guozhangwang
Copy link
Contributor

Test this please

@guozhangwang
Copy link
Contributor

test this please

2 similar comments
@guozhangwang
Copy link
Contributor

test this please

@guozhangwang
Copy link
Contributor

test this please

@guozhangwang
Copy link
Contributor

test this please

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! After green build I will merge to trunk.

@@ -356,21 +358,10 @@ public synchronized void seekToEnd(Collection<TopicPartition> partitions) {
subscriptions.requestOffsetReset(partitions, OffsetResetStrategy.LATEST);
}

// needed for cases where you make a second call to endOffsets
public synchronized void addEndOffsets(final Map<TopicPartition, Long> newOffsets) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice.


EasyMock.verify(taskManager);

// The Mock consumer shall throw as the assignment has been wiped out, but records are assigned.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm :) it reminds me that the mock consumer's behavior is not exactly the same as the actual consumer (the later would filter, the former would throw), but perhaps this worth a different PR to cleanup. @abbccdda could you file a JIRA for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang
Copy link
Contributor

test this please

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
4 participants