Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: classify authorization exception as user error #7061

Merged
merged 11 commits into from
Feb 25, 2021
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.query;

import io.confluent.ksql.query.QueryError.Type;
import java.util.Objects;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* {@code GroupAuthorizationClassifier} classifies missing consumer group ACLs as user error
*/
public class GroupAuthorizationClassifier implements QueryErrorClassifier {

private static final Logger LOG = LoggerFactory.getLogger(GroupAuthorizationClassifier.class);

private final String queryId;

public GroupAuthorizationClassifier(final String queryId) {
this.queryId = Objects.requireNonNull(queryId, "queryId");
}

@Override
public Type classify(final Throwable e) {
final Type type = e instanceof GroupAuthorizationException ? Type.USER : Type.UNKNOWN;

if (type == Type.USER) {
LOG.info(
"Classified error as USER error based on missing consumer groups access rights."
+ " Query ID: {} Exception: {}",
queryId,
e);
}

return type;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import org.slf4j.LoggerFactory;

/**
* {@code MissingTopicClassifier} classifies errors by querying the broker
* to check that all topics that the query relies on being accessible exist
* and are accessible
* {@code MissingTopicClassifier} classifies missing source topic exceptions as user error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to this PR but I'm curious: what happens if an internal (changelog or repartition) topic is missing, or if a sink topic is missing? Does Streams throw a different type of exception in these cases?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Atm, for changelogs and repartition topic they would be created. We would fail if the expected config does not match. (this will "change" with https://cwiki.apache.org/confluence/display/KAFKA/KIP-698%3A+Add+Explicit+User+Initialization+of+Broker-side+State+to+Kafka+Streams) -- Or course, it's only verified in a rebalance, but checking the source topic is also only done during a rebalance.

For output topics, Kafka Streams won't do anything, and thus the producer would fill up its write buffer and eventually block. To eventually Streams would crash.

But as ksqlDB checks for output topic, it should be ok?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For output topics, Kafka Streams won't do anything, and thus the producer would fill up its write buffer and eventually block. To eventually Streams would crash.

this is true for internal topics too right? It seems like right now for internal topics that are deleted we'd block for max.block.ms and then throw an error classified as SYSTEM. Then, retry and recreate. After KIP-698 we'd block and then throw an error we will classify as USER on every retry.

But as ksqlDB checks for output topic, it should be ok?

Where do we check?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is true for internal topics too right?

Yes. Not sure how the error would be classified atm or after KIP-698. But it might be out-of-scope for this PR. Would like to focus on authorization errors for now.

Where do we check?

Doesn't kslqDB create output topics explicitly if they don't exist?

*/
public class MissingTopicClassifier implements QueryErrorClassifier {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,13 @@ public PersistentQueryMetadata buildPersistentQuery(
applicationId
));

final QueryErrorClassifier topicClassifier = new MissingTopicClassifier(applicationId);
final QueryErrorClassifier userErrorClassifiers = new MissingTopicClassifier(applicationId)
.and(new TopicAuthorizationClassifier(applicationId))
.and(new GroupAuthorizationClassifier(applicationId))
.and(new TransactionAuthorizationClassifier(applicationId));
final QueryErrorClassifier classifier = buildConfiguredClassifiers(ksqlConfig, applicationId)
.map(topicClassifier::and)
.orElse(topicClassifier);
.map(userErrorClassifiers::and)
.orElse(userErrorClassifiers);

return new PersistentQueryMetadata(
statementText,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.query;

import io.confluent.ksql.query.QueryError.Type;
import java.util.Objects;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* {@code TopicAuthorizationClassifier} classifies topic ACL exceptions as user error
*/
public class TopicAuthorizationClassifier implements QueryErrorClassifier {

private static final Logger LOG = LoggerFactory.getLogger(TopicAuthorizationClassifier.class);

private final String queryId;

public TopicAuthorizationClassifier(final String queryId) {
this.queryId = Objects.requireNonNull(queryId, "queryId");
}

@Override
public Type classify(final Throwable e) {
final Type type = e instanceof TopicAuthorizationException ? Type.USER : Type.UNKNOWN;

if (type == Type.USER) {
LOG.info(
"Classified error as USER error based on missing topic access rights."
+ " Query ID: {} Exception: {}",
queryId,
e);
}

return type;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.query;

import io.confluent.ksql.query.QueryError.Type;
import java.util.Objects;
import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
import org.apache.kafka.streams.errors.StreamsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* {@code TransactionAuthorizationClassifier} classifies missing cluster ACLs as user error
*/
public class TransactionAuthorizationClassifier implements QueryErrorClassifier {

private static final Logger LOG =
LoggerFactory.getLogger(TransactionAuthorizationClassifier.class);

private final String queryId;

public TransactionAuthorizationClassifier(final String queryId) {
this.queryId = Objects.requireNonNull(queryId, "queryId");
}

@Override
public Type classify(final Throwable e) {
final Type type =
e instanceof StreamsException
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang It's unclear to me, why this exception is wrapped with StreamsException while TopicAuthorizationException and GroupAuthorizationException are not?

For MissingSourceTopicException it's clear because it extends StreamsException and is thrown by KS directly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the producer should not wrap anything with StreamsException -- so why does KS wrap the one but not the other producer exception? And how would the KIP change it (it seem the KIP is for the producer but not for KS)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TopicAuthorizationException: this can be thrown from both consumer and producer, and for consumer it could throw from partitionsFor which is used in KS via global store, in that place we just do not wrap it and let it throw all the way up. For producer it may be thrown from the callback, in which we do wrap --- this is an inconsistency.

GroupAuthorizationException: only thrown from producer in latest version with KIP-447, for which we should wrap in KS; in older version it could be thrown from consumer and again we do not wrap in KS.

TransactionalIdAuthorizationException: thrown from producer, in the callback where we always wrap.

Copy link
Contributor

@rodesai rodesai Feb 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our other classifiers (e.g. the regex classifier) we walk the cause stack and try and find any exception that matches). That would be an approach to make this check more robust.

&& e.getCause() instanceof TransactionalIdAuthorizationException
? Type.USER
: Type.UNKNOWN;

if (type == Type.USER) {
LOG.info(
"Classified error as USER error based on missing transactional ID access rights."
+ " Query ID: {} Exception: {}",
queryId,
e);
}

return type;
}

}
Loading