Skip to content

Commit

Permalink
feat(client): support (non-streaming) insert into in Java client (#5448)
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia committed May 22, 2020
1 parent d91c016 commit 9e8234a
Show file tree
Hide file tree
Showing 18 changed files with 387 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import io.confluent.ksql.api.client.impl.ClientImpl;
import io.vertx.core.Vertx;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -71,9 +70,19 @@ public interface Client {
*/
BatchedQueryResult executeQuery(String sql, Map<String, Object> properties);

CompletableFuture<Void> insertInto(String streamName, Map<String, Object> row);
/**
* Inserts a row into a ksqlDB stream.
*
* <p>The {@code CompletableFuture} will be failed if a non-200 response is received from the
* server, or if the server encounters an error while processing the insertion.
*
* @param streamName name of the target stream
* @param row the row to insert. Keys are column names and values are column values.
* @return a future that completes once the server response is received
*/
CompletableFuture<Void> insertInto(String streamName, KsqlObject row);

Publisher<InsertAck> streamInserts(String streamName, Publisher<List<Object>> insertsPublisher);
Publisher<InsertAck> streamInserts(String streamName, Publisher<KsqlObject> insertsPublisher);

/**
* Terminates a push query with the specified query ID.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ public KsqlArray add(final Boolean value) {
* @return a reference to this
*/
public KsqlArray add(final BigDecimal value) {
delegate.add(value);
delegate.add(value.doubleValue());
return this;
}

Expand All @@ -293,7 +293,7 @@ public KsqlArray add(final BigDecimal value) {
* @return a reference to this
*/
public KsqlArray add(final KsqlArray value) {
delegate.add(value);
delegate.add(KsqlArray.toJsonArray(value));
return this;
}

Expand All @@ -304,7 +304,7 @@ public KsqlArray add(final KsqlArray value) {
* @return a reference to this
*/
public KsqlArray add(final KsqlObject value) {
delegate.add(value);
delegate.add(KsqlObject.toJsonObject(value));
return this;
}

Expand Down Expand Up @@ -385,7 +385,7 @@ public int hashCode() {
return delegate.hashCode();
}

private static JsonArray toJsonArray(final KsqlArray ksqlArray) {
static JsonArray toJsonArray(final KsqlArray ksqlArray) {
return new JsonArray(ksqlArray.getList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ public KsqlObject put(final String key, final Boolean value) {
* @return a reference to this
*/
public KsqlObject put(final String key, final BigDecimal value) {
delegate.put(key, value);
delegate.put(key, value.doubleValue());
return this;
}

Expand All @@ -312,7 +312,7 @@ public KsqlObject put(final String key, final BigDecimal value) {
* @return a reference to this
*/
public KsqlObject put(final String key, final KsqlArray value) {
delegate.put(key, value);
delegate.put(key, KsqlArray.toJsonArray(value));
return this;
}

Expand All @@ -324,7 +324,7 @@ public KsqlObject put(final String key, final KsqlArray value) {
* @return a reference to this
*/
public KsqlObject put(final String key, final KsqlObject value) {
delegate.put(key, value);
delegate.put(key, KsqlObject.toJsonObject(value));
return this;
}

Expand Down Expand Up @@ -422,7 +422,7 @@ public static KsqlObject fromArray(final List<String> keys, final KsqlArray valu
return ret;
}

private static JsonObject toJsonObject(final KsqlObject ksqlObject) {
static JsonObject toJsonObject(final KsqlObject ksqlObject) {
return new JsonObject(ksqlObject.getMap());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import io.confluent.ksql.api.client.ClientOptions;
import io.confluent.ksql.api.client.InsertAck;
import io.confluent.ksql.api.client.KsqlClientException;
import io.confluent.ksql.api.client.KsqlObject;
import io.confluent.ksql.api.client.StreamedQueryResult;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpClientRequest;
Expand All @@ -40,7 +42,6 @@
import java.nio.charset.Charset;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -118,20 +119,42 @@ public BatchedQueryResult executeQuery(
}

@Override
public CompletableFuture<Void> insertInto(
final String streamName, final Map<String, Object> row) {
return null; // not yet implemented
public CompletableFuture<Void> insertInto(final String streamName, final KsqlObject row) {
final CompletableFuture<Void> cf = new CompletableFuture<>();

final Buffer requestBody = Buffer.buffer();
final JsonObject params = new JsonObject().put("target", streamName);
requestBody.appendBuffer(params.toBuffer()).appendString("\n");
requestBody.appendString(row.toJsonString()).appendString("\n");

makeRequest(
"/inserts-stream",
requestBody,
cf,
response -> handleResponse(response, cf, InsertsResponseHandler::new)
);

return cf;
}

@Override
public Publisher<InsertAck> streamInserts(
final String streamName, final Publisher<List<Object>> insertsPublisher) {
final String streamName, final Publisher<KsqlObject> insertsPublisher) {
return null; // not yet implemented
}

@Override
public CompletableFuture<Void> terminatePushQuery(final String queryId) {
return makeCloseQueryRequest(queryId);
final CompletableFuture<Void> cf = new CompletableFuture<>();

makeRequest(
"/close-query",
new JsonObject().put("queryId", queryId),
cf,
response -> handleCloseQueryResponse(response, cf)
);

return cf;
}

@Override
Expand All @@ -144,7 +167,7 @@ public void close() {

@FunctionalInterface
private interface ResponseHandlerSupplier<T extends CompletableFuture<?>> {
QueryResponseHandler<T> get(Context ctx, RecordParser recordParser, T cf);
ResponseHandler<T> get(Context ctx, RecordParser recordParser, T cf);
}

private <T extends CompletableFuture<?>> void makeQueryRequest(
Expand All @@ -159,26 +182,21 @@ private <T extends CompletableFuture<?>> void makeQueryRequest(
"/query-stream",
requestBody,
cf,
response -> handleQueryResponse(response, cf, responseHandlerSupplier)
response -> handleResponse(response, cf, responseHandlerSupplier)
);
}

private CompletableFuture<Void> makeCloseQueryRequest(final String queryId) {
final CompletableFuture<Void> cf = new CompletableFuture<>();

makeRequest(
"/close-query",
new JsonObject().put("queryId", queryId),
cf,
response -> handleCloseQueryResponse(response, cf)
);

return cf;
private <T extends CompletableFuture<?>> void makeRequest(
final String path,
final JsonObject requestBody,
final T cf,
final Handler<HttpClientResponse> responseHandler) {
makeRequest(path, requestBody.toBuffer(), cf, responseHandler);
}

private <T extends CompletableFuture<?>> void makeRequest(
final String path,
final JsonObject requestBody,
final Buffer requestBody,
final T cf,
final Handler<HttpClientResponse> responseHandler) {
HttpClientRequest request = httpClient.request(HttpMethod.POST,
Expand All @@ -189,20 +207,20 @@ private <T extends CompletableFuture<?>> void makeRequest(
if (clientOptions.isUseBasicAuth()) {
request = configureBasicAuth(request);
}
request.end(requestBody.toBuffer());
request.end(requestBody);
}

private HttpClientRequest configureBasicAuth(final HttpClientRequest request) {
return request.putHeader(AUTHORIZATION.toString(), basicAuthHeader);
}

private static <T extends CompletableFuture<?>> void handleQueryResponse(
private static <T extends CompletableFuture<?>> void handleResponse(
final HttpClientResponse response,
final T cf,
final ResponseHandlerSupplier<T> responseHandlerSupplier) {
if (response.statusCode() == OK.code()) {
final RecordParser recordParser = RecordParser.newDelimited("\n", response);
final QueryResponseHandler<T> responseHandler =
final ResponseHandler<T> responseHandler =
responseHandlerSupplier.get(Vertx.currentContext(), recordParser, cf);

recordParser.handler(responseHandler::handleBodyBuffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected void handleRow(final Buffer buff) {
}

@Override
protected void handleBodyEnd() {
protected void doHandleBodyEnd() {
if (!hasReadArguments) {
throw new IllegalStateException("Body ended before metadata received");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.client.impl;

import io.confluent.ksql.api.client.KsqlClientException;
import io.vertx.core.Context;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.core.parsetools.RecordParser;
import java.util.concurrent.CompletableFuture;

public class InsertsResponseHandler extends ResponseHandler<CompletableFuture<Void>> {

private int numAcks;

InsertsResponseHandler(
final Context context, final RecordParser recordParser, final CompletableFuture<Void> cf) {
super(context, recordParser, cf);
}

@Override
protected void doHandleBodyBuffer(final Buffer buff) {
final JsonObject jsonObject = new JsonObject(buff);
final String status = jsonObject.getString("status");
if ("ok".equals(status)) {
numAcks++;
} else if ("error".equals(status)) {
cf.completeExceptionally(new KsqlClientException(String.format(
"Received error from /inserts-stream. Error code: %d. Message: %s",
jsonObject.getInteger("error_code"),
jsonObject.getString("message")
)));
} else {
throw new IllegalStateException(
"Unrecognized status response from /inserts-stream: " + status);
}
}

@Override
protected void doHandleException(final Throwable t) {
if (!cf.isDone()) {
cf.completeExceptionally(t);
}
}

@Override
protected void doHandleBodyEnd() {
if (numAcks != 1) {
throw new IllegalStateException(
"Received unexpected number of acks from /inserts-stream. "
+ "Expected: 1. Got: " + numAcks);
}

if (!cf.isDone()) {
cf.complete(null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,62 +18,45 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.ksql.api.client.util.JsonMapper;
import io.confluent.ksql.rest.entity.QueryResponseMetadata;
import io.confluent.ksql.util.VertxUtils;
import io.vertx.core.Context;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.parsetools.RecordParser;
import java.util.concurrent.CompletableFuture;

abstract class QueryResponseHandler<T extends CompletableFuture<?>> {
abstract class QueryResponseHandler<T extends CompletableFuture<?>> extends ResponseHandler<T> {

private static ObjectMapper JSON_MAPPER = JsonMapper.get();

protected final Context context;
protected final RecordParser recordParser;
protected final T cf;
protected boolean hasReadArguments;

QueryResponseHandler(final Context context, final RecordParser recordParser, final T cf) {
this.context = context;
this.recordParser = recordParser;
this.cf = cf;
super(context, recordParser, cf);
}

public void handleBodyBuffer(final Buffer buff) {
checkContext();
@Override
protected void doHandleBodyBuffer(final Buffer buff) {
if (!hasReadArguments) {
handleArgs(buff);
} else {
handleRow(buff);
}
}

public void handleException(final Throwable t) {
checkContext();
@Override
protected void doHandleException(final Throwable t) {
if (!cf.isDone()) {
cf.completeExceptionally(t);
} else {
handleExceptionAfterFutureCompleted(t);
}
}

public void handleBodyEnd(final Void v) {
checkContext();
handleBodyEnd();
}

protected abstract void handleBodyEnd();

protected abstract void handleMetadata(QueryResponseMetadata queryResponseMetadata);

protected abstract void handleRow(Buffer buff);

protected abstract void handleExceptionAfterFutureCompleted(Throwable t);

protected void checkContext() {
VertxUtils.checkContext(context);
}

private void handleArgs(final Buffer buff) {
hasReadArguments = true;

Expand Down
Loading

0 comments on commit 9e8234a

Please sign in to comment.