Skip to content

Commit

Permalink
feat: Add REST and Websocket authorization hooks and interface (#3000)
Browse files Browse the repository at this point in the history
  • Loading branch information
spena committed Jun 27, 2019
1 parent 39ce7f4 commit 39af991
Show file tree
Hide file tree
Showing 15 changed files with 535 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.confluent.ksql.rest.server.computation.CommandStore;
import io.confluent.ksql.rest.server.computation.StatementExecutor;
import io.confluent.ksql.rest.server.context.KsqlRestServiceContextBinder;
import io.confluent.ksql.rest.server.filters.KsqlAuthorizationFilter;
import io.confluent.ksql.rest.server.resources.KsqlExceptionMapper;
import io.confluent.ksql.rest.server.resources.KsqlResource;
import io.confluent.ksql.rest.server.resources.RootDocument;
Expand Down Expand Up @@ -330,8 +331,13 @@ public void configureBaseApplication(
config.property(ServerProperties.OUTBOUND_CONTENT_LENGTH_BUFFER, 0);
config.property(ServerProperties.WADL_FEATURE_DISABLE, true);

// Registers the REST security extensions
securityExtension.register(config, ksqlConfig);
// Controls the access to all REST endpoints
securityExtension.getAuthorizationProvider().ifPresent(
ac -> config.register(new KsqlAuthorizationFilter(ac))
);

// Registers any other security filters (i.e. user context impersonation)
securityExtension.register(config);
}

@Override
Expand Down Expand Up @@ -531,10 +537,14 @@ private void registerCommandTopic() {
}

private static KsqlSecurityExtension loadSecurityExtension(final KsqlConfig ksqlConfig) {
return Optional.ofNullable(ksqlConfig.getConfiguredInstance(
KsqlConfig.KSQL_SECURITY_EXTENSION_CLASS,
KsqlSecurityExtension.class
)).orElse(new KsqlDefaultSecurityExtension());
final KsqlSecurityExtension securityExtension = Optional.ofNullable(
ksqlConfig.getConfiguredInstance(
KsqlConfig.KSQL_SECURITY_EXTENSION_CLASS,
KsqlSecurityExtension.class
)).orElse(new KsqlDefaultSecurityExtension());

securityExtension.initialize(ksqlConfig);
return securityExtension;
}

private void displayWelcomeMessage() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.server.filters;

import io.confluent.ksql.rest.server.resources.Errors;
import io.confluent.ksql.rest.server.security.KsqlAuthorizationProvider;
import java.security.Principal;
import javax.annotation.Priority;
import javax.ws.rs.Priorities;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerRequestFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Authorization filter for REST endpoints.
*/
@Priority(Priorities.AUTHORIZATION)
public class KsqlAuthorizationFilter implements ContainerRequestFilter {
private static final Logger log = LoggerFactory.getLogger(KsqlAuthorizationFilter.class);

private final KsqlAuthorizationProvider authorizationProvider;

public KsqlAuthorizationFilter(final KsqlAuthorizationProvider authorizationProvider) {
this.authorizationProvider = authorizationProvider;
}

@Override
public void filter(final ContainerRequestContext requestContext) {
final Principal user = requestContext.getSecurityContext().getUserPrincipal();
final String method = requestContext.getMethod(); // i.e GET, POST
final String path = "/" + requestContext.getUriInfo().getPath();

try {
authorizationProvider.checkEndpointAccess(user, method, path);
} catch (final Throwable t) {
log.warn(String.format("User:%s is denied access to \"%s %s\"", user, method, path), t);
requestContext.abortWith(Errors.accessDenied(t.getMessage()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ public static int toErrorCode(final int statusCode) {
return statusCode * HTTP_TO_ERROR_CODE_MULTIPLIER;
}

public static Response accessDenied(final String msg) {
return Response
.status(FORBIDDEN)
.entity(new KsqlErrorMessage(ERROR_CODE_FORBIDDEN, msg))
.build();
}

public static Response badRequest(final String msg) {
return Response
.status(BAD_REQUEST)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ public void onOpen(final Session session, final EndpointConfig unused) {
log.debug("Opening websocket session {}", session.getId());

try {
// Check if the user has authorization to access this Websocket endpoint
checkEndpointAuthorization(session.getUserPrincipal(), QUERY_ENDPOINT_METHOD_NAME);
// Check if the user has authorization to open a WS session
checkAuthorization(session);

validateVersion(session);

Expand Down Expand Up @@ -264,15 +264,22 @@ public void onError(final Session session, final Throwable t) {
SessionUtil.closeSilently(session, CloseCodes.UNEXPECTED_CONDITION, t.getMessage());
}

private void checkEndpointAuthorization(final Principal userPrincipal, final String methodName) {
final Class<?> className = this.getClass();

if (!securityExtension.getAuthorizer().hasAccess(userPrincipal, className, methodName)) {
final String userName = (userPrincipal != null) ? userPrincipal.getName() : null;
throw new KsqlException(
String.format("User:%s is denied to access this cluster.", userName)
);
}
private void checkAuthorization(final Session session) {
final String method = "POST";
final String path = this.getClass().getAnnotation(ServerEndpoint.class).value();
final Principal user = session.getUserPrincipal();

securityExtension.getAuthorizationProvider().ifPresent(
provider -> {
try {
provider.checkEndpointAccess(user, method, path);
} catch (final Throwable t) {
log.warn(String.format("User:%s is denied access to Websocket "
+ "%s endpoint", user, path), t);
throw new KsqlException(t);
}
}
);
}

private void validateVersion(final Session session) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.server.security;

import java.security.Principal;

/**
* Interface that provides authorization to KSQL.
*/
public interface KsqlAuthorizationProvider {
/**
* Checks if the user is authorized to access the endpoint.
*
* @param user The user who is requesting access to the endpoint
* @param method The endpoint method used, i.e. POST, GET, DELETE
* @param path The endpoint path to access, i.e. "/ksql", "/ksql/terminate", "/query"*
*/
void checkEndpointAccess(Principal user, String method, String path);
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.confluent.ksql.util.KsqlConfig;
import java.security.Principal;
import java.util.Collections;
import java.util.Optional;
import java.util.function.Supplier;
import javax.ws.rs.core.Configurable;
import org.apache.kafka.streams.KafkaClientSupplier;
Expand All @@ -33,14 +34,17 @@ public class KsqlDefaultSecurityExtension implements KsqlSecurityExtension {
private KsqlConfig ksqlConfig;

@Override
public KsqlAuthorizer getAuthorizer() {
// Return a dummy authorizer that authorizes any request
return (user, resourceClass, resourceMethod) -> true;
public void initialize(final KsqlConfig ksqlConfig) {
this.ksqlConfig = ksqlConfig;
}

@Override
public void register(final Configurable<?> configurable, final KsqlConfig ksqlConfig) {
this.ksqlConfig = ksqlConfig;
public Optional<KsqlAuthorizationProvider> getAuthorizationProvider() {
return Optional.empty();
}

@Override
public void register(final Configurable<?> configurable) {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.security.Principal;
import java.util.Optional;
import java.util.function.Supplier;
import javax.ws.rs.core.Configurable;
import org.apache.kafka.streams.KafkaClientSupplier;
Expand All @@ -33,21 +34,32 @@
*/
public interface KsqlSecurityExtension extends AutoCloseable {
/**
* @return The {@code KsqlAuthorizer} used to authorize access to KSQL resources.
* Initializes any implementation-specific resources for the security extension.
*
* @param ksqlConfig The KSQL configuration object
*/
void initialize(KsqlConfig ksqlConfig);

/**
* Returns the authorization provider used to verify access permissions on KSQL resources.
* </p>
* If no authorization is required/enabled for KSQL, then an {@code Optional.empty()} object must
* be returned.
*
* @return The (Optional) provider used for authorization requests.
*/
KsqlAuthorizer getAuthorizer();
Optional<KsqlAuthorizationProvider> getAuthorizationProvider();

/**
* Registers the security extension.
* Registers other security extension filters.
* </p>
* A {@link Configurable} is passed so that the extension can register REST filters to
* secure KSQL REST endpoints (i.e. Authorization filters).
* A {@link Configurable} is passed so that the extension can register other REST filters to
* to the KSQL REST endpoints (i.e. Impersonation context).
*
* @param configurable The {@link Configurable} object where to register the security plugins.
* @param ksqlConfig The KSQL configuration containing security required configs.
* @throws KsqlException If an error occurs while registering the REST security plugin.
*/
void register(Configurable<?> configurable, KsqlConfig ksqlConfig);
void register(Configurable<?> configurable);

/**
* Constructs a {@link org.apache.kafka.streams.KafkaClientSupplier} with the user's credentials.
Expand Down
Loading

0 comments on commit 39af991

Please sign in to comment.