Skip to content

Commit

Permalink
spring-projectsGH-1554: Partition wildcard for initial offsets
Browse files Browse the repository at this point in the history
Resolves spring-projects#1554

When using manual assignment it was difficult to specify the initial
offset for multiple partitions - especially when dynamically determined.

Add a wildcard to indicate the offset should be applied to all partitions.

**cherry-pick to 2.5.x**
  • Loading branch information
garyrussell committed Jul 31, 2020
1 parent 069924c commit c80d300
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 30 deletions.
Expand Up @@ -555,22 +555,32 @@ private List<TopicPartitionOffset> resolveTopicPartitionsList(TopicPartition top
for (String partition : partitions) {
resolvePartitionAsInteger((String) topic, resolveExpression(partition), result);
}

for (PartitionOffset partitionOffset : partitionOffsets) {
TopicPartitionOffset topicPartitionOffset =
new TopicPartitionOffset((String) topic,
resolvePartition(topic, partitionOffset),
resolveInitialOffset(topic, partitionOffset),
isRelative(topic, partitionOffset));
if (!result.contains(topicPartitionOffset)) {
result.add(topicPartitionOffset);
}
else {
throw new IllegalArgumentException(
String.format("@TopicPartition can't have the same partition configuration twice: [%s]",
topicPartitionOffset));
if (partitionOffsets.length == 1 && partitionOffsets[0].partition().equals("*")) {
result.forEach(tpo -> {
tpo.setOffset(resolveInitialOffset(tpo.getTopic(), partitionOffsets[0]));
tpo.setRelativeToCurrent(isRelative(tpo.getTopic(), partitionOffsets[0]));
});
}
else {
for (PartitionOffset partitionOffset : partitionOffsets) {
Assert.isTrue(!partitionOffset.partition().equals("*"), () ->
"Partition wildcard '*' is only allowed in a single @PartitionOffset in " + result);
TopicPartitionOffset topicPartitionOffset =
new TopicPartitionOffset((String) topic,
resolvePartition(topic, partitionOffset),
resolveInitialOffset(topic, partitionOffset),
isRelative(topic, partitionOffset));
if (!result.contains(topicPartitionOffset)) {
result.add(topicPartitionOffset);
}
else {
throw new IllegalArgumentException(
String.format("@TopicPartition can't have the same partition configuration twice: [%s]",
topicPartitionOffset));
}
}
}
Assert.isTrue(result.size() > 0, () -> "At least one partition required for " + topic);
return result;
}

Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,9 +31,10 @@
public @interface PartitionOffset {

/**
* The partition within the topic to listen on.
* Property place holders and SpEL expressions are supported,
* which must resolve to Integer (or String that can be parsed as Integer).
* The partition within the topic to listen on. Property place holders and SpEL
* expressions are supported, which must resolve to Integer (or String that can be
* parsed as Integer). '*' indicates that the initial offset will be applied to all
* partitions in the encompassing {@link TopicPartition}
* @return partition within the topic.
*/
String partition();
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,7 +50,8 @@
String[] partitions() default {};

/**
* The partitions with initial offsets within the topic.
* The partitions with initial offsets within the topic. There must only be one
* instance of {@link PartitionOffset} if its 'partition' property is '*'.
* Partitions specified here can't be duplicated in the {@link #partitions()}.
* @return the {@link PartitionOffset} array.
*/
Expand Down
Expand Up @@ -25,6 +25,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -890,7 +891,7 @@ private void subscribeOrAssignTopics(final Consumer<? super K, ? super V> subscr
else {
List<TopicPartitionOffset> topicPartitionsToAssign =
Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions);
this.definedPartitions = new HashMap<>(topicPartitionsToAssign.size());
this.definedPartitions = new LinkedHashMap<>(topicPartitionsToAssign.size());
for (TopicPartitionOffset topicPartition : topicPartitionsToAssign) {
this.definedPartitions.put(topicPartition.getTopicPartition(),
new OffsetMetadata(topicPartition.getOffset(), topicPartition.isRelativeToCurrent(),
Expand Down Expand Up @@ -2108,7 +2109,7 @@ private void initPartitionsIfNeeded() {
* called until we poll() the consumer. Users can use a ConsumerAwareRebalanceListener
* or a ConsumerSeekAware listener in that case.
*/
Map<TopicPartition, OffsetMetadata> partitions = new HashMap<>(this.definedPartitions);
Map<TopicPartition, OffsetMetadata> partitions = new LinkedHashMap<>(this.definedPartitions);
Set<TopicPartition> beginnings = partitions.entrySet().stream()
.filter(e -> SeekPosition.BEGINNING.equals(e.getValue().seekPosition))
.map(Entry::getKey)
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2019 the original author or authors.
* Copyright 2019-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -70,11 +70,11 @@ public enum SeekPosition {

private final TopicPartition topicPartition;

private final Long offset;
private final SeekPosition position;

private final boolean relativeToCurrent;
private Long offset;

private final SeekPosition position;
private boolean relativeToCurrent;

/**
* Construct an instance with no initial offset management.
Expand Down Expand Up @@ -171,10 +171,28 @@ public Long getOffset() {
return this.offset;
}

/**
* Set the offset.
* @param offset the offset.
* @since 2.5.5
*/
public void setOffset(Long offset) {
this.offset = offset;
}

public boolean isRelativeToCurrent() {
return this.relativeToCurrent;
}

/**
* Set whether the offset is relative to the current position.
* @param relativeToCurrent true for relative to current.
* @since 2.5.5
*/
public void setRelativeToCurrent(boolean relativeToCurrent) {
this.relativeToCurrent = relativeToCurrent;
}

public SeekPosition getPosition() {
return this.position;
}
Expand Down
Expand Up @@ -51,6 +51,7 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
Expand Down Expand Up @@ -94,7 +95,10 @@ public void discardRemainingRecordsFromPollAndSeek() throws Exception {
this.registry.stop();
assertThat(this.config.closeLatch.await(10, TimeUnit.SECONDS)).isTrue();
InOrder inOrder = inOrder(this.consumer);
inOrder.verify(this.consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
inOrder.verify(this.consumer).assign(any(Collection.class));
inOrder.verify(this.consumer).seek(new TopicPartition("foo", 0), 0L);
inOrder.verify(this.consumer).seek(new TopicPartition("foo", 1), 0L);
inOrder.verify(this.consumer).seek(new TopicPartition("foo", 2), 0L);
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
inOrder.verify(this.consumer).commitSync(
Collections.singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(1L)),
Expand Down Expand Up @@ -137,11 +141,14 @@ public static class Config {

final CountDownLatch closeLatch = new CountDownLatch(1);

final CountDownLatch commitLatch = new CountDownLatch(7);
final CountDownLatch commitLatch = new CountDownLatch(6);

int count;

@KafkaListener(topics = "foo", groupId = "grp")
@KafkaListener(groupId = "grp",
topicPartitions = @org.springframework.kafka.annotation.TopicPartition(topic = "foo",
partitions = "#{'0,1,2'.split(',')}",
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")))
public void foo(String in, @Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery) {
this.contents.add(in);
this.deliveries.add(delivery);
Expand Down
19 changes: 19 additions & 0 deletions src/reference/asciidoc/kafka.adoc
Expand Up @@ -1209,6 +1209,7 @@ public void listen(String data) {
----
====

[[manual-assignment]]
You can also configure POJO listeners with explicit topics and partitions (and, optionally, their initial offsets).
The following example shows how to do so:

Expand All @@ -1230,6 +1231,24 @@ You can specify each partition in the `partitions` or `partitionOffsets` attribu

As with most annotation properties, you can use SpEL expressions; for an example of how to generate a large list of partitions, see <<tip-assign-all-parts>>.

Starting with version 2.5.5, you can apply an initial offset to all defined partitions:

====
[source, java]
----
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" },
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
----
====

The `*` wildcard represents all partitions in the `partitions` attribute.
There must only be one `@PartitionOffset` with the wildcard in each `@TopicPartition`.

When using manual `AckMode`, you can also provide the listener with the `Acknowledgment`.
The following example also shows how to use a different container factory.

Expand Down
4 changes: 3 additions & 1 deletion src/reference/asciidoc/tips.adoc
Expand Up @@ -11,7 +11,8 @@ The following is an example of how to use the power of a SpEL expression to crea
[source, java]
----
@KafkaListener(topicPartitions = @TopicPartition(topic = "compacted",
partitions = "#{@finder.partitions('compacted')}"))
partitions = "#{@finder.partitions('compacted')}"),
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")))
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
...
}
Expand Down Expand Up @@ -43,6 +44,7 @@ public static class PartitionFinder {

Using this in conjunction with `ConsumerConfig.AUTO_OFFSET_RESET_CONFIG=earliest` will load all records each time the application is started.
You should also set the container's `AckMode` to `MANUAL` to prevent the container from committing offsets for a `null` consumer group.
Howewever, starting with version 2.5.5, as shown above, you can apply an initial offset to all partitions; see <<manual-assignment>> for more information.

[[ex-jdbc-sync]]
=== Example of Transaction Synchronization
Expand Down
6 changes: 6 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Expand Up @@ -15,3 +15,9 @@ See <<exactly-once>> for more information.

Various error handlers (that extend `FailedRecordProcessor`) and the `DefaultAfterRollbackProcessor` now reset the `BackOff` if recovery fails.
See <<seek-to-current>>, <<recovering-batch-eh>>, <<dead-letters>> and <<after-rollback>> for more information.

==== @KafkaLisener Changes

When using manual partition assignment, you can now specify a wildcard for determining which partitions should be reset to the initial offset.
(Also added in version 2.5.5).
See <<manual-assignment,manual assignment>> for more information.

0 comments on commit c80d300

Please sign in to comment.