Skip to content

Commit

Permalink
feat: add config to disable pull queries when validation is required (#…
Browse files Browse the repository at this point in the history
…3879)

fixes #3863

 - Added `ksql.query.pull.skip.access.validator` to control if pull queries work without validation
 - By default, Pull queries error out, if auth validation is needed
 - Replaced DUMMY_VALIDATOR with Optional<> interface for KsqlAuthorizationValidatorFactory
 - Fixed some tests, added test cases
 - Applied on both `query` and websocket endpoints
  • Loading branch information
vinothchandar committed Nov 18, 2019
1 parent 28b37eb commit ccc636d
Show file tree
Hide file tree
Showing 20 changed files with 193 additions and 84 deletions.
1 change: 1 addition & 0 deletions ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public class CliTest {
.builder(CLUSTER::bootstrapServers)
.withProperty(KsqlConfig.SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_MS_PROPERTY,
KsqlConstants.defaultSinkWindowChangeLogAdditionalRetention + 1)
.withProperty(KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG, true)
.build();

@ClassRule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public final class ImmutableProperties {
.add(KsqlConfig.KSQL_EXT_DIR)
.add(KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG)
.add(KsqlConfig.KSQL_QUERY_PULL_ENABLE_CONFIG)
.add(KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG)
.addAll(KsqlConfig.SSL_CONFIG_NAMES)
.build();

Expand Down
13 changes: 13 additions & 0 deletions ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,13 @@ public class KsqlConfig extends AbstractConfig {
+ "\"off\" disables the validator. If set to \"auto\", KSQL will attempt to discover "
+ "whether the Kafka cluster supports the required API, and enables the validator if "
+ "it does.";
public static final String KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG =
"ksql.query.pull.skip.access.validator";
public static final boolean KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_DEFAULT = false;
public static final String KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_DOC = "If \"true\", KSQL will "
+ " NOT enforce access validation checks for pull queries, which could expose Kafka topics"
+ " which are secured with ACLs. Please enable only after careful consideration."
+ " If \"false\", KSQL pull queries will fail against a secure Kafka cluster";

public static final String KSQL_QUERY_PULL_ENABLE_CONFIG = "ksql.query.pull.enable";
public static final String KSQL_QUERY_PULL_ENABLE_DOC =
Expand Down Expand Up @@ -604,6 +611,12 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
KSQL_QUERY_PULL_STREAMSTORE_REBALANCING_TIMEOUT_MS_DEFAULT,
Importance.LOW,
KSQL_QUERY_PULL_STREAMSTORE_REBALANCING_TIMEOUT_MS_DOC
).define(
KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG,
Type.BOOLEAN,
KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_DEFAULT,
Importance.LOW,
KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_DOC
)
.withClientSslSupport();
for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlServerException;
import java.util.Optional;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
Expand All @@ -29,37 +30,35 @@ public final class KsqlAuthorizationValidatorFactory {
private static final Logger LOG = LoggerFactory
.getLogger(KsqlAuthorizationValidatorFactory.class);
private static final String KAFKA_AUTHORIZER_CLASS_NAME = "authorizer.class.name";
private static final KsqlAuthorizationValidator DUMMY_VALIDATOR =
(sc, metastore, statement) -> { };

private KsqlAuthorizationValidatorFactory() {
}

public static KsqlAuthorizationValidator create(
public static Optional<KsqlAuthorizationValidator> create(
final KsqlConfig ksqlConfig,
final ServiceContext serviceContext
) {
final String enabled = ksqlConfig.getString(KsqlConfig.KSQL_ENABLE_TOPIC_ACCESS_VALIDATOR);
if (enabled.equals(KsqlConfig.KSQL_ACCESS_VALIDATOR_ON)) {
LOG.info("Forcing topic access validator");
return new KsqlAuthorizationValidatorImpl();
return Optional.of(new KsqlAuthorizationValidatorImpl());
} else if (enabled.equals(KsqlConfig.KSQL_ACCESS_VALIDATOR_OFF)) {
return DUMMY_VALIDATOR;
return Optional.empty();
}

final Admin adminClient = serviceContext.getAdminClient();

if (isKafkaAuthorizerEnabled(adminClient)) {
if (KafkaClusterUtil.isAuthorizedOperationsSupported(adminClient)) {
LOG.info("KSQL topic authorization checks enabled.");
return new KsqlAuthorizationValidatorImpl();
return Optional.of(new KsqlAuthorizationValidatorImpl());
}

LOG.warn("The Kafka broker has an authorization service enabled, but the Kafka "
+ "version does not support authorizedOperations(). "
+ "KSQL topic authorization checks will not be enabled.");
}
return DUMMY_VALIDATOR;
return Optional.empty();
}

private static boolean isKafkaAuthorizerEnabled(final Admin adminClient) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public DefaultServiceContext(
private DefaultServiceContext(
final KafkaClientSupplier kafkaClientSupplier,
final Supplier<Admin> adminClientSupplier,
final Function<Supplier<Admin>, KafkaTopicClient> topicClientSupplier,
final Function<Supplier<Admin>, KafkaTopicClient> topicClientProvider,
final Supplier<SchemaRegistryClient> srClientSupplier,
final Supplier<ConnectClient> connectClientSupplier,
final Supplier<SimpleKsqlClient> ksqlClientSupplier
Expand All @@ -100,7 +100,7 @@ private DefaultServiceContext(
this.kafkaClientSupplier = requireNonNull(kafkaClientSupplier, "kafkaClientSupplier");

this.topicClientSupplier = new MemoizedSupplier<>(
() -> topicClientSupplier.apply(this.adminClientSupplier));
() -> topicClientProvider.apply(this.adminClientSupplier));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verifyZeroInteractions;
Expand All @@ -31,6 +30,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
Expand Down Expand Up @@ -78,44 +78,45 @@ public void shouldReturnAuthorizationValidator() {
givenKafkaAuthorizer("an-authorizer-class", Collections.emptySet());

// When:
final KsqlAuthorizationValidator validator = KsqlAuthorizationValidatorFactory.create(
final Optional<KsqlAuthorizationValidator> validator = KsqlAuthorizationValidatorFactory.create(
ksqlConfig,
serviceContext
);

// Then
assertThat(validator, is(instanceOf(KsqlAuthorizationValidatorImpl.class)));
assertThat("validator should be present", validator.isPresent());
assertThat(validator.get(), is(instanceOf(KsqlAuthorizationValidatorImpl.class)));
}

@Test
public void shouldReturnDummyValidator() {
public void shouldReturnEmptyValidator() {
// Given:
givenKafkaAuthorizer("", Collections.emptySet());

// When:
final KsqlAuthorizationValidator validator = KsqlAuthorizationValidatorFactory.create(
final Optional<KsqlAuthorizationValidator> validator = KsqlAuthorizationValidatorFactory.create(
ksqlConfig,
serviceContext
);

// Then
assertThat(validator, not(instanceOf(KsqlAuthorizationValidatorImpl.class)));
assertThat(validator, is(Optional.empty()));
}

@Test
public void shouldReturnDummyValidatorIfNotEnabled() {
public void shouldReturnEmptyValidatorIfNotEnabled() {
// Given:
when(ksqlConfig.getString(KsqlConfig.KSQL_ENABLE_TOPIC_ACCESS_VALIDATOR))
.thenReturn(KsqlConfig.KSQL_ACCESS_VALIDATOR_OFF);

// When:
final KsqlAuthorizationValidator validator = KsqlAuthorizationValidatorFactory.create(
final Optional<KsqlAuthorizationValidator> validator = KsqlAuthorizationValidatorFactory.create(
ksqlConfig,
serviceContext
);

// Then:
assertThat(validator, not(instanceOf(KsqlAuthorizationValidatorImpl.class)));
assertThat(validator, is(Optional.empty()));
verifyZeroInteractions(adminClient);
}

Expand All @@ -126,29 +127,30 @@ public void shouldReturnAuthorizationValidatorIfEnabled() {
.thenReturn(KsqlConfig.KSQL_ACCESS_VALIDATOR_ON);

// When:
final KsqlAuthorizationValidator validator = KsqlAuthorizationValidatorFactory.create(
final Optional<KsqlAuthorizationValidator> validator = KsqlAuthorizationValidatorFactory.create(
ksqlConfig,
serviceContext
);

// Then:
assertThat(validator, instanceOf(KsqlAuthorizationValidatorImpl.class));
assertThat("validator should be present", validator.isPresent());
assertThat(validator.get(), is(instanceOf(KsqlAuthorizationValidatorImpl.class)));
verifyZeroInteractions(adminClient);
}

@Test
public void shouldReturnDummyValidatorIfAuthorizedOperationsReturnNull() {
public void shouldReturnEmptyValidatorIfAuthorizedOperationsReturnNull() {
// Given:
givenKafkaAuthorizer("an-authorizer-class", null);

// When:
final KsqlAuthorizationValidator validator = KsqlAuthorizationValidatorFactory.create(
final Optional<KsqlAuthorizationValidator> validator = KsqlAuthorizationValidatorFactory.create(
ksqlConfig,
serviceContext
);

// Then
assertThat(validator, not(instanceOf(KsqlAuthorizationValidatorImpl.class)));
assertThat(validator, is(Optional.empty()));
}

private void givenKafkaAuthorizer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class RestQueryTranslationTest {
private static final TestKsqlRestApp REST_APP = TestKsqlRestApp
.builder(TEST_HARNESS::kafkaBootstrapServers)
.withProperty(KsqlConfig.KSQL_STREAMS_PREFIX + StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
.withProperty(KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG, true)
.withStaticServiceContext(TEST_HARNESS::getServiceContext)
.build();

Expand Down Expand Up @@ -106,7 +107,7 @@ public void tearDown() {

@Test
public void shouldBuildAndExecuteQueries() {
try (RestTestExecutor testExecutor = textExecutor()) {
try (RestTestExecutor testExecutor = testExecutor()) {
testExecutor.buildAndExecuteQuery(testCase);
} catch (final AssertionError e) {
throw new AssertionError(e.getMessage()
Expand All @@ -119,7 +120,7 @@ public void shouldBuildAndExecuteQueries() {
}
}

private static RestTestExecutor textExecutor() {
private static RestTestExecutor testExecutor() {
return new RestTestExecutor(
REST_APP.getListeners().get(0),
TEST_HARNESS.getKafkaCluster(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,12 +402,17 @@ private void waitForWarmStateStores(

final ImmutableList<Response> expectedResponse = ImmutableList.of(queryResponse);
final ImmutableList<String> statements = ImmutableList.of(querySql);
final long waitMs = 10;

final long threshold = System.currentTimeMillis() + MAX_STATIC_WARMUP.toMillis();
while (System.currentTimeMillis() < threshold) {
final RestResponse<QueryStream> resp = restClient.makeQueryRequest(querySql, null);
if (resp.isErroneous()) {
Thread.yield();
try {
Thread.sleep(waitMs);
} catch (InterruptedException e) {
// ignore
}
LOG.info("Server responded with an error code to a pull query. "
+ "This could be because the materialized store is not yet warm.");
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ protected void registerWebSocketEndpoints(final ServerContainer container) {
);

final StatementParser statementParser = new StatementParser(ksqlEngine);
final KsqlAuthorizationValidator authorizationValidator =
final Optional<KsqlAuthorizationValidator> authorizationValidator =
KsqlAuthorizationValidatorFactory.create(ksqlConfigNoPort, serviceContext);

container.addEndpoint(
Expand Down Expand Up @@ -496,7 +496,7 @@ static KsqlRestApplication buildApplication(

final KsqlSecurityExtension securityExtension = loadSecurityExtension(ksqlConfig);

final KsqlAuthorizationValidator authorizationValidator =
final Optional<KsqlAuthorizationValidator> authorizationValidator =
KsqlAuthorizationValidatorFactory.create(ksqlConfig, serviceContext);

final StreamedQueryResource streamedQueryResource = new StreamedQueryResource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ public class DistributingExecutor {
private final CommandQueue commandQueue;
private final Duration distributedCmdResponseTimeout;
private final BiFunction<KsqlExecutionContext, ServiceContext, Injector> injectorFactory;
private final KsqlAuthorizationValidator authorizationValidator;
private final Optional<KsqlAuthorizationValidator> authorizationValidator;
private final RequestValidator requestValidator;

public DistributingExecutor(
final CommandQueue commandQueue,
final Duration distributedCmdResponseTimeout,
final BiFunction<KsqlExecutionContext, ServiceContext, Injector> injectorFactory,
final KsqlAuthorizationValidator authorizationValidator,
final Optional<KsqlAuthorizationValidator> authorizationValidator,
final RequestValidator requestValidator
) {
this.commandQueue = Objects.requireNonNull(commandQueue, "commandQueue");
Expand Down Expand Up @@ -154,15 +154,17 @@ private void checkAuthorization(
final MetaStore metaStore = serverExecutionContext.getMetaStore();

// Check the User will be permitted to execute this statement
authorizationValidator.checkAuthorization(userServiceContext, metaStore, statement);
authorizationValidator.ifPresent(
validator ->
validator.checkAuthorization(userServiceContext, metaStore, statement));

try {
// Check the KSQL service principal will be permitted too
authorizationValidator.checkAuthorization(
serverExecutionContext.getServiceContext(),
metaStore,
statement
);
authorizationValidator.ifPresent(
validator -> validator.checkAuthorization(
serverExecutionContext.getServiceContext(),
metaStore,
statement));
} catch (final Exception e) {
throw new KsqlServerException("The KSQL server is not permitted to execute the command", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.regex.PatternSyntaxException;
Expand Down Expand Up @@ -97,7 +98,7 @@ public class KsqlResource implements KsqlConfigurable {
private final Duration distributedCmdResponseTimeout;
private final ActivenessRegistrar activenessRegistrar;
private final BiFunction<KsqlExecutionContext, ServiceContext, Injector> injectorFactory;
private final KsqlAuthorizationValidator authorizationValidator;
private final Optional<KsqlAuthorizationValidator> authorizationValidator;
private RequestValidator validator;
private RequestHandler handler;

Expand All @@ -107,7 +108,7 @@ public KsqlResource(
final CommandQueue commandQueue,
final Duration distributedCmdResponseTimeout,
final ActivenessRegistrar activenessRegistrar,
final KsqlAuthorizationValidator authorizationValidator
final Optional<KsqlAuthorizationValidator> authorizationValidator
) {
this(
ksqlEngine,
Expand All @@ -125,7 +126,7 @@ public KsqlResource(
final Duration distributedCmdResponseTimeout,
final ActivenessRegistrar activenessRegistrar,
final BiFunction<KsqlExecutionContext, ServiceContext, Injector> injectorFactory,
final KsqlAuthorizationValidator authorizationValidator
final Optional<KsqlAuthorizationValidator> authorizationValidator
) {
this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine");
this.commandQueue = Objects.requireNonNull(commandQueue, "commandQueue");
Expand Down
Loading

0 comments on commit ccc636d

Please sign in to comment.