Skip to content

Commit

Permalink
fix: classify SR missing subject and access rights query errors as US…
Browse files Browse the repository at this point in the history
…ER errors (#9072)
  • Loading branch information
spena committed Apr 29, 2022
1 parent 2843f00 commit 90a609d
Show file tree
Hide file tree
Showing 5 changed files with 239 additions and 1 deletion.
@@ -0,0 +1,49 @@
/*
* Copyright 2022 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.query;

import io.confluent.ksql.query.QueryError.Type;
import io.confluent.ksql.schema.registry.SchemaRegistryUtil;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* {@code MissingSubjectClassifier} classifies missing SR subjects exceptions as user error
*/
public class MissingSubjectClassifier implements QueryErrorClassifier {
private static final Logger LOG = LoggerFactory.getLogger(MissingSubjectClassifier.class);

private final String queryId;

public MissingSubjectClassifier(final String queryId) {
this.queryId = Objects.requireNonNull(queryId, "queryId");
}

@Override
public Type classify(final Throwable e) {
final Type type = SchemaRegistryUtil.isSubjectNotFoundErrorCode(e) ? Type.USER : Type.UNKNOWN;

if (type == Type.USER) {
LOG.info(
"Classified error as USER error based on missing SR subject. Query ID: {} Exception: {}",
queryId,
e);
}

return type;
}
}
Expand Up @@ -641,7 +641,9 @@ private QueryErrorClassifier getConfiguredQueryErrorClassifier(
final String applicationId
) {
final QueryErrorClassifier userErrorClassifiers = new MissingTopicClassifier(applicationId)
.and(new AuthorizationClassifier(applicationId));
.and(new AuthorizationClassifier(applicationId))
.and(new MissingSubjectClassifier(applicationId))
.and(new SchemaAuthorizationClassifier(applicationId));
return buildConfiguredClassifiers(ksqlConfig, applicationId)
.map(userErrorClassifiers::and)
.orElse(userErrorClassifiers);
Expand Down
@@ -0,0 +1,51 @@
/*
* Copyright 2022 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.query;

import io.confluent.ksql.query.QueryError.Type;
import io.confluent.ksql.schema.registry.SchemaRegistryUtil;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* {@code SchemaAuthorizationClassifier} classifies authorization SR subjects exceptions
* as user error
*/
public class SchemaAuthorizationClassifier implements QueryErrorClassifier {
private static final Logger LOG = LoggerFactory.getLogger(SchemaAuthorizationClassifier.class);

private final String queryId;

public SchemaAuthorizationClassifier(final String queryId) {
this.queryId = Objects.requireNonNull(queryId, "queryId");
}

@Override
public Type classify(final Throwable e) {
final Type type = SchemaRegistryUtil.isAuthErrorCode(e) ? Type.USER : Type.UNKNOWN;

if (type == Type.USER) {
LOG.info(
"Classified error as USER error based on missing SR subject access rights. "
+ "Query ID: {} Exception: {}",
queryId,
e);
}

return type;
}
}
@@ -0,0 +1,61 @@
/*
* Copyright 2022 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.query;

import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import org.apache.kafka.streams.errors.MissingSourceTopicException;
import org.junit.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

public class MissingSubjectClassifierTest {
@Test
public void shouldClassifyMissingSubjectAsUserError() {
// Given:
final Exception e = new RestClientException("foo", 404, 40401);

// When:
final QueryError.Type type = new MissingSubjectClassifier("").classify(e);

// Then:
assertThat(type, is(QueryError.Type.USER));
}

@Test
public void shouldClassifyNoMissingSubjectAsUnknownErrorCode() {
// Given:
final Exception e = new RestClientException("foo", 401, 40101);

// When:
final QueryError.Type type = new MissingSubjectClassifier("").classify(e);

// Then:
assertThat(type, is(QueryError.Type.UNKNOWN));
}

@Test
public void shouldClassifyOtherExceptionAsUnknownException() {
// Given:
final Exception e = new Exception("foo");

// When:
final QueryError.Type type = new MissingSubjectClassifier("").classify(e);

// Then:
assertThat(type, is(QueryError.Type.UNKNOWN));
}
}
@@ -0,0 +1,75 @@
/*
* Copyright 2022 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.query;

import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.streams.errors.StreamsException;
import org.junit.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

public class SchemaAuthorizationClassifierTest {
@Test
public void shouldClassifySRAuthorizationErrorCodeAsUserError() {
// Given:
final Exception e = new RestClientException("foo", 403, 40301);

// When:
final QueryError.Type type = new SchemaAuthorizationClassifier("").classify(e);

// Then:
assertThat(type, is(QueryError.Type.USER));
}

@Test
public void shouldClassifySRAuthenticationErrorCodeAsUserError() {
// Given:
final Exception e = new RestClientException("foo", 401, 403101);

// When:
final QueryError.Type type = new SchemaAuthorizationClassifier("").classify(e);

// Then:
assertThat(type, is(QueryError.Type.USER));
}

@Test
public void shouldClassifyNoAuthErrorSubjectAsUnknownErrorCode() {
// Given:
final Exception e = new RestClientException("foo", 404, 40401);

// When:
final QueryError.Type type = new SchemaAuthorizationClassifier("").classify(e);

// Then:
assertThat(type, is(QueryError.Type.UNKNOWN));
}

@Test
public void shouldClassifyOtherExceptionAsUnknownException() {
// Given:
final Exception e = new Exception("foo");

// When:
final QueryError.Type type = new SchemaAuthorizationClassifier("").classify(e);

// Then:
assertThat(type, is(QueryError.Type.UNKNOWN));
}
}

0 comments on commit 90a609d

Please sign in to comment.