Skip to content

Commit

Permalink
spring-projectsGH-2170: Add a way to create a custom OffsetAndMetadata
Browse files Browse the repository at this point in the history
  • Loading branch information
frosiere committed Mar 23, 2022
1 parent 1d13671 commit cc1d11d
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;

import org.springframework.beans.BeanUtils;
Expand Down Expand Up @@ -167,6 +168,10 @@ protected AbstractMessageListenerContainer(ConsumerFactory<? super K, ? super V>
if (this.containerProperties.getConsumerRebalanceListener() == null) {
this.containerProperties.setConsumerRebalanceListener(createSimpleLoggingConsumerRebalanceListener());
}
final OffsetCommitCallback commitCallback = containerProperties.getCommitCallback();
if (commitCallback != null) {
this.containerProperties.setCommitCallback(commitCallback, containerProperties.getOffsetAndMetadataProvider());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.regex.Pattern;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;

import org.springframework.kafka.support.LogIfLevelEnabled;
Expand Down Expand Up @@ -89,6 +90,11 @@ public class ConsumerProperties {
*/
private OffsetCommitCallback commitCallback;

/**
* A provider for {@link OffsetAndMetadata}. The provider allows customization of metadata.
*/
private OffsetAndMetadataProvider offsetAndMetadataProvider = (listenerMetadata, offset) -> new OffsetAndMetadata(offset);

/**
* Whether or not to call consumer.commitSync() or commitAsync() when the
* container is responsible for commits. Default true.
Expand Down Expand Up @@ -275,6 +281,20 @@ public void setCommitCallback(OffsetCommitCallback commitCallback) {
this.commitCallback = commitCallback;
}

/**
* Set the commit callback and a metadata provider; by default a simple logging callback is used to log
* success at DEBUG level and failures at ERROR level.
* Used when {@link #setSyncCommits(boolean) syncCommits} is false.
* @param commitCallback the callback.
* @param offsetAndMetadataProvider an offset and metadata provider.
* @since 2.8.5
* @see #setSyncCommits(boolean)
*/
public void setCommitCallback(OffsetCommitCallback commitCallback, OffsetAndMetadataProvider offsetAndMetadataProvider) {
this.commitCallback = commitCallback;
this.offsetAndMetadataProvider = offsetAndMetadataProvider;
}

/**
* Return the commit callback.
* @return the callback.
Expand All @@ -284,6 +304,15 @@ public OffsetCommitCallback getCommitCallback() {
return this.commitCallback;
}

/**
* Return the offset and metadata provider.
* @return the offset and metadata provider.
*/
@Nullable
public OffsetAndMetadataProvider getOffsetAndMetadataProvider() {
return this.offsetAndMetadataProvider;
}

/**
* Set whether or not to call consumer.commitSync() or commitAsync() when the
* container is responsible for commits. Default true.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1438,7 +1438,7 @@ private void fixTxOffsetsIfNeeded() {
return;
}
if (position > oamd.offset()) {
toFix.put(tp, new OffsetAndMetadata(position));
toFix.put(tp, createOffsetAndMetadata(position));
}
});
if (toFix.size() > 0) {
Expand Down Expand Up @@ -1910,7 +1910,7 @@ else if (record.offset() < offs.get(0)) {
private void ackImmediate(ConsumerRecord<K, V> record) {
Map<TopicPartition, OffsetAndMetadata> commits = Collections.singletonMap(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
createOffsetAndMetadata(record.offset() + 1));
this.commitLogger.log(() -> COMMITTING + commits);
if (this.producer != null) {
doSendOffsets(this.producer, commits);
Expand All @@ -1926,9 +1926,8 @@ else if (this.syncCommits) {
private void ackImmediate(ConsumerRecords<K, V> records) {
Map<TopicPartition, OffsetAndMetadata> commits = new HashMap<>();
for (TopicPartition part : records.partitions()) {
commits.put(part,
new OffsetAndMetadata(records.records(part)
.get(records.records(part).size() - 1).offset() + 1));
commits.put(part, createOffsetAndMetadata(records.records(part)
.get(records.records(part).size() - 1).offset() + 1));
}
this.commitLogger.log(() -> COMMITTING + commits);
if (this.producer != null) {
Expand Down Expand Up @@ -2694,7 +2693,7 @@ public void ackCurrent(final ConsumerRecord<K, V> record) {
if (this.isRecordAck) {
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
createOffsetAndMetadata(record.offset() + 1));
if (this.producer == null) {
this.commitLogger.log(() -> COMMITTING + offsetsToCommit);
if (this.syncCommits) {
Expand Down Expand Up @@ -2996,7 +2995,7 @@ private Map<TopicPartition, OffsetAndMetadata> buildCommits() {
for (Entry<String, Map<Integer, Long>> entry : this.offsets.entrySet()) {
for (Entry<Integer, Long> offset : entry.getValue().entrySet()) {
commits.put(new TopicPartition(entry.getKey(), offset.getKey()),
new OffsetAndMetadata(offset.getValue() + 1));
createOffsetAndMetadata(offset.getValue() + 1));
}
}
this.offsets.clear();
Expand Down Expand Up @@ -3079,6 +3078,26 @@ public String toString() {
+ "\n]";
}

private OffsetAndMetadata createOffsetAndMetadata(long offset) {
final OffsetAndMetadataProvider metadataProvider = this.containerProperties.getOffsetAndMetadataProvider();
return metadataProvider == null
? new OffsetAndMetadata(offset)
: metadataProvider.provide(new ConsumerAwareListenerMetadata(), offset);
}

private final class ConsumerAwareListenerMetadata implements ListenerMetadata {

@Override
public String getListenerId() {
return getBeanName();
}

@Override
public String getGroupId() {
return ListenerConsumer.this.consumerGroupId;
}
}

private final class ConsumerAcknowledgment implements Acknowledgment {

private final ConsumerRecord<K, V> record;
Expand Down Expand Up @@ -3272,8 +3291,7 @@ private boolean collectAndCommitIfNecessary(Collection<TopicPartition> partition
for (TopicPartition partition : partitions) {
try {
if (committed.get(partition) == null) { // no existing commit for this group
offsetsToCommit.put(partition,
new OffsetAndMetadata(ListenerConsumer.this.consumer.position(partition)));
offsetsToCommit.put(partition, createOffsetAndMetadata(ListenerConsumer.this.consumer.position(partition)));
}
}
catch (NoOffsetForPartitionException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2016-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener;

/**
* Metadata associated to a {@link org.springframework.kafka.annotation.KafkaListener}.
*
* @author Francois Rosiere
* @since 2.8.5
* @see org.springframework.kafka.annotation.KafkaListener
*/
public interface ListenerMetadata {

/**
* Return the listener id.
* @return the listener id.
*/
String getListenerId();

/**
* Return the group id.
* @return the group id.
*/
String getGroupId();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2016-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;

/**
* Provider for {@link OffsetAndMetadata}. In case of async commits of the offsets,
* the provider can be used in combination with an {@link org.apache.kafka.clients.consumer.OffsetCommitCallback} to
* have more granularity in the way to create an {@link OffsetAndMetadata}.
*
* @author Francois Rosiere
* @since 2.8.5
* @see org.apache.kafka.clients.consumer.OffsetCommitCallback
*/
public interface OffsetAndMetadataProvider {

/**
* Provide an offset and metadata object for the given listener metadata and offset.
*
* @param listenerMetadata metadata associated to a listener.
* @param offset an offset.
* @return an offset and metadata.
*/
OffsetAndMetadata provide(ListenerMetadata listenerMetadata, long offset);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,16 @@
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.BDDMockito.willThrow;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -3844,6 +3848,54 @@ public void clearThreadState(Consumer<?, ?> consumer) {
container.stop();
}

@Test
public void testOffsetAndMetadataWithoutProvider() throws InterruptedException {
testOffsetAndMetadata(null, new OffsetAndMetadata(1));
}

@Test
public void testOffsetAndMetadataWithProvider() throws InterruptedException {
testOffsetAndMetadata((listenerMetadata, offset) ->
new OffsetAndMetadata(offset, listenerMetadata.getGroupId()),
new OffsetAndMetadata(1, "grp"));
}

@SuppressWarnings("unchecked")
private void testOffsetAndMetadata(OffsetAndMetadataProvider provider,
OffsetAndMetadata expectedOffsetAndMetadata) throws InterruptedException {
final ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
final Consumer<Integer, String> consumer = mock(Consumer.class);
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
given(consumer.poll(any(Duration.class))).willAnswer(i -> new ConsumerRecords<>(
Map.of(
new TopicPartition("foo", 0),
Collections.singletonList(new ConsumerRecord<>("foo", 0, 0L, 1, "foo"))
)
));
final ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> offsetsCaptor = ArgumentCaptor.forClass(Map.class);
final CountDownLatch latch = new CountDownLatch(1);
doAnswer(invocation -> {
latch.countDown();
return null;
}).when(consumer).commitAsync(offsetsCaptor.capture(), any());
final ContainerProperties containerProps = new ContainerProperties(new TopicPartitionOffset("foo", 0));
containerProps.setGroupId("grp");
containerProps.setClientId("clientId");
containerProps.setSyncCommits(false);
containerProps.setMessageListener((MessageListener<Integer, String>) data -> {
});
containerProps.setCommitCallback((offsets, exception) -> {
}, provider);
final KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
container.start();
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(offsetsCaptor.getValue())
.hasSize(1)
.containsValue(expectedOffsetAndMetadata);
container.stop();
}

private Consumer<?, ?> spyOnConsumer(KafkaMessageListenerContainer<Integer, String> container) {
Consumer<?, ?> consumer =
KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer", Consumer.class);
Expand Down

0 comments on commit cc1d11d

Please sign in to comment.