diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java index 3437624c499c0..5a81c8dea136d 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java @@ -24,9 +24,13 @@ import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.DescribeShareGroupsOptions; +import org.apache.kafka.clients.admin.ListShareGroupOffsetsOptions; +import org.apache.kafka.clients.admin.ListShareGroupOffsetsResult; +import org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.RecordsToDelete; import org.apache.kafka.clients.admin.ShareMemberDescription; +import org.apache.kafka.clients.admin.SharePartitionOffsetInfo; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; @@ -106,18 +110,20 @@ import java.util.stream.Stream; import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; +import static org.apache.kafka.test.TestUtils.DEFAULT_POLL_INTERVAL_MS; import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -@SuppressWarnings("ClassFanOutComplexity") +@SuppressWarnings({"ClassFanOutComplexity", "ClassDataAbstractionCoupling"}) @Timeout(1200) @Tag("integration") @ClusterTestDefaults( @@ -2163,11 +2169,7 @@ public void testShareConsumerAfterCoordinatorMovement() throws Exception { int shareGroupStateTp = Utils.abs(key.asCoordinatorKey().hashCode()) % 3; List curShareCoordNodeId = null; try { - curShareCoordNodeId = admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).allTopicNames().get().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME) - .partitions().stream() - .filter(info -> info.partition() == shareGroupStateTp) - .map(info -> info.leader().id()) - .toList(); + curShareCoordNodeId = topicPartitionLeader(admin, Topic.SHARE_GROUP_STATE_TOPIC_NAME, shareGroupStateTp); } catch (Exception e) { fail(e); } @@ -2182,11 +2184,7 @@ public void testShareConsumerAfterCoordinatorMovement() throws Exception { List newShareCoordNodeId = null; try { - newShareCoordNodeId = admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).allTopicNames().get().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME) - .partitions().stream() - .filter(info -> info.partition() == shareGroupStateTp) - .map(info -> info.leader().id()) - .toList(); + newShareCoordNodeId = topicPartitionLeader(admin, Topic.SHARE_GROUP_STATE_TOPIC_NAME, shareGroupStateTp); } catch (Exception e) { fail(e); } @@ -3159,6 +3157,285 @@ private void verifyYammerMetricCount(String filterString, int count) { assertEquals(count, ((Meter) renewAck).count()); } + @ClusterTest + public void testDescribeShareGroupOffsetsForEmptySharePartition() { + String groupId = "group1"; + try (ShareConsumer shareConsumer = createShareConsumer(groupId); + Admin adminClient = createAdminClient()) { + shareConsumer.subscribe(List.of(tp.topic())); + // Polling share consumer to make sure the share partition in created. + shareConsumer.poll(Duration.ofMillis(2000)); + SharePartitionOffsetInfo sharePartitionOffsetInfo = sharePartitionOffsetInfo(adminClient, groupId, tp); + // Since the partition is empty, and no records have been consumed, the share partition startOffset will be + // -1. Thus, there will be no description for the share partition. + assertNull(sharePartitionOffsetInfo); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + @ClusterTest + public void testSharePartitionLagForSingleShareConsumer() { + String groupId = "group1"; + alterShareAutoOffsetReset(groupId, "earliest"); + try (Producer producer = createProducer(); + ShareConsumer shareConsumer = createShareConsumer(groupId); + Admin adminClient = createAdminClient()) { + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "Message".getBytes()); + producer.send(record); + producer.flush(); + shareConsumer.subscribe(List.of(tp.topic())); + // Polling share consumer to make sure the share partition in created and teh record is consumed. + waitedPoll(shareConsumer, 2500L, 1); + // Acknowledge and commit the consumed record to update the share partition state. + shareConsumer.commitSync(); + // After the acknowledgement is successful, the share partition lag should be 0 because the only produced record has been consumed. + verifySharePartitionLag(adminClient, groupId, tp, 0L); + // Producing another record to the share partition. + producer.send(record); + producer.flush(); + // Since the new record has not been consumed yet, the share partition lag should be 1. + verifySharePartitionLag(adminClient, groupId, tp, 1L); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @ClusterTest + public void testSharePartitionLagForMultipleShareConsumers() { + String groupId = "group1"; + alterShareAutoOffsetReset(groupId, "earliest"); + try (Producer producer = createProducer(); + ShareConsumer shareConsumer1 = createShareConsumer(groupId); + ShareConsumer shareConsumer2 = createShareConsumer(groupId); + Admin adminClient = createAdminClient()) { + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "Message".getBytes()); + producer.send(record); + producer.flush(); + producer.flush(); + shareConsumer1.subscribe(List.of(tp.topic())); + shareConsumer2.subscribe(List.of(tp.topic())); + // Polling share consumer 1 to make sure the share partition in created and the records are consumed. + waitedPoll(shareConsumer1, 2500L, 1); + // Acknowledge and commit the consumed records to update the share partition state. + shareConsumer1.commitSync(); + // After the acknowledgement is successful, the share partition lag should be 0 because the all produced records have been consumed. + verifySharePartitionLag(adminClient, groupId, tp, 0L); + // Producing more records to the share partition. + producer.send(record); + // Polling share consumer 2 this time. + waitedPoll(shareConsumer2, 2500L, 1); + // Since the consumed record hasn't been acknowledged yet, the share partition lag should be 1. + verifySharePartitionLag(adminClient, groupId, tp, 1L); + // Acknowledge and commit the consumed records to update the share partition state. + shareConsumer2.commitSync(); + // After the acknowledgement is successful, the share partition lag should be 0 because the all produced records have been consumed. + verifySharePartitionLag(adminClient, groupId, tp, 0L); + // Producing another record to the share partition. + producer.send(record); + producer.flush(); + // Since the new record has not been consumed yet, the share partition lag should be 1. + verifySharePartitionLag(adminClient, groupId, tp, 1L); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @ClusterTest + public void testSharePartitionLagWithReleaseAcknowledgement() { + String groupId = "group1"; + alterShareAutoOffsetReset(groupId, "earliest"); + try (Producer producer = createProducer(); + ShareConsumer shareConsumer = createShareConsumer(groupId, Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT)); + Admin adminClient = createAdminClient()) { + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "Message".getBytes()); + producer.send(record); + producer.flush(); + shareConsumer.subscribe(List.of(tp.topic())); + // Polling share consumer to make sure the share partition is created and the record is consumed. + ConsumerRecords records = waitedPoll(shareConsumer, 2500L, 1); + // Accept the record first to move the offset forward and register the state with persister. + records.forEach(r -> shareConsumer.acknowledge(r, AcknowledgeType.ACCEPT)); + shareConsumer.commitSync(); + // After accepting, the lag should be 0 because the record is consumed successfully. + verifySharePartitionLag(adminClient, groupId, tp, 0L); + // Producing another record to the share partition. + producer.send(record); + producer.flush(); + // The produced record is consumed. + records = waitedPoll(shareConsumer, 2500L, 1); + // Now release the record - it should be available for redelivery. + records.forEach(r -> shareConsumer.acknowledge(r, AcknowledgeType.RELEASE)); + shareConsumer.commitSync(); + // After releasing the lag should be 1, because the record is released for redelivery. + verifySharePartitionLag(adminClient, groupId, tp, 1L); + // The record is now consumed again. + records = waitedPoll(shareConsumer, 2500L, 1); + // Accept the record to mark it as consumed. + records.forEach(r -> shareConsumer.acknowledge(r, AcknowledgeType.ACCEPT)); + shareConsumer.commitSync(); + // After accepting the record, the lag should be 0 because all the produced records have been consumed. + verifySharePartitionLag(adminClient, groupId, tp, 0L); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @ClusterTest + public void testSharePartitionLagWithRejectAcknowledgement() { + String groupId = "group1"; + alterShareAutoOffsetReset(groupId, "earliest"); + try (Producer producer = createProducer(); + ShareConsumer shareConsumer = createShareConsumer(groupId, Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT)); + Admin adminClient = createAdminClient()) { + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "Message".getBytes()); + producer.send(record); + producer.flush(); + shareConsumer.subscribe(List.of(tp.topic())); + // Polling share consumer to make sure the share partition is created and the record is consumed. + ConsumerRecords records = waitedPoll(shareConsumer, 2500L, 1); + // Accept the record first to move the offset forward and register the state with persister. + records.forEach(r -> shareConsumer.acknowledge(r, AcknowledgeType.ACCEPT)); + shareConsumer.commitSync(); + // After accepting, the lag should be 0 because the record is consumed successfully. + verifySharePartitionLag(adminClient, groupId, tp, 0L); + // Producing another record to the share partition. + producer.send(record); + producer.flush(); + // The produced record is consumed. + records = waitedPoll(shareConsumer, 2500L, 1); + // Now reject the record - it should not be available for redelivery. + records.forEach(r -> shareConsumer.acknowledge(r, AcknowledgeType.REJECT)); + shareConsumer.commitSync(); + // After rejecting the lag should be 0, because the record is permanently rejected and offset moves forward. + verifySharePartitionLag(adminClient, groupId, tp, 0L); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @ClusterTest( + brokers = 3, + serverProperties = { + @ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + @ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "3"), + @ClusterConfigProperty(key = "share.coordinator.state.topic.num.partitions", value = "1"), + @ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "3") + } + ) + public void testSharePartitionLagOnGroupCoordinatorMovement() { + String groupId = "group1"; + alterShareAutoOffsetReset(groupId, "earliest"); + try (Producer producer = createProducer(); + ShareConsumer shareConsumer = createShareConsumer(groupId); + Admin adminClient = createAdminClient()) { + String topicName = "testTopicWithReplicas"; + // Create a topic with replication factor 3 + createTopic(topicName, 1, 3); + TopicPartition tp = new TopicPartition(topicName, 0); + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "Message".getBytes()); + // Produce first record and consume it + producer.send(record); + producer.flush(); + shareConsumer.subscribe(List.of(tp.topic())); + // Polling share consumer to make sure the share partition is created and the record is consumed. + waitedPoll(shareConsumer, 2500L, 1); + // Acknowledge and commit the consumed record to update the share partition state. + shareConsumer.commitSync(); + // After the acknowledgement is successful, the share partition lag should be 0 because the only produced record has been consumed. + verifySharePartitionLag(adminClient, groupId, tp, 0L); + // Producing another record to the share partition. + producer.send(record); + producer.flush(); + // Since the new record has not been consumed yet, the share partition lag should be 1. + verifySharePartitionLag(adminClient, groupId, tp, 1L); + List curGroupCoordNodeId; + // Find the broker which is the group coordinator for the share group. + curGroupCoordNodeId = topicPartitionLeader(adminClient, Topic.GROUP_METADATA_TOPIC_NAME, 0); + assertEquals(1, curGroupCoordNodeId.size()); + // Shut down the coordinator broker + KafkaBroker broker = cluster.brokers().get(curGroupCoordNodeId.get(0)); + cluster.shutdownBroker(curGroupCoordNodeId.get(0)); + // Wait for it to be completely shutdown + broker.awaitShutdown(); + // Wait for the leaders of share coordinator, group coordinator and topic partition to be elected, if needed, on a different broker. + TestUtils.waitForCondition(() -> { + List newShareCoordNodeId = topicPartitionLeader(adminClient, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0); + List newGroupCoordNodeId = topicPartitionLeader(adminClient, Topic.GROUP_METADATA_TOPIC_NAME, 0); + List newTopicPartitionLeader = topicPartitionLeader(adminClient, tp.topic(), tp.partition()); + + return newShareCoordNodeId.size() == 1 && !Objects.equals(newShareCoordNodeId.get(0), curGroupCoordNodeId.get(0)) && + newGroupCoordNodeId.size() == 1 && !Objects.equals(newGroupCoordNodeId.get(0), curGroupCoordNodeId.get(0)) && + newTopicPartitionLeader.size() == 1 && !Objects.equals(newTopicPartitionLeader.get(0), curGroupCoordNodeId.get(0)); + }, DEFAULT_MAX_WAIT_MS, DEFAULT_POLL_INTERVAL_MS, () -> "Failed to elect new leaders after broker shutdown"); + // After group coordinator shutdown, check that lag is still 1 + verifySharePartitionLag(adminClient, groupId, tp, 1L); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + @ClusterTest( + brokers = 3, + serverProperties = { + @ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + @ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "3"), + @ClusterConfigProperty(key = "share.coordinator.state.topic.num.partitions", value = "1"), + @ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "3") + } + ) + public void testSharePartitionLagOnShareCoordinatorMovement() { + String groupId = "group1"; + alterShareAutoOffsetReset(groupId, "earliest"); + try (Producer producer = createProducer(); + ShareConsumer shareConsumer = createShareConsumer(groupId); + Admin adminClient = createAdminClient()) { + String topicName = "testTopicWithReplicas"; + // Create a topic with replication factor 3 + createTopic(topicName, 1, 3); + TopicPartition tp = new TopicPartition(topicName, 0); + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "Message".getBytes()); + // Produce first record and consume it + producer.send(record); + producer.flush(); + shareConsumer.subscribe(List.of(tp.topic())); + // Polling share consumer to make sure the share partition is created and the record is consumed. + waitedPoll(shareConsumer, 2500L, 1); + // Acknowledge and commit the consumed record to update the share partition state. + shareConsumer.commitSync(); + // After the acknowledgement is successful, the share partition lag should be 0 because the only produced record has been consumed. + verifySharePartitionLag(adminClient, groupId, tp, 0L); + // Producing another record to the share partition. + producer.send(record); + producer.flush(); + // Since the new record has not been consumed yet, the share partition lag should be 1. + verifySharePartitionLag(adminClient, groupId, tp, 1L); + List curShareCoordNodeId; + // Find the broker which is the share coordinator for the share partition. + curShareCoordNodeId = topicPartitionLeader(adminClient, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0); + assertEquals(1, curShareCoordNodeId.size()); + // Shut down the coordinator broker + KafkaBroker broker = cluster.brokers().get(curShareCoordNodeId.get(0)); + cluster.shutdownBroker(curShareCoordNodeId.get(0)); + // Wait for it to be completely shutdown + broker.awaitShutdown(); + // Wait for the leaders of share coordinator, group coordinator and topic partition to be elected, if needed, on a different broker. + TestUtils.waitForCondition(() -> { + List newShareCoordNodeId = topicPartitionLeader(adminClient, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0); + List newGroupCoordNodeId = topicPartitionLeader(adminClient, Topic.GROUP_METADATA_TOPIC_NAME, 0); + List newTopicPartitionLeader = topicPartitionLeader(adminClient, tp.topic(), tp.partition()); + + return newShareCoordNodeId.size() == 1 && !Objects.equals(newShareCoordNodeId.get(0), curShareCoordNodeId.get(0)) && + newGroupCoordNodeId.size() == 1 && !Objects.equals(newGroupCoordNodeId.get(0), curShareCoordNodeId.get(0)) && + newTopicPartitionLeader.size() == 1 && !Objects.equals(newTopicPartitionLeader.get(0), curShareCoordNodeId.get(0)); + }, DEFAULT_MAX_WAIT_MS, DEFAULT_POLL_INTERVAL_MS, () -> "Failed to elect new leaders after broker shutdown"); + // After share coordinator shutdown and new leader's election, check that lag is still 1 + verifySharePartitionLag(adminClient, groupId, tp, 1L); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + /** * Util class to encapsulate state for a consumer/producer * being executed by an {@link ExecutorService}. @@ -3470,6 +3747,35 @@ private void alterShareIsolationLevel(String groupId, String newValue) { } } + private List topicPartitionLeader(Admin adminClient, String topicName, int partition) throws InterruptedException, ExecutionException { + return adminClient.describeTopics(List.of(topicName)).allTopicNames().get().get(topicName) + .partitions().stream() + .filter(info -> info.partition() == partition) + .map(info -> info.leader().id()) + .filter(info -> info != -1) + .toList(); + } + + private SharePartitionOffsetInfo sharePartitionOffsetInfo(Admin adminClient, String groupId, TopicPartition tp) throws InterruptedException, ExecutionException { + SharePartitionOffsetInfo partitionResult; + ListShareGroupOffsetsResult result = adminClient.listShareGroupOffsets( + Map.of(groupId, new ListShareGroupOffsetsSpec().topicPartitions(List.of(tp))), + new ListShareGroupOffsetsOptions().timeoutMs(30000) + ); + partitionResult = result.partitionsToOffsetInfo(groupId).get().get(tp); + return partitionResult; + } + + private void verifySharePartitionLag(Admin adminClient, String groupId, TopicPartition tp, long expectedLag) throws InterruptedException { + TestUtils.waitForCondition(() -> { + SharePartitionOffsetInfo sharePartitionOffsetInfo = sharePartitionOffsetInfo(adminClient, groupId, tp); + System.out.println("Current share partition description: " + sharePartitionOffsetInfo); + return sharePartitionOffsetInfo != null && + sharePartitionOffsetInfo.lag().isPresent() && + sharePartitionOffsetInfo.lag().get() == expectedLag; + }, DEFAULT_MAX_WAIT_MS, DEFAULT_POLL_INTERVAL_MS, () -> "Failed to retrieve share partition lag"); + } + private void alterShareRecordLockDurationMs(String groupId, int newValue) { ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId); Map> alterEntries = new HashMap<>();