Skip to content

Commit

Permalink
feat(client): add getInfo method to java client (#7030)
Browse files Browse the repository at this point in the history
* chore: add getInfo method to java client

* address review comments

* other small fixes
  • Loading branch information
Zara Lim committed Feb 19, 2021
1 parent 1b18c96 commit b09f003
Show file tree
Hide file tree
Showing 8 changed files with 278 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,16 @@ public interface Client {
*/
CompletableFuture<SourceDescription> describeSource(String sourceName);

/**
* Returns metadata about the ksqlDB server.
*
* <p>If a non-200 response is received from the server, the {@code CompletableFuture} will be
* failed.
*
* @return metadata for server
*/
CompletableFuture<ServerInfo> serverInfo();

/**
* Closes the underlying HTTP client.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.client;

/**
* Metadata for a ksqlDB server.
*/
public interface ServerInfo {

/**
* @return the ksqlDB version the server is running
*/
String getServerVersion();

/**
* @return the Kafka cluster id
*/
String getKafkaClusterId();

/**
* @return the ksqlDB service id
*/
String getKsqlServiceId();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.confluent.ksql.api.client.QueryInfo;
import io.confluent.ksql.api.client.QueryInfo.QueryType;
import io.confluent.ksql.api.client.ServerInfo;
import io.confluent.ksql.api.client.SourceDescription;
import io.confluent.ksql.api.client.StreamInfo;
import io.confluent.ksql.api.client.TableInfo;
Expand All @@ -28,7 +29,9 @@
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling
final class AdminResponseHandlers {
// CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling

private AdminResponseHandlers() {
}
Expand Down Expand Up @@ -98,6 +101,25 @@ static void handleDescribeSourceResponse(
}
}

static void handleServerInfoResponse(
final JsonObject serverInfoEntity,
final CompletableFuture<ServerInfo> cf
) {
final JsonObject source = serverInfoEntity.getJsonObject("KsqlServerInfo");

try {
final ServerInfoImpl serverInfo = new ServerInfoImpl(
source.getString("version"),
source.getString("kafkaClusterId"),
source.getString("ksqlServiceId")
);
cf.complete(serverInfo);
} catch (Exception e) {
cf.completeExceptionally(new IllegalStateException(
"Unexpected server response format. Response: " + serverInfoEntity));
}
}

static boolean isListStreamsResponse(final JsonObject ksqlEntity) {
return getListStreamsResponse(ksqlEntity).isPresent();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.confluent.ksql.api.client.ExecuteStatementResult;
import io.confluent.ksql.api.client.KsqlObject;
import io.confluent.ksql.api.client.QueryInfo;
import io.confluent.ksql.api.client.ServerInfo;
import io.confluent.ksql.api.client.SourceDescription;
import io.confluent.ksql.api.client.StreamInfo;
import io.confluent.ksql.api.client.StreamedQueryResult;
Expand Down Expand Up @@ -67,6 +68,7 @@ public class ClientImpl implements Client {
private static final String INSERTS_ENDPOINT = "/inserts-stream";
private static final String CLOSE_QUERY_ENDPOINT = "/close-query";
private static final String KSQL_ENDPOINT = "/ksql";
private static final String INFO_ENDPOINT = "/info";

private final ClientOptions clientOptions;
private final Vertx vertx;
Expand Down Expand Up @@ -148,7 +150,7 @@ public CompletableFuture<Void> insertInto(final String streamName, final KsqlObj
requestBody.appendBuffer(params.toBuffer()).appendString("\n");
requestBody.appendString(row.toJsonString()).appendString("\n");

makeRequest(
makePostRequest(
INSERTS_ENDPOINT,
requestBody,
cf,
Expand All @@ -169,7 +171,7 @@ public CompletableFuture<AcksPublisher> streamInserts(
final JsonObject params = new JsonObject().put("target", streamName);
requestBody.appendBuffer(params.toBuffer()).appendString("\n");

makeRequest(
makePostRequest(
"/inserts-stream",
requestBody,
cf,
Expand All @@ -186,7 +188,7 @@ public CompletableFuture<AcksPublisher> streamInserts(
public CompletableFuture<Void> terminatePushQuery(final String queryId) {
final CompletableFuture<Void> cf = new CompletableFuture<>();

makeRequest(
makePostRequest(
CLOSE_QUERY_ENDPOINT,
new JsonObject().put("queryId", queryId),
cf,
Expand All @@ -210,7 +212,7 @@ public CompletableFuture<ExecuteStatementResult> executeStatement(
return cf;
}

makeRequest(
makePostRequest(
KSQL_ENDPOINT,
new JsonObject().put("ksql", sql).put("streamsProperties", properties),
cf,
Expand All @@ -228,7 +230,7 @@ public CompletableFuture<ExecuteStatementResult> executeStatement(
public CompletableFuture<List<StreamInfo>> listStreams() {
final CompletableFuture<List<StreamInfo>> cf = new CompletableFuture<>();

makeRequest(
makePostRequest(
KSQL_ENDPOINT,
new JsonObject().put("ksql", "list streams;"),
cf,
Expand All @@ -243,7 +245,7 @@ public CompletableFuture<List<StreamInfo>> listStreams() {
public CompletableFuture<List<TableInfo>> listTables() {
final CompletableFuture<List<TableInfo>> cf = new CompletableFuture<>();

makeRequest(
makePostRequest(
KSQL_ENDPOINT,
new JsonObject().put("ksql", "list tables;"),
cf,
Expand All @@ -258,7 +260,7 @@ public CompletableFuture<List<TableInfo>> listTables() {
public CompletableFuture<List<TopicInfo>> listTopics() {
final CompletableFuture<List<TopicInfo>> cf = new CompletableFuture<>();

makeRequest(
makePostRequest(
KSQL_ENDPOINT,
new JsonObject().put("ksql", "list topics;"),
cf,
Expand All @@ -273,7 +275,7 @@ public CompletableFuture<List<TopicInfo>> listTopics() {
public CompletableFuture<List<QueryInfo>> listQueries() {
final CompletableFuture<List<QueryInfo>> cf = new CompletableFuture<>();

makeRequest(
makePostRequest(
KSQL_ENDPOINT,
new JsonObject().put("ksql", "list queries;"),
cf,
Expand All @@ -288,7 +290,7 @@ public CompletableFuture<List<QueryInfo>> listQueries() {
public CompletableFuture<SourceDescription> describeSource(final String sourceName) {
final CompletableFuture<SourceDescription> cf = new CompletableFuture<>();

makeRequest(
makePostRequest(
KSQL_ENDPOINT,
new JsonObject().put("ksql", "describe " + sourceName + ";"),
cf,
Expand All @@ -299,6 +301,21 @@ public CompletableFuture<SourceDescription> describeSource(final String sourceNa
return cf;
}

@Override
public CompletableFuture<ServerInfo> serverInfo() {
final CompletableFuture<ServerInfo> cf = new CompletableFuture<>();

makeGetRequest(
INFO_ENDPOINT,
new JsonObject(),
cf,
response -> handleObjectResponse(
response, cf, AdminResponseHandlers::handleServerInfoResponse)
);

return cf;
}

@Override
public void close() {
httpClient.close();
Expand All @@ -325,37 +342,55 @@ private <T extends CompletableFuture<?>> void makeQueryRequest(
) {
final JsonObject requestBody = new JsonObject().put("sql", sql).put("properties", properties);

makeRequest(
makePostRequest(
QUERY_STREAM_ENDPOINT,
requestBody,
cf,
response -> handleStreamedResponse(response, cf, responseHandlerSupplier)
);
}

private <T extends CompletableFuture<?>> void makeRequest(
private <T extends CompletableFuture<?>> void makeGetRequest(
final String path,
final JsonObject requestBody,
final T cf,
final Handler<HttpClientResponse> responseHandler) {
makeRequest(path, requestBody.toBuffer(), cf, responseHandler);
makeRequest(path, requestBody.toBuffer(), cf, responseHandler, true, HttpMethod.GET);
}

private <T extends CompletableFuture<?>> void makeRequest(
private <T extends CompletableFuture<?>> void makePostRequest(
final String path,
final JsonObject requestBody,
final T cf,
final Handler<HttpClientResponse> responseHandler) {
makePostRequest(path, requestBody.toBuffer(), cf, responseHandler);
}

private <T extends CompletableFuture<?>> void makePostRequest(
final String path,
final Buffer requestBody,
final T cf,
final Handler<HttpClientResponse> responseHandler) {
makeRequest(path, requestBody, cf, responseHandler, true);
makePostRequest(path, requestBody, cf, responseHandler, true);
}

private <T extends CompletableFuture<?>> void makeRequest(
private <T extends CompletableFuture<?>> void makePostRequest(
final String path,
final Buffer requestBody,
final T cf,
final Handler<HttpClientResponse> responseHandler,
final boolean endRequest) {
HttpClientRequest request = httpClient.request(HttpMethod.POST,
makeRequest(path, requestBody, cf, responseHandler, endRequest, HttpMethod.POST);
}

private <T extends CompletableFuture<?>> void makeRequest(
final String path,
final Buffer requestBody,
final T cf,
final Handler<HttpClientResponse> responseHandler,
final boolean endRequest,
final HttpMethod method) {
HttpClientRequest request = httpClient.request(method,
serverSocketAddress, clientOptions.getPort(), clientOptions.getHost(),
path,
responseHandler)
Expand Down Expand Up @@ -445,6 +480,21 @@ private static <T> void handleSingleEntityResponse(
}
}

private static <T> void handleObjectResponse(
final HttpClientResponse response,
final CompletableFuture<T> cf,
final SingleEntityResponseHandler<T> responseHandler
) {
if (response.statusCode() == OK.code()) {
response.bodyHandler(buffer -> {
final JsonObject entity = buffer.toJsonObject();
responseHandler.accept(entity, cf);
});
} else {
handleErrorResponse(response, cf);
}
}

private static <T extends CompletableFuture<?>> void handleErrorResponse(
final HttpClientResponse response,
final T cf
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.client.impl;

import io.confluent.ksql.api.client.ServerInfo;
import java.util.Objects;

public class ServerInfoImpl implements ServerInfo {
private final String serverVersion;
private final String kafkaClusterId;
private final String ksqlServiceId;

public ServerInfoImpl(
final String serverVersion,
final String kafkaClusterId,
final String ksqlServiceId
) {
this.serverVersion = Objects.requireNonNull(serverVersion, "serverVersion");
this.kafkaClusterId = Objects.requireNonNull(kafkaClusterId, "kafkaClusterId");
this.ksqlServiceId = Objects.requireNonNull(ksqlServiceId, "ksqlServiceId");
}

@Override
public String getServerVersion() {
return serverVersion;
}

@Override
public String getKafkaClusterId() {
return kafkaClusterId;
}

@Override
public String getKsqlServiceId() {
return ksqlServiceId;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final ServerInfoImpl that = (ServerInfoImpl) o;
return serverVersion.equals(that.serverVersion)
&& kafkaClusterId.equals(that.kafkaClusterId)
&& ksqlServiceId.equals(that.ksqlServiceId);
}

@Override
public int hashCode() {
return Objects.hash(serverVersion, kafkaClusterId, ksqlServiceId);
}

@Override
public String toString() {
return "ServerInfo{"
+ "serverVersion='" + serverVersion + '\''
+ ", kafkaClusterId='" + kafkaClusterId + '\''
+ ", ksqlServiceId='" + ksqlServiceId + '\''
+ '}';
}
}
Loading

0 comments on commit b09f003

Please sign in to comment.