-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: classify authorization exception as user error #7061
feat: classify authorization exception as user error #7061
Conversation
@confluentinc It looks like @mjsax just signed our Contributor License Agreement. 👍 Always at your service, clabot |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! It might make sense to test this locally to make sure that this is exactly the error type that is raised (and not wrapped in something else)
|
||
if (type == Type.USER) { | ||
LOG.info( | ||
"Classified error as USER error based on missing topic. Query ID: {} Exception: {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you meant to update this error message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
|
||
@Test | ||
public void shouldClassifyNoMissingTopicAsUnknownError() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public void shouldClassifyNoMissingTopicAsUnknownError() { | |
public void shouldClassifyNoMissingAclAsUnknownError() { |
@@ -0,0 +1,52 @@ | |||
/* | |||
* Copyright 2020 Confluent Inc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: year (and also for the new test file)
@mjsax can we also knock out https://confluentinc.atlassian.net/browse/KCI-237 in this patch? |
Add integration test for error classification
Sorry to keep you adding more into this PR, but besides KCI-237, here are a few more exceptions we can knock out :)
|
@guozhangwang It seem an |
@Override | ||
public Type classify(final Throwable e) { | ||
final Type type = | ||
e instanceof StreamsException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guozhangwang It's unclear to me, why this exception is wrapped with StreamsException
while TopicAuthorizationException
and GroupAuthorizationException
are not?
For MissingSourceTopicException
it's clear because it extends StreamsException
and is thrown by KS directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should not, and it should be fixed via https://cwiki.apache.org/confluence/display/KAFKA/KIP-691%3A+Enhance+Transactional+Producer+Exception+Handling
cc @abbccdda
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But the producer should not wrap anything with StreamsException
-- so why does KS wrap the one but not the other producer exception? And how would the KIP change it (it seem the KIP is for the producer but not for KS)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TopicAuthorizationException
: this can be thrown from both consumer and producer, and for consumer it could throw from partitionsFor
which is used in KS via global store, in that place we just do not wrap it and let it throw all the way up. For producer it may be thrown from the callback, in which we do wrap --- this is an inconsistency.
GroupAuthorizationException
: only thrown from producer in latest version with KIP-447, for which we should wrap in KS; in older version it could be thrown from consumer and again we do not wrap in KS.
TransactionalIdAuthorizationException
: thrown from producer, in the callback where we always wrap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In our other classifiers (e.g. the regex classifier) we walk the cause stack and try and find any exception that matches). That would be an approach to make this check more robust.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mjsax -- LGTM with minor suggestions inline. I'm not sure about the answer to your question about why some exceptions are wrapped and others aren't. ksqlDB directly passes the exception (from the call to kafkaStreams.setUncaughtExceptionHandler(...)
) to the QueryErrorClassifier
so there's no unwrapping done in ksqlDB itself. Would have to dive into the Streams code to understand why some are wrapped and others aren't.
* {@code MissingTopicClassifier} classifies errors by querying the broker | ||
* to check that all topics that the query relies on being accessible exist | ||
* and are accessible | ||
* {@code MissingTopicClassifier} classifies missing source topic exceptions as user error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated to this PR but I'm curious: what happens if an internal (changelog or repartition) topic is missing, or if a sink topic is missing? Does Streams throw a different type of exception in these cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Atm, for changelogs and repartition topic they would be created. We would fail if the expected config does not match. (this will "change" with https://cwiki.apache.org/confluence/display/KAFKA/KIP-698%3A+Add+Explicit+User+Initialization+of+Broker-side+State+to+Kafka+Streams) -- Or course, it's only verified in a rebalance, but checking the source topic is also only done during a rebalance.
For output topics, Kafka Streams won't do anything, and thus the producer would fill up its write buffer and eventually block. To eventually Streams would crash.
But as ksqlDB checks for output topic, it should be ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For output topics, Kafka Streams won't do anything, and thus the producer would fill up its write buffer and eventually block. To eventually Streams would crash.
this is true for internal topics too right? It seems like right now for internal topics that are deleted we'd block for max.block.ms and then throw an error classified as SYSTEM. Then, retry and recreate. After KIP-698 we'd block and then throw an error we will classify as USER on every retry.
But as ksqlDB checks for output topic, it should be ok?
Where do we check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is true for internal topics too right?
Yes. Not sure how the error would be classified atm or after KIP-698. But it might be out-of-scope for this PR. Would like to focus on authorization errors for now.
Where do we check?
Doesn't kslqDB create output topics explicitly if they don't exist?
ksqldb-engine/src/test/java/io/confluent/ksql/query/GroupAuthorizationClassifierTest.java
Outdated
Show resolved
Hide resolved
|
||
queryMetadata.start(); | ||
assertThatEventually( | ||
"", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: empty error message.
@Test | ||
public void shouldClassifyMissingSourceTopicExceptionAsUserError() { | ||
// Given: | ||
final String serviceId = "my-service-id_"; // Defaults to "default_" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These new tests are hard to read since it's not clear to me which parts of the "givens" are relevant for the test and which are just boilerplate. Can we refactor the common parts/boilerplate into a helper method (e.g., givenTestSetupWithAclsForQuery()
) so only the relevant parts are left in each test?
|
||
final Map<String, Object> ksqlConfig = getKsqlConfig(NORMAL_USER); | ||
ksqlConfig.put(KSQL_SERVICE_ID_CONFIG, serviceId); | ||
ksqlConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_BETA); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this cause a transaction ID authorization exception? (Can we add a quick comment so readers know that's the purpose of setting this config in this test?)
…rizationClassifierTest.java Co-authored-by: Victoria Xia <victoria.f.xia281@gmail.com>
Why |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! nice integration tests :)
Description
If we receive a
AuthorizationException
it indicates as configuration issue, thus, we should classify it as a user error.Testing done
Added corresponding unit tests.
Reviewer checklist