-
Notifications
You must be signed in to change notification settings - Fork 13.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-10839: improve consumer group coordinator unavailable message #9729
KAFKA-10839: improve consumer group coordinator unavailable message #9729
Conversation
@@ -1355,7 +1362,7 @@ public void run() { | |||
} else if (heartbeat.sessionTimeoutExpired(now)) { | |||
// the session timeout has expired without seeing a successful heartbeat, so we should | |||
// probably make sure the coordinator is still healthy. | |||
markCoordinatorUnknown(); | |||
markCoordinatorUnknown("session timed out without heartbeat"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These messages feel a bit smelly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems ok. Maybe "without receiving a successful heartbeat response"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -257,7 +257,7 @@ protected synchronized boolean ensureCoordinatorReady(final Timer timer) { | |||
} else if (coordinator != null && client.isUnavailable(coordinator)) { | |||
// we found the coordinator, but the connection has failed, so mark | |||
// it dead and backoff before retrying discovery | |||
markCoordinatorUnknown(); | |||
markCoordinatorUnknown("coordinator unavailable"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be extracted into a constant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm ok with it. This is just informational and the literal value is easier to read.
markCoordinatorUnknown(false); | ||
|
||
protected synchronized void markCoordinatorUnknown(Errors error) { | ||
markCoordinatorUnknown(false, "error response {}" + error.message()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wonder if we can use the name of the error. Some of the messages will read awkwardly in the log message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, that was what I was trying to do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
if (this.coordinator != null) { | ||
log.info("Group coordinator {} is unavailable or invalid, will attempt rediscovery", this.coordinator); | ||
log.info("Group coordinator {} is unavailable or invalid due to cause: {}." | ||
+ "isDisconnected: {}. Rediscovery will attempted.", this.coordinator, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "Rediscovery will be attempted"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -1355,7 +1362,7 @@ public void run() { | |||
} else if (heartbeat.sessionTimeoutExpired(now)) { | |||
// the session timeout has expired without seeing a successful heartbeat, so we should | |||
// probably make sure the coordinator is still healthy. | |||
markCoordinatorUnknown(); | |||
markCoordinatorUnknown("session timed out without heartbeat"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems ok. Maybe "without receiving a successful heartbeat response"?
@@ -1311,7 +1311,7 @@ public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartitio | |||
future.raise(error); | |||
} else if (error == Errors.NOT_COORDINATOR) { | |||
// re-discover the coordinator and retry | |||
markCoordinatorUnknown(); | |||
markCoordinatorUnknown(error.message()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs fixing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for the patch.
Missed this in #9729. The substitution in `markCoordinatorUnknown` does not work because the argument is not provided as a parameter. Reviewers: Ismael Juma <ismael@juma.me.uk>
…9729) When a consumer encounters an issue that triggers marking it to mark coordinator as unknown, the error message it prints does not give much context about the error that triggered it. This change includes the response error that triggered the transition or any other cause if not triggered by an error code in a response. Reviewers: Jason Gustafson <jason@confluent.io>
Missed this in #9729. The substitution in `markCoordinatorUnknown` does not work because the argument is not provided as a parameter. Reviewers: Ismael Juma <ismael@juma.me.uk>
When a consumer encounters an issue that triggers marking it to mark coordinator as unknown, the error message it prints does not give much context about the error that triggered it. This change includes the response error that triggered the transition or any other cause if not triggered by an error code in a response.