Skip to content

Commit

Permalink
feat: Add consistency vector handling to CLI and Java client (#8264)
Browse files Browse the repository at this point in the history
* add consistency token to rest client

added consistency token to rest client and cli

http2 not working

added CT to queue

rebased and tests pass

rename consistencytoken

javadoc

added test for ws, made ct in client string

rebase and fix conflicts

merge different pr's

address comments

fixed test after rename

fix test after rename

address comments and fix

* clean tests, address comments

* supress warning

* test

* test and cleaning

* blank lines

* minor

* address comments

* minor

* address comments
  • Loading branch information
vpapavas committed Nov 4, 2021
1 parent 58d400b commit a651677
Show file tree
Hide file tree
Showing 56 changed files with 1,217 additions and 283 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static io.netty.handler.codec.http.HttpHeaderNames.AUTHORIZATION;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.api.client.AcksPublisher;
import io.confluent.ksql.api.client.BatchedQueryResult;
import io.confluent.ksql.api.client.Client;
Expand All @@ -35,6 +36,8 @@
import io.confluent.ksql.api.client.TableInfo;
import io.confluent.ksql.api.client.TopicInfo;
import io.confluent.ksql.api.client.exception.KsqlClientException;
import io.confluent.ksql.util.ConsistencyOffsetVector;
import io.confluent.ksql.util.KsqlRequestConfig;
import io.confluent.ksql.util.VertxSslOptionsFactory;
import io.vertx.core.Context;
import io.vertx.core.Handler;
Expand All @@ -60,6 +63,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
Expand All @@ -81,6 +85,8 @@ public class ClientImpl implements Client {
private final String basicAuthHeader;
private final boolean ownedVertx;
private final Map<String, Object> sessionVariables;
private final Map<String, Object> requestProperties;
private final AtomicReference<String> serializedConsistencyVector;

/**
* {@code Client} instances should be created via {@link Client#create(ClientOptions)}, NOT via
Expand Down Expand Up @@ -108,21 +114,29 @@ private ClientImpl(final ClientOptions clientOptions, final Vertx vertx,
this.serverSocketAddress =
SocketAddress.inetSocketAddress(clientOptions.getPort(), clientOptions.getHost());
this.sessionVariables = new HashMap<>();
this.serializedConsistencyVector = new AtomicReference<>();
this.requestProperties = new HashMap<>();
}

@Override
public CompletableFuture<StreamedQueryResult> streamQuery(final String sql) {
return streamQuery(sql, Collections.emptyMap());
return streamQuery(sql, new HashMap<>());
}

@Override
public CompletableFuture<StreamedQueryResult> streamQuery(
final String sql,
final Map<String, Object> properties
) {
if (ConsistencyOffsetVector.isConsistencyVectorEnabled(properties)) {
requestProperties.put(
KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR,
serializedConsistencyVector.get());
}
final CompletableFuture<StreamedQueryResult> cf = new CompletableFuture<>();
makeQueryRequest(sql, properties, cf,
(ctx, rp, fut, req) -> new StreamQueryResponseHandler(ctx, rp, fut));
(ctx, rp, fut, req) -> new StreamQueryResponseHandler(
ctx, rp, fut, serializedConsistencyVector));
return cf;
}

Expand All @@ -136,13 +150,19 @@ public BatchedQueryResult executeQuery(
final String sql,
final Map<String, Object> properties
) {
if (ConsistencyOffsetVector.isConsistencyVectorEnabled(properties)) {
requestProperties.put(
KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR,
serializedConsistencyVector.get());
}
final BatchedQueryResult result = new BatchedQueryResultImpl();
makeQueryRequest(
sql,
properties,
result,
(context, recordParser, cf, request) -> new ExecuteQueryResponseHandler(
context, recordParser, cf, clientOptions.getExecuteQueryMaxResultRows())
context, recordParser, cf, clientOptions.getExecuteQueryMaxResultRows(),
serializedConsistencyVector)
);
return result;
}
Expand Down Expand Up @@ -418,6 +438,11 @@ public Map<String, Object> getVariables() {
return new HashMap<>(sessionVariables);
}

@VisibleForTesting
public String getSerializedConsistencyVector() {
return serializedConsistencyVector.get();
}

@Override
public void close() {
httpClient.close();
Expand Down Expand Up @@ -445,7 +470,8 @@ private <T extends CompletableFuture<?>> void makeQueryRequest(
final JsonObject requestBody = new JsonObject()
.put("sql", sql)
.put("properties", properties)
.put("sessionVariables", sessionVariables);
.put("sessionVariables", sessionVariables)
.put("requestProperties", requestProperties);

makePostRequest(
QUERY_STREAM_ENDPOINT,
Expand Down Expand Up @@ -542,15 +568,14 @@ private HttpClientRequest configureBasicAuth(final HttpClientRequest request) {
return request.putHeader(AUTHORIZATION.toString(), basicAuthHeader);
}

private static <T extends CompletableFuture<?>> void handleStreamedResponse(
private <T extends CompletableFuture<?>> void handleStreamedResponse(
final HttpClientResponse response,
final T cf,
final StreamedResponseHandlerSupplier<T> responseHandlerSupplier) {
if (response.statusCode() == OK.code()) {
final RecordParser recordParser = RecordParser.newDelimited("\n", response);
final ResponseHandler<T> responseHandler =
responseHandlerSupplier.get(Vertx.currentContext(), recordParser, cf, response.request());

recordParser.handler(responseHandler::handleBodyBuffer);
recordParser.endHandler(responseHandler::handleBodyEnd);
recordParser.exceptionHandler(responseHandler::handleException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
import io.vertx.core.Context;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.parsetools.RecordParser;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,15 +43,18 @@ public class ExecuteQueryResponseHandler extends QueryResponseHandler<BatchedQue
private List<String> columnNames;
private List<ColumnType> columnTypes;
private Map<String, Integer> columnNameToIndex;
private AtomicReference<String> serializedConsistencyVector;

ExecuteQueryResponseHandler(
final Context context,
final RecordParser recordParser,
final BatchedQueryResult cf,
final int maxRows) {
final int maxRows,
final AtomicReference<String> serializedCV) {
super(context, recordParser, cf);
this.maxRows = maxRows;
this.rows = new ArrayList<>();
this.serializedConsistencyVector = Objects.requireNonNull(serializedCV, "serializedCV");
}

@Override
Expand All @@ -61,14 +67,31 @@ protected void handleMetadata(final QueryResponseMetadata queryResponseMetadata)

@Override
protected void handleRow(final Buffer buff) {
final JsonArray values = new JsonArray(buff);
if (rows.size() < maxRows) {
rows.add(new RowImpl(columnNames, columnTypes, values, columnNameToIndex));
final Row row;
final Object json = buff.toJson();

if (json instanceof JsonObject) {
final JsonObject jsonObject = (JsonObject) json;
// This is the serialized consistency vector
// Don't add it to the result list since the user should not see it
if (jsonObject.getMap() != null && jsonObject.getMap().containsKey("consistencyToken")) {
serializedConsistencyVector.set((String) ((JsonObject) json).getMap().get(
"consistencyToken"));
} else {
throw new RuntimeException("Could not decode JSON, expected consistency toke: " + json);
}
} else if (json instanceof JsonArray) {
final JsonArray values = new JsonArray(buff);
if (rows.size() < maxRows) {
rows.add(new RowImpl(columnNames, columnTypes, values, columnNameToIndex));
} else {
throw new KsqlClientException(
"Reached max number of rows that may be returned by executeQuery(). "
+ "Increase the limit via ClientOptions#setExecuteQueryMaxResultRows(). "
+ "Current limit: " + maxRows);
}
} else {
throw new KsqlClientException(
"Reached max number of rows that may be returned by executeQuery(). "
+ "Increase the limit via ClientOptions#setExecuteQueryMaxResultRows(). "
+ "Current limit: " + maxRows);
throw new RuntimeException("Could not decode JSON: " + json);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,23 @@
import io.vertx.core.json.JsonObject;
import io.vertx.core.parsetools.RecordParser;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;

public class StreamQueryResponseHandler
extends QueryResponseHandler<CompletableFuture<StreamedQueryResult>> {

private StreamedQueryResultImpl queryResult;
private Map<String, Integer> columnNameToIndex;
private boolean paused;
private AtomicReference<String> serializedConsistencyVector;

StreamQueryResponseHandler(final Context context, final RecordParser recordParser,
final CompletableFuture<StreamedQueryResult> cf) {
final CompletableFuture<StreamedQueryResult> cf,
final AtomicReference<String> serializedCV) {
super(context, recordParser, cf);
this.serializedConsistencyVector = Objects.requireNonNull(serializedCV, "serializedCV");
}

@Override
Expand All @@ -57,10 +62,10 @@ protected void handleRow(final Buffer buff) {
if (queryResult == null) {
throw new IllegalStateException("handleRow called before metadata processed");
}

final Object json = buff.toJson();
final Row row;
if (json instanceof JsonArray) {
final Row row = new RowImpl(
row = new RowImpl(
queryResult.columnNames(),
queryResult.columnTypes(),
(JsonArray) json,
Expand All @@ -73,10 +78,17 @@ protected void handleRow(final Buffer buff) {
paused = true;
}
} else if (json instanceof JsonObject) {
final JsonObject error = (JsonObject) json;
queryResult.handleError(new KsqlException(
error.getString("message")
));
final JsonObject jsonObject = (JsonObject) json;
// This is the serialized consistency vector
// Don't add it to the publisher's buffer since the user should not see it
if (jsonObject.getMap() != null && jsonObject.getMap().containsKey("consistencyToken")) {
serializedConsistencyVector.set((String) ((JsonObject) json).getMap().get(
"consistencyToken"));
} else {
queryResult.handleError(new KsqlException(
jsonObject.getString("message")
));
}
} else {
throw new RuntimeException("Could not decode JSON: " + json);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;

public final class JsonMapper {

Expand All @@ -30,7 +31,8 @@ public final class JsonMapper {
.disable(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES)
.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS)
.enable(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN)
.setNodeFactory(JsonNodeFactory.withExactBigDecimals(true));
.setNodeFactory(JsonNodeFactory.withExactBigDecimals(true))
.registerModule(new Jdk8Module());
}

private JsonMapper() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,9 @@
import io.confluent.ksql.api.client.KsqlArray;
import io.confluent.ksql.api.client.KsqlObject;
import io.confluent.ksql.api.client.util.RowUtil;
import io.netty.buffer.ByteBuf;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import org.junit.Before;
Expand Down Expand Up @@ -224,5 +222,4 @@ public void shouldImplementHashCodeAndEquals() {
)
.testEquals();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@
import io.confluent.ksql.serde.SerdeFeature;
import io.confluent.ksql.serde.SerdeFeatures;
import io.confluent.ksql.util.AppInfo;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.StructuredTypesDataProvider;
import io.confluent.ksql.util.TestDataProvider;
import io.vertx.core.Vertx;
Expand All @@ -108,7 +107,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import kafka.zookeeper.ZooKeeperClientException;
Expand Down
Loading

0 comments on commit a651677

Please sign in to comment.