Skip to content

Commit

Permalink
feat: classify authorization exception as user error (#7061)
Browse files Browse the repository at this point in the history
Co-authored-by: Victoria Xia <victoria.f.xia281@gmail.com>
  • Loading branch information
mjsax and vcrfxia committed Feb 25, 2021
1 parent dcfdb0d commit a74b77c
Show file tree
Hide file tree
Showing 5 changed files with 331 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.query;

import io.confluent.ksql.query.QueryError.Type;
import java.util.Objects;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.streams.errors.StreamsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* {@code AuthorizationClassifier} classifies missing ACLs as user error
*/
public class AuthorizationClassifier implements QueryErrorClassifier {

private static final Logger LOG =
LoggerFactory.getLogger(AuthorizationClassifier.class);

private final String queryId;

public AuthorizationClassifier(final String queryId) {
this.queryId = Objects.requireNonNull(queryId, "queryId");
}

@Override
public Type classify(final Throwable e) {
final Type type =
e instanceof AuthorizationException
|| e instanceof StreamsException
&& e.getCause() instanceof AuthorizationException
? Type.USER
: Type.UNKNOWN;

if (type == Type.USER) {
LOG.info(
"Classified error as USER error based on missing access rights."
+ " Query ID: {} Exception: {}",
queryId,
e);
}

return type;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import org.slf4j.LoggerFactory;

/**
* {@code MissingTopicClassifier} classifies errors by querying the broker
* to check that all topics that the query relies on being accessible exist
* and are accessible
* {@code MissingTopicClassifier} classifies missing source topic exceptions as user error
*/
public class MissingTopicClassifier implements QueryErrorClassifier {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,11 @@ public PersistentQueryMetadata buildPersistentQuery(
applicationId
));

final QueryErrorClassifier topicClassifier = new MissingTopicClassifier(applicationId);
final QueryErrorClassifier userErrorClassifiers = new MissingTopicClassifier(applicationId)
.and(new AuthorizationClassifier(applicationId));
final QueryErrorClassifier classifier = buildConfiguredClassifiers(ksqlConfig, applicationId)
.map(topicClassifier::and)
.orElse(topicClassifier);
.map(userErrorClassifiers::and)
.orElse(userErrorClassifiers);

return new PersistentQueryMetadata(
statementText,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import static org.apache.kafka.common.resource.ResourceType.CLUSTER;
import static org.apache.kafka.common.resource.ResourceType.GROUP;
import static org.apache.kafka.common.resource.ResourceType.TOPIC;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;

Expand All @@ -44,6 +46,8 @@
import io.confluent.ksql.engine.KsqlEngineTestUtil;
import io.confluent.ksql.function.InternalFunctionRegistry;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.query.QueryError;
import io.confluent.ksql.query.QueryError.Type;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.query.id.SequentialQueryIdGenerator;
import io.confluent.ksql.services.DisabledKsqlClient;
Expand All @@ -60,6 +64,8 @@
import io.confluent.ksql.util.OrderDataProvider;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -74,8 +80,13 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.MissingSourceTopicException;
import org.apache.kafka.streams.errors.StreamsException;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
Expand All @@ -98,6 +109,8 @@ public class SecureIntegrationTest {
private static final Credentials SUPER_USER = VALID_USER1;
private static final Credentials NORMAL_USER = VALID_USER2;
private static final AtomicInteger COUNTER = new AtomicInteger(0);
private static final String SERVICE_ID = "my-service-id_";
private static final String QUERY_ID_PREFIX = "_confluent-ksql-" + SERVICE_ID;

public static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness
.builder()
Expand Down Expand Up @@ -210,37 +223,136 @@ public void shouldRunQueryAgainstKafkaClusterOverSaslSsl() {
@Test
public void shouldWorkWithMinimalPrefixedAcls() {
// Given:
final String serviceId = "my-service-id_"; // Defaults to "default_"
final String prefix = "_confluent-ksql-" + serviceId;

final Map<String, Object> ksqlConfig = getKsqlConfig(NORMAL_USER);
ksqlConfig.put(KSQL_SERVICE_ID_CONFIG, serviceId);
ksqlConfig.put(KSQL_SERVICE_ID_CONFIG, SERVICE_ID);

givenTestSetupWithAclsForQuery();
givenAllowAcl(NORMAL_USER,
resource(CLUSTER, "kafka-cluster"),
ops(DESCRIBE_CONFIGS));
resource(CLUSTER, "kafka-cluster"),
ops(DESCRIBE_CONFIGS));
givenTestSetupWithConfig(ksqlConfig);

givenAllowAcl(NORMAL_USER,
resource(TOPIC, INPUT_TOPIC),
ops(READ));
// Then:
assertCanRunRepartitioningKsqlQuery();
assertCanAccessClusterConfig();
}

givenAllowAcl(NORMAL_USER,
resource(TOPIC, outputTopic),
ops(CREATE /* as the topic doesn't exist yet*/, WRITE));
@Test
public void shouldClassifyMissingSourceTopicExceptionAsUserError() {
// Given:
final Map<String, Object> ksqlConfig = getKsqlConfig(NORMAL_USER);
ksqlConfig.put(KSQL_SERVICE_ID_CONFIG, SERVICE_ID);

givenAllowAcl(NORMAL_USER,
prefixedResource(TOPIC, prefix),
ops(ALL));
givenTestSetupWithAclsForQuery();
givenTestSetupWithConfig(ksqlConfig);

givenAllowAcl(NORMAL_USER,
prefixedResource(GROUP, prefix),
ops(ALL));
// When:
topicClient.deleteTopics(Collections.singleton(INPUT_TOPIC));
assertThatEventually(
"Wait for async topic deleting",
() -> topicClient.isTopicExists(outputTopic),
is(false)
);

// Then:
assertQueryFailsWithUserError(
String.format(
"CREATE STREAM %s AS SELECT * FROM %s;",
outputTopic,
INPUT_STREAM
),
String.format(
"%s: One or more source topics were missing during rebalance",
MissingSourceTopicException.class.getName()
)
);
}

@Test
public void shouldClassifyTopicAuthorizationExceptionAsUserError() {
// Given:
final Map<String, Object> ksqlConfig = getKsqlConfig(NORMAL_USER);
ksqlConfig.put(KSQL_SERVICE_ID_CONFIG, SERVICE_ID);

givenTestSetupWithAclsForQuery();
givenTestSetupWithConfig(ksqlConfig);

// When:
TEST_HARNESS.getKafkaCluster().addUserAcl(
NORMAL_USER.username,
AclPermissionType.DENY,
resource(TOPIC, INPUT_TOPIC),
ops(READ)
);

// Then:
assertCanRunRepartitioningKsqlQuery();
assertCanAccessClusterConfig(prefix);
assertQueryFailsWithUserError(
String.format(
"CREATE STREAM %s AS SELECT * FROM %s;",
outputTopic,
INPUT_STREAM
),
String.format(
"%s: Not authorized to access topics: [%s]",
TopicAuthorizationException.class.getName(),
INPUT_TOPIC
)
);
}

@Test
public void shouldClassifyGroupAuthorizationExceptionAsUserError() {
// Given:
final Map<String, Object> ksqlConfig = getKsqlConfig(NORMAL_USER);
ksqlConfig.put(KSQL_SERVICE_ID_CONFIG, SERVICE_ID);

givenTestSetupWithAclsForQuery();
givenTestSetupWithConfig(ksqlConfig);

TEST_HARNESS.getKafkaCluster().addUserAcl(
NORMAL_USER.username,
AclPermissionType.DENY,
prefixedResource(GROUP, QUERY_ID_PREFIX),
ops(ALL)
);

// Then:
assertQueryFailsWithUserError(
String.format(
"CREATE STREAM %s AS SELECT * FROM %s;",
outputTopic,
INPUT_STREAM
),
String.format(
"%s: Not authorized to access group: %squery_",
GroupAuthorizationException.class.getName(),
QUERY_ID_PREFIX
) + "%s"
);
}

@Test
public void shouldClassifyTransactionIdAuthorizationExceptionAsUserError() {
// Given:
final Map<String, Object> ksqlConfig = getKsqlConfig(NORMAL_USER);
ksqlConfig.put(KSQL_SERVICE_ID_CONFIG, SERVICE_ID);
ksqlConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_BETA);

givenTestSetupWithAclsForQuery(); // does not authorize TX, but we enabled EOS above
givenTestSetupWithConfig(ksqlConfig);

// Then:
assertQueryFailsWithUserError(
String.format(
"CREATE STREAM %s AS SELECT * FROM %s;",
outputTopic,
INPUT_STREAM
),
String.format(
"%s: Error encountered trying to initialize transactions [stream-thread [Time-limited test]]",
StreamsException.class.getName()
)
);
}

// Requires correctly configured schema-registry running
Expand Down Expand Up @@ -268,6 +380,24 @@ public void shouldRunQueryAgainstSecureSchemaRegistry() {
}
}

private void givenTestSetupWithAclsForQuery() {
givenAllowAcl(NORMAL_USER,
resource(TOPIC, INPUT_TOPIC),
ops(READ));

givenAllowAcl(NORMAL_USER,
resource(TOPIC, outputTopic),
ops(CREATE /* as the topic doesn't exist yet*/, WRITE));

givenAllowAcl(NORMAL_USER,
prefixedResource(TOPIC, QUERY_ID_PREFIX),
ops(ALL));

givenAllowAcl(NORMAL_USER,
prefixedResource(GROUP, QUERY_ID_PREFIX),
ops(ALL));
}

private static void givenAllowAcl(final Credentials credentials,
final ResourcePattern resource,
final Set<AclOperation> ops) {
Expand All @@ -289,21 +419,26 @@ private void givenTestSetupWithConfig(final Map<String, Object> ksqlConfigs) {
}

private void assertCanRunSimpleKsqlQuery() {
assertCanRunKsqlQuery("CREATE STREAM %s AS SELECT * FROM %s;",
outputTopic, INPUT_STREAM);
assertCanRunKsqlQuery(
"CREATE STREAM %s AS SELECT * FROM %s;",
outputTopic,
INPUT_STREAM
);
}

private void assertCanRunRepartitioningKsqlQuery() {
assertCanRunKsqlQuery("CREATE TABLE %s AS SELECT itemid, count(*) "
+ "FROM %s WINDOW TUMBLING (size 5 second) GROUP BY itemid;",
outputTopic, INPUT_STREAM);
assertCanRunKsqlQuery(
"CREATE TABLE %s AS SELECT itemid, count(*) FROM %s GROUP BY itemid;",
outputTopic,
INPUT_STREAM
);
}

private void assertCanAccessClusterConfig(final String resourcePrefix) {
private void assertCanAccessClusterConfig() {
// Creating topic with default replicas causes topic client to query cluster config to get
// default replica count:
serviceContext.getTopicClient()
.createTopic(resourcePrefix + "-foo", 1, TopicProperties.DEFAULT_REPLICAS);
.createTopic(QUERY_ID_PREFIX + "-foo", 1, TopicProperties.DEFAULT_REPLICAS);
}

private void assertCanRunKsqlQuery(
Expand All @@ -321,6 +456,29 @@ private void assertCanRunKsqlQuery(
TEST_HARNESS.verifyAvailableRecords(outputTopic, greaterThan(0));
}

private void assertQueryFailsWithUserError(
final String query,
final String errorMsg
) {
final QueryMetadata queryMetadata = KsqlEngineTestUtil
.execute(serviceContext, ksqlEngine, query, ksqlConfig, Collections.emptyMap()).get(0);

queryMetadata.start();
assertThatEventually(
"Wait for query to fail",
() -> queryMetadata.getQueryErrors().size() > 0,
is(true)
);

for (final QueryError error : queryMetadata.getQueryErrors()) {
assertThat(error.getType(), is(Type.USER));
assertThat(
error.getErrorMessage().split("\n")[0],
is(String.format(errorMsg, queryMetadata.getQueryId()))
);
}
}

private static Map<String, Object> getBaseKsqlConfig() {
final Map<String, Object> configs = new HashMap<>(KsqlConfigTestUtil.baseTestConfig());
configs.put(
Expand Down Expand Up @@ -382,6 +540,6 @@ private void executePersistentQuery(final String queryString,
.execute(serviceContext, ksqlEngine, query, ksqlConfig, Collections.emptyMap()).get(0);

queryMetadata.start();
queryId = ((PersistentQueryMetadata) queryMetadata).getQueryId();
queryId = queryMetadata.getQueryId();
}
}

0 comments on commit a74b77c

Please sign in to comment.