Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5154: Consumer fetches from revoked partitions when SyncGroup fails with disconnection [WIP] #3181

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -457,7 +457,6 @@ public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> fut
} else {
AbstractCoordinator.this.generation = new Generation(joinResponse.generationId(),
joinResponse.memberId(), joinResponse.groupProtocol());
AbstractCoordinator.this.rejoinNeeded = false;
if (joinResponse.isLeader()) {
onJoinLeader(joinResponse).chain(future);
} else {
Expand Down Expand Up @@ -537,6 +536,7 @@ public void handle(SyncGroupResponse syncResponse,
if (error == Errors.NONE) {
sensors.syncLatency.record(response.requestLatencyMs());
future.complete(syncResponse.memberAssignment());
AbstractCoordinator.this.rejoinNeeded = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a second thought... future.complete(syncResponse.memberAssignment()) above will trigger joinFuture.addListener's onSuccess, which will enable the heartbeat thread right away, and hence there is a (very small) race condition.

I think it is safer to just move the the above line inside onSuccess (line 395) to set it before enabling heart beat thread, and we would not need AbstractCoordinator.this prefix also.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dguy @hachikuji if it sounds good to you I can go ahead and make this change while merging.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

} else {
requestRejoin();

Expand Down Expand Up @@ -783,8 +783,9 @@ protected abstract class CoordinatorResponseHandler<R, T> extends RequestFutureA
@Override
public void onFailure(RuntimeException e, RequestFuture<T> future) {
// mark the coordinator as dead
if (e instanceof DisconnectException)
if (e instanceof DisconnectException) {
coordinatorDead();
}
future.raise(e);
}

Expand Down
Expand Up @@ -46,6 +46,7 @@
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.FetchResponse.PartitionData;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
Expand Down Expand Up @@ -98,6 +99,7 @@
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -1256,6 +1258,77 @@ public void testMetricConfigRecordingLevel() {
}
}

@Test
public void shouldAttemptToRejoinGroupAfterSyncGroupFailed() throws Exception {
int rebalanceTimeoutMs = 60000;
int sessionTimeoutMs = 30000;
int heartbeatIntervalMs = 500;

Time time = new MockTime();
Cluster cluster = TestUtils.singletonCluster(topic, 1);
Node node = cluster.nodes().get(0);

Metadata metadata = new Metadata(0, Long.MAX_VALUE, false);
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());

MockClient client = new MockClient(time, metadata);
client.setNode(node);
PartitionAssignor assignor = new RoundRobinAssignor();

final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, false, 1000);

consumer.subscribe(Collections.singleton(topic), getConsumerRebalanceListener(consumer));
client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());


client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE), coordinator);
client.prepareResponseFrom(syncGroupResponse(Collections.singletonList(tp0), Errors.NONE), coordinator);

client.prepareResponseFrom(fetchResponse(tp0, 0, 1), node);
client.prepareResponseFrom(fetchResponse(tp0, 1, 0), node);

consumer.poll(0);

// heartbeat fails due to rebalance in progress
client.prepareResponseFrom(new MockClient.RequestMatcher() {
@Override
public boolean matches(AbstractRequest body) {
return true;
}
}, new HeartbeatResponse(Errors.REBALANCE_IN_PROGRESS), coordinator);

// join group
final ByteBuffer byteBuffer = ConsumerProtocol.serializeSubscription(new PartitionAssignor.Subscription(Collections.singletonList(topic)));

// This member becomes the leader
final JoinGroupResponse leaderResponse = new JoinGroupResponse(Errors.NONE, 1, assignor.name(), "memberId", "memberId",
Collections.<String, ByteBuffer>singletonMap("memberId", byteBuffer));
client.prepareResponseFrom(leaderResponse, coordinator);

// sync group fails due to disconnect
client.prepareResponseFrom(syncGroupResponse(Collections.singletonList(tp0), Errors.NONE), coordinator, true);

// should try and find the new coordinator
client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);

// rejoin group
client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE), coordinator);
client.prepareResponseFrom(syncGroupResponse(Collections.singletonList(tp0), Errors.NONE), coordinator);

client.prepareResponseFrom(new MockClient.RequestMatcher() {
@Override
public boolean matches(final AbstractRequest body) {
return body instanceof FetchRequest && ((FetchRequest) body).fetchData().containsKey(tp0);
}
}, fetchResponse(tp0, 1, 1), node);
time.sleep(heartbeatIntervalMs);
Thread.sleep(heartbeatIntervalMs);
final ConsumerRecords<String, String> records = consumer.poll(0);
assertFalse(records.isEmpty());
}

private void consumerCloseTest(final long closeTimeoutMs,
List<? extends AbstractResponse> responses,
long waitMs,
Expand Down Expand Up @@ -1360,7 +1433,6 @@ private ConsumerRebalanceListener getConsumerRebalanceListener(final KafkaConsum
return new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

}

@Override
Expand Down