-
Notifications
You must be signed in to change notification settings - Fork 13.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-8880: Add overloaded function of Consumer.committed #7304
Changes from 10 commits
c2bf2ff
da9b4e5
c3ad1d6
649fc4f
b0fe68e
603d7b2
4168701
4573451
aefb5bf
57f2a9a
2bc8bf0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1735,7 +1735,10 @@ public long position(TopicPartition partition, final Duration timeout) { | |
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors | ||
* @throws org.apache.kafka.common.errors.TimeoutException if the committed offset cannot be found before | ||
* the timeout specified by {@code default.api.timeout.ms} expires. | ||
* | ||
* @deprecated since 2.4 Use {@link #committed(Set)} instead | ||
*/ | ||
@Deprecated | ||
@Override | ||
public OffsetAndMetadata committed(TopicPartition partition) { | ||
return committed(partition, Duration.ofMillis(defaultApiTimeoutMs)); | ||
|
@@ -1745,7 +1748,8 @@ public OffsetAndMetadata committed(TopicPartition partition) { | |
* Get the last committed offset for the given partition (whether the commit happened by this process or | ||
* another). This offset will be used as the position for the consumer in the event of a failure. | ||
* <p> | ||
* This call will block to do a remote call to get the latest committed offsets from the server. | ||
* This call will block until the position can be determined, an unrecoverable error is | ||
* encountered (in which case it is thrown to the caller), or the timeout expires. | ||
* | ||
* @param partition The partition to check | ||
* @param timeout The maximum amount of time to await the current committed offset | ||
|
@@ -1760,21 +1764,85 @@ public OffsetAndMetadata committed(TopicPartition partition) { | |
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors | ||
* @throws org.apache.kafka.common.errors.TimeoutException if the committed offset cannot be found before | ||
* expiration of the timeout | ||
* | ||
* @deprecated since 2.4 Use {@link #committed(Set, Duration)} instead | ||
*/ | ||
@Deprecated | ||
@Override | ||
public OffsetAndMetadata committed(TopicPartition partition, final Duration timeout) { | ||
return committed(Collections.singleton(partition), timeout).get(partition); | ||
} | ||
|
||
/** | ||
* Get the last committed offsets for the given partitions (whether the commit happened by this process or | ||
* another). The returned offsets will be used as the position for the consumer in the event of a failure. | ||
* <p> | ||
* Partitions that do not have a committed offset would not be included in the returned map. | ||
* <p> | ||
* If any of the partitions requested do not exist, an exception would be thrown. | ||
* <p> | ||
* This call will do a remote call to get the latest committed offsets from the server, and will block until the | ||
* committed offsets are gotten successfully, an unrecoverable error is encountered (in which case it is thrown to | ||
* the caller), or the timeout specified by {@code default.api.timeout.ms} expires (in which case a | ||
* {@link org.apache.kafka.common.errors.TimeoutException} is thrown to the caller). | ||
* | ||
* @param partitions The partitions to check | ||
* @return The latest committed offsets for the given partitions; partitions that do not have any committed offsets | ||
* would not be included in the returned result | ||
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this | ||
* function is called | ||
* @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while | ||
* this function is called | ||
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details | ||
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the | ||
* configured groupId. See the exception for more details | ||
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors | ||
* @throws org.apache.kafka.common.errors.TimeoutException if the committed offset cannot be found before | ||
* the timeout specified by {@code default.api.timeout.ms} expires. | ||
*/ | ||
@Override | ||
public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions) { | ||
return committed(partitions, Duration.ofMillis(defaultApiTimeoutMs)); | ||
} | ||
|
||
/** | ||
* Get the last committed offsets for the given partitions (whether the commit happened by this process or | ||
* another). The returned offsets will be used as the position for the consumer in the event of a failure. | ||
* <p> | ||
* Partitions that do not have a committed offset would not be included in the returned map. | ||
* <p> | ||
* If any of the partitions requested do not exist, an exception would be thrown. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now that we're batching calls it might be nice to return all of the valid ones we received and some marker for those we did not. Sans that, we should specify what type of exception you get and it would be nice to be able to get details about which partitions did not exist. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've thought about that when discussing about KIP-520, e.g. on admin client when getting committed offsets we return a As for the exception, it would indicate which topic-partition(s) do not exist. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds reasonable. Is the non-existant topic partition list accessible programmatically or just in the exception text? The former seems a bit nicer in allowing for potential recovery at runtime. |
||
* <p> | ||
* This call will block to do a remote call to get the latest committed offsets from the server. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Should the description here match https://github.com/apache/kafka/pull/7304/files#diff-267b7c1e68156c1301c56be63ae41dd0R1779-R1782 from above with the exception that the user specifies the timeout in this case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ack. |
||
* | ||
* @param partitions The partitions to check | ||
* @param timeout The maximum amount of time to await the latest committed offsets | ||
* @return The latest committed offsets for the given partitions; partitions that do not have any committed offsets | ||
* would not be included in the returned result | ||
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this | ||
* function is called | ||
* @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while | ||
* this function is called | ||
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details | ||
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the | ||
* configured groupId. See the exception for more details | ||
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors | ||
* @throws org.apache.kafka.common.errors.TimeoutException if the committed offset cannot be found before | ||
* expiration of the timeout | ||
*/ | ||
@Override | ||
public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions, final Duration timeout) { | ||
acquireAndEnsureOpen(); | ||
try { | ||
maybeThrowInvalidGroupIdException(); | ||
Map<TopicPartition, OffsetAndMetadata> offsets = coordinator.fetchCommittedOffsets( | ||
Collections.singleton(partition), time.timer(timeout)); | ||
Map<TopicPartition, OffsetAndMetadata> offsets = coordinator.fetchCommittedOffsets(partitions, time.timer(timeout)); | ||
if (offsets == null) { | ||
throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the last " + | ||
"committed offset for partition " + partition + " could be determined. Try tuning default.api.timeout.ms " + | ||
"larger to relax the threshold."); | ||
"committed offset for partitions " + partitions + " could be determined. Try tuning default.api.timeout.ms " + | ||
"larger to relax the threshold."); | ||
} else { | ||
offsets.forEach(this::updateLastSeenEpochIfNewer); | ||
return offsets.get(partition); | ||
return offsets; | ||
} | ||
} finally { | ||
release(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: Probably obvious, but since this doc is pretty good about being clear on details, maybe it is worth pointing out that this is for the consumer group?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually even if the consumer is not part of a group and not using subscribe as well it can still commit offsets, and others can get its committed offsets as long as they know its group.id.