Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Internal Server Error for /healthcheck endpoint in RBAC-enabled #6482

Merged
merged 7 commits into from
Oct 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static io.confluent.ksql.rest.Errors.ERROR_CODE_UNAUTHORIZED;
import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;

import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.api.server.KsqlApiException;
import io.confluent.ksql.api.server.Server;
import io.confluent.ksql.rest.server.KsqlRestConfig;
Expand All @@ -40,6 +41,8 @@
* Handler that calls any authentication plugin
*/
public class AuthenticationPluginHandler implements Handler<RoutingContext> {
public static final Set<String> KSQL_AUTHENTICATION_SKIP_PATHS = ImmutableSet
.of("/v1/metadata", "/v1/metadata/id", "/healthcheck");

private final Server server;
private final AuthenticationPlugin securityHandlerPlugin;
Expand All @@ -50,10 +53,9 @@ public AuthenticationPluginHandler(final Server server,
final AuthenticationPlugin securityHandlerPlugin) {
this.server = Objects.requireNonNull(server);
this.securityHandlerPlugin = Objects.requireNonNull(securityHandlerPlugin);
// We add in all the paths that don't require authorization from
// We add in all the paths that don't require authentication/authorization from
// KsqlAuthorizationProviderHandler
final Set<String> unauthenticatedPaths = new HashSet<>(
KsqlAuthorizationProviderHandler.PATHS_WITHOUT_AUTHORIZATION);
final Set<String> unauthenticatedPaths = new HashSet<>(KSQL_AUTHENTICATION_SKIP_PATHS);
// And then we add anything from the property authentication.skip.paths
// This preserves the behaviour from the previous Jetty based implementation
final List<String> unauthed = server.getConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,22 @@

package io.confluent.ksql.api.auth;

import static io.confluent.ksql.api.auth.AuthenticationPluginHandler.KSQL_AUTHENTICATION_SKIP_PATHS;
import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;

import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.security.KsqlAuthorizationProvider;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.WorkerExecutor;
import io.vertx.ext.auth.User;
import io.vertx.ext.web.RoutingContext;
import java.util.Set;

/**
* Handler that calls a KsqlAuthorizationProvider plugin that can be used for custom authorization
*/
public class KsqlAuthorizationProviderHandler implements Handler<RoutingContext> {

public static final Set<String> PATHS_WITHOUT_AUTHORIZATION = ImmutableSet
.of("/v1/metadata", "/v1/metadata/id", "/healthcheck");

private final WorkerExecutor workerExecutor;
private final KsqlAuthorizationProvider ksqlAuthorizationProvider;

Expand All @@ -49,7 +45,7 @@ public void handle(final RoutingContext routingContext) {

final String path = routingContext.normalisedPath();

if (PATHS_WITHOUT_AUTHORIZATION.contains(path)) {
if (KSQL_AUTHENTICATION_SKIP_PATHS.contains(path)) {
routingContext.next();
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,18 @@ public KsqlSecurityContext provide(final ApiSecurityContext apiSecurityContext)
final Optional<Principal> principal = apiSecurityContext.getPrincipal();
final Optional<String> authHeader = apiSecurityContext.getAuthToken();

if (securityExtension == null || !securityExtension.getUserContextProvider().isPresent()) {
// A user context is not necessary if a user context provider is not present or the user
// principal is missing. If a failed authentication attempt results in a missing principle,
// then the authentication plugin will have already failed the connection before calling
// this method. Therefore, if we've reached this method with a missing principle, then this
// must be a valid connection that does not require authentication.
// For these cases, we create a default service context that the missing user can use.
final boolean requiresUserContext =
securityExtension != null
&& securityExtension.getUserContextProvider().isPresent()
&& principal.isPresent();

if (!requiresUserContext) {
return new KsqlSecurityContext(
principal,
defaultServiceContextFactory.create(ksqlConfig, authHeader, schemaRegistryClientFactory,
Expand All @@ -71,8 +82,8 @@ public KsqlSecurityContext provide(final ApiSecurityContext apiSecurityContext)
userServiceContextFactory.create(
ksqlConfig,
authHeader,
provider.getKafkaClientSupplier(principal.orElse(null)),
provider.getSchemaRegistryClientFactory(principal.orElse(null)),
provider.getKafkaClientSupplier(principal.get()),
provider.getSchemaRegistryClientFactory(principal.get()),
sharedClient)))
.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static java.util.Objects.requireNonNull;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
Expand All @@ -28,7 +27,6 @@
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.ServiceInfo;
import io.confluent.ksql.api.auth.AuthenticationPlugin;
import io.confluent.ksql.api.auth.KsqlAuthorizationProviderHandler;
import io.confluent.ksql.api.impl.DefaultKsqlSecurityContextProvider;
import io.confluent.ksql.api.impl.KsqlSecurityContextProvider;
import io.confluent.ksql.api.impl.MonitoredEndpoints;
Expand Down Expand Up @@ -117,13 +115,11 @@
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -739,7 +735,7 @@ static KsqlRestApplication buildApplication(
serviceContext,
ksqlEngine,
ksqlConfig,
injectPathsWithoutAuthentication(restConfig),
restConfig,
commandRunner,
commandStore,
statusResource,
Expand Down Expand Up @@ -1009,22 +1005,6 @@ private static DropwizardMetricsOptions setUpHttpMetrics(final KsqlConfig ksqlCo
return metricsOptions;
}

private static KsqlRestConfig injectPathsWithoutAuthentication(final KsqlRestConfig restConfig) {
final Set<String> authenticationSkipPaths = new HashSet<>(
restConfig.getList(KsqlRestConfig.AUTHENTICATION_SKIP_PATHS_CONFIG)
);

authenticationSkipPaths.addAll(KsqlAuthorizationProviderHandler.PATHS_WITHOUT_AUTHORIZATION);

final Map<String, Object> restConfigs = restConfig.getOriginals();

// REST paths that are public and do not require authentication
restConfigs.put(KsqlRestConfig.AUTHENTICATION_SKIP_PATHS_CONFIG,
Joiner.on(",").join(authenticationSkipPaths));

return new KsqlRestConfig(restConfigs);
}

@VisibleForTesting
static Map<String, String> toClientProps(final Map<String, Object> config) {
final Map<String, String> clientProps = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ private CompletableFuture<EndpointResponse> executeOldApiEndpointOnWorker(
private CompletableFuture<EndpointResponse> executeOldApiEndpoint(
final ApiSecurityContext apiSecurityContext,
final Function<KsqlSecurityContext, EndpointResponse> functionCall) {

final KsqlSecurityContext ksqlSecurityContext = ksqlSecurityContextProvider
.provide(apiSecurityContext);

Expand Down
63 changes: 49 additions & 14 deletions ksqldb-rest-app/src/test/java/io/confluent/ksql/api/ApiTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.confluent.ksql.api.utils.SendStream;
import io.confluent.ksql.parser.exception.ParseFailedException;
import io.confluent.ksql.rest.entity.PushQueryId;
import io.confluent.ksql.util.AppInfo;
import io.confluent.ksql.util.VertxCompletableFuture;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonArray;
Expand All @@ -53,6 +54,40 @@ public class ApiTest extends BaseApiTest {

protected static final List<JsonObject> DEFAULT_INSERT_ROWS = generateInsertRows();

@Test
@CoreApiTest
public void shouldExecuteInfoRquest() throws Exception {
// When
HttpResponse<Buffer> response = sendGetRequest("/info");

// Then
assertThat(response.statusCode(), is(200));
assertThat(response.statusMessage(), is("OK"));
QueryResponse queryResponse = new QueryResponse(response.bodyAsString());
assertThat(queryResponse.responseObject.getJsonObject("KsqlServerInfo").getString("version"),
is(AppInfo.getVersion()));
assertThat(queryResponse.responseObject.getJsonObject("KsqlServerInfo").getString("kafkaClusterId"),
is("kafka-cluster-id"));
assertThat(queryResponse.responseObject.getJsonObject("KsqlServerInfo").getString("ksqlServiceId"),
is("ksql-service-id"));
}

@Test
@CoreApiTest
public void shouldExecuteServerMetadataIdRequest() throws Exception {
// When
HttpResponse<Buffer> response = sendGetRequest("/v1/metadata/id");

// Then
assertThat(response.statusCode(), is(200));
assertThat(response.statusMessage(), is("OK"));
QueryResponse queryResponse = new QueryResponse(response.bodyAsString());
assertThat(queryResponse.responseObject.getJsonObject("scope").getJsonObject("clusters")
.getString("kafka-cluster"), is("kafka-cluster-id"));
assertThat(queryResponse.responseObject.getJsonObject("scope").getJsonObject("clusters")
.getString("ksql-cluster"), is("ksql-service-id"));
}

@Test
@CoreApiTest
public void shouldExecutePullQuery() throws Exception {
Expand All @@ -63,7 +98,7 @@ public void shouldExecutePullQuery() throws Exception {
requestBody.put("properties", properties);

// When
HttpResponse<Buffer> response = sendRequest("/query-stream", requestBody.toBuffer());
HttpResponse<Buffer> response = sendPostRequest("/query-stream", requestBody.toBuffer());

// Then
assertThat(response.statusCode(), is(200));
Expand Down Expand Up @@ -220,7 +255,7 @@ public void shouldHandleQueryWithMissingSql() throws Exception {
JsonObject requestBody = new JsonObject().put("foo", "bar");

// When
HttpResponse<Buffer> response = sendRequest("/query-stream", requestBody.toBuffer());
HttpResponse<Buffer> response = sendPostRequest("/query-stream", requestBody.toBuffer());

// Then
assertThat(response.statusCode(), is(400));
Expand All @@ -238,7 +273,7 @@ public void shouldHandleErrorInProcessingQuery() throws Exception {
testEndpoints.setRowsBeforePublisherError(DEFAULT_JSON_ROWS.size() - 1);

// When
HttpResponse<Buffer> response = sendRequest("/query-stream",
HttpResponse<Buffer> response = sendPostRequest("/query-stream",
DEFAULT_PUSH_QUERY_REQUEST_BODY.toBuffer());

// Then
Expand Down Expand Up @@ -285,7 +320,7 @@ public void shouldCloseQuery() throws Exception {

VertxCompletableFuture<HttpResponse<Void>> responseFuture = new VertxCompletableFuture<>();
// Make the request to stream a query
sendRequest("/query-stream", (request) ->
sendPostRequest("/query-stream", (request) ->
request
.as(BodyCodec.pipe(writeStream))
.sendJsonObject(DEFAULT_PUSH_QUERY_REQUEST_BODY, responseFuture)
Expand Down Expand Up @@ -314,7 +349,7 @@ public void shouldCloseQuery() throws Exception {

// Now send another request to close the query
JsonObject closeQueryRequestBody = new JsonObject().put("queryId", queryId);
HttpResponse<Buffer> closeQueryResponse = sendRequest("/close-query",
HttpResponse<Buffer> closeQueryResponse = sendPostRequest("/close-query",
closeQueryRequestBody.toBuffer());
assertThat(closeQueryResponse.statusCode(), is(200));

Expand All @@ -335,7 +370,7 @@ public void shouldHandleMissingQueryIDInCloseQuery() throws Exception {
JsonObject closeQueryRequestBody = new JsonObject().put("foo", "bar");

// When
HttpResponse<Buffer> response = sendRequest("/close-query",
HttpResponse<Buffer> response = sendPostRequest("/close-query",
closeQueryRequestBody.toBuffer());

// Then
Expand All @@ -355,7 +390,7 @@ public void shouldHandleUnknownQueryIDInCloseQuery() throws Exception {
JsonObject closeQueryRequestBody = new JsonObject().put("queryId", "xyzfasgf");

// When
HttpResponse<Buffer> response = sendRequest("/close-query",
HttpResponse<Buffer> response = sendPostRequest("/close-query",
closeQueryRequestBody.toBuffer());

// Then
Expand All @@ -380,7 +415,7 @@ public void shouldInsertWithAcksStream() throws Exception {
}

// When
HttpResponse<Buffer> response = sendRequest("/inserts-stream", requestBody);
HttpResponse<Buffer> response = sendPostRequest("/inserts-stream", requestBody);

// Then
assertThat(response.statusCode(), is(200));
Expand Down Expand Up @@ -411,7 +446,7 @@ public void shouldStreamInserts() throws Exception {
// When

// Make an HTTP request but keep the request body and response streams open
sendRequest("/inserts-stream", (request) ->
sendPostRequest("/inserts-stream", (request) ->
request
.as(BodyCodec.pipe(writeStream))
.sendStream(readStream, fut)
Expand Down Expand Up @@ -464,7 +499,7 @@ public void shouldHandleMissingTargetInInserts() throws Exception {
JsonObject requestBody = new JsonObject();

// When
HttpResponse<Buffer> response = sendRequest("/inserts-stream",
HttpResponse<Buffer> response = sendPostRequest("/inserts-stream",
requestBody.toBuffer().appendString("\n"));

// Then
Expand Down Expand Up @@ -494,7 +529,7 @@ public void shouldHandleErrorInProcessingInserts() throws Exception {

// When

HttpResponse<Buffer> response = sendRequest("/inserts-stream", requestBody);
HttpResponse<Buffer> response = sendPostRequest("/inserts-stream", requestBody);

// Then

Expand Down Expand Up @@ -533,7 +568,7 @@ public void shouldHandleMalformedJsonInInsertsStream() throws Exception {

// When

HttpResponse<Buffer> response = sendRequest("/inserts-stream", requestBody);
HttpResponse<Buffer> response = sendPostRequest("/inserts-stream", requestBody);

// Then

Expand Down Expand Up @@ -758,7 +793,7 @@ private void shouldRejectInvalidQuery(final String query) throws Exception {
JsonObject requestBody = new JsonObject().put("sql", query);

// When
HttpResponse<Buffer> response = sendRequest("/query-stream",
HttpResponse<Buffer> response = sendPostRequest("/query-stream",
requestBody.toBuffer());

// Then
Expand All @@ -778,7 +813,7 @@ private void shouldRejectWhenInternalErrorInProcessingQuery(final String query)
JsonObject requestBody = new JsonObject().put("sql", query);

// When
HttpResponse<Buffer> response = sendRequest("/query-stream",
HttpResponse<Buffer> response = sendPostRequest("/query-stream",
requestBody.toBuffer());

// Then
Expand Down
Loading