Skip to content

Commit

Permalink
feat(client): support DDL/DML statements in Java client (#5775)
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia committed Jul 15, 2020
1 parent 7f5b843 commit 53ca76f
Show file tree
Hide file tree
Showing 8 changed files with 977 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,48 @@ public interface Client {
*/
CompletableFuture<Void> terminatePushQuery(String queryId);

/**
* Sends a SQL request to the ksqlDB server. This method supports 'CREATE', 'CREATE ... AS
* SELECT', 'DROP', 'TERMINATE', and 'INSERT INTO ... AS SELECT' statements.
*
* <p>Each request should contain exactly one statement. Requests that contain multiple statements
* will be rejected by the client, in the form of failing the {@code CompletableFuture}, and the
* request will not be sent to the server.
*
* <p>The {@code CompletableFuture} is completed once a response is received from the server.
* Note that the actual execution of the submitted statement is asynchronous, so the statement
* may not have been executed by the time the {@code CompletableFuture} is completed.
*
* <p>If a non-200 response is received from the server, the {@code CompletableFuture} will be
* failed.
*
* @param sql the request to be executed
* @return a future that completes once the server response is received
*/
CompletableFuture<Void> executeStatement(String sql);

/**
* Sends a SQL request with the specified properties to the ksqlDB server. This method supports
* 'CREATE', 'CREATE ... AS SELECT', 'DROP', 'TERMINATE', and 'INSERT INTO ... AS SELECT'
* statements.
*
* <p>Each request should contain exactly one statement. Requests that contain multiple statements
* will be rejected by the client, in the form of failing the {@code CompletableFuture}, and the
* request will not be sent to the server.
*
* <p>The {@code CompletableFuture} is completed once a response is received from the server.
* Note that the actual execution of the submitted statement is asynchronous, so the statement
* may not have been executed by the time the {@code CompletableFuture} is completed.
*
* <p>If a non-200 response is received from the server, the {@code CompletableFuture} will be
* failed.
*
* @param sql the request to be executed
* @param properties properties associated with the request
* @return a future that completes once the server response is received
*/
CompletableFuture<Void> executeStatement(String sql, Map<String, Object> properties);

/**
* Returns the list of ksqlDB streams from the ksqlDB server's metastore.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,123 @@ private AdminResponseHandlers() {
static void handleListStreamsResponse(
final JsonObject streamsListEntity,
final CompletableFuture<List<StreamInfo>> cf
) {
final Optional<List<StreamInfo>> streams = getListStreamsResponse(streamsListEntity);
if (streams.isPresent()) {
cf.complete(streams.get());
} else {
cf.completeExceptionally(new IllegalStateException(
"Unexpected server response format. Response: " + streamsListEntity));
}
}

static void handleListTablesResponse(
final JsonObject tablesListEntity,
final CompletableFuture<List<TableInfo>> cf
) {
final Optional<List<TableInfo>> tables = getListTablesResponse(tablesListEntity);
if (tables.isPresent()) {
cf.complete(tables.get());
} else {
cf.completeExceptionally(new IllegalStateException(
"Unexpected server response format. Response: " + tablesListEntity));
}
}

static void handleListTopicsResponse(
final JsonObject kafkaTopicsListEntity,
final CompletableFuture<List<TopicInfo>> cf
) {
final Optional<List<TopicInfo>> topics = getListTopicsResponse(kafkaTopicsListEntity);
if (topics.isPresent()) {
cf.complete(topics.get());
} else {
cf.completeExceptionally(new IllegalStateException(
"Unexpected server response format. Response: " + kafkaTopicsListEntity));
}
}

static void handleListQueriesResponse(
final JsonObject queriesEntity,
final CompletableFuture<List<QueryInfo>> cf
) {
final Optional<List<QueryInfo>> queries = getListQueriesResponse(queriesEntity);
if (queries.isPresent()) {
cf.complete(queries.get());
} else {
cf.completeExceptionally(new IllegalStateException(
"Unexpected server response format. Response: " + queriesEntity));
}
}

static boolean isListStreamsResponse(final JsonObject ksqlEntity) {
return getListStreamsResponse(ksqlEntity).isPresent();
}

static boolean isListTablesResponse(final JsonObject ksqlEntity) {
return getListTablesResponse(ksqlEntity).isPresent();
}

static boolean isListTopicsResponse(final JsonObject ksqlEntity) {
return getListTopicsResponse(ksqlEntity).isPresent();
}

static boolean isListQueriesResponse(final JsonObject ksqlEntity) {
return getListQueriesResponse(ksqlEntity).isPresent();
}

static boolean isDescribeSourceResponse(final JsonObject ksqlEntity) {
return ksqlEntity.getJsonObject("sourceDescription") != null;
}

static boolean isDescribeOrListFunctionResponse(final JsonObject ksqlEntity) {
return ksqlEntity.getJsonArray("functions") != null;
}

static boolean isExplainQueryResponse(final JsonObject ksqlEntity) {
return ksqlEntity.getJsonObject("queryDescription") != null;
}

static boolean isListPropertiesResponse(final JsonObject ksqlEntity) {
return ksqlEntity.getJsonArray("properties") != null;
}

static boolean isListTypesResponse(final JsonObject ksqlEntity) {
return ksqlEntity.getJsonObject("types") != null;
}

static boolean isListConnectorsResponse(final JsonObject ksqlEntity) {
return ksqlEntity.getJsonArray("connectors") != null;
}

static boolean isDescribeConnectorResponse(final JsonObject ksqlEntity) {
return ksqlEntity.getString("connectorClass") != null;
}

static boolean isCreateConnectorResponse(final JsonObject ksqlEntity) {
return ksqlEntity.getJsonObject("info") != null;
}

static boolean isDropConnectorResponse(final JsonObject ksqlEntity) {
return ksqlEntity.getString("connectorName") != null;
}

static boolean isConnectErrorResponse(final JsonObject ksqlEntity) {
return ksqlEntity.getString("errorMessage") != null;
}

/**
* Attempts to parse the provided response entity as a {@code StreamsListEntity}.
*
* @param streamsListEntity response entity
* @return optional containing parsed result if successful, else empty
*/
private static Optional<List<StreamInfo>> getListStreamsResponse(
final JsonObject streamsListEntity
) {
try {
final JsonArray streams = streamsListEntity.getJsonArray("streams");
cf.complete(streams.stream()
return Optional.of(streams.stream()
.map(o -> (JsonObject) o)
.map(o -> new StreamInfoImpl(
o.getString("name"),
Expand All @@ -47,18 +160,22 @@ static void handleListStreamsResponse(
.collect(Collectors.toList())
);
} catch (Exception e) {
cf.completeExceptionally(new IllegalStateException(
"Unexpected server response format. Response: " + streamsListEntity));
return Optional.empty();
}
}

static void handleListTablesResponse(
final JsonObject tablesListEntity,
final CompletableFuture<List<TableInfo>> cf
/**
* Attempts to parse the provided response entity as a {@code TablesListEntity}.
*
* @param tablesListEntity response entity
* @return optional containing parsed result if successful, else empty
*/
private static Optional<List<TableInfo>> getListTablesResponse(
final JsonObject tablesListEntity
) {
try {
final JsonArray tables = tablesListEntity.getJsonArray("tables");
cf.complete(tables.stream()
return Optional.of(tables.stream()
.map(o -> (JsonObject) o)
.map(o -> new TableInfoImpl(
o.getString("name"),
Expand All @@ -68,22 +185,26 @@ static void handleListTablesResponse(
.collect(Collectors.toList())
);
} catch (Exception e) {
cf.completeExceptionally(new IllegalStateException(
"Unexpected server response format. Response: " + tablesListEntity));
return Optional.empty();
}
}

static void handleListTopicsResponse(
final JsonObject kafkaTopicsListEntity,
final CompletableFuture<List<TopicInfo>> cf
/**
* Attempts to parse the provided response entity as a {@code KafkaTopicsListEntity}.
*
* @param kafkaTopicsListEntity response entity
* @return optional containing parsed result if successful, else empty
*/
private static Optional<List<TopicInfo>> getListTopicsResponse(
final JsonObject kafkaTopicsListEntity
) {
try {
final JsonArray topics = kafkaTopicsListEntity.getJsonArray("topics");
cf.complete(topics.stream()
return Optional.of(topics.stream()
.map(o -> (JsonObject) o)
.map(o -> {
final List<Integer> replicaInfo = o.getJsonArray("replicaInfo").stream()
.map(v -> (Integer)v)
.map(v -> (Integer) v)
.collect(Collectors.toList());
return new TopicInfoImpl(
o.getString("name"),
Expand All @@ -93,18 +214,20 @@ static void handleListTopicsResponse(
.collect(Collectors.toList())
);
} catch (Exception e) {
cf.completeExceptionally(new IllegalStateException(
"Unexpected server response format. Response: " + kafkaTopicsListEntity));
return Optional.empty();
}
}

static void handleListQueriesResponse(
final JsonObject queriesEntity,
final CompletableFuture<List<QueryInfo>> cf
) {
/**
* Attempts to parse the provided response entity as a {@code QueriesEntity}.
*
* @param queriesEntity response entity
* @return optional containing parsed result if successful, else empty
*/
private static Optional<List<QueryInfo>> getListQueriesResponse(final JsonObject queriesEntity) {
try {
final JsonArray queries = queriesEntity.getJsonArray("queries");
cf.complete(queries.stream()
return Optional.of(queries.stream()
.map(o -> (JsonObject) o)
.map(o -> {
final QueryType queryType = QueryType.valueOf(o.getString("queryType"));
Expand Down Expand Up @@ -139,8 +262,7 @@ static void handleListQueriesResponse(
.collect(Collectors.toList())
);
} catch (Exception e) {
cf.completeExceptionally(new IllegalStateException(
"Unexpected server response format. Response: " + queriesEntity));
return Optional.empty();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.ksql.api.client.impl;

import static io.confluent.ksql.api.client.impl.DdlDmlRequestValidators.validateExecuteStatementRequest;
import static io.netty.handler.codec.http.HttpHeaderNames.AUTHORIZATION;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;

Expand Down Expand Up @@ -51,6 +52,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.reactivestreams.Publisher;

// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling
Expand Down Expand Up @@ -190,6 +192,34 @@ public CompletableFuture<Void> terminatePushQuery(final String queryId) {
return cf;
}

@Override
public CompletableFuture<Void> executeStatement(final String sql) {
return executeStatement(sql, Collections.emptyMap());
}

@Override
public CompletableFuture<Void> executeStatement(
final String sql, final Map<String, Object> properties) {
final CompletableFuture<Void> cf = new CompletableFuture<>();

if (!validateExecuteStatementRequest(sql, cf)) {
return cf;
}

makeRequest(
KSQL_ENDPOINT,
new JsonObject().put("ksql", sql).put("streamsProperties", properties),
cf,
response -> handleSingleEntityResponse(
response,
cf,
DdlDmlResponseHandlers::handleExecuteStatementResponse,
DdlDmlResponseHandlers::handleUnexpectedNumResponseEntities)
);

return cf;
}

@Override
public CompletableFuture<List<StreamInfo>> listStreams() {
final CompletableFuture<List<StreamInfo>> cf = new CompletableFuture<>();
Expand Down Expand Up @@ -360,13 +390,23 @@ private static <T> void handleSingleEntityResponse(
final HttpClientResponse response,
final CompletableFuture<T> cf,
final SingleEntityResponseHandler<T> responseHandler
) {
handleSingleEntityResponse(response, cf, responseHandler,
numEntities -> new IllegalStateException(
"Unexpected number of entities in server response: " + numEntities));
}

private static <T> void handleSingleEntityResponse(
final HttpClientResponse response,
final CompletableFuture<T> cf,
final SingleEntityResponseHandler<T> responseHandler,
final Function<Integer, RuntimeException> multipleEntityErrorSupplier
) {
if (response.statusCode() == OK.code()) {
response.bodyHandler(buffer -> {
final JsonArray entities = buffer.toJsonArray();
if (entities.size() != 1) {
cf.completeExceptionally(new IllegalStateException(
"Unexpected number of entities in server response: " + entities.size()));
cf.completeExceptionally(multipleEntityErrorSupplier.apply(entities.size()));
return;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.client.impl;

import io.confluent.ksql.api.client.exception.KsqlClientException;
import java.util.concurrent.CompletableFuture;

final class DdlDmlRequestValidators {

private DdlDmlRequestValidators() {
}

static boolean validateExecuteStatementRequest(
final String sql,
final CompletableFuture<Void> cf
) {
if (!sql.contains(";")) {
cf.completeExceptionally(new KsqlClientException(
"Missing semicolon in SQL for executeStatement() request."));
return false;
}

if (sql.indexOf(";") != sql.lastIndexOf(";")) {
cf.completeExceptionally(new KsqlClientException(
"executeStatement() may only be used to execute one statement at a time."));
return false;
}

return true;
}
}
Loading

0 comments on commit 53ca76f

Please sign in to comment.