New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: do not continue processing records on permission errors in schema registry #10103
fix: do not continue processing records on permission errors in schema registry #10103
Conversation
|
1 similar comment
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a couple of comments of using the Throwables
class instead of using recursion ourselves. The logic is good, so +1 for this PR. But If you can use Throwables
, that will make the code simpler I think.
ksqldb-common/src/main/java/io/confluent/ksql/errors/LogMetricAndContinueExceptionHandler.java
Outdated
Show resolved
Hide resolved
ksqldb-common/src/main/java/io/confluent/ksql/errors/LogMetricAndContinueExceptionHandler.java
Outdated
Show resolved
Hide resolved
ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadataImpl.java
Outdated
Show resolved
Hide resolved
@spena thanks for the review. I updated the code, seems like you'll need to reapprove it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved based on Sergio's recent comment , I see throwable has been used. Thanks @lucasbru
Description
KSQL currently treats an exception due to missing permission on schema registry during deserialization as any other deserialization exception, and the default action for deserialization exception is log and continue.
The permission issue should stop KSQL from processing.
With this change:
LogMetricAndContinueExceptionHandler
for aRestClientException
(can have multiple levels of wrapping) to get the error code and check if we have to deal with an authorization error.FAIL
from thedeserializationExceptionHandler
for such exceptions.StreamsUncaughtExceptionHandler
. There, we make sure that the exception is classified as aUSER
error so that on-call is not alerted. The uncaught exception handler will restart the thread and retry processing records.The
StreamThreads
for that query will end up in a loop that will restart theStreamThread
and retry processing the next record. So we effectively get that the query is blocked until the permission error is resolved.Testing done
Tested the change in a local setup connected to ccloud schema registry.
Unit tests included.
Reviewer checklist