Skip to content

Commit

Permalink
KAFKA-8880: Add overloaded function of Consumer.committed (#7304)
Browse files Browse the repository at this point in the history
1. Add the overloaded functions.
2. Update the code in Streams to use the batch API for better latency (this applies to both active StreamsTask for initialize the offsets, as well as the StandbyTasks for updating offset limits).
3. Also update all unit test to replace the deprecated APIs.

Reviewers: Christopher Pettitt <cpettitt@confluent.io>, Kamal Chandraprakash  <kamal.chandraprakash@gmail.com>, Bill Bejeck <bill@confluent.io>
  • Loading branch information
guozhangwang committed Sep 24, 2019
1 parent 1ae0956 commit bcc0237
Show file tree
Hide file tree
Showing 17 changed files with 235 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,25 @@ public interface Consumer<K, V> extends Closeable {
/**
* @see KafkaConsumer#committed(TopicPartition)
*/
@Deprecated
OffsetAndMetadata committed(TopicPartition partition);

/**
* @see KafkaConsumer#committed(TopicPartition, Duration)
*/
@Deprecated
OffsetAndMetadata committed(TopicPartition partition, final Duration timeout);

/**
* @see KafkaConsumer#committed(Set)
*/
Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions);

/**
* @see KafkaConsumer#committed(Set, Duration)
*/
Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions, final Duration timeout);

/**
* @see KafkaConsumer#metrics()
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1735,7 +1735,10 @@ public long position(TopicPartition partition, final Duration timeout) {
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
* @throws org.apache.kafka.common.errors.TimeoutException if the committed offset cannot be found before
* the timeout specified by {@code default.api.timeout.ms} expires.
*
* @deprecated since 2.4 Use {@link #committed(Set)} instead
*/
@Deprecated
@Override
public OffsetAndMetadata committed(TopicPartition partition) {
return committed(partition, Duration.ofMillis(defaultApiTimeoutMs));
Expand All @@ -1745,7 +1748,8 @@ public OffsetAndMetadata committed(TopicPartition partition) {
* Get the last committed offset for the given partition (whether the commit happened by this process or
* another). This offset will be used as the position for the consumer in the event of a failure.
* <p>
* This call will block to do a remote call to get the latest committed offsets from the server.
* This call will block until the position can be determined, an unrecoverable error is
* encountered (in which case it is thrown to the caller), or the timeout expires.
*
* @param partition The partition to check
* @param timeout The maximum amount of time to await the current committed offset
Expand All @@ -1760,21 +1764,85 @@ public OffsetAndMetadata committed(TopicPartition partition) {
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
* @throws org.apache.kafka.common.errors.TimeoutException if the committed offset cannot be found before
* expiration of the timeout
*
* @deprecated since 2.4 Use {@link #committed(Set, Duration)} instead
*/
@Deprecated
@Override
public OffsetAndMetadata committed(TopicPartition partition, final Duration timeout) {
return committed(Collections.singleton(partition), timeout).get(partition);
}

/**
* Get the last committed offsets for the given partitions (whether the commit happened by this process or
* another). The returned offsets will be used as the position for the consumer in the event of a failure.
* <p>
* Partitions that do not have a committed offset would not be included in the returned map.
* <p>
* If any of the partitions requested do not exist, an exception would be thrown.
* <p>
* This call will do a remote call to get the latest committed offsets from the server, and will block until the
* committed offsets are gotten successfully, an unrecoverable error is encountered (in which case it is thrown to
* the caller), or the timeout specified by {@code default.api.timeout.ms} expires (in which case a
* {@link org.apache.kafka.common.errors.TimeoutException} is thrown to the caller).
*
* @param partitions The partitions to check
* @return The latest committed offsets for the given partitions; partitions that do not have any committed offsets
* would not be included in the returned result
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
* function is called
* @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
* this function is called
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
* configured groupId. See the exception for more details
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
* @throws org.apache.kafka.common.errors.TimeoutException if the committed offset cannot be found before
* the timeout specified by {@code default.api.timeout.ms} expires.
*/
@Override
public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions) {
return committed(partitions, Duration.ofMillis(defaultApiTimeoutMs));
}

/**
* Get the last committed offsets for the given partitions (whether the commit happened by this process or
* another). The returned offsets will be used as the position for the consumer in the event of a failure.
* <p>
* Partitions that do not have a committed offset would not be included in the returned map.
* <p>
* If any of the partitions requested do not exist, an exception would be thrown.
* <p>
* This call will block to do a remote call to get the latest committed offsets from the server.
*
* @param partitions The partitions to check
* @param timeout The maximum amount of time to await the latest committed offsets
* @return The latest committed offsets for the given partitions; partitions that do not have any committed offsets
* would not be included in the returned result
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
* function is called
* @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
* this function is called
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
* configured groupId. See the exception for more details
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
* @throws org.apache.kafka.common.errors.TimeoutException if the committed offset cannot be found before
* expiration of the timeout
*/
@Override
public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions, final Duration timeout) {
acquireAndEnsureOpen();
try {
maybeThrowInvalidGroupIdException();
Map<TopicPartition, OffsetAndMetadata> offsets = coordinator.fetchCommittedOffsets(
Collections.singleton(partition), time.timer(timeout));
Map<TopicPartition, OffsetAndMetadata> offsets = coordinator.fetchCommittedOffsets(partitions, time.timer(timeout));
if (offsets == null) {
throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the last " +
"committed offset for partition " + partition + " could be determined. Try tuning default.api.timeout.ms " +
"larger to relax the threshold.");
"committed offset for partitions " + partitions + " could be determined. Try tuning default.api.timeout.ms " +
"larger to relax the threshold.");
} else {
offsets.forEach(this::updateLastSeenEpochIfNewer);
return offsets.get(partition);
return offsets;
}
} finally {
release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import java.util.stream.Collectors;


/**
Expand Down Expand Up @@ -290,20 +291,33 @@ public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata)
subscriptions.seek(partition, offsetAndMetadata.offset());
}

@Deprecated
@Override
public synchronized OffsetAndMetadata committed(TopicPartition partition) {
ensureNotClosed();
if (subscriptions.isAssigned(partition)) {
return committed.get(partition);
}
return new OffsetAndMetadata(0);
public synchronized OffsetAndMetadata committed(final TopicPartition partition) {
return committed(Collections.singleton(partition)).get(partition);
}

@Deprecated
@Override
public OffsetAndMetadata committed(TopicPartition partition, final Duration timeout) {
public OffsetAndMetadata committed(final TopicPartition partition, final Duration timeout) {
return committed(partition);
}

@Override
public synchronized Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions) {
ensureNotClosed();

return partitions.stream()
.filter(committed::containsKey)
.collect(Collectors.toMap(tp -> tp, tp -> subscriptions.isAssigned(tp) ?
committed.get(tp) : new OffsetAndMetadata(0)));
}

@Override
public synchronized Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions, final Duration timeout) {
return committed(partitions);
}

@Override
public synchronized long position(TopicPartition partition) {
ensureNotClosed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -696,20 +696,20 @@ public void testCommitsFetchedDuringAssign() {

// fetch offset for one topic
client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, offset1), Errors.NONE), coordinator);
assertEquals(offset1, consumer.committed(tp0).offset());
assertEquals(offset1, consumer.committed(Collections.singleton(tp0)).get(tp0).offset());

consumer.assign(Arrays.asList(tp0, tp1));

// fetch offset for two topics
Map<TopicPartition, Long> offsets = new HashMap<>();
offsets.put(tp0, offset1);
client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator);
assertEquals(offset1, consumer.committed(tp0).offset());
assertEquals(offset1, consumer.committed(Collections.singleton(tp0)).get(tp0).offset());

offsets.remove(tp0);
offsets.put(tp1, offset2);
client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator);
assertEquals(offset2, consumer.committed(tp1).offset());
assertEquals(offset2, consumer.committed(Collections.singleton(tp1)).get(tp1).offset());
consumer.close(Duration.ofMillis(0));
}

Expand Down Expand Up @@ -1137,7 +1137,7 @@ public void testManualAssignmentChangeWithAutoCommitEnabled() {

// fetch offset for one topic
client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, 0L), Errors.NONE), coordinator);
assertEquals(0, consumer.committed(tp0).offset());
assertEquals(0, consumer.committed(Collections.singleton(tp0)).get(tp0).offset());

// verify that assignment immediately changes
assertTrue(consumer.assignment().equals(singleton(tp0)));
Expand Down Expand Up @@ -1195,7 +1195,7 @@ public void testManualAssignmentChangeWithAutoCommitDisabled() {
client.prepareResponseFrom(
offsetResponse(Collections.singletonMap(tp0, 0L), Errors.NONE),
coordinator);
assertEquals(0, consumer.committed(tp0).offset());
assertEquals(0, consumer.committed(Collections.singleton(tp0)).get(tp0).offset());

// verify that assignment immediately changes
assertTrue(consumer.assignment().equals(singleton(tp0)));
Expand Down Expand Up @@ -1256,12 +1256,12 @@ public void testOffsetOfPausedPartitions() {
offsets.put(tp1, 0L);

client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator);
assertEquals(0, consumer.committed(tp0).offset());
assertEquals(0, consumer.committed(Collections.singleton(tp0)).get(tp0).offset());

offsets.remove(tp0);
offsets.put(tp1, 0L);
client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator);
assertEquals(0, consumer.committed(tp1).offset());
assertEquals(0, consumer.committed(Collections.singleton(tp1)).get(tp1).offset());

// fetch and verify consumer's position in the two partitions
final Map<TopicPartition, Long> offsetResponse = new HashMap<>();
Expand Down Expand Up @@ -1356,7 +1356,7 @@ public void testOperationsBySubscribingConsumerWithDefaultGroupId() {
}

try {
newConsumer((String) null).committed(tp0);
newConsumer((String) null).committed(Collections.singleton(tp0)).get(tp0);
fail("Expected an InvalidGroupIdException");
} catch (InvalidGroupIdException e) {
// OK, expected
Expand All @@ -1383,7 +1383,7 @@ public void testOperationsByAssigningConsumerWithDefaultGroupId() {
consumer.assign(singleton(tp0));

try {
consumer.committed(tp0);
consumer.committed(Collections.singleton(tp0)).get(tp0);
fail("Expected an InvalidGroupIdException");
} catch (InvalidGroupIdException e) {
// OK, expected
Expand Down Expand Up @@ -1636,7 +1636,7 @@ public void testCommitSyncAuthenticationFailure() {
@Test(expected = AuthenticationException.class)
public void testCommittedAuthenticationFaiure() {
final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError();
consumer.committed(tp0);
consumer.committed(Collections.singleton(tp0)).get(tp0);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ public void testSimpleMock() {
assertEquals(rec1, iter.next());
assertEquals(rec2, iter.next());
assertFalse(iter.hasNext());
assertEquals(2L, consumer.position(new TopicPartition("test", 0)));
final TopicPartition tp = new TopicPartition("test", 0);
assertEquals(2L, consumer.position(tp));
consumer.commitSync();
assertEquals(2L, consumer.committed(new TopicPartition("test", 0)).offset());
assertEquals(2L, consumer.committed(Collections.singleton(tp)).get(tp).offset());
}

@SuppressWarnings("deprecation")
Expand All @@ -81,9 +82,10 @@ public void testSimpleMockDeprecated() {
assertEquals(rec1, iter.next());
assertEquals(rec2, iter.next());
assertFalse(iter.hasNext());
assertEquals(2L, consumer.position(new TopicPartition("test", 0)));
final TopicPartition tp = new TopicPartition("test", 0);
assertEquals(2L, consumer.position(tp));
consumer.commitSync();
assertEquals(2L, consumer.committed(new TopicPartition("test", 0)).offset());
assertEquals(2L, consumer.committed(Collections.singleton(tp)).get(tp).offset());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {

if (records.nonEmpty) {
consumer.commitSync()
assertEquals(consumer.position(tp), consumer.committed(tp).offset)
assertEquals(consumer.position(tp), consumer.committed(Set(tp).asJava).get(tp).offset)

if (consumer.position(tp) == numRecords) {
consumer.seekToBeginning(Collections.emptyList())
Expand Down Expand Up @@ -153,7 +153,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
} else if (coin == 2) {
info("Committing offset.")
consumer.commitSync()
assertEquals(consumer.position(tp), consumer.committed(tp).offset)
assertEquals(consumer.position(tp), consumer.committed(Set(tp).asJava).get(tp).offset)
}
}
}
Expand Down Expand Up @@ -485,7 +485,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
consumer.poll(time.Duration.ofSeconds(3L))
assertTrue("Assignment did not complete on time", assignSemaphore.tryAcquire(1, TimeUnit.SECONDS))
if (committedRecords > 0)
assertEquals(committedRecords, consumer.committed(tp).offset)
assertEquals(committedRecords, consumer.committed(Set(tp).asJava).get(tp).offset)
consumer.close()
}

Expand Down

0 comments on commit bcc0237

Please sign in to comment.