Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-13435; Group won't consume partitions added after static member restart #11559

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@ public AbstractCoordinator(GroupRebalanceConfig rebalanceConfig,
*/
protected abstract void onJoinPrepare(int generation, String memberId);

/**
* TODO
*/
protected void onFollowerJoined() { }

/**
* Perform assignment for the group. This is used by the leader to push state to all the members
* of the group (e.g. to push partition assignments in the case of the new consumer)
Expand Down Expand Up @@ -672,6 +677,8 @@ public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> fut
}

private RequestFuture<ByteBuffer> onJoinFollower() {
onFollowerJoined();

// send follower's sync group with an empty assignment
SyncGroupRequest.Builder requestBuilder =
new SyncGroupRequest.Builder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
// of offset commit requests, which may be invoked from the heartbeat thread
private final ConcurrentLinkedQueue<OffsetCommitCompletion> completedOffsetCommits;

private boolean isLeader = false;
private Set<String> joinedSubscription;
private MetadataSnapshot metadataSnapshot;
private MetadataSnapshot assignmentSnapshot;
Expand Down Expand Up @@ -217,6 +216,7 @@ public String protocolType() {
protected JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata() {
log.debug("Joining group with current subscription: {}", subscriptions.subscription());
this.joinedSubscription = subscriptions.subscription();

JoinGroupRequestData.JoinGroupRequestProtocolCollection protocolSet = new JoinGroupRequestData.JoinGroupRequestProtocolCollection();

List<String> topics = new ArrayList<>(joinedSubscription);
Expand Down Expand Up @@ -351,10 +351,6 @@ protected void onJoinComplete(int generation,
ByteBuffer assignmentBuffer) {
log.debug("Executing onJoinComplete with generation {} and memberId {}", generation, memberId);

// Only the leader is responsible for monitoring for metadata changes (i.e. partition changes)
if (!isLeader)
assignmentSnapshot = null;

ConsumerPartitionAssignor assignor = lookupAssignor(assignmentStrategy);
if (assignor == null)
throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
Expand Down Expand Up @@ -606,6 +602,11 @@ private void maybeUpdateGroupSubscription(String assignorName,
}
}

@Override
protected void onFollowerJoined() {
assignmentSnapshot = metadataSnapshot;
}

@Override
protected Map<String, ByteBuffer> performAssignment(String leaderId,
String assignmentStrategy,
Expand Down Expand Up @@ -633,8 +634,6 @@ protected Map<String, ByteBuffer> performAssignment(String leaderId,
// which ensures that all metadata changes will eventually be seen
updateGroupSubscription(allSubscribedTopics);

isLeader = true;

log.debug("Performing assignment using strategy {} with subscriptions {}", assignorName, subscriptions);

Map<String, Assignment> assignments = assignor.assign(metadata.fetch(), new GroupSubscription(subscriptions)).groupAssignment();
Expand Down Expand Up @@ -749,8 +748,8 @@ protected void onJoinPrepare(int generation, String memberId) {
}
}

isLeader = false;
subscriptions.resetGroupSubscription();
assignmentSnapshot = null;

if (exception != null) {
throw new KafkaException("User rebalance callback throws an error", exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1886,8 +1886,9 @@ private void testInFlightRequestsFailedAfterCoordinatorMarkedDead(Errors error)

@Test
public void testAutoCommitDynamicAssignment() {
try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)
) {
try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)) {
client.updateMetadata(metadataResponse);

subscriptions.subscribe(singleton(topic1), rebalanceListener);
joinAsFollowerAndReceiveAssignment(coordinator, singletonList(t1p));
subscriptions.seek(t1p, 100);
Expand All @@ -1901,6 +1902,8 @@ public void testAutoCommitDynamicAssignment() {
@Test
public void testAutoCommitRetryBackoff() {
try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)) {
client.updateMetadata(metadataResponse);

subscriptions.subscribe(singleton(topic1), rebalanceListener);
joinAsFollowerAndReceiveAssignment(coordinator, singletonList(t1p));

Expand Down Expand Up @@ -1934,6 +1937,8 @@ public void testAutoCommitRetryBackoff() {
@Test
public void testAutoCommitAwaitsInterval() {
try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)) {
client.updateMetadata(metadataResponse);

subscriptions.subscribe(singleton(topic1), rebalanceListener);
joinAsFollowerAndReceiveAssignment(coordinator, singletonList(t1p));

Expand Down Expand Up @@ -1971,6 +1976,8 @@ public void testAutoCommitAwaitsInterval() {
@Test
public void testAutoCommitDynamicAssignmentRebalance() {
try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)) {
client.updateMetadata(metadataResponse);

subscriptions.subscribe(singleton(topic1), rebalanceListener);
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
Expand All @@ -1995,6 +2002,8 @@ public void testAutoCommitDynamicAssignmentRebalance() {
@Test
public void testAutoCommitManualAssignment() {
try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)) {
client.updateMetadata(metadataResponse);

subscriptions.assignFromUser(singleton(t1p));
subscriptions.seek(t1p, 100);
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
Expand All @@ -2010,6 +2019,8 @@ public void testAutoCommitManualAssignment() {
@Test
public void testAutoCommitManualAssignmentCoordinatorUnknown() {
try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)) {
client.updateMetadata(metadataResponse);

subscriptions.assignFromUser(singleton(t1p));
subscriptions.seek(t1p, 100);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import scala.jdk.CollectionConverters._
import scala.collection.mutable.Buffer
import kafka.server.QuotaType
import kafka.server.KafkaServer
import org.apache.kafka.clients.admin.NewPartitions
import org.apache.kafka.clients.admin.NewTopic

import scala.collection.mutable

Expand Down Expand Up @@ -1791,4 +1793,33 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertTrue(records2.count() == 1 && records2.records(tp).asScala.head.offset == 1,
"Expected consumer2 to consume one message from offset 1, which is the committed offset of consumer1")
}
}

@Test
def testStaticConsumerDetectsNewPartitionCreatedAfterRestart(): Unit = {
val foo = "foo"
val foo0 = new TopicPartition(foo, 0)
val foo1 = new TopicPartition(foo, 1)

val admin = createAdminClient()
admin.createTopics(Seq(new NewTopic(foo, 1, 1.asInstanceOf[Short])).asJava).all.get

val consumerConfig = new Properties
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group-id")
consumerConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "my-instance-id")

val consumer1 = createConsumer(configOverrides = consumerConfig)
consumer1.subscribe(Seq(foo).asJava)
awaitAssignment(consumer1, Set(foo0))
consumer1.close()

val consumer2 = createConsumer(configOverrides = consumerConfig)
consumer2.subscribe(Seq(foo).asJava)
awaitAssignment(consumer2, Set(foo0))

admin.createPartitions(Map(foo -> NewPartitions.increaseTo(2)).asJava).all.get

awaitAssignment(consumer2, Set(foo0, foo1))

consumer2.close()
}
}