diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java index 998ac2c585d59..391796fc5dc2d 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java @@ -357,10 +357,41 @@ public void testAcknowledgementCommitCallbackSuccessfulAcknowledgement() throws TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for share consumer"); - TestUtils.waitForCondition(() -> { - shareConsumer.poll(Duration.ofMillis(500)); - return partitionOffsetsMap.containsKey(tp); - }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to receive call to callback"); + // The callback should be called before the return of the poll, even when there are no more records. + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(2000)); + assertEquals(0, records.count()); + assertTrue(partitionOffsetsMap.containsKey(tp)); + + // We expect no exception as the acknowledgement error code is null. + assertFalse(partitionExceptionMap.containsKey(tp)); + verifyShareGroupStateTopicRecordsProduced(); + } + } + + @ClusterTest + public void testAcknowledgementCommitCallbackSuccessfulAcknowledgementOnCommitSync() throws Exception { + alterShareAutoOffsetReset("group1", "earliest"); + try (Producer producer = createProducer(); + ShareConsumer shareConsumer = createShareConsumer("group1")) { + + Map> partitionOffsetsMap = new HashMap<>(); + Map partitionExceptionMap = new HashMap<>(); + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + + producer.send(record); + producer.flush(); + + shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallback(partitionOffsetsMap, partitionExceptionMap)); + shareConsumer.subscribe(Set.of(tp.topic())); + + TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, + DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for share consumer"); + + // The acknowledgement commit callback should be called before the commitSync returns + // once the records have been confirmed to have been acknowledged. + Map> result = shareConsumer.commitSync(); + assertEquals(1, result.size()); + assertTrue(partitionOffsetsMap.containsKey(tp)); // We expect no exception as the acknowledgement error code is null. assertFalse(partitionExceptionMap.containsKey(tp)); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java index 7969750325662..4a7e19a6e5694 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java @@ -609,6 +609,15 @@ public synchronized ConsumerRecords poll(final Duration timeout) { throw e.cause(); } finally { kafkaShareConsumerMetrics.recordPollEnd(timer.currentTimeMs()); + + // Handle any acknowledgements which completed while we were waiting, but do not throw + // the exception because the fetched records would then not be returned to the caller + try { + handleCompletedAcknowledgements(false); + } catch (Throwable t) { + log.warn("Exception thrown in acknowledgement commit callback", t); + } + release(); } } @@ -752,8 +761,11 @@ public Map> commitSync(final Duration result.put(tip, Optional.of(exception)); } }); - return result; + // Handle any acknowledgements which completed while we were waiting + handleCompletedAcknowledgements(false); + + return result; } finally { wakeupTrigger.clearTask(); }