Skip to content

Commit

Permalink
feat: add REST /metadata path to display KSQL server information (rep…
Browse files Browse the repository at this point in the history
…laces /info) (#3313)

This new path (/metadata) will replace the /info path in the next KSQL major release. It is added to have a standard API between Confluent components that show information about the current cluster.
  • Loading branch information
spena committed Sep 19, 2019
1 parent 730c913 commit 8be29b9
Show file tree
Hide file tree
Showing 12 changed files with 576 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.DescribeClusterOptions;
Expand All @@ -33,6 +35,8 @@
public final class KafkaClusterUtil {
private static final Logger LOG = LoggerFactory.getLogger(KafkaClusterUtil.class);

private static final long DESCRIBE_CLUSTER_TIMEOUT_SECONDS = 30;

private KafkaClusterUtil() {

}
Expand Down Expand Up @@ -76,4 +80,15 @@ public static Config getConfig(final Admin adminClient) {
throw new KsqlServerException("Could not get Kafka cluster configuration!", e);
}
}

public static String getKafkaClusterId(final ServiceContext serviceContext) {
try {
return serviceContext.getAdminClient()
.describeCluster()
.clusterId()
.get(DESCRIBE_CLUSTER_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (final Exception e) {
throw new RuntimeException("Failed to get Kafka cluster information", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.fasterxml.jackson.jaxrs.base.JsonParseExceptionMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
Expand Down Expand Up @@ -50,6 +51,7 @@
import io.confluent.ksql.rest.server.resources.KsqlResource;
import io.confluent.ksql.rest.server.resources.RootDocument;
import io.confluent.ksql.rest.server.resources.ServerInfoResource;
import io.confluent.ksql.rest.server.resources.ServerMetadataResource;
import io.confluent.ksql.rest.server.resources.StatusResource;
import io.confluent.ksql.rest.server.resources.streaming.StreamedQueryResource;
import io.confluent.ksql.rest.server.resources.streaming.WSQueryEndpoint;
Expand All @@ -75,6 +77,7 @@
import io.confluent.ksql.version.metrics.VersionCheckerAgent;
import io.confluent.ksql.version.metrics.collector.KsqlModuleType;
import io.confluent.rest.Application;
import io.confluent.rest.RestConfig;
import io.confluent.rest.validation.JacksonMessageBodyProvider;
import java.io.Console;
import java.io.OutputStreamWriter;
Expand All @@ -90,6 +93,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.function.BiFunction;
import java.util.function.Consumer;
Expand Down Expand Up @@ -167,6 +171,7 @@ public static String getCommandsStreamName() {
final Consumer<KsqlConfig> rocksDBConfigSetterHandler
) {
super(config);

this.serviceContext = requireNonNull(serviceContext, "serviceContext");
this.ksqlConfigNoPort = requireNonNull(ksqlConfig, "ksqlConfig");
this.ksqlEngine = requireNonNull(ksqlEngine, "ksqlEngine");
Expand All @@ -193,6 +198,7 @@ public static String getCommandsStreamName() {
public void setupResources(final Configurable<?> config, final KsqlRestConfig appConfig) {
config.register(rootDocument);
config.register(new ServerInfoResource(serviceContext, ksqlConfigNoPort));
config.register(ServerMetadataResource.create(serviceContext, ksqlConfigNoPort));
config.register(statusResource);
config.register(ksqlResource);
config.register(streamedQueryResource);
Expand Down Expand Up @@ -540,7 +546,7 @@ static KsqlRestApplication buildApplication(
serviceContext,
ksqlEngine,
ksqlConfig,
restConfig,
injectPathsWithoutAuthentication(restConfig),
commandRunner,
commandStore,
rootDocument,
Expand Down Expand Up @@ -660,4 +666,20 @@ private KsqlConfig buildConfigWithPort() {

return new KsqlConfig(props);
}

private static KsqlRestConfig injectPathsWithoutAuthentication(final KsqlRestConfig restConfig) {
final Set<String> authenticationSkipPaths = new HashSet<>(
restConfig.getList(RestConfig.AUTHENTICATION_SKIP_PATHS)
);

authenticationSkipPaths.addAll(KsqlAuthorizationFilter.getPathsWithoutAuthorization());

final Map<String, Object> restConfigs = restConfig.getOriginals();

// REST paths that are public and do not require authentication
restConfigs.put(RestConfig.AUTHENTICATION_SKIP_PATHS,
Joiner.on(",").join(authenticationSkipPaths));

return new KsqlRestConfig(restConfigs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,19 @@
package io.confluent.ksql.rest.server.filters;

import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.server.resources.ServerMetadataResource;
import io.confluent.ksql.security.KsqlAuthorizationProvider;
import java.lang.reflect.Method;
import java.security.Principal;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import javax.annotation.Priority;
import javax.ws.rs.Path;
import javax.ws.rs.Priorities;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerRequestFilter;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -32,6 +39,9 @@
public class KsqlAuthorizationFilter implements ContainerRequestFilter {
private static final Logger log = LoggerFactory.getLogger(KsqlAuthorizationFilter.class);

private static final Set<String> PATHS_WITHOUT_AUTHORIZATION =
getPathsFrom(ServerMetadataResource.class);

private final KsqlAuthorizationProvider authorizationProvider;

public KsqlAuthorizationFilter(final KsqlAuthorizationProvider authorizationProvider) {
Expand All @@ -44,6 +54,10 @@ public void filter(final ContainerRequestContext requestContext) {
final String method = requestContext.getMethod(); // i.e GET, POST
final String path = "/" + requestContext.getUriInfo().getPath();

if (!requiresAuthorization(path)) {
return;
}

try {
authorizationProvider.checkEndpointAccess(user, method, path);
} catch (final Throwable t) {
Expand All @@ -52,4 +66,29 @@ public void filter(final ContainerRequestContext requestContext) {
requestContext.abortWith(Errors.accessDenied(t.getMessage()));
}
}

public static Set<String> getPathsWithoutAuthorization() {
return PATHS_WITHOUT_AUTHORIZATION;
}

private boolean requiresAuthorization(final String path) {
return !PATHS_WITHOUT_AUTHORIZATION.contains(path);
}

private static Set<String> getPathsFrom(final Class<?> resourceClass) {
final Set<String> paths = new HashSet<>();
final String mainPath = StringUtils.stripEnd(
resourceClass.getAnnotation(Path.class).value(), "/"
);

paths.add(mainPath);
for (Method m : resourceClass.getMethods()) {
if (m.isAnnotationPresent(Path.class)) {
paths.add(mainPath + "/"
+ StringUtils.strip(m.getAnnotation(Path.class).value(), "/"));
}
}

return Collections.unmodifiableSet(paths);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
import com.google.common.base.Suppliers;
import io.confluent.ksql.rest.entity.ServerInfo;
import io.confluent.ksql.rest.entity.Versions;
import io.confluent.ksql.services.KafkaClusterUtil;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.Version;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
Expand All @@ -32,31 +32,18 @@
@Path("/info")
@Produces({Versions.KSQL_V1_JSON, MediaType.APPLICATION_JSON})
public class ServerInfoResource {
private static final long DESCRIBE_CLUSTER_TIMEOUT_SECONDS = 30;

private final Supplier<ServerInfo> serverInfo;

public ServerInfoResource(final ServiceContext serviceContext, final KsqlConfig ksqlConfig) {
this.serverInfo = Suppliers.memoize(
() -> new ServerInfo(
Version.getVersion(),
getKafkaClusterId(serviceContext),
KafkaClusterUtil.getKafkaClusterId(serviceContext),
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG)
)
)::get;
}

private static String getKafkaClusterId(final ServiceContext serviceContext) {
try {
return serviceContext.getAdminClient()
.describeCluster()
.clusterId()
.get(DESCRIBE_CLUSTER_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (final Exception e) {
throw new RuntimeException("Failed to get Kafka cluster information", e);
}
}

@GET
public Response get() {
return Response.ok(serverInfo.get()).build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.server.resources;

import io.confluent.ksql.rest.entity.ServerClusterId;
import io.confluent.ksql.rest.entity.ServerMetadata;
import io.confluent.ksql.rest.entity.Versions;
import io.confluent.ksql.services.KafkaClusterUtil;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.Version;
import java.util.Objects;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

@Path("/metadata")
@Produces({Versions.KSQL_V1_JSON, MediaType.APPLICATION_JSON})
public final class ServerMetadataResource {
private final ServerMetadata serverMetadata;

private ServerMetadataResource(final ServerMetadata serverMetadata) {
this.serverMetadata = Objects.requireNonNull(serverMetadata, "serverMetadata");
}

@GET
public Response getServerMetadata() {
return Response.ok(serverMetadata).build();
}

@GET
@Path("/id")
public Response getServerClusterId() {
return Response.ok(serverMetadata.getClusterId()).build();
}

public static ServerMetadataResource create(
final ServiceContext serviceContext,
final KsqlConfig ksqlConfig
) {
return new ServerMetadataResource(new ServerMetadata(
Version.getVersion(),
ServerClusterId.of(
KafkaClusterUtil.getKafkaClusterId(serviceContext),
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG)
)
));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
Expand Down Expand Up @@ -89,6 +91,32 @@ public void filterShouldAbortIfAuthorizationIsDenied() {
is("access denied"));
}

@Test
public void filterShouldContinueOnUnauthorizedMetadataPath() {
// Given:
ContainerRequest request = givenRequestContext(userPrincipal, "GET", "metadata");

// When:
authorizationFilter.filter(request);

// Then:
assertThat(request.getAbortResponse(), is(nullValue()));
verifyZeroInteractions(authorizationProvider);
}

@Test
public void filterShouldContinueOnUnauthorizedMetadataIdPath() {
// Given:
ContainerRequest request = givenRequestContext(userPrincipal, "GET", "metadata/id");

// When:
authorizationFilter.filter(request);

// Then:
assertThat(request.getAbortResponse(), is(nullValue()));
verifyZeroInteractions(authorizationProvider);
}

private ContainerRequest givenRequestContext(
final Principal principal,
final String method,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.server.resources;

import static org.hamcrest.Matchers.equalTo;
Expand Down
Loading

0 comments on commit 8be29b9

Please sign in to comment.