Skip to content

Commit

Permalink
fix: Internal Server Error for /healthcheck endpoint in RBAC-enabled (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
spena committed Oct 26, 2020
1 parent 68fb7af commit ebee5ec
Show file tree
Hide file tree
Showing 14 changed files with 338 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static io.confluent.ksql.rest.Errors.ERROR_CODE_UNAUTHORIZED;
import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;

import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.api.server.KsqlApiException;
import io.confluent.ksql.api.server.Server;
import io.confluent.ksql.rest.server.KsqlRestConfig;
Expand All @@ -40,6 +41,8 @@
* Handler that calls any authentication plugin
*/
public class AuthenticationPluginHandler implements Handler<RoutingContext> {
public static final Set<String> KSQL_AUTHENTICATION_SKIP_PATHS = ImmutableSet
.of("/v1/metadata", "/v1/metadata/id", "/healthcheck");

private final Server server;
private final AuthenticationPlugin securityHandlerPlugin;
Expand All @@ -50,10 +53,9 @@ public AuthenticationPluginHandler(final Server server,
final AuthenticationPlugin securityHandlerPlugin) {
this.server = Objects.requireNonNull(server);
this.securityHandlerPlugin = Objects.requireNonNull(securityHandlerPlugin);
// We add in all the paths that don't require authorization from
// We add in all the paths that don't require authentication/authorization from
// KsqlAuthorizationProviderHandler
final Set<String> unauthenticatedPaths = new HashSet<>(
KsqlAuthorizationProviderHandler.PATHS_WITHOUT_AUTHORIZATION);
final Set<String> unauthenticatedPaths = new HashSet<>(KSQL_AUTHENTICATION_SKIP_PATHS);
// And then we add anything from the property authentication.skip.paths
// This preserves the behaviour from the previous Jetty based implementation
final List<String> unauthed = server.getConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,22 @@

package io.confluent.ksql.api.auth;

import static io.confluent.ksql.api.auth.AuthenticationPluginHandler.KSQL_AUTHENTICATION_SKIP_PATHS;
import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;

import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.security.KsqlAuthorizationProvider;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.WorkerExecutor;
import io.vertx.ext.auth.User;
import io.vertx.ext.web.RoutingContext;
import java.util.Set;

/**
* Handler that calls a KsqlAuthorizationProvider plugin that can be used for custom authorization
*/
public class KsqlAuthorizationProviderHandler implements Handler<RoutingContext> {

public static final Set<String> PATHS_WITHOUT_AUTHORIZATION = ImmutableSet
.of("/v1/metadata", "/v1/metadata/id", "/healthcheck");

private final WorkerExecutor workerExecutor;
private final KsqlAuthorizationProvider ksqlAuthorizationProvider;

Expand All @@ -49,7 +45,7 @@ public void handle(final RoutingContext routingContext) {

final String path = routingContext.normalisedPath();

if (PATHS_WITHOUT_AUTHORIZATION.contains(path)) {
if (KSQL_AUTHENTICATION_SKIP_PATHS.contains(path)) {
routingContext.next();
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,18 @@ public KsqlSecurityContext provide(final ApiSecurityContext apiSecurityContext)
final Optional<Principal> principal = apiSecurityContext.getPrincipal();
final Optional<String> authHeader = apiSecurityContext.getAuthToken();

if (securityExtension == null || !securityExtension.getUserContextProvider().isPresent()) {
// A user context is not necessary if a user context provider is not present or the user
// principal is missing. If a failed authentication attempt results in a missing principle,
// then the authentication plugin will have already failed the connection before calling
// this method. Therefore, if we've reached this method with a missing principle, then this
// must be a valid connection that does not require authentication.
// For these cases, we create a default service context that the missing user can use.
final boolean requiresUserContext =
securityExtension != null
&& securityExtension.getUserContextProvider().isPresent()
&& principal.isPresent();

if (!requiresUserContext) {
return new KsqlSecurityContext(
principal,
defaultServiceContextFactory.create(ksqlConfig, authHeader, schemaRegistryClientFactory,
Expand All @@ -71,8 +82,8 @@ public KsqlSecurityContext provide(final ApiSecurityContext apiSecurityContext)
userServiceContextFactory.create(
ksqlConfig,
authHeader,
provider.getKafkaClientSupplier(principal.orElse(null)),
provider.getSchemaRegistryClientFactory(principal.orElse(null)),
provider.getKafkaClientSupplier(principal.get()),
provider.getSchemaRegistryClientFactory(principal.get()),
sharedClient)))
.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static java.util.Objects.requireNonNull;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
Expand All @@ -28,7 +27,6 @@
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.ServiceInfo;
import io.confluent.ksql.api.auth.AuthenticationPlugin;
import io.confluent.ksql.api.auth.KsqlAuthorizationProviderHandler;
import io.confluent.ksql.api.impl.DefaultKsqlSecurityContextProvider;
import io.confluent.ksql.api.impl.KsqlSecurityContextProvider;
import io.confluent.ksql.api.impl.MonitoredEndpoints;
Expand Down Expand Up @@ -117,13 +115,11 @@
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -739,7 +735,7 @@ static KsqlRestApplication buildApplication(
serviceContext,
ksqlEngine,
ksqlConfig,
injectPathsWithoutAuthentication(restConfig),
restConfig,
commandRunner,
commandStore,
statusResource,
Expand Down Expand Up @@ -1009,22 +1005,6 @@ private static DropwizardMetricsOptions setUpHttpMetrics(final KsqlConfig ksqlCo
return metricsOptions;
}

private static KsqlRestConfig injectPathsWithoutAuthentication(final KsqlRestConfig restConfig) {
final Set<String> authenticationSkipPaths = new HashSet<>(
restConfig.getList(KsqlRestConfig.AUTHENTICATION_SKIP_PATHS_CONFIG)
);

authenticationSkipPaths.addAll(KsqlAuthorizationProviderHandler.PATHS_WITHOUT_AUTHORIZATION);

final Map<String, Object> restConfigs = restConfig.getOriginals();

// REST paths that are public and do not require authentication
restConfigs.put(KsqlRestConfig.AUTHENTICATION_SKIP_PATHS_CONFIG,
Joiner.on(",").join(authenticationSkipPaths));

return new KsqlRestConfig(restConfigs);
}

@VisibleForTesting
static Map<String, String> toClientProps(final Map<String, Object> config) {
final Map<String, String> clientProps = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ private CompletableFuture<EndpointResponse> executeOldApiEndpointOnWorker(
private CompletableFuture<EndpointResponse> executeOldApiEndpoint(
final ApiSecurityContext apiSecurityContext,
final Function<KsqlSecurityContext, EndpointResponse> functionCall) {

final KsqlSecurityContext ksqlSecurityContext = ksqlSecurityContextProvider
.provide(apiSecurityContext);

Expand Down
63 changes: 49 additions & 14 deletions ksqldb-rest-app/src/test/java/io/confluent/ksql/api/ApiTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.confluent.ksql.api.utils.SendStream;
import io.confluent.ksql.parser.exception.ParseFailedException;
import io.confluent.ksql.rest.entity.PushQueryId;
import io.confluent.ksql.util.AppInfo;
import io.confluent.ksql.util.VertxCompletableFuture;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonArray;
Expand All @@ -53,6 +54,40 @@ public class ApiTest extends BaseApiTest {

protected static final List<JsonObject> DEFAULT_INSERT_ROWS = generateInsertRows();

@Test
@CoreApiTest
public void shouldExecuteInfoRquest() throws Exception {
// When
HttpResponse<Buffer> response = sendGetRequest("/info");

// Then
assertThat(response.statusCode(), is(200));
assertThat(response.statusMessage(), is("OK"));
QueryResponse queryResponse = new QueryResponse(response.bodyAsString());
assertThat(queryResponse.responseObject.getJsonObject("KsqlServerInfo").getString("version"),
is(AppInfo.getVersion()));
assertThat(queryResponse.responseObject.getJsonObject("KsqlServerInfo").getString("kafkaClusterId"),
is("kafka-cluster-id"));
assertThat(queryResponse.responseObject.getJsonObject("KsqlServerInfo").getString("ksqlServiceId"),
is("ksql-service-id"));
}

@Test
@CoreApiTest
public void shouldExecuteServerMetadataIdRequest() throws Exception {
// When
HttpResponse<Buffer> response = sendGetRequest("/v1/metadata/id");

// Then
assertThat(response.statusCode(), is(200));
assertThat(response.statusMessage(), is("OK"));
QueryResponse queryResponse = new QueryResponse(response.bodyAsString());
assertThat(queryResponse.responseObject.getJsonObject("scope").getJsonObject("clusters")
.getString("kafka-cluster"), is("kafka-cluster-id"));
assertThat(queryResponse.responseObject.getJsonObject("scope").getJsonObject("clusters")
.getString("ksql-cluster"), is("ksql-service-id"));
}

@Test
@CoreApiTest
public void shouldExecutePullQuery() throws Exception {
Expand All @@ -63,7 +98,7 @@ public void shouldExecutePullQuery() throws Exception {
requestBody.put("properties", properties);

// When
HttpResponse<Buffer> response = sendRequest("/query-stream", requestBody.toBuffer());
HttpResponse<Buffer> response = sendPostRequest("/query-stream", requestBody.toBuffer());

// Then
assertThat(response.statusCode(), is(200));
Expand Down Expand Up @@ -220,7 +255,7 @@ public void shouldHandleQueryWithMissingSql() throws Exception {
JsonObject requestBody = new JsonObject().put("foo", "bar");

// When
HttpResponse<Buffer> response = sendRequest("/query-stream", requestBody.toBuffer());
HttpResponse<Buffer> response = sendPostRequest("/query-stream", requestBody.toBuffer());

// Then
assertThat(response.statusCode(), is(400));
Expand All @@ -238,7 +273,7 @@ public void shouldHandleErrorInProcessingQuery() throws Exception {
testEndpoints.setRowsBeforePublisherError(DEFAULT_JSON_ROWS.size() - 1);

// When
HttpResponse<Buffer> response = sendRequest("/query-stream",
HttpResponse<Buffer> response = sendPostRequest("/query-stream",
DEFAULT_PUSH_QUERY_REQUEST_BODY.toBuffer());

// Then
Expand Down Expand Up @@ -285,7 +320,7 @@ public void shouldCloseQuery() throws Exception {

VertxCompletableFuture<HttpResponse<Void>> responseFuture = new VertxCompletableFuture<>();
// Make the request to stream a query
sendRequest("/query-stream", (request) ->
sendPostRequest("/query-stream", (request) ->
request
.as(BodyCodec.pipe(writeStream))
.sendJsonObject(DEFAULT_PUSH_QUERY_REQUEST_BODY, responseFuture)
Expand Down Expand Up @@ -314,7 +349,7 @@ public void shouldCloseQuery() throws Exception {

// Now send another request to close the query
JsonObject closeQueryRequestBody = new JsonObject().put("queryId", queryId);
HttpResponse<Buffer> closeQueryResponse = sendRequest("/close-query",
HttpResponse<Buffer> closeQueryResponse = sendPostRequest("/close-query",
closeQueryRequestBody.toBuffer());
assertThat(closeQueryResponse.statusCode(), is(200));

Expand All @@ -335,7 +370,7 @@ public void shouldHandleMissingQueryIDInCloseQuery() throws Exception {
JsonObject closeQueryRequestBody = new JsonObject().put("foo", "bar");

// When
HttpResponse<Buffer> response = sendRequest("/close-query",
HttpResponse<Buffer> response = sendPostRequest("/close-query",
closeQueryRequestBody.toBuffer());

// Then
Expand All @@ -355,7 +390,7 @@ public void shouldHandleUnknownQueryIDInCloseQuery() throws Exception {
JsonObject closeQueryRequestBody = new JsonObject().put("queryId", "xyzfasgf");

// When
HttpResponse<Buffer> response = sendRequest("/close-query",
HttpResponse<Buffer> response = sendPostRequest("/close-query",
closeQueryRequestBody.toBuffer());

// Then
Expand All @@ -380,7 +415,7 @@ public void shouldInsertWithAcksStream() throws Exception {
}

// When
HttpResponse<Buffer> response = sendRequest("/inserts-stream", requestBody);
HttpResponse<Buffer> response = sendPostRequest("/inserts-stream", requestBody);

// Then
assertThat(response.statusCode(), is(200));
Expand Down Expand Up @@ -411,7 +446,7 @@ public void shouldStreamInserts() throws Exception {
// When

// Make an HTTP request but keep the request body and response streams open
sendRequest("/inserts-stream", (request) ->
sendPostRequest("/inserts-stream", (request) ->
request
.as(BodyCodec.pipe(writeStream))
.sendStream(readStream, fut)
Expand Down Expand Up @@ -464,7 +499,7 @@ public void shouldHandleMissingTargetInInserts() throws Exception {
JsonObject requestBody = new JsonObject();

// When
HttpResponse<Buffer> response = sendRequest("/inserts-stream",
HttpResponse<Buffer> response = sendPostRequest("/inserts-stream",
requestBody.toBuffer().appendString("\n"));

// Then
Expand Down Expand Up @@ -494,7 +529,7 @@ public void shouldHandleErrorInProcessingInserts() throws Exception {

// When

HttpResponse<Buffer> response = sendRequest("/inserts-stream", requestBody);
HttpResponse<Buffer> response = sendPostRequest("/inserts-stream", requestBody);

// Then

Expand Down Expand Up @@ -533,7 +568,7 @@ public void shouldHandleMalformedJsonInInsertsStream() throws Exception {

// When

HttpResponse<Buffer> response = sendRequest("/inserts-stream", requestBody);
HttpResponse<Buffer> response = sendPostRequest("/inserts-stream", requestBody);

// Then

Expand Down Expand Up @@ -758,7 +793,7 @@ private void shouldRejectInvalidQuery(final String query) throws Exception {
JsonObject requestBody = new JsonObject().put("sql", query);

// When
HttpResponse<Buffer> response = sendRequest("/query-stream",
HttpResponse<Buffer> response = sendPostRequest("/query-stream",
requestBody.toBuffer());

// Then
Expand All @@ -778,7 +813,7 @@ private void shouldRejectWhenInternalErrorInProcessingQuery(final String query)
JsonObject requestBody = new JsonObject().put("sql", query);

// When
HttpResponse<Buffer> response = sendRequest("/query-stream",
HttpResponse<Buffer> response = sendPostRequest("/query-stream",
requestBody.toBuffer());

// Then
Expand Down
Loading

0 comments on commit ebee5ec

Please sign in to comment.