Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-8179: Part 7, cooperative rebalancing in Streams #7386

Merged
merged 18 commits into from
Oct 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@
<suppress checks="MethodLength"
files="RocksDBWindowStoreTest.java"/>

<suppress checks="MemberName"
files="StreamsPartitionAssignorTest.java"/>

<suppress checks="ClassDataAbstractionCoupling"
files=".*[/\\]streams[/\\].*test[/\\].*.java"/>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,26 +355,29 @@ protected void onJoinComplete(int generation,
Set<TopicPartition> addedPartitions = new HashSet<>(assignedPartitions);
addedPartitions.removeAll(ownedPartitions);

// Invoke user's revocation callback before changing assignment or updating state
if (protocol == RebalanceProtocol.COOPERATIVE) {
Set<TopicPartition> revokedPartitions = new HashSet<>(ownedPartitions);
revokedPartitions.removeAll(assignedPartitions);

log.info("Updating with newly assigned partitions: {}, compare with already owned partitions: {}, " +
"newly added partitions: {}, revoking partitions: {}",
log.info("Updating assignment with\n" +
"now assigned partitions: {}\n" +
"compare with previously owned partitions: {}\n" +
"newly added partitions: {}\n" +
"revoked partitions: {}\n",
Utils.join(assignedPartitions, ", "),
Utils.join(ownedPartitions, ", "),
Utils.join(addedPartitions, ", "),
Utils.join(revokedPartitions, ", "));

Utils.join(revokedPartitions, ", ")
);

if (!revokedPartitions.isEmpty()) {
// revoke partitions that was previously owned but no longer assigned;
// note that we should only change the assignment AFTER we've triggered
// the revoke callback
// revoke partitions that were previously owned but no longer assigned;
// note that we should only change the assignment (or update the assignor's state)
// AFTER we've triggered the revoke callback
firstException.compareAndSet(null, invokePartitionsRevoked(revokedPartitions));

// if revoked any partitions, need to re-join the group afterwards
log.debug("Need to revoke partitions {} and re-join the group", revokedPartitions);
requestRejoin();
}
}
Expand Down Expand Up @@ -679,7 +682,6 @@ protected void onJoinPrepare(int generation, String memberId) {
}
}


isLeader = false;
subscriptions.resetGroupSubscription();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,14 @@ public synchronized void assignFromSubscribed(Collection<TopicPartition> assignm
if (!this.partitionsAutoAssigned())
throw new IllegalArgumentException("Attempt to dynamically assign partitions while manual assignment in use");

Map<TopicPartition, TopicPartitionState> assignedPartitionStates = new HashMap<>(assignments.size());
for (TopicPartition tp : assignments) {
TopicPartitionState state = this.assignment.stateValue(tp);
if (state == null)
state = new TopicPartitionState();
assignedPartitionStates.put(tp, state);
}

Map<TopicPartition, TopicPartitionState> assignedPartitionStates = partitionToStateMap(assignments);
assignmentId++;
this.assignment.set(assignedPartitionStates);
}
Expand Down Expand Up @@ -669,13 +675,6 @@ public synchronized ConsumerRebalanceListener rebalanceListener() {
return rebalanceListener;
}

private static Map<TopicPartition, TopicPartitionState> partitionToStateMap(Collection<TopicPartition> assignments) {
Map<TopicPartition, TopicPartitionState> map = new HashMap<>(assignments.size());
for (TopicPartition tp : assignments)
map.put(tp, new TopicPartitionState());
return map;
}

private static class TopicPartitionState {

private FetchState fetchState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,7 @@ public void testUnauthorizedTopic() {
}

@Test
public void testFetchDuringRebalance() {
public void testFetchDuringEagerRebalance() {
buildFetcher();

subscriptions.subscribe(singleton(topicName), listener);
Expand All @@ -859,7 +859,9 @@ public void testFetchDuringRebalance() {

assertEquals(1, fetcher.sendFetches());

// Now the rebalance happens and fetch positions are cleared
// Now the eager rebalance happens and fetch positions are cleared
subscriptions.assignFromSubscribed(Collections.emptyList());

subscriptions.assignFromSubscribed(singleton(tp0));
client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0));
consumerClient.poll(time.timer(0));
Expand All @@ -868,6 +870,31 @@ public void testFetchDuringRebalance() {
assertTrue(fetcher.fetchedRecords().isEmpty());
}

@Test
public void testFetchDuringCooperativeRebalance() {
buildFetcher();

subscriptions.subscribe(singleton(topicName), listener);
subscriptions.assignFromSubscribed(singleton(tp0));
subscriptions.seek(tp0, 0);

client.updateMetadata(initialUpdateResponse);

assertEquals(1, fetcher.sendFetches());

// Now the cooperative rebalance happens and fetch positions are NOT cleared for unrevoked partitions
subscriptions.assignFromSubscribed(singleton(tp0));

client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0));
consumerClient.poll(time.timer(0));

Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetchedRecords();

// The active fetch should NOT be ignored since the position for tp0 is still valid
assertEquals(1, fetchedRecords.size());
assertEquals(3, fetchedRecords.get(tp0).size());
}

@Test
public void testInFlightFetchOnPausedPartition() {
buildFetcher();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,15 @@ boolean allTasksRunning() {
@Override
void closeTask(final StreamTask task, final boolean clean) {
if (suspended.containsKey(task.id())) {
task.closeSuspended(clean, false, null);
task.closeSuspended(clean, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice cleanup! In #5501 we moved the close / abort logic into the caller suspend function, and we should actually remove the parameter in that PR but overlooked it.

} else {
task.close(clean, false);
}
}

boolean hasRestoringTasks() {
return !restoring.isEmpty();
}

Set<TaskId> suspendedTaskIds() {
return suspended.keySet();
Expand Down Expand Up @@ -107,7 +111,7 @@ RuntimeException suspendOrCloseTasks(final Set<TaskId> revokedTasks,
} else if (restoring.containsKey(task)) {
revokedRestoringTasks.add(task);
} else if (!suspended.containsKey(task)) {
log.warn("Task {} was revoked but cannot be found in the assignment", task);
log.warn("Task {} was revoked but cannot be found in the assignment, may have been closed due to error", task);
}
}

Expand All @@ -131,7 +135,7 @@ private RuntimeException suspendRunningTasks(final Set<TaskId> runningTasksToSus
task.suspend();
suspended.put(id, task);
} catch (final TaskMigratedException closeAsZombieAndSwallow) {
// as we suspend a task, we are either shutting down or rebalancing, thus, we swallow and move on
// swallow and move on since we are rebalancing
log.info("Failed to suspend {} {} since it got migrated to another thread already. " +
"Closing it as zombie and move on.", taskTypeName, id);
firstException.compareAndSet(null, closeZombieTask(task));
Expand Down Expand Up @@ -248,7 +252,7 @@ private RuntimeException closeSuspended(final boolean isZombie,

try {
final boolean clean = !isZombie;
task.closeSuspended(clean, isZombie, null);
task.closeSuspended(clean, null);
} catch (final RuntimeException e) {
log.error("Failed to close suspended {} {} due to the following error:", taskTypeName, task.id(), e);
return e;
Expand All @@ -264,7 +268,6 @@ RuntimeException closeNotAssignedSuspendedTasks(final Set<TaskId> revokedTasks)
for (final TaskId revokedTask : revokedTasks) {
final StreamTask suspendedTask = suspended.get(revokedTask);

// task may not be in the suspended tasks if it was closed due to some error
if (suspendedTask != null) {
firstException.compareAndSet(null, closeSuspended(false, suspendedTask));
} else {
Expand Down Expand Up @@ -335,7 +338,7 @@ boolean maybeResumeSuspendedTask(final TaskId taskId,
return true;
} else {
log.warn("Couldn't resume task {} assigned partitions {}, task partitions {}", taskId, partitions, task.partitions());
task.closeSuspended(true, false, null);
task.closeSuspended(true, null);
}
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,18 +119,6 @@ public void commit() {
commitNeeded = false;
}

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we remove the standby task close?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We remove the suspend and closeSuspended because we no longer suspend standby tasks at all as of the PR 7321 but forgot to clean up all the methods

* <pre>
* - flush store
* - checkpoint store
* </pre>
*/
@Override
public void suspend() {
log.debug("Suspending");
flushAndCheckpointState();
}

private void flushAndCheckpointState() {
stateMgr.flush();
stateMgr.checkpoint(Collections.emptyMap());
Expand Down Expand Up @@ -162,13 +150,6 @@ public void close(final boolean clean,
taskClosed = true;
}

@Override
public void closeSuspended(final boolean clean,
final boolean isZombie,
final RuntimeException e) {
close(clean, isZombie);
}

/**
* Updates a state store using records from one change log partition
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ public Collection<TopicPartition> restore(final RestoringTasks active) {
initialize(active);
}

if (needsRestoring.isEmpty() || restoreConsumer.assignment().isEmpty()) {
restoreConsumer.unsubscribe();
if (checkForCompletedRestoration()) {
return completedRestorers;
}

Expand Down Expand Up @@ -116,9 +115,7 @@ public Collection<TopicPartition> restore(final RestoringTasks active) {

needsRestoring.removeAll(completedRestorers);

if (needsRestoring.isEmpty()) {
restoreConsumer.unsubscribe();
}
checkForCompletedRestoration();

return completedRestorers;
}
Expand Down Expand Up @@ -337,7 +334,14 @@ private long processNext(final List<ConsumerRecord<byte[], byte[]>> records,
return nextPosition;
}


private boolean checkForCompletedRestoration() {
if (needsRestoring.isEmpty()) {
log.info("Finished restoring all active tasks");
restoreConsumer.unsubscribe();
return true;
}
return false;
}

private boolean hasPartition(final TopicPartition topicPartition) {
final List<PartitionInfo> partitions = partitionInfo.get(topicPartition.topic());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,6 @@ private void initTopology() {
* @throws TaskMigratedException if committing offsets failed (non-EOS)
* or if the task producer got fenced (EOS)
*/
@Override
public void suspend() {
log.debug("Suspending");
suspend(true, false);
Expand Down Expand Up @@ -674,10 +673,7 @@ private void closeTopology() {
}

// helper to avoid calling suspend() twice if a suspended task is not reassigned and closed
@Override
public void closeSuspended(final boolean clean,
final boolean isZombie,
RuntimeException firstException) {
void closeSuspended(final boolean clean, RuntimeException firstException) {
try {
closeStateManager(clean);
} catch (final RuntimeException e) {
Expand Down Expand Up @@ -729,7 +725,7 @@ public void close(boolean clean,
log.error("Could not close task due to the following error:", e);
}

closeSuspended(clean, isZombie, firstException);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice cleanup!

closeSuspended(clean, firstException);

taskClosed = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,13 @@ public class StreamThread extends Thread {
*/
public enum State implements ThreadStateTransitionValidator {

CREATED(1, 5), // 0
STARTING(2, 3, 5), // 1
PARTITIONS_REVOKED(3, 5), // 2
PARTITIONS_ASSIGNED(2, 3, 4, 5), // 3
RUNNING(2, 3, 5), // 4
PENDING_SHUTDOWN(6), // 5
DEAD; // 6
CREATED(1, 5), // 0
STARTING(2, 3, 5), // 1
PARTITIONS_REVOKED(2, 3, 5), // 2
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allow PARTITIONS_REVOKED to transition to itself (but callback is a no-op if no new partitions have been revoked)

PARTITIONS_ASSIGNED(2, 3, 4, 5), // 3
RUNNING(2, 3, 5), // 4
PENDING_SHUTDOWN(6), // 5
DEAD; // 6

private final Set<Integer> validTransitions = new HashSet<>();

Expand Down Expand Up @@ -744,9 +744,9 @@ void runOnce() {
// to unblock the restoration as soon as possible
records = pollRequests(Duration.ZERO);
} else if (state == State.PARTITIONS_REVOKED) {
// try to fetch some records with normal poll time
// in order to wait long enough to get the join response
records = pollRequests(pollTime);
// try to fetch som records with zero poll millis to unblock
Copy link
Contributor Author

@ableegoldman ableegoldman Oct 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can potentially be in PARTITIONS_REVOKED with cooperative rebalancing so we don't want to just block doing nothing during the rebalance -- not sure if it's worth polling for zero since this is rare with cooperative, or some other time < pollTime? cc/ @guozhangwang

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In EAGER, during the PARTITIONS_REVOKED state we would not return any data from consumer anyways; In future COOPERATIVE, even if we can return some data during the rebalance, the transition of PARTITIONS_REVOKED -> PARTITIONS_ASSIGNED would happen in a single consumer.poll call, and only very rarely we would stay in PARTITION_REVOKED after consumer.poll if the subscription changed. So I think poll with zero sounds good to me.

Also note that in my PR for returning data in the middle of a rebalance, we still pass in non-zero timeout for finding the coordinator so that we are ensured to have one round-trip at least within that call.

// other useful work while waiting for the join response
records = pollRequests(Duration.ZERO);
} else if (state == State.RUNNING || state == State.STARTING) {
// try to fetch some records with normal poll time
// in order to get long polling
Expand Down Expand Up @@ -980,7 +980,12 @@ boolean maybeCommit() {
}
}

lastCommitMs = now;
if (committed == -1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a meta comment: maybe another (equally hacky?) way to do this, is to expose the MemberState from ConsumerMetadata (we are exposing this in KIP-447). And then in Streams, we can check that state after each poll call -- remember that state can only change within the poll call, and depending on that we can decide whether or not commit.

As for now I think this way is fine, cannot really think of a better way that does not change public APIs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this trace for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it might be helpful to see that we are not committing some of what we've processed, because it is we can't commit during a rebalance. But I don't think it's absolutely necessary and can take it out if you don't think it adds much?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's okay to leave it as TRACE, as practically we do not turn on TRACE that frequently.

log.trace("Unable to commit as we are in the middle of a rebalance, will try again when it completes.");
} else {
lastCommitMs = now;
}

processStandbyRecords = true;
} else {
committed = taskManager.maybeCommitActiveTasksPerUserRequested();
Expand Down
Loading