New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-14401: Fail the worker if any uncaught exception comes in WorkThread #13361
base: trunk
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @rohits64! Thanks for picking this up. It can be particularly painful when the WorkThread dies and there is no feedback to the Herder/operator. It seems clear, failures esp in Offset storage can cause no offset commits and Status storage can lead to stale statuses.
First, let's make sure you are signed up on Apache Jira, and if you could assign this ticket to yourself that would be great. (just send a mail to dev@kafka.apache.org and ask for a Jira account with the preferred username).
It would also be nice if we could discuss what is expected when the WorkThread is killed.
Currently, we already retry for both TimeException
and RetriableException
forever. In several environments, when the exception is persistent, for example when there are persistent timeouts the work thread might get into an infinite retry mode (we have seen cases of this on production systems). This is dangerous as (1) there is no visibility to the users other than logs on these retries and (2) overtime the callbacks might fill the queue. However, it seems this is an issue we need to address separately but sets a good context.
Since the KafkaBasedLog mainly consumes/produces the Kafka topics, we are already retrying the exceptions mentioned above. However, any other runtime exception here IMO should be treated as fatal and should be treated accordingly. Two options seem obvious:
-
We can possibly retry the exception some static number of times with exponential back-off and then fail the WorkThread. The exception must be propagated back to the Herder and fail the Worker.
-
We can also fail the worker as soon as we see the exception. Again we propagate the error back to the Herder and fail it.
In either case, since on worker failures rebalances will be triggered the connector/tasks are back healthy than stay stuck in a weird state.
WDYT of this @rohits64, @vamossagar12? Are there any concerns I am missing here?
// resume the thread in case of any unexpected exception encountered | ||
thread = new WorkThread(); | ||
thread.start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might not be an acceptable solution to restart forever. However, if we were to retry we could keep using the same thread, and need not spawn a new thread. We have precedence for retrying for RetriableException
and TimeoutException
since the thread itself is stateless.
Hi @mukkachaitanya, thanks for the inputs. I think it we can go with exponential backoff with some static number of retries. I have made the changes. Currently I have made the number of retry as 10. That should be sufficient number of retries. |
I don't think that we should add retries when we already know that the exceptions that would be caught here are non-retriable. Additionally, it may be unsafe or incorrect to retry on an arbitrary exception, and may produce unexpected behavior in the consumer or in the worker. @rohits64 I also think that the PR as-is does not address the latter point that @mukkachaitanya made. We need to propagate these failures to the asynchronous callers, and not just let this thread die (with or without retries). As-is, this PR does not address the problem in the title. Thanks for looking into this, it's certainly not good for these failures to silently stall the worker indefinitely! |
Fair point @gharris1727! I agree. I think the retries came into the picture because we were retrying on |
Thanks for the PR @rohits64 . I would agree with @gharris1727 and @mukkachaitanya in this case that we shouldn't add the overhead of retries in this case. Also, as pointed out, the original problem of notifying the producers to the queue about the fact that the WorkerThread is dead and can't handle anymore requests isn't handled. |
Thanks @mukkachaitanya, @gharris1727 and @vamossagar12 for looking into this. As pointed out it should be fair to fail instead of retrying. As there might be unexpected behaviour with unhandled exceptions. I will make further changes so that it would fail the worker not just die silently. |
Yeah TimeoutException is a tricky one because it is generally thrown after having waited for some amount of time (like |
if (offsetLog.unexpectedExceptionCaught.get()) { | ||
throw new ConnectException("Unexpected Exception caught", offsetLog.exception); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QQ - what if ConnectException is thrown inside readToEnd
method instead in next line ? We may not need to access variables from KafkaBasedLog class explicitly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Connect Exception can be thrown inside the readToEnd method, we need to handle it at all place where it is called, instead of just in KafkaOffsetBackingStore. It would be better approach as it would handle all cases of this bug.
Remove redundant commit
This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurrs in the next 30 days, it will be automatically closed. |
Here WorkThread dies if any unexpected exception is encountered. And the connectors/tasks are not aware of this and they would keep submitting callbacks to KafkaBasedLog with nobody processing them. I have added a uncaught exception handler in case if any uncaught exception comes in and it is detected and the Worker fails.
EDIT:
Instead of starting a new thread, I have added retry with exponential backoff with limited number of retries(currently at 10)Committer Checklist (excluded from commit message)