Skip to content

Commit

Permalink
feat: Add an endpoint for returning the query limit configuration (#6353
Browse files Browse the repository at this point in the history
)

* feat: Add an endpoint for returning the query limit configuration

* fix compile error

* Address review comments

* Address review comments and update docs
  • Loading branch information
Zara Lim committed Oct 12, 2020
1 parent 4c7c9b5 commit 84d202d
Show file tree
Hide file tree
Showing 15 changed files with 282 additions and 1 deletion.
23 changes: 23 additions & 0 deletions docs/developer-guide/ksqldb-rest-api/info-endpoint.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,26 @@ Your output should resemble:
}
```

To view non-sensitive server configurations, you can use the `/v1/configs` endpoint:

```bash
curl -sX GET "http://localhost:8088/v1/configs" | jq '.'
```

Your output should resemble:

```json
{
"configs": {
"ksql.query.persistent.active.limit": 20
}
}
```

To view a specific endpoint, you can add a query:

```bash
curl -sX GET "http://localhost:8088/v1/configs?name=ksql.query.persistent.active.limit" | jq '.'
```

Currently, the only configuration that is visible is `ksql.query.persistent.active.limit`.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import java.nio.channels.ClosedChannelException;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -205,7 +206,10 @@ private Router setupRouter() {
.produces(Versions.KSQL_V1_JSON)
.produces(JSON_CONTENT_TYPE)
.handler(this::handleWebsocket);

router.route(HttpMethod.GET, "/v1/configs")
.produces(Versions.KSQL_V1_JSON)
.produces(JSON_CONTENT_TYPE)
.handler(this::handleConfigRequest);
return router;
}

Expand Down Expand Up @@ -326,6 +330,15 @@ private void handleWebsocket(final RoutingContext routingContext) {
server.getWorkerExecutor(), apiSecurityContext);
}

private void handleConfigRequest(final RoutingContext routingContext) {
final List<String> requestedConfigs = routingContext.queryParam("name");
handleOldApiRequest(server, routingContext, null, Optional.empty(),
(request, apiSecurityContext) ->
endpoints
.executeConfig(requestedConfigs, DefaultApiSecurityContext.create(routingContext))
);
}

private static void chcHandler(final RoutingContext routingContext) {
routingContext.response().putHeader(HttpHeaders.CONTENT_TYPE.toString(), "application/json")
.end(new JsonObject().toBuffer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.vertx.core.WorkerExecutor;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.json.JsonObject;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.reactivestreams.Subscriber;
Expand Down Expand Up @@ -105,6 +106,9 @@ CompletableFuture<EndpointResponse> executeLagReport(LagReportingMessage lagRepo
CompletableFuture<EndpointResponse> executeServerMetadataClusterId(
ApiSecurityContext apiSecurityContext);

CompletableFuture<EndpointResponse> executeConfig(
List<String> requestedConfigs, ApiSecurityContext apiSecurityContext);

// This is the legacy websocket based query streaming API
void executeWebsocketStream(ServerWebSocket webSocket, MultiMap requstParams,
WorkerExecutor workerExecutor, ApiSecurityContext apiSecurityContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import io.confluent.ksql.rest.server.execution.PullQueryExecutor;
import io.confluent.ksql.rest.server.execution.PullQueryExecutorMetrics;
import io.confluent.ksql.rest.server.resources.ClusterStatusResource;
import io.confluent.ksql.rest.server.resources.ConfigResource;
import io.confluent.ksql.rest.server.resources.HealthCheckResource;
import io.confluent.ksql.rest.server.resources.HeartbeatResource;
import io.confluent.ksql.rest.server.resources.KsqlConfigurable;
Expand Down Expand Up @@ -183,6 +184,7 @@ public final class KsqlRestApplication implements Executable {
private final HealthCheckResource healthCheckResource;
private volatile ServerMetadataResource serverMetadataResource;
private volatile WSQueryEndpoint wsQueryEndpoint;
private final ConfigResource configResource;
@SuppressWarnings("UnstableApiUsage")
private volatile ListeningScheduledExecutorService oldApiWebsocketExecutor;
private final Vertx vertx;
Expand Down Expand Up @@ -280,6 +282,7 @@ public static SourceName getCommandsStreamName() {
this.ksqlConfigNoPort,
this.commandRunner);
this.queryMonitor = requireNonNull(ksqlQueryMonitor, "ksqlQueryMonitor");
this.configResource = new ConfigResource(ksqlConfig);
MetricCollectors.addConfigurableReporter(ksqlConfigNoPort);
this.pullQueryMetrics = requireNonNull(pullQueryMetrics, "pullQueryMetrics");
log.debug("ksqlDB API server instance created");
Expand Down Expand Up @@ -342,6 +345,7 @@ public void startAsync() {
healthCheckResource,
serverMetadataResource,
wsQueryEndpoint,
configResource,
pullQueryMetrics
);
apiServer = new Server(vertx, ksqlRestConfig, endpoints, securityExtension,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.confluent.ksql.rest.server.execution.PullQueryExecutor;
import io.confluent.ksql.rest.server.execution.PullQueryExecutorMetrics;
import io.confluent.ksql.rest.server.resources.ClusterStatusResource;
import io.confluent.ksql.rest.server.resources.ConfigResource;
import io.confluent.ksql.rest.server.resources.HealthCheckResource;
import io.confluent.ksql.rest.server.resources.HeartbeatResource;
import io.confluent.ksql.rest.server.resources.KsqlResource;
Expand All @@ -52,6 +53,7 @@
import io.vertx.core.WorkerExecutor;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.json.JsonObject;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand All @@ -78,6 +80,7 @@ public class KsqlServerEndpoints implements Endpoints {
private final HealthCheckResource healthCheckResource;
private final ServerMetadataResource serverMetadataResource;
private final WSQueryEndpoint wsQueryEndpoint;
private final ConfigResource configResource;
private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;

// CHECKSTYLE_RULES.OFF: ParameterNumber
Expand All @@ -96,6 +99,7 @@ public KsqlServerEndpoints(
final HealthCheckResource healthCheckResource,
final ServerMetadataResource serverMetadataResource,
final WSQueryEndpoint wsQueryEndpoint,
final ConfigResource configResource,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics
) {

Expand All @@ -115,6 +119,7 @@ public KsqlServerEndpoints(
this.healthCheckResource = Objects.requireNonNull(healthCheckResource);
this.serverMetadataResource = Objects.requireNonNull(serverMetadataResource);
this.wsQueryEndpoint = Objects.requireNonNull(wsQueryEndpoint);
this.configResource = Objects.requireNonNull(configResource);
this.pullQueryMetrics = Objects.requireNonNull(pullQueryMetrics);
}

Expand Down Expand Up @@ -253,6 +258,19 @@ public CompletableFuture<EndpointResponse> executeServerMetadataClusterId(
ksqlSecurityContext -> serverMetadataResource.getServerClusterId());
}

@Override
public CompletableFuture<EndpointResponse> executeConfig(
final List<String> requestedConfigs,
final ApiSecurityContext apiSecurityContext) {
if (requestedConfigs.size() == 0) {
return executeOldApiEndpoint(apiSecurityContext,
ksqlSecurityContext -> configResource.getAllConfigs());
} else {
return executeOldApiEndpoint(apiSecurityContext,
ksqlSecurityContext -> configResource.getConfigs(requestedConfigs));
}
}

@Override
public void executeWebsocketStream(final ServerWebSocket webSocket, final MultiMap requestParams,
final WorkerExecutor workerExecutor,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.server.resources;

import io.confluent.ksql.rest.EndpointResponse;
import io.confluent.ksql.rest.entity.ConfigResponse;
import io.confluent.ksql.util.KsqlConfig;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ConfigResource {
private final Map<String, Object> visibleConfigs = new HashMap<>();

public ConfigResource(final KsqlConfig ksqlConfig) {
setVisibleConfigs(ksqlConfig);
}

private void setVisibleConfigs(final KsqlConfig ksqlConfig) {
visibleConfigs.put(
KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG,
ksqlConfig.getInt(KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG)
);
}

public EndpointResponse getConfigs(final List<String> requestedConfigs) {
final Map<String, Object> configs = new HashMap<>();
for (String config : requestedConfigs) {
final Object value = visibleConfigs.get(config);
if (value != null) {
configs.put(config, value);
}
}
return EndpointResponse.ok(new ConfigResponse(configs));
}

public EndpointResponse getAllConfigs() {
return EndpointResponse.ok(new ConfigResponse(visibleConfigs));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,12 @@ public CompletableFuture<EndpointResponse> executeServerMetadataClusterId(
return null;
}

@Override
public CompletableFuture<EndpointResponse> executeConfig(
List<String> requestedConfigs, ApiSecurityContext apiSecurityContext) {
return null;
}

@Override
public void executeWebsocketStream(ServerWebSocket webSocket, MultiMap requstParams,
WorkerExecutor workerExecutor, ApiSecurityContext apiSecurityContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.vertx.core.parsetools.RecordParser;
import io.vertx.core.streams.ReadStream;
import io.vertx.ext.web.codec.BodyCodec;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.reactivestreams.Subscriber;
Expand Down Expand Up @@ -247,6 +248,13 @@ public CompletableFuture<EndpointResponse> executeServerMetadataClusterId(
return null;
}

@Override
public CompletableFuture<EndpointResponse> executeConfig(
List<String> requestedConfigs,
ApiSecurityContext apiSecurityContext) {
return null;
}

@Override
public void executeWebsocketStream(ServerWebSocket webSocket, MultiMap requstParams,
WorkerExecutor workerExecutor, ApiSecurityContext apiSecurityContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,13 @@ public CompletableFuture<EndpointResponse> executeServerMetadataClusterId(
return null;
}

@Override
public CompletableFuture<EndpointResponse> executeConfig(
List<String> requestedConfigs,
ApiSecurityContext apiSecurityContext) {
return null;
}

@Override
public void executeWebsocketStream(ServerWebSocket webSocket, MultiMap requstParams,
WorkerExecutor workerExecutor, ApiSecurityContext apiSecurityContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,13 @@ public CompletableFuture<EndpointResponse> executeServerMetadataClusterId(
return null;
}

@Override
public CompletableFuture<EndpointResponse> executeConfig(
List<String> requestedConfigs,
ApiSecurityContext apiSecurityContext) {
return null;
}

@Override
public void executeWebsocketStream(ServerWebSocket webSocket, MultiMap requstParams,
WorkerExecutor workerExecutor, ApiSecurityContext apiSecurityContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import io.confluent.ksql.rest.entity.CommandStatus;
import io.confluent.ksql.rest.entity.CommandStatus.Status;
import io.confluent.ksql.rest.entity.CommandStatuses;
import io.confluent.ksql.rest.entity.ConfigResponse;
import io.confluent.ksql.rest.entity.KsqlRequest;
import io.confluent.ksql.rest.entity.ServerClusterId;
import io.confluent.ksql.rest.entity.ServerInfo;
Expand All @@ -62,6 +63,7 @@
import io.confluent.ksql.test.util.secure.ClientTrustStore;
import io.confluent.ksql.test.util.secure.Credentials;
import io.confluent.ksql.test.util.secure.SecureKafkaHelper;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.PageViewDataProvider;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod;
Expand Down Expand Up @@ -289,6 +291,28 @@ public void shouldExecuteServerMetadataIdRequest() {
assertThat(response, is(notNullValue()));
}

@Test
public void shouldExecuteAllConfigsRequest() {
// When:
final ConfigResponse response = RestIntegrationTestUtil.makeConfigRequest(REST_APP);

// Then:
assertThat(response.getConfigs().get(KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG), is(notNullValue()));
}

@Test
public void shouldExecuteConfigRequest() {
// When:
final ConfigResponse response = RestIntegrationTestUtil.makeConfigRequest(
REST_APP,
Arrays.asList(new String[] {"foo", KsqlConfig.CONNECT_URL_PROPERTY, KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG})
);

// Then:
assertThat(response.getConfigs().keySet().size(), is(1));
assertThat(response.getConfigs().get(KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG), is(notNullValue()));
}

@Test
public void shouldExecuteRootDocumentRequest() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.confluent.ksql.rest.entity.CommandStatus.Status;
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.CommandStatuses;
import io.confluent.ksql.rest.entity.ConfigResponse;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
Expand Down Expand Up @@ -155,6 +156,31 @@ static ServerClusterId makeServerMetadataIdRequest(final TestKsqlRestApp restApp
}
}

static ConfigResponse makeConfigRequest(final TestKsqlRestApp restApp) {
try (final KsqlRestClient restClient = restApp.buildKsqlClient(Optional.empty())) {

final RestResponse<ConfigResponse> res = restClient.makeConfigRequest();

throwOnError(res);

return res.getResponse();
}
}

static ConfigResponse makeConfigRequest(
final TestKsqlRestApp restApp,
final List<String> requestedConfigs
) {
try (final KsqlRestClient restClient = restApp.buildKsqlClient(Optional.empty())) {

final RestResponse<ConfigResponse> res = restClient.makeConfigRequest(requestedConfigs);

throwOnError(res);

return res.getResponse();
}
}

static List<StreamedRow> makeQueryRequest(
final TestKsqlRestApp restApp,
final String sql,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.confluent.ksql.rest.entity.ClusterStatusResponse;
import io.confluent.ksql.rest.entity.CommandStatus;
import io.confluent.ksql.rest.entity.CommandStatuses;
import io.confluent.ksql.rest.entity.ConfigResponse;
import io.confluent.ksql.rest.entity.HealthCheckResponse;
import io.confluent.ksql.rest.entity.HeartbeatResponse;
import io.confluent.ksql.rest.entity.KsqlEntityList;
Expand Down Expand Up @@ -149,6 +150,14 @@ public RestResponse<ClusterStatusResponse> makeClusterStatusRequest() {
return target().getClusterStatus();
}

public RestResponse<ConfigResponse> makeConfigRequest() {
return target().getConfigRequest();
}

public RestResponse<ConfigResponse> makeConfigRequest(final List<String> requestedConfigs) {
return target().getConfigRequest(requestedConfigs);
}

public CompletableFuture<RestResponse<LagReportingResponse>> makeAsyncLagReportingRequest(
final LagReportingMessage lagReportingMessage
) {
Expand Down
Loading

0 comments on commit 84d202d

Please sign in to comment.