Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-8880: Add overloaded function of Consumer.committed #7304

Merged
merged 11 commits into from
Sep 24, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,25 @@ public interface Consumer<K, V> extends Closeable {
/**
* @see KafkaConsumer#committed(TopicPartition)
*/
@Deprecated
OffsetAndMetadata committed(TopicPartition partition);

/**
* @see KafkaConsumer#committed(TopicPartition, Duration)
*/
@Deprecated
OffsetAndMetadata committed(TopicPartition partition, final Duration timeout);

/**
* @see KafkaConsumer#committed(Set)
*/
Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions);

/**
* @see KafkaConsumer#committed(Set, Duration)
*/
Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions, final Duration timeout);

/**
* @see KafkaConsumer#metrics()
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1735,7 +1735,10 @@ public long position(TopicPartition partition, final Duration timeout) {
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
* @throws org.apache.kafka.common.errors.TimeoutException if the committed offset cannot be found before
* the timeout specified by {@code default.api.timeout.ms} expires.
*
* @deprecated since 2.4 Use {@link #committed(Set)} instead
*/
@Deprecated
@Override
public OffsetAndMetadata committed(TopicPartition partition) {
return committed(partition, Duration.ofMillis(defaultApiTimeoutMs));
Expand All @@ -1745,7 +1748,8 @@ public OffsetAndMetadata committed(TopicPartition partition) {
* Get the last committed offset for the given partition (whether the commit happened by this process or
* another). This offset will be used as the position for the consumer in the event of a failure.
* <p>
* This call will block to do a remote call to get the latest committed offsets from the server.
* This call will block until the position can be determined, an unrecoverable error is
* encountered (in which case it is thrown to the caller), or the timeout expires.
*
* @param partition The partition to check
* @param timeout The maximum amount of time to await the current committed offset
Expand All @@ -1760,21 +1764,85 @@ public OffsetAndMetadata committed(TopicPartition partition) {
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
* @throws org.apache.kafka.common.errors.TimeoutException if the committed offset cannot be found before
* expiration of the timeout
*
* @deprecated since 2.4 Use {@link #committed(Set, Duration)} instead
*/
@Deprecated
@Override
public OffsetAndMetadata committed(TopicPartition partition, final Duration timeout) {
return committed(Collections.singleton(partition), timeout).get(partition);
}

/**
* Get the last committed offsets for the given partitions (whether the commit happened by this process or
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Probably obvious, but since this doc is pretty good about being clear on details, maybe it is worth pointing out that this is for the consumer group?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually even if the consumer is not part of a group and not using subscribe as well it can still commit offsets, and others can get its committed offsets as long as they know its group.id.

* another). The returned offsets will be used as the position for the consumer in the event of a failure.
* <p>
* Partitions that do not have a committed offset would not be included in the returned map.
* <p>
* If any of the partitions requested do not exist, an exception would be thrown.
* <p>
* This call will do a remote call to get the latest committed offsets from the server, and will block until the
* committed offsets are gotten successfully, an unrecoverable error is encountered (in which case it is thrown to
* the caller), or the timeout specified by {@code default.api.timeout.ms} expires (in which case a
* {@link org.apache.kafka.common.errors.TimeoutException} is thrown to the caller).
*
* @param partitions The partitions to check
* @return The latest committed offsets for the given partitions; partitions that do not have any committed offsets
* would not be included in the returned result
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
* function is called
* @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
* this function is called
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
* configured groupId. See the exception for more details
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
* @throws org.apache.kafka.common.errors.TimeoutException if the committed offset cannot be found before
* the timeout specified by {@code default.api.timeout.ms} expires.
*/
@Override
public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions) {
return committed(partitions, Duration.ofMillis(defaultApiTimeoutMs));
}

/**
* Get the last committed offsets for the given partitions (whether the commit happened by this process or
* another). The returned offsets will be used as the position for the consumer in the event of a failure.
* <p>
* Partitions that do not have a committed offset would not be included in the returned map.
* <p>
* If any of the partitions requested do not exist, an exception would be thrown.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we're batching calls it might be nice to return all of the valid ones we received and some marker for those we did not.

Sans that, we should specify what type of exception you get and it would be nice to be able to get details about which partitions did not exist.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've thought about that when discussing about KIP-520, e.g. on admin client when getting committed offsets we return a map<topic-partition, future<>> and each future can either be an exception or the actual value. But for consumer we do not have such APIs so I've decided to stick with consistency.

As for the exception, it would indicate which topic-partition(s) do not exist.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds reasonable. Is the non-existant topic partition list accessible programmatically or just in the exception text? The former seems a bit nicer in allowing for potential recovery at runtime.

* <p>
* This call will block to do a remote call to get the latest committed offsets from the server.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should the description here match https://github.com/apache/kafka/pull/7304/files#diff-267b7c1e68156c1301c56be63ae41dd0R1779-R1782 from above with the exception that the user specifies the timeout in this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

*
* @param partitions The partitions to check
* @param timeout The maximum amount of time to await the latest committed offsets
* @return The latest committed offsets for the given partitions; partitions that do not have any committed offsets
* would not be included in the returned result
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
* function is called
* @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
* this function is called
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
* configured groupId. See the exception for more details
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
* @throws org.apache.kafka.common.errors.TimeoutException if the committed offset cannot be found before
* expiration of the timeout
*/
@Override
public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions, final Duration timeout) {
acquireAndEnsureOpen();
try {
maybeThrowInvalidGroupIdException();
Map<TopicPartition, OffsetAndMetadata> offsets = coordinator.fetchCommittedOffsets(
Collections.singleton(partition), time.timer(timeout));
Map<TopicPartition, OffsetAndMetadata> offsets = coordinator.fetchCommittedOffsets(partitions, time.timer(timeout));
if (offsets == null) {
throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the last " +
"committed offset for partition " + partition + " could be determined. Try tuning default.api.timeout.ms " +
"larger to relax the threshold.");
"committed offset for partitions " + partitions + " could be determined. Try tuning default.api.timeout.ms " +
"larger to relax the threshold.");
} else {
offsets.forEach(this::updateLastSeenEpochIfNewer);
return offsets.get(partition);
return offsets;
}
} finally {
release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import java.util.stream.Collectors;


/**
Expand Down Expand Up @@ -290,20 +291,33 @@ public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata)
subscriptions.seek(partition, offsetAndMetadata.offset());
}

@Deprecated
@Override
public synchronized OffsetAndMetadata committed(TopicPartition partition) {
ensureNotClosed();
if (subscriptions.isAssigned(partition)) {
return committed.get(partition);
}
return new OffsetAndMetadata(0);
public synchronized OffsetAndMetadata committed(final TopicPartition partition) {
return committed(Collections.singleton(partition)).get(partition);
}

@Deprecated
@Override
public OffsetAndMetadata committed(TopicPartition partition, final Duration timeout) {
public OffsetAndMetadata committed(final TopicPartition partition, final Duration timeout) {
return committed(partition);
}

@Override
public synchronized Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions) {
ensureNotClosed();

return partitions.stream()
.filter(committed::containsKey)
.collect(Collectors.toMap(tp -> tp, tp -> subscriptions.isAssigned(tp) ?
committed.get(tp) : new OffsetAndMetadata(0)));
}

@Override
public synchronized Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions, final Duration timeout) {
return committed(partitions);
}

@Override
public synchronized long position(TopicPartition partition) {
ensureNotClosed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -696,20 +696,20 @@ public void testCommitsFetchedDuringAssign() {

// fetch offset for one topic
client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, offset1), Errors.NONE), coordinator);
assertEquals(offset1, consumer.committed(tp0).offset());
assertEquals(offset1, consumer.committed(Collections.singleton(tp0)).get(tp0).offset());

consumer.assign(Arrays.asList(tp0, tp1));

// fetch offset for two topics
Map<TopicPartition, Long> offsets = new HashMap<>();
offsets.put(tp0, offset1);
client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator);
assertEquals(offset1, consumer.committed(tp0).offset());
assertEquals(offset1, consumer.committed(Collections.singleton(tp0)).get(tp0).offset());

offsets.remove(tp0);
offsets.put(tp1, offset2);
client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator);
assertEquals(offset2, consumer.committed(tp1).offset());
assertEquals(offset2, consumer.committed(Collections.singleton(tp1)).get(tp1).offset());
consumer.close(Duration.ofMillis(0));
}

Expand Down Expand Up @@ -1137,7 +1137,7 @@ public void testManualAssignmentChangeWithAutoCommitEnabled() {

// fetch offset for one topic
client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, 0L), Errors.NONE), coordinator);
assertEquals(0, consumer.committed(tp0).offset());
assertEquals(0, consumer.committed(Collections.singleton(tp0)).get(tp0).offset());

// verify that assignment immediately changes
assertTrue(consumer.assignment().equals(singleton(tp0)));
Expand Down Expand Up @@ -1195,7 +1195,7 @@ public void testManualAssignmentChangeWithAutoCommitDisabled() {
client.prepareResponseFrom(
offsetResponse(Collections.singletonMap(tp0, 0L), Errors.NONE),
coordinator);
assertEquals(0, consumer.committed(tp0).offset());
assertEquals(0, consumer.committed(Collections.singleton(tp0)).get(tp0).offset());

// verify that assignment immediately changes
assertTrue(consumer.assignment().equals(singleton(tp0)));
Expand Down Expand Up @@ -1256,12 +1256,12 @@ public void testOffsetOfPausedPartitions() {
offsets.put(tp1, 0L);

client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator);
assertEquals(0, consumer.committed(tp0).offset());
assertEquals(0, consumer.committed(Collections.singleton(tp0)).get(tp0).offset());

offsets.remove(tp0);
offsets.put(tp1, 0L);
client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator);
assertEquals(0, consumer.committed(tp1).offset());
assertEquals(0, consumer.committed(Collections.singleton(tp1)).get(tp1).offset());

// fetch and verify consumer's position in the two partitions
final Map<TopicPartition, Long> offsetResponse = new HashMap<>();
Expand Down Expand Up @@ -1356,7 +1356,7 @@ public void testOperationsBySubscribingConsumerWithDefaultGroupId() {
}

try {
newConsumer((String) null).committed(tp0);
newConsumer((String) null).committed(Collections.singleton(tp0)).get(tp0);
fail("Expected an InvalidGroupIdException");
} catch (InvalidGroupIdException e) {
// OK, expected
Expand All @@ -1383,7 +1383,7 @@ public void testOperationsByAssigningConsumerWithDefaultGroupId() {
consumer.assign(singleton(tp0));

try {
consumer.committed(tp0);
consumer.committed(Collections.singleton(tp0)).get(tp0);
fail("Expected an InvalidGroupIdException");
} catch (InvalidGroupIdException e) {
// OK, expected
Expand Down Expand Up @@ -1636,7 +1636,7 @@ public void testCommitSyncAuthenticationFailure() {
@Test(expected = AuthenticationException.class)
public void testCommittedAuthenticationFaiure() {
final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError();
consumer.committed(tp0);
consumer.committed(Collections.singleton(tp0)).get(tp0);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ public void testSimpleMock() {
assertEquals(rec1, iter.next());
assertEquals(rec2, iter.next());
assertFalse(iter.hasNext());
assertEquals(2L, consumer.position(new TopicPartition("test", 0)));
final TopicPartition tp = new TopicPartition("test", 0);
assertEquals(2L, consumer.position(tp));
consumer.commitSync();
assertEquals(2L, consumer.committed(new TopicPartition("test", 0)).offset());
assertEquals(2L, consumer.committed(Collections.singleton(tp)).get(tp).offset());
}

@SuppressWarnings("deprecation")
Expand All @@ -81,9 +82,10 @@ public void testSimpleMockDeprecated() {
assertEquals(rec1, iter.next());
assertEquals(rec2, iter.next());
assertFalse(iter.hasNext());
assertEquals(2L, consumer.position(new TopicPartition("test", 0)));
final TopicPartition tp = new TopicPartition("test", 0);
assertEquals(2L, consumer.position(tp));
consumer.commitSync();
assertEquals(2L, consumer.committed(new TopicPartition("test", 0)).offset());
assertEquals(2L, consumer.committed(Collections.singleton(tp)).get(tp).offset());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {

if (records.nonEmpty) {
consumer.commitSync()
assertEquals(consumer.position(tp), consumer.committed(tp).offset)
assertEquals(consumer.position(tp), consumer.committed(Set(tp).asJava).get(tp).offset)

if (consumer.position(tp) == numRecords) {
consumer.seekToBeginning(Collections.emptyList())
Expand Down Expand Up @@ -153,7 +153,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
} else if (coin == 2) {
info("Committing offset.")
consumer.commitSync()
assertEquals(consumer.position(tp), consumer.committed(tp).offset)
assertEquals(consumer.position(tp), consumer.committed(Set(tp).asJava).get(tp).offset)
}
}
}
Expand Down Expand Up @@ -485,7 +485,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
consumer.poll(time.Duration.ofSeconds(3L))
assertTrue("Assignment did not complete on time", assignSemaphore.tryAcquire(1, TimeUnit.SECONDS))
if (committedRecords > 0)
assertEquals(committedRecords, consumer.committed(tp).offset)
assertEquals(committedRecords, consumer.committed(Set(tp).asJava).get(tp).offset)
consumer.close()
}

Expand Down