Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add REST and Websocket authorization hooks and interface #3000

Merged
merged 7 commits into from
Jun 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.confluent.ksql.rest.server.computation.CommandStore;
import io.confluent.ksql.rest.server.computation.StatementExecutor;
import io.confluent.ksql.rest.server.context.KsqlRestServiceContextBinder;
import io.confluent.ksql.rest.server.filters.KsqlAuthorizationFilter;
import io.confluent.ksql.rest.server.resources.KsqlExceptionMapper;
import io.confluent.ksql.rest.server.resources.KsqlResource;
import io.confluent.ksql.rest.server.resources.RootDocument;
Expand Down Expand Up @@ -330,8 +331,13 @@ public void configureBaseApplication(
config.property(ServerProperties.OUTBOUND_CONTENT_LENGTH_BUFFER, 0);
config.property(ServerProperties.WADL_FEATURE_DISABLE, true);

// Registers the REST security extensions
securityExtension.register(config, ksqlConfig);
// Controls the access to all REST endpoints
securityExtension.getAuthorizationProvider().ifPresent(
ac -> config.register(new KsqlAuthorizationFilter(ac))
);

// Registers any other security filters (i.e. user context impersonation)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this comment isn't really helpful, and is a bit confusing (since the impersonation stuff is really a detail of the plugin)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. I was actually going to remove this call in another PR. My plan is to wrap the impersonation implementation into a call like extension.getUserContext(principal), and make that call from the REST and WS endpoints to get the Kafka and SR clients for that user.

This PR is only authorization so I didn't want to add more functionality.

securityExtension.register(config);
}

@Override
Expand Down Expand Up @@ -531,10 +537,14 @@ private void registerCommandTopic() {
}

private static KsqlSecurityExtension loadSecurityExtension(final KsqlConfig ksqlConfig) {
return Optional.ofNullable(ksqlConfig.getConfiguredInstance(
KsqlConfig.KSQL_SECURITY_EXTENSION_CLASS,
KsqlSecurityExtension.class
)).orElse(new KsqlDefaultSecurityExtension());
final KsqlSecurityExtension securityExtension = Optional.ofNullable(
ksqlConfig.getConfiguredInstance(
KsqlConfig.KSQL_SECURITY_EXTENSION_CLASS,
KsqlSecurityExtension.class
)).orElse(new KsqlDefaultSecurityExtension());

securityExtension.initialize(ksqlConfig);
return securityExtension;
}

private void displayWelcomeMessage() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.server.filters;

import io.confluent.ksql.rest.server.resources.Errors;
import io.confluent.ksql.rest.server.security.KsqlAuthorizationProvider;
import java.security.Principal;
import javax.annotation.Priority;
import javax.ws.rs.Priorities;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerRequestFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Authorization filter for REST endpoints.
*/
@Priority(Priorities.AUTHORIZATION)
public class KsqlAuthorizationFilter implements ContainerRequestFilter {
private static final Logger log = LoggerFactory.getLogger(KsqlAuthorizationFilter.class);

private final KsqlAuthorizationProvider authorizationProvider;

public KsqlAuthorizationFilter(final KsqlAuthorizationProvider authorizationProvider) {
this.authorizationProvider = authorizationProvider;
}

@Override
public void filter(final ContainerRequestContext requestContext) {
final Principal user = requestContext.getSecurityContext().getUserPrincipal();
final String method = requestContext.getMethod(); // i.e GET, POST
final String path = "/" + requestContext.getUriInfo().getPath();

try {
authorizationProvider.checkEndpointAccess(user, method, path);
} catch (final Throwable t) {
log.warn(String.format("User:%s is denied access to \"%s %s\"", user, method, path), t);
requestContext.abortWith(Errors.accessDenied(t.getMessage()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ public static int toErrorCode(final int statusCode) {
return statusCode * HTTP_TO_ERROR_CODE_MULTIPLIER;
}

public static Response accessDenied(final String msg) {
return Response
.status(FORBIDDEN)
.entity(new KsqlErrorMessage(ERROR_CODE_FORBIDDEN, msg))
.build();
}

public static Response badRequest(final String msg) {
return Response
.status(BAD_REQUEST)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ public void onOpen(final Session session, final EndpointConfig unused) {
log.debug("Opening websocket session {}", session.getId());

try {
// Check if the user has authorization to access this Websocket endpoint
checkEndpointAuthorization(session.getUserPrincipal(), QUERY_ENDPOINT_METHOD_NAME);
// Check if the user has authorization to open a WS session
checkAuthorization(session);

validateVersion(session);

Expand Down Expand Up @@ -264,15 +264,22 @@ public void onError(final Session session, final Throwable t) {
SessionUtil.closeSilently(session, CloseCodes.UNEXPECTED_CONDITION, t.getMessage());
}

private void checkEndpointAuthorization(final Principal userPrincipal, final String methodName) {
final Class<?> className = this.getClass();

if (!securityExtension.getAuthorizer().hasAccess(userPrincipal, className, methodName)) {
final String userName = (userPrincipal != null) ? userPrincipal.getName() : null;
throw new KsqlException(
String.format("User:%s is denied to access this cluster.", userName)
);
}
private void checkAuthorization(final Session session) {
final String method = "POST";
final String path = this.getClass().getAnnotation(ServerEndpoint.class).value();
final Principal user = session.getUserPrincipal();

securityExtension.getAuthorizationProvider().ifPresent(
provider -> {
try {
provider.checkEndpointAccess(user, method, path);
} catch (final Throwable t) {
log.warn(String.format("User:%s is denied access to Websocket "
+ "%s endpoint", user, path), t);
throw new KsqlException(t);
}
}
);
}

private void validateVersion(final Session session) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.server.security;

import java.security.Principal;

/**
* Interface that provides authorization to KSQL.
*/
public interface KsqlAuthorizationProvider {
/**
* Checks if the user is authorized to access the endpoint.
*
* @param user The user who is requesting access to the endpoint
* @param method The endpoint method used, i.e. POST, GET, DELETE
* @param path The endpoint path to access, i.e. "/ksql", "/ksql/terminate", "/query"*
*/
void checkEndpointAccess(Principal user, String method, String path);
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.confluent.ksql.util.KsqlConfig;
import java.security.Principal;
import java.util.Collections;
import java.util.Optional;
import java.util.function.Supplier;
import javax.ws.rs.core.Configurable;
import org.apache.kafka.streams.KafkaClientSupplier;
Expand All @@ -33,14 +34,17 @@ public class KsqlDefaultSecurityExtension implements KsqlSecurityExtension {
private KsqlConfig ksqlConfig;

@Override
public KsqlAuthorizer getAuthorizer() {
// Return a dummy authorizer that authorizes any request
return (user, resourceClass, resourceMethod) -> true;
public void initialize(final KsqlConfig ksqlConfig) {
this.ksqlConfig = ksqlConfig;
}

@Override
public void register(final Configurable<?> configurable, final KsqlConfig ksqlConfig) {
this.ksqlConfig = ksqlConfig;
public Optional<KsqlAuthorizationProvider> getAuthorizationProvider() {
return Optional.empty();
}

@Override
public void register(final Configurable<?> configurable) {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.security.Principal;
import java.util.Optional;
import java.util.function.Supplier;
import javax.ws.rs.core.Configurable;
import org.apache.kafka.streams.KafkaClientSupplier;
Expand All @@ -33,21 +34,32 @@
*/
public interface KsqlSecurityExtension extends AutoCloseable {
/**
* @return The {@code KsqlAuthorizer} used to authorize access to KSQL resources.
* Initializes any implementation-specific resources for the security extension.
*
* @param ksqlConfig The KSQL configuration object
*/
void initialize(KsqlConfig ksqlConfig);

/**
* Returns the authorization provider used to verify access permissions on KSQL resources.
* </p>
* If no authorization is required/enabled for KSQL, then an {@code Optional.empty()} object must
* be returned.
*
* @return The (Optional) provider used for authorization requests.
*/
KsqlAuthorizer getAuthorizer();
Optional<KsqlAuthorizationProvider> getAuthorizationProvider();

/**
* Registers the security extension.
* Registers other security extension filters.
* </p>
* A {@link Configurable} is passed so that the extension can register REST filters to
* secure KSQL REST endpoints (i.e. Authorization filters).
* A {@link Configurable} is passed so that the extension can register other REST filters to
* to the KSQL REST endpoints (i.e. Impersonation context).
*
* @param configurable The {@link Configurable} object where to register the security plugins.
* @param ksqlConfig The KSQL configuration containing security required configs.
* @throws KsqlException If an error occurs while registering the REST security plugin.
*/
void register(Configurable<?> configurable, KsqlConfig ksqlConfig);
void register(Configurable<?> configurable);

/**
* Constructs a {@link org.apache.kafka.streams.KafkaClientSupplier} with the user's credentials.
Expand Down
Loading