New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-8880: Add overloaded function of Consumer.committed #7304
KAFKA-8880: Add overloaded function of Consumer.committed #7304
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guozhangwang just one minor nit, otherwise LGTM
* Get the last committed offsets for the given partitions (whether the commit happened by this process or | ||
* another). The returned offsets will be used as the position for the consumer in the event of a failure. | ||
* <p> | ||
* This call will block to do a remote call to get the latest committed offsets from the server. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Should the description here match https://github.com/apache/kafka/pull/7304/files#diff-267b7c1e68156c1301c56be63ae41dd0R1779-R1782 from above with the exception that the user specifies the timeout in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, left some nits.
if (offsets == null) { | ||
throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the last " + | ||
"committed offset for partition " + partition + " could be determined. Try tuning default.api.timeout.ms " + | ||
"larger to relax the threshold."); | ||
"committed offset for partition " + partitions + " could be determined. Try tuning default.api.timeout.ms " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: partition
-> partitions
* Get the last committed offsets for the given partitions (whether the commit happened by this process or | ||
* another). The returned offsets will be used as the position for the consumer in the event of a failure. | ||
* <p> | ||
* This call will do a remote call to get the latest committed offset from the server, and will block until the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: latest committed offset
(or) last committed offset
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be latest committed offset
* the caller), or the timeout specified by {@code default.api.timeout.ms} expires (in which case a | ||
* {@link org.apache.kafka.common.errors.TimeoutException} is thrown to the caller). | ||
* | ||
* @param partitions The partition to check |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: partition
-> partitions
* {@link org.apache.kafka.common.errors.TimeoutException} is thrown to the caller). | ||
* | ||
* @param partitions The partition to check | ||
* @return The last committed offset and metadata or null if there was no prior commit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not clear whether the returned map
is null (or) OffsetAndMetadata
will be null for the given partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!
* This call will block to do a remote call to get the latest committed offsets from the server. | ||
* | ||
* @param partitions The partitions to check | ||
* @param timeout The maximum amount of time to await the current committed offset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure. the current
-> to get the latest/last
cc @cpettitt-confluent @bbejeck for another look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly minor tweaks suggested.
* If any of the partitions requested do not exist, an exception would be thrown. | ||
* <p> | ||
* This call will do a remote call to get the latest committed offsets from the server, and will block until the | ||
* committed offset is gotten successfully, an unrecoverable error is encountered (in which case it is thrown to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/offset is/offsets are/
} | ||
|
||
/** | ||
* Get the last committed offsets for the given partitions (whether the commit happened by this process or |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: Probably obvious, but since this doc is pretty good about being clear on details, maybe it is worth pointing out that this is for the consumer group?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually even if the consumer is not part of a group and not using subscribe as well it can still commit offsets, and others can get its committed offsets as long as they know its group.id.
* <p> | ||
* Partitions that do not have a committed offset would not be included in the returned map. | ||
* <p> | ||
* If any of the partitions requested do not exist, an exception would be thrown. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that we're batching calls it might be nice to return all of the valid ones we received and some marker for those we did not.
Sans that, we should specify what type of exception you get and it would be nice to be able to get details about which partitions did not exist.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've thought about that when discussing about KIP-520, e.g. on admin client when getting committed offsets we return a map<topic-partition, future<>>
and each future can either be an exception or the actual value. But for consumer we do not have such APIs so I've decided to stick with consistency.
As for the exception, it would indicate which topic-partition(s) do not exist.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds reasonable. Is the non-existant topic partition list accessible programmatically or just in the exception text? The former seems a bit nicer in allowing for potential recovery at runtime.
@@ -250,16 +251,23 @@ public boolean hasStateStores() { | |||
return stateMgr.changelogPartitions(); | |||
} | |||
|
|||
long committedOffsetForPartition(final TopicPartition partition) { | |||
Map<TopicPartition, Long> committedOffsetForPartition(final Set<TopicPartition> partitions) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
committedOffsetsForPartitions
} | ||
|
||
long offsetLimit(final TopicPartition partition) { | ||
private long offsetLimit(final TopicPartition partition) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
final long newLimit = committedOffsetForPartition(partition); | ||
final long previousLimit = offsetLimits.put(partition, newLimit); | ||
if (previousLimit > newLimit) { | ||
if (previousLimit != null && previousLimit > newLimit) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should either do this check for all updated limits or none.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack. Thinking about this a bit more, I think we do not need the updateableOffsetLimits
since it always have the same topic-partitions with offsetLimits
, and instead we just keep a flag and always ask for all topic-partitions in offsetLimits
. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 now that we're doing the call in batch.
final long limit) { | ||
log.trace("Updating store offset limit for partition {} to {}", partition, limit); | ||
offsetLimits.put(partition, limit); | ||
void putOffsetLimit(final Map<TopicPartition, Long> offsets) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
putOffsetLimits
@@ -481,7 +481,6 @@ void commit(final boolean startNewTransaction, final Map<TopicPartition, Long> p | |||
final long offset = entry.getValue() + 1; | |||
final long partitionTime = partitionTimes.get(partition); | |||
consumedOffsetsAndMetadata.put(partition, new OffsetAndMetadata(offset, encodeTimestamp(partitionTime))); | |||
stateMgr.putOffsetLimit(partition, offset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is not needed since for stream task we only need to put offset limit once, which is before the restoration. During normal processing we do not need to set offset limit any more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. That should not be needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still LGTM, just one minor additional comment.
log.debug("A committed timestamp was detected: setting the partition time of partition {}" | ||
+ " to {} in stream task {}", partition, committedTimestamp, this); | ||
} else { | ||
log.debug("No committed timestamp was found in metadata for partition {}", partition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is no metadata
would we want to use the latest timestamp seen so far for the StreamTask
and use that to set PartitionGroup#setPartitionTime
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I followed the logic from the other PR that @RichardYuSTUG did, would leave to Richard to explain why we did this :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bbejeck @guozhangwang Oops, looks like I missed this. Bill has a point here. I will probably log a JIRA to get this done.
retest this please |
1 similar comment
retest this please |
Merging to trunk; will submit a separate PR for docs change. |
Conflicts: * .gitignore: addition of clients/src/generated-test was near local additions for support-metrics. * checkstyle/suppressions.xml: upstream refactoring of exclusions for generator were near the local changes for support-metrics. * gradle.properties: scala version bump caused a minor conflict due to the kafka version change locally. gradle/dependencies.gradle: bcpkix version bump was near avro additions in the local version. * apache-github/trunk: (49 commits) KAFKA-8471: Replace control requests/responses with automated protocol (apache#7353) MINOR: Don't generate unnecessary strings for debug logging in FetchSessionHandler (apache#7394) MINOR:fixed typo and removed outdated varilable name (apache#7402) KAFKA-8934: Create version file during build for Streams (apache#7397) KAFKA-8319: Make KafkaStreamsTest a non-integration test class (apache#7382) KAFKA-6883: Add toUpperCase support to sasl.kerberos.principal.to.local rule (KIP-309) KAFKA-8907; Return topic configs in CreateTopics response (KIP-525) (apache#7380) MINOR: Address review comments for KIP-504 authorizer changes (apache#7379) MINOR: add versioning to request and response headers (apache#7372) KAFKA-7273: Extend Connect Converter to support headers (apache#6362) MINOR: improve the Kafka RPC code generator (apache#7340) MINOR: Improve the org.apache.kafka.common.protocol code (apache#7344) KAFKA-8880: Docs on upgrade-guide (apache#7385) KAFKA-8179: do not suspend standby tasks during rebalance (apache#7321) KAFKA-8580: Compute RocksDB metrics (apache#7263) KAFKA-8880: Add overloaded function of Consumer.committed (apache#7304) HOTFIX: fix Kafka Streams upgrade note for broker backward compatibility (apache#7363) KAFKA-8848; Update system tests to use new AclAuthorizer (apache#7374) MINOR: remove unnecessary null check (apache#7299) KAFKA-6958: Overload methods for group and windowed stream to allow to name operation name using the new Named class (apache#6413) ...
Committer Checklist (excluded from commit message)