diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java index da253fa1c427e..49010d3a4cfc6 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java @@ -62,10 +62,14 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.coordinator.group.GroupConfig; import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig; +import org.apache.kafka.server.metrics.KafkaYammerMetrics; import org.apache.kafka.server.share.SharePartitionKey; import org.apache.kafka.test.NoRetryException; import org.apache.kafka.test.TestUtils; +import com.yammer.metrics.core.Meter; + +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Timeout; @@ -162,6 +166,11 @@ public void setup() { } } + @AfterEach + public void tearDown() { + kafka.utils.TestUtils.clearYammerMetrics(); + } + @ClusterTest public void testPollNoSubscribeFails() { try (ShareConsumer shareConsumer = createShareConsumer("group1")) { @@ -2953,6 +2962,7 @@ public void testRenewAcknowledgementOnPoll() { shareConsumer.commitSync(); assertEquals(15, acknowledgementsCommitted.get()); } + verifyYammerMetricCount("ackType=Renew", 5); } @ClusterTest @@ -2998,6 +3008,119 @@ public void testRenewAcknowledgementOnCommitSync() { shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT); } } + verifyYammerMetricCount("ackType=Renew", 5); + } + + @ClusterTest + public void testRenewAcknowledgementInvalidStateRecord() { + alterShareAutoOffsetReset("group1", "earliest"); + try (Producer producer = createProducer(); + ShareConsumer shareConsumer = createShareConsumer( + "group1", + Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT)) + ) { + AtomicInteger acknowledgementsCommitted = new AtomicInteger(0); + shareConsumer.setAcknowledgementCommitCallback((offsetsByTopicPartition, exception) -> + offsetsByTopicPartition.forEach((tip, offsets) -> acknowledgementsCommitted.addAndGet(offsets.size()))); + + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "Message ".getBytes()); + producer.send(record); + producer.flush(); + + shareConsumer.subscribe(List.of(tp.topic())); + ConsumerRecords records = waitedPoll(shareConsumer, 2500L, 1); + assertEquals(1, records.count()); + + for (ConsumerRecord rec : records) { + shareConsumer.acknowledge(rec, AcknowledgeType.REJECT); + shareConsumer.commitSync(); + assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(rec, AcknowledgeType.RENEW)); + } + } + verifyYammerMetricCount("ackType=Renew", 0); + } + + @ClusterTest( + brokers = 1, + serverProperties = { + @ClusterConfigProperty(key = "group.share.record.lock.duration.ms", value = "12000"), + @ClusterConfigProperty(key = "group.share.min.record.lock.duration.ms", value = "12000"), + } + ) + public void testRenewAcknowledgementNoResultInPoll() { + alterShareAutoOffsetReset("group1", "earliest"); + try (Producer producer = createProducer(); + ShareConsumer shareConsumer = createShareConsumer( + "group1", + Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT)) + ) { + AtomicInteger acknowledgementsCommitted = new AtomicInteger(0); + shareConsumer.setAcknowledgementCommitCallback((offsetsByTopicPartition, exception) -> + offsetsByTopicPartition.forEach((tip, offsets) -> acknowledgementsCommitted.addAndGet(offsets.size()))); + + for (int i = 0; i < 10; i++) { + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), ("Message " + i).getBytes()); + producer.send(record); + } + producer.flush(); + + shareConsumer.subscribe(List.of(tp.topic())); + ConsumerRecords records = waitedPoll(shareConsumer, 2500L, 10); + assertEquals(10, records.count()); + + int count = 0; + for (ConsumerRecord record : records) { + if (count % 2 == 0) { + shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT); + } else { + shareConsumer.acknowledge(record, AcknowledgeType.RENEW); + } + count++; + } + + // 5 more records (total 15 produced). + for (int i = 10; i < 15; i++) { + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), ("Message " + i).getBytes()); + producer.send(record); + } + + // Get the rest of all 5 records. + records = waitedPoll(shareConsumer, 11500L, 0); // This will send the acks but not return next 5 records (10-15) + assertEquals(10, acknowledgementsCommitted.get()); + assertEquals(0, records.count()); + verifyYammerMetricCount("ackType=Renew", 5); + + // Renewal duration passed, now records will be back. + records = waitedPoll(shareConsumer, 2500L, 5); // Renewed records as well as 10-15 records. + assertEquals(5, records.count()); + for (ConsumerRecord record : records) { + shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT); + } + + shareConsumer.commitSync(); + + records = waitedPoll(shareConsumer, 2500L, 5); // 10-15 records. + assertEquals(5, records.count()); + for (ConsumerRecord record : records) { + shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT); + } + + shareConsumer.commitSync(); + + // Initial - 5 renew + 5 accept, Subsequent - 5 renewed accepted + 5 fresh accepted (10-15) + assertEquals(20, acknowledgementsCommitted.get()); + } + verifyYammerMetricCount("ackType=Renew", 5); + } + + private void verifyYammerMetricCount(String filterString, int count) { + com.yammer.metrics.core.Metric renewAck = KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream() + .filter(entry -> entry.getKey().toString().contains(filterString)) + .findAny() + .orElseThrow(() -> new AssertionError("metric not found")) + .getValue(); + + assertEquals(count, ((Meter) renewAck).count()); } /**