New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-6789: Handle COORDINATOR_LOAD_IN_PROGRESS, COORDINATOR_NOT_AVAILABLE retriable errors in AdminClient API #5578
Conversation
retest this please |
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Show resolved
Hide resolved
@@ -2471,11 +2469,10 @@ void handleFailure(Throwable throwable) { | |||
return new DescribeConsumerGroupsResult(new HashMap<String, KafkaFuture<ConsumerGroupDescription>>(futures)); | |||
} | |||
|
|||
private boolean handleFindCoordinatorError(FindCoordinatorResponse response, KafkaFutureImpl<?> future) { | |||
Errors error = response.error(); | |||
private boolean handleRetriableError(Errors error, KafkaFutureImpl<?> future) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think handleRetriableError
is a bit misleading. I mean it handles both retriable and non-retriable error. From this perspective the old naming was better (from my perspective).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viktorsomogyi now this error handle method also applied on describeConsumerGroups
,so the old naming does not fit perfectly? But I do agree the naming is a little misleading.
@@ -2567,8 +2564,11 @@ private void maybeAddConsumerGroup(ListGroupsResponse.Group group) { | |||
void handleResponse(AbstractResponse abstractResponse) { | |||
final ListGroupsResponse response = (ListGroupsResponse) abstractResponse; | |||
synchronized (results) { | |||
if (response.error() != Errors.NONE) { | |||
results.addError(response.error().exception(), node); | |||
Errors error = response.error(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very similar to handleRetriableError
, perhaps there is an opportunity to use a common code.
Hi @omkreddy , if you don't mind, I've reviewed it too and left a few comments. |
env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller())); | ||
|
||
//Retriable errors should be retried |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: extra whitespace
env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller())); | ||
|
||
//Retriable errors should be retried |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: same, whitespace
if (error.exception() instanceof RetriableException) { | ||
throw error.exception(); | ||
} else if (response.hasError()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the same, under the hood:
public boolean hasError() {
return this.error != Errors.NONE;
}
So I think you can keep this?
95a14cc
to
2477189
Compare
f843e73
to
90972cd
Compare
@hachikuji pinging for review |
@cmccabe can you please review this? |
AdminClient already has retry logic for consumer group requests. However, there are a few cases that we don't handle. One is where the leader has moved between the call to findCoordinator and our attempt to describe the group that the coordinator owns. This patch doesn't fix that case either, though. This patch replaces a lot of checks for specific errors with checks to see if something inherits from RetriableException. As I've commented before on other JIRAs, I don't think this is a good idea for AdminClient. Specific exceptions have to be handled in a specific way based on what they indicate. For example, if there is a load in progess, maybe we need to back off for a little bit until it completes. If the coordinator has moved, we should try to send to the new coordinator. Just checking for what inherits from what tends to lead to the wrong behavior because people don't think about what each specific error case means and how it should be handled. Also, what is the rationale for renaming COORDINATOR_LOADING_IN_PROGRESS to COORDINATOR_LOAD_IN_PROGRESS? I can see that the patch adds retry logic to deleteConsumer groups. However, I don't understand why we would want to do this. It seems like all the errors for deleting groups are not retryable and related to things like authorization, etc. In general I think we should probably drop this PR and rethink the approach. It would be particularly useful to know what specific errors people want to handle that aren't handled now. |
@cmccabe This PR implementation is consistent with existing code (listConsumerGroups), where we try for COORDINATOR_LOAD_IN_PROGRESS, COORDINATOR_NOT_AVAILABLE errors. There is no rename of COORDINATOR_LOAD_IN_PROGRESS, I just updated the comment with correct enum name. In deleteConsumer groups also, We are only retrying retriable exceptions, not all exceptions. This PR implementation approach is same as #5595 PR. |
I agree that we should make
Understood. We certainly don't have to solve every problem in this PR. However, I think that the current name of the PR, "Add retry logic to AdminClient consumer group requests" is misleading, since it implies that there is no existing retry logic.
That's true -- my mistake. Your change just fixes a comment.
The approach in #5595 is to enumerate specific errors and handle them as needed. That's different than handling all subclasses of RetriableException the same way. |
ok..I tried to extract duplicate code to a common method. this me made to handle all RetriableExceptions in same way. I will try to revert some of the changes and will try to handle specific errors . |
90972cd
to
5d4f636
Compare
@cmccabe @hachikuji Updated the PR to handle COORDINATOR_LOAD_IN_PROGRESS, COORDINATOR_NOT_AVAILABLE retriable errors in consumer group API. This may help in fixing few flaky tests. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch. Just a couple comments.
future.completeExceptionally(error.exception()); | ||
return true; | ||
} else if (error != Errors.NONE) { | ||
// TODO: KAFKA-6789, we can retry based on the error code. hanlde NOT_COORDINATOR error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can probably drop this comment. I filed https://issues.apache.org/jira/browse/KAFKA-8341 for retrying NOT_COORDINATOR errors since that is a bit more work.
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Show resolved
Hide resolved
…LABLE retriable errors in AdminClient
5d4f636
to
75dedba
Compare
@hachikuji Thanks for the review. Updated the PR, Please take a look. |
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for the patch. I'll merge after the build completes.
This patch adds support to retry all group operations after COORDINATOR_LOAD_IN_PROGRESS and COORDINATOR_NOT_AVAILABLE in AdminClient group operations. Previously we only had logic to retry after FindCoordinator failures. Reviewers: Yishun Guan <gyishun@gmail.com>, Viktor Somogyi <viktorsomogyi@gmail.com>, Jason Gustafson <jason@confluent.io>
This patch adds support to retry all group operations after COORDINATOR_LOAD_IN_PROGRESS and COORDINATOR_NOT_AVAILABLE in AdminClient group operations. Previously we only had logic to retry after FindCoordinator failures. Reviewers: Yishun Guan <gyishun@gmail.com>, Viktor Somogyi <viktorsomogyi@gmail.com>, Jason Gustafson <jason@confluent.io>
This patch adds support to retry all group operations after COORDINATOR_LOAD_IN_PROGRESS and COORDINATOR_NOT_AVAILABLE in AdminClient group operations. Previously we only had logic to retry after FindCoordinator failures. Reviewers: Yishun Guan <gyishun@gmail.com>, Viktor Somogyi <viktorsomogyi@gmail.com>, Jason Gustafson <jason@confluent.io>
…es-14-May * AK_REPO/trunk: (24 commits) KAFKA-7321: Add a Maximum Log Compaction Lag (KIP-354) (apache#6009) KAFKA-8335; Clean empty batches when sequence numbers are reused (apache#6715) KAFKA-6455: Session Aggregation should use window-end-time as record timestamp (apache#6645) KAFKA-6521: Use timestamped stores for KTables (apache#6667) [MINOR] Consolidate in-memory/rocksdb unit tests for window & session store (apache#6677) MINOR: Include StickyAssignor in system tests (apache#5223) KAFKA-7633: Allow Kafka Connect to access internal topics without cluster ACLs (apache#5918) MINOR: Align KTableAgg and KTableReduce (apache#6712) MINOR: Fix code section formatting in TROGDOR.md (apache#6720) MINOR: Remove unnecessary OptionParser#accepts method call from PreferredReplicaLeaderElectionCommand (apache#6710) KAFKA-8352 : Fix Connect System test failure 404 Not Found (apache#6713) KAFKA-8348: Fix KafkaStreams JavaDocs (apache#6707) MINOR: Add missing option for running vagrant-up.sh with AWS to vagrant/README.md KAFKA-8344; Fix vagrant-up.sh to work with AWS properly MINOR: docs typo in '--zookeeper myhost:2181--execute' MINOR: Remove header and key/value converter config value logging (apache#6660) KAFKA-8231: Expansion of ConnectClusterState interface (apache#6584) KAFKA-8324: Add close() method to RocksDBConfigSetter (apache#6697) KAFKA-6789; Handle retriable group errors in AdminClient API (apache#5578) KAFKA-8332: Refactor ImplicitLinkedHashSet to avoid losing ordering when converting to Scala ...
…5578) This patch adds support to retry all group operations after COORDINATOR_LOAD_IN_PROGRESS and COORDINATOR_NOT_AVAILABLE in AdminClient group operations. Previously we only had logic to retry after FindCoordinator failures. Reviewers: Yishun Guan <gyishun@gmail.com>, Viktor Somogyi <viktorsomogyi@gmail.com>, Jason Gustafson <jason@confluent.io>
AdminClient describe groups, delete Groups, list Offsets API
Committer Checklist (excluded from commit message)