Skip to content

Commit

Permalink
feat: perform permission checks for KSQL service context
Browse files Browse the repository at this point in the history
This provides better error messages when both the user and the
KSQL service contexts do not have authorization to access the
topics.
  • Loading branch information
spena committed Aug 25, 2019
1 parent b252d53 commit 2279f5e
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public static KsqlAuthorizationValidator create(
final String enabled = ksqlConfig.getString(KsqlConfig.KSQL_ENABLE_TOPIC_ACCESS_VALIDATOR);
if (enabled.equals(KsqlConfig.KSQL_ACCESS_VALIDATOR_ON)) {
LOG.info("Forcing topic access validator");
return new KsqlAuthorizationValidatorImpl();
return new KsqlAuthorizationValidatorImpl(serviceContext);
} else if (enabled.equals(KsqlConfig.KSQL_ACCESS_VALIDATOR_OFF)) {
return DUMMY_VALIDATOR;
}
Expand All @@ -52,7 +52,7 @@ public static KsqlAuthorizationValidator create(
if (isKafkaAuthorizerEnabled(adminClient)) {
if (KafkaClusterUtil.isAuthorizedOperationsSupported(adminClient)) {
LOG.info("KSQL topic authorization checks enabled.");
return new KsqlAuthorizationValidatorImpl();
return new KsqlAuthorizationValidatorImpl(serviceContext);
}

LOG.warn("The Kafka broker has an authorization service enabled, but the Kafka "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.parser.tree.CreateAsSelect;
import io.confluent.ksql.parser.tree.CreateSource;
import io.confluent.ksql.parser.tree.InsertInto;
import io.confluent.ksql.parser.tree.PrintTopic;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.topic.SourceTopicsExtractor;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlServerException;
import java.util.Collections;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.kafka.common.acl.AclOperation;

/**
Expand All @@ -37,78 +41,86 @@
* This validator only works on Kakfa 2.3 or later.
*/
public class KsqlAuthorizationValidatorImpl implements KsqlAuthorizationValidator {
private final ServiceContext serverContext;

public KsqlAuthorizationValidatorImpl(final ServiceContext serverContext) {
this.serverContext = serverContext;
}

@Override
public void checkAuthorization(
final ServiceContext serviceContext,
final ServiceContext userContext,
final MetaStore metaStore,
final Statement statement
) {
if (statement instanceof Query) {
validateQuery(serviceContext, metaStore, (Query)statement);
validateQuery((Query)statement, userContext, metaStore);
} else if (statement instanceof InsertInto) {
validateInsertInto(serviceContext, metaStore, (InsertInto)statement);
validateInsertInto((InsertInto)statement, userContext, metaStore);
} else if (statement instanceof CreateAsSelect) {
validateCreateAsSelect(serviceContext, metaStore, (CreateAsSelect)statement);
validateCreateAsSelect((CreateAsSelect)statement, userContext, metaStore);
} else if (statement instanceof PrintTopic) {
validatePrintTopic(serviceContext, (PrintTopic)statement);
validatePrintTopic((PrintTopic) statement, userContext);
} else if (statement instanceof CreateSource) {
validateCreateSource((CreateSource)statement, userContext);
}
}

private void validateQuery(
final ServiceContext serviceContext,
final MetaStore metaStore,
final Query query
public void validateQuery(
final Query query,
final ServiceContext userContext,
final MetaStore metaStore
) {
final SourceTopicsExtractor extractor = new SourceTopicsExtractor(metaStore);
extractor.process(query, null);
for (String kafkaTopic : extractor.getSourceTopics()) {
checkAccess(serviceContext, kafkaTopic, AclOperation.READ);
}
final Set<String> sourceTopics = getSourceTopics(query, metaStore);
checkRead(sourceTopics, userContext);
}

private void validateCreateSource(
final CreateSource createSource,
final ServiceContext userContext
) {
final String sourceTopic = createSource.getProperties().getKafkaTopic();

checkRead(Collections.singleton(sourceTopic), userContext);
maybeFailServerValidation(
(ctx) -> checkRead(Collections.singleton(sourceTopic), ctx)
);
}

private void validateCreateAsSelect(
final ServiceContext serviceContext,
final MetaStore metaStore,
final CreateAsSelect createAsSelect
final CreateAsSelect createAsSelect,
final ServiceContext userContext,
final MetaStore metaStore
) {
/*
* Check topic access for CREATE STREAM/TABLE AS SELECT statements.
*
* Validates Write on the target topic if exists, and Read on the query sources topics.
*
* The Create access is validated by the TopicCreateInjector which will attempt to create
* the target topic using the same ServiceContext used for validation.
*/

validateQuery(serviceContext, metaStore, createAsSelect.getQuery());

// At this point, the topic should have been created by the TopicCreateInjector
final String kafkaTopic = getCreateAsSelectSinkTopic(metaStore, createAsSelect);
checkAccess(serviceContext, kafkaTopic, AclOperation.WRITE);
validateQuery(createAsSelect.getQuery(), userContext, metaStore);
maybeFailServerValidation(
(ctx) -> validateQuery(createAsSelect.getQuery(), ctx, metaStore)
);
}

private void validateInsertInto(
final ServiceContext serviceContext,
final MetaStore metaStore,
final InsertInto insertInto
final InsertInto insertInto,
final ServiceContext userContext,
final MetaStore metaStore
) {
/*
* Check topic access for INSERT INTO statements.
*
* Validates Write on the target topic, and Read on the query sources topics.
*/

validateQuery(serviceContext, metaStore, insertInto.getQuery());

final String kafkaTopic = getSourceTopicName(metaStore, insertInto.getTarget().getSuffix());
checkAccess(serviceContext, kafkaTopic, AclOperation.WRITE);
validateQuery(insertInto.getQuery(), userContext, metaStore);
maybeFailServerValidation(
(ctx) -> validateQuery(insertInto.getQuery(), ctx, metaStore)
);

final String sinkTopic = getSourceTopicName(metaStore, insertInto.getTarget().getSuffix());
checkWrite(Collections.singleton(sinkTopic), userContext);
maybeFailServerValidation(
(ctx) -> checkWrite(Collections.singleton(sinkTopic), ctx)
);
}

private void validatePrintTopic(
final ServiceContext serviceContext,
final PrintTopic printTopic
final PrintTopic printTopic,
final ServiceContext userContext
) {
checkAccess(serviceContext, printTopic.getTopic().toString(), AclOperation.READ);
final String sourceTopic = printTopic.getTopic().toString();
checkRead(Collections.singleton(sourceTopic), userContext);
}

private String getSourceTopicName(final MetaStore metaStore, final String streamOrTable) {
Expand All @@ -121,31 +133,50 @@ private String getSourceTopicName(final MetaStore metaStore, final String stream
return dataSource.getKafkaTopicName();
}

/**
* Checks if the ServiceContext has access to the topic with the specified AclOperation.
*/
private void checkAccess(
final ServiceContext serviceContext,
final String topicName,
final AclOperation operation
private Set<String> getSourceTopics(final Query query, final MetaStore metaStore) {
final SourceTopicsExtractor extractor = new SourceTopicsExtractor(metaStore);
extractor.process(query, null);
return extractor.getSourceTopics();
}

private void maybeFailServerValidation(
final Consumer<ServiceContext> consumer
) {
final Set<AclOperation> authorizedOperations = serviceContext.getTopicClient()
.describeTopic(topicName).authorizedOperations();

// Kakfa 2.2 or lower do not support authorizedOperations(). In case of running on a
// unsupported broker version, then the authorizeOperation will be null.
if (authorizedOperations != null && !authorizedOperations.contains(operation)) {
// This error message is similar to what Kafka throws when it cannot access the topic
// due to an authorization error. I used this message to keep a consistent message.
throw new KsqlTopicAuthorizationException(operation, Collections.singleton(topicName));
try {
consumer.accept(serverContext);
} catch (final KsqlTopicAuthorizationException e) {
throw new KsqlServerException(String.format(
String.format("The KSQL service principal is not authorized to execute the command.")
), e);
} catch (final Exception e) {
throw new KsqlServerException(String.format(
String.format("The KSQL service principal failed to validate the command.")
), e);
}
}

private String getCreateAsSelectSinkTopic(
final MetaStore metaStore,
final CreateAsSelect createAsSelect
) {
return createAsSelect.getProperties().getKafkaTopic()
.orElseGet(() -> getSourceTopicName(metaStore, createAsSelect.getName().getSuffix()));
private void checkRead(final Set<String> topics, final ServiceContext serviceContext) {
checkAccess(AclOperation.READ, topics, serviceContext.getTopicClient());
}

public void checkWrite(final Set<String> topics, final ServiceContext serviceContext) {
checkAccess(AclOperation.WRITE, topics, serviceContext.getTopicClient());
}

private void checkAccess(
final AclOperation operation,
final Set<String> topics,
final KafkaTopicClient topicClient
) {
topics.forEach(topic -> {
final Set<AclOperation> authorizedOperations = topicClient.describeTopic(topic)
.authorizedOperations();

// Kakfa 2.2 or lower do not support authorizedOperations(). In case of running on a
// unsupported broker version, then the authorizeOperation will be null.
if (authorizedOperations != null && !authorizedOperations.contains(operation)) {
throw new KsqlTopicAuthorizationException(operation, Collections.singleton(topic));
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void setUp() {
metaStore = new MetaStoreImpl(new InternalFunctionRegistry());
ksqlEngine = KsqlEngineTestUtil.createKsqlEngine(serviceContext, metaStore);

authorizationValidator = new KsqlAuthorizationValidatorImpl();
authorizationValidator = new KsqlAuthorizationValidatorImpl(serviceContext);
when(serviceContext.getTopicClient()).thenReturn(kafkaTopicClient);

givenTopic(TOPIC_NAME_1, TOPIC_1);
Expand Down Expand Up @@ -310,26 +310,6 @@ public void shouldCreateAsSelectExistingTopicWithWritePermissionsAllowed() {
// Above command should not throw any exception
}

@Test
public void shouldThrowWhenCreateAsSelectExistingStreamWithoutWritePermissionsDenied() {
// Given:
givenTopicPermissions(TOPIC_1, Collections.singleton(AclOperation.READ));
givenTopicPermissions(TOPIC_2, Collections.singleton(AclOperation.READ));
final Statement statement = givenStatement(String.format(
"CREATE STREAM %s AS SELECT * FROM %s;", STREAM_TOPIC_2, STREAM_TOPIC_1)
);

// Then:
expectedException.expect(KsqlTopicAuthorizationException.class);
expectedException.expectMessage(String.format(
"Authorization denied to Write on topic(s): [%s]", TOPIC_2.name()
));


// When:
authorizationValidator.checkAuthorization(serviceContext, metaStore, statement);
}

@Test
public void shouldCreateAsSelectWithTopicAndWritePermissionsAllowed() {
// Given:
Expand Down

0 comments on commit 2279f5e

Please sign in to comment.