From 631cba07ae88a97122eced535d28ff51fb4607c3 Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Mon, 17 Nov 2025 16:06:23 +0530 Subject: [PATCH 1/3] KAFKA-19845: 2/N Addition share consumers test. --- .../clients/consumer/ShareConsumerTest.java | 124 ++++++++++++++++++ 1 file changed, 124 insertions(+) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java index da253fa1c427e..359aa91b6e7fe 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java @@ -62,10 +62,13 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.coordinator.group.GroupConfig; import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig; +import org.apache.kafka.server.metrics.KafkaYammerMetrics; import org.apache.kafka.server.share.SharePartitionKey; import org.apache.kafka.test.NoRetryException; import org.apache.kafka.test.TestUtils; +import com.yammer.metrics.core.Meter; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Timeout; @@ -2953,6 +2956,9 @@ public void testRenewAcknowledgementOnPoll() { shareConsumer.commitSync(); assertEquals(15, acknowledgementsCommitted.get()); } + + // Verify that metric. + verifyYammerMetricCount("ackType=Renew", 5); } @ClusterTest @@ -2998,6 +3004,124 @@ public void testRenewAcknowledgementOnCommitSync() { shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT); } } + + // Verify that metric. + verifyYammerMetricCount("ackType=Renew", 5); + } + + @ClusterTest + public void testRenewAcknowledgementInvalidStateRecord() { + alterShareAutoOffsetReset("group1", "earliest"); + try (Producer producer = createProducer(); + ShareConsumer shareConsumer = createShareConsumer( + "group1", + Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT)) + ) { + AtomicInteger acknowledgementsCommitted = new AtomicInteger(0); + shareConsumer.setAcknowledgementCommitCallback((offsetsByTopicPartition, exception) -> + offsetsByTopicPartition.forEach((tip, offsets) -> acknowledgementsCommitted.addAndGet(offsets.size()))); + + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "Message ".getBytes()); + producer.send(record); + producer.flush(); + + shareConsumer.subscribe(List.of(tp.topic())); + ConsumerRecords records = waitedPoll(shareConsumer, 2500L, 1); + assertEquals(1, records.count()); + + for (ConsumerRecord rec : records) { + shareConsumer.acknowledge(rec, AcknowledgeType.REJECT); + shareConsumer.commitSync(); + assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(rec, AcknowledgeType.RENEW)); + } + } + + // Verify that metric. + verifyYammerMetricCount("ackType=Renew", 0); // Because the share consumer itself should invalidate. + } + + @ClusterTest( + brokers = 1, + serverProperties = { + @ClusterConfigProperty(key = "group.share.record.lock.duration.ms", value = "12000"), + @ClusterConfigProperty(key = "group.share.min.record.lock.duration.ms", value = "12000"), + } + ) + public void testRenewAcknowledgementNoResultInPoll() { + alterShareAutoOffsetReset("group1", "earliest"); + try (Producer producer = createProducer(); + ShareConsumer shareConsumer = createShareConsumer( + "group1", + Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT)) + ) { + AtomicInteger acknowledgementsCommitted = new AtomicInteger(0); + shareConsumer.setAcknowledgementCommitCallback((offsetsByTopicPartition, exception) -> + offsetsByTopicPartition.forEach((tip, offsets) -> acknowledgementsCommitted.addAndGet(offsets.size()))); + + for (int i = 0; i < 10; i++) { + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), ("Message " + i).getBytes()); + producer.send(record); + } + producer.flush(); + + shareConsumer.subscribe(List.of(tp.topic())); + ConsumerRecords records = waitedPoll(shareConsumer, 2500L, 10); + assertEquals(10, records.count()); + + int count = 0; + for (ConsumerRecord record : records) { + if (count % 2 == 0) { + shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT); + } else { + shareConsumer.acknowledge(record, AcknowledgeType.RENEW); + } + count++; + } + + // 5 more records (total 15 produced). + for (int i = 10; i < 15; i++) { + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), ("Message " + i).getBytes()); + producer.send(record); + } + + // Get the rest of all 5 records. + records = waitedPoll(shareConsumer, 11500L, 0); // This will send the acks but not return next 5 records (10-15) + assertEquals(10, acknowledgementsCommitted.get()); + assertEquals(0, records.count()); + + // Verify that metric. + verifyYammerMetricCount("ackType=Renew", 5); + + // Renewal duration passed, now records will be back. + records = waitedPoll(shareConsumer, 2500L, 5); // Renewed records as well as 10-15 records. + assertEquals(5, records.count()); + for (ConsumerRecord record : records) { + shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT); + } + + shareConsumer.commitSync(); + + records = waitedPoll(shareConsumer, 2500L, 5); // 10-15 records. + assertEquals(5, records.count()); + for (ConsumerRecord record : records) { + shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT); + } + + shareConsumer.commitSync(); + + // Initial - 5 renew + 5 accept, Subsequent - 5 renewed accepted + 5 fresh accepted (10-15) + assertEquals(20, acknowledgementsCommitted.get()); + } + } + + private void verifyYammerMetricCount(String filterString, int val) { + com.yammer.metrics.core.Metric renewAck = KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream() + .filter(entry -> entry.getKey().toString().contains(filterString)) + .findAny() + .orElseThrow(() -> new AssertionError("metric not found")) + .getValue(); + + assertEquals(val, ((Meter) renewAck).count()); } /** From b785a50f27eb172936dd40c5ca45b007c6e1a6dd Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Mon, 17 Nov 2025 16:44:52 +0530 Subject: [PATCH 2/3] remove metric check --- .../clients/consumer/ShareConsumerTest.java | 25 ------------------- 1 file changed, 25 deletions(-) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java index 359aa91b6e7fe..214fdceb7b215 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java @@ -62,13 +62,10 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.coordinator.group.GroupConfig; import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig; -import org.apache.kafka.server.metrics.KafkaYammerMetrics; import org.apache.kafka.server.share.SharePartitionKey; import org.apache.kafka.test.NoRetryException; import org.apache.kafka.test.TestUtils; -import com.yammer.metrics.core.Meter; - import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Timeout; @@ -2956,9 +2953,6 @@ public void testRenewAcknowledgementOnPoll() { shareConsumer.commitSync(); assertEquals(15, acknowledgementsCommitted.get()); } - - // Verify that metric. - verifyYammerMetricCount("ackType=Renew", 5); } @ClusterTest @@ -3004,9 +2998,6 @@ public void testRenewAcknowledgementOnCommitSync() { shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT); } } - - // Verify that metric. - verifyYammerMetricCount("ackType=Renew", 5); } @ClusterTest @@ -3035,9 +3026,6 @@ public void testRenewAcknowledgementInvalidStateRecord() { assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(rec, AcknowledgeType.RENEW)); } } - - // Verify that metric. - verifyYammerMetricCount("ackType=Renew", 0); // Because the share consumer itself should invalidate. } @ClusterTest( @@ -3089,9 +3077,6 @@ public void testRenewAcknowledgementNoResultInPoll() { assertEquals(10, acknowledgementsCommitted.get()); assertEquals(0, records.count()); - // Verify that metric. - verifyYammerMetricCount("ackType=Renew", 5); - // Renewal duration passed, now records will be back. records = waitedPoll(shareConsumer, 2500L, 5); // Renewed records as well as 10-15 records. assertEquals(5, records.count()); @@ -3114,16 +3099,6 @@ public void testRenewAcknowledgementNoResultInPoll() { } } - private void verifyYammerMetricCount(String filterString, int val) { - com.yammer.metrics.core.Metric renewAck = KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream() - .filter(entry -> entry.getKey().toString().contains(filterString)) - .findAny() - .orElseThrow(() -> new AssertionError("metric not found")) - .getValue(); - - assertEquals(val, ((Meter) renewAck).count()); - } - /** * Util class to encapsulate state for a consumer/producer * being executed by an {@link ExecutorService}. From 871eadca62ffde4efac6ae4f03a067155869e486 Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Mon, 17 Nov 2025 17:02:33 +0530 Subject: [PATCH 3/3] add metric validation --- .../clients/consumer/ShareConsumerTest.java | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java index 214fdceb7b215..49010d3a4cfc6 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java @@ -62,10 +62,14 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.coordinator.group.GroupConfig; import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig; +import org.apache.kafka.server.metrics.KafkaYammerMetrics; import org.apache.kafka.server.share.SharePartitionKey; import org.apache.kafka.test.NoRetryException; import org.apache.kafka.test.TestUtils; +import com.yammer.metrics.core.Meter; + +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Timeout; @@ -162,6 +166,11 @@ public void setup() { } } + @AfterEach + public void tearDown() { + kafka.utils.TestUtils.clearYammerMetrics(); + } + @ClusterTest public void testPollNoSubscribeFails() { try (ShareConsumer shareConsumer = createShareConsumer("group1")) { @@ -2953,6 +2962,7 @@ public void testRenewAcknowledgementOnPoll() { shareConsumer.commitSync(); assertEquals(15, acknowledgementsCommitted.get()); } + verifyYammerMetricCount("ackType=Renew", 5); } @ClusterTest @@ -2998,6 +3008,7 @@ public void testRenewAcknowledgementOnCommitSync() { shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT); } } + verifyYammerMetricCount("ackType=Renew", 5); } @ClusterTest @@ -3026,6 +3037,7 @@ public void testRenewAcknowledgementInvalidStateRecord() { assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(rec, AcknowledgeType.RENEW)); } } + verifyYammerMetricCount("ackType=Renew", 0); } @ClusterTest( @@ -3076,6 +3088,7 @@ public void testRenewAcknowledgementNoResultInPoll() { records = waitedPoll(shareConsumer, 11500L, 0); // This will send the acks but not return next 5 records (10-15) assertEquals(10, acknowledgementsCommitted.get()); assertEquals(0, records.count()); + verifyYammerMetricCount("ackType=Renew", 5); // Renewal duration passed, now records will be back. records = waitedPoll(shareConsumer, 2500L, 5); // Renewed records as well as 10-15 records. @@ -3097,6 +3110,17 @@ public void testRenewAcknowledgementNoResultInPoll() { // Initial - 5 renew + 5 accept, Subsequent - 5 renewed accepted + 5 fresh accepted (10-15) assertEquals(20, acknowledgementsCommitted.get()); } + verifyYammerMetricCount("ackType=Renew", 5); + } + + private void verifyYammerMetricCount(String filterString, int count) { + com.yammer.metrics.core.Metric renewAck = KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream() + .filter(entry -> entry.getKey().toString().contains(filterString)) + .findAny() + .orElseThrow(() -> new AssertionError("metric not found")) + .getValue(); + + assertEquals(count, ((Meter) renewAck).count()); } /**