Skip to content

Commit

Permalink
Integrate new API with engine for query streaming plus some refactori…
Browse files Browse the repository at this point in the history
…ngs (#4495)
  • Loading branch information
purplefox committed Feb 11, 2020
1 parent de406c1 commit 055406e
Show file tree
Hide file tree
Showing 56 changed files with 2,797 additions and 939 deletions.
4 changes: 4 additions & 0 deletions ksql-api/pom.xml
Expand Up @@ -80,6 +80,10 @@
<groupId>io.confluent.ksql</groupId>
<artifactId>ksql-common</artifactId>
</dependency>
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksql-engine</artifactId>
</dependency>

<!-- Required for running tests -->

Expand Down
20 changes: 20 additions & 0 deletions ksql-api/src/main/java/io/confluent/ksql/api/impl/Utils.java
Expand Up @@ -17,6 +17,7 @@

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.impl.VertxThread;

/**
* General purpose utils (not limited to the server, could be used by client too) for the API
Expand Down Expand Up @@ -45,4 +46,23 @@ public static <T> void connectPromise(final Future<T> future, final Promise<T> p
});
}

public static void checkIsWorker() {
checkThread(true);
}

public static void checkIsNotWorker() {
checkThread(false);
}

private static void checkThread(final boolean worker) {
final Thread thread = Thread.currentThread();
if (!(thread instanceof VertxThread)) {
throw new IllegalStateException("Not a Vert.x thread " + thread);
}
final VertxThread vertxThread = (VertxThread) thread;
if (vertxThread.isWorker() != worker) {
throw new IllegalStateException("Not a " + (worker ? "worker" : "event loop") + " thread");
}
}

}
@@ -0,0 +1,211 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.plugin;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.api.impl.Utils;
import io.confluent.ksql.api.server.BaseServerEndpoints;
import io.confluent.ksql.api.server.PushQueryHandler;
import io.confluent.ksql.api.spi.InsertsSubscriber;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.FormatOptions;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.registry.KsqlSchemaRegistryClientFactory;
import io.confluent.ksql.security.KsqlSecurityExtension;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.QueryMetadata;
import io.vertx.core.Context;
import io.vertx.core.WorkerExecutor;
import io.vertx.core.json.JsonObject;
import java.security.Principal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.reactivestreams.Subscriber;

public class KsqlServerEndpoints extends BaseServerEndpoints {

private final KsqlEngine ksqlEngine;
private final KsqlConfig ksqlConfig;
private final KsqlSecurityExtension securityExtension;
private final ServiceContextFactory theServiceContextFactory;

public interface ServiceContextFactory {

ServiceContext create(
KsqlConfig ksqlConfig,
Optional<String> authHeader,
KafkaClientSupplier kafkaClientSupplier,
Supplier<SchemaRegistryClient> srClientFactory
);
}

public KsqlServerEndpoints(
final KsqlEngine ksqlEngine,
final KsqlConfig ksqlConfig,
final KsqlSecurityExtension securityExtension,
final ServiceContextFactory theServiceContextFactory) {
this.ksqlEngine = Objects.requireNonNull(ksqlEngine);
this.ksqlConfig = Objects.requireNonNull(ksqlConfig);
this.securityExtension = Objects.requireNonNull(securityExtension);
this.theServiceContextFactory = Objects.requireNonNull(theServiceContextFactory);
}

@Override
protected PushQueryHandler createQuery(final String sql, final JsonObject properties,
final Context context, final WorkerExecutor workerExecutor,
final Consumer<GenericRow> rowConsumer) {
// Must be run on worker as all this stuff is slow
Utils.checkIsWorker();

final ServiceContext serviceContext = createServiceContext(new DummyPrincipal());
final ConfiguredStatement<Query> statement = createStatement(sql, properties.getMap());

final QueryMetadata queryMetadata = ksqlEngine
.executeQuery(serviceContext, statement, rowConsumer);
return new KsqlQueryHandle(queryMetadata, statement.getStatement().getLimit());
}

private ConfiguredStatement<Query> createStatement(final String queryString,
final Map<String, Object> properties) {
final List<ParsedStatement> statements = ksqlEngine.parse(queryString);
if ((statements.size() != 1)) {
throw new KsqlStatementException(
String.format("Expected exactly one KSQL statement; found %d instead", statements.size()),
queryString);
}
final PreparedStatement<?> ps = ksqlEngine.prepare(statements.get(0));
final Statement statement = ps.getStatement();
if (!(statement instanceof Query)) {
throw new KsqlStatementException("Not a query", queryString);
}
@SuppressWarnings("unchecked") final PreparedStatement<Query> psq =
(PreparedStatement<Query>) ps;
return ConfiguredStatement.of(psq, properties, ksqlConfig);
}

private ServiceContext createServiceContext(final Principal principal) {
// Creates a ServiceContext using the user's credentials, so the WS query topics are
// accessed with the user permission context (defaults to KSQL service context)

if (!securityExtension.getUserContextProvider().isPresent()) {
return createServiceContext(new DefaultKafkaClientSupplier(),
new KsqlSchemaRegistryClientFactory(ksqlConfig, Collections.emptyMap())::get);
}

return securityExtension.getUserContextProvider()
.map(provider ->
createServiceContext(
provider.getKafkaClientSupplier(principal),
provider.getSchemaRegistryClientFactory(principal)
))
.get();
}

private ServiceContext createServiceContext(
final KafkaClientSupplier kafkaClientSupplier,
final Supplier<SchemaRegistryClient> srClientFactory
) {
return theServiceContextFactory.create(ksqlConfig,
Optional.empty(),
kafkaClientSupplier, srClientFactory);
}

private static class DummyPrincipal implements Principal {

@Override
public String getName() {
return "NO_PRINCIPAL";
}
}

@Override
public InsertsSubscriber createInsertsSubscriber(final String target, final JsonObject properties,
final Subscriber<JsonObject> acksSubscriber) {
return null;
}

private static class KsqlQueryHandle implements PushQueryHandler {

private final QueryMetadata queryMetadata;
private final OptionalInt limit;

KsqlQueryHandle(final QueryMetadata queryMetadata, final OptionalInt limit) {
this.queryMetadata = queryMetadata;
this.limit = limit;
}

@Override
public List<String> getColumnNames() {
return colNamesFromSchema(queryMetadata.getLogicalSchema());
}

@Override
public List<String> getColumnTypes() {
return colTypesFromSchema(queryMetadata.getLogicalSchema());
}

@Override
public OptionalInt getLimit() {
return limit;
}

@Override
public void start() {
queryMetadata.start();
}

@Override
public void stop() {
queryMetadata.close();
}

private static List<String> colTypesFromSchema(final LogicalSchema logicalSchema) {
final List<Column> cols = logicalSchema.value();
final List<String> colTypes = new ArrayList<>(cols.size());
for (Column col : cols) {
colTypes.add(col.type().toString(FormatOptions.none()));
}
return colTypes;
}

private static List<String> colNamesFromSchema(final LogicalSchema logicalSchema) {
final List<Column> cols = logicalSchema.value();
final List<String> colNames = new ArrayList<>(cols.size());
for (Column col : cols) {
colNames.add(col.name().name());
}
return colNames;
}
}
}
Expand Up @@ -18,9 +18,7 @@
import static io.confluent.ksql.api.server.ErrorCodes.ERROR_CODE_INTERNAL_ERROR;

import io.confluent.ksql.api.server.protocol.ErrorResponse;
import io.confluent.ksql.api.server.protocol.InsertAck;
import io.vertx.core.Context;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.JsonObject;
import java.util.Objects;
Expand All @@ -35,9 +33,6 @@
public class AcksSubscriber extends ReactiveSubscriber<JsonObject> {

private static final Logger log = LoggerFactory.getLogger(AcksSubscriber.class);
private static final int BATCH_SIZE = 4;
private static final Buffer OK_INSERT_RESPONSE_LINE = new InsertAck().toBuffer()
.appendString("\n");
private static final int REQUEST_BATCH_SIZE = 1000;

private final HttpServerResponse response;
Expand Down
Expand Up @@ -30,31 +30,36 @@ public class ApiServerConfig extends AbstractConfig {

private static final String PROPERTY_PREFIX = "apiserver.";

public static final String VERTICLE_INSTANCES = propertyName("verticle-instances");
public static final String VERTICLE_INSTANCES = propertyName("verticle.instances");
public static final int DEFAULT_VERTICLE_INSTANCES =
2 * Runtime.getRuntime().availableProcessors();
public static final String VERTICLE_INSTANCES_DOC =
"The number of server verticle instances to start. Usually you want at least many instances"
+ " as there are cores you want to use, as each instance is single threaded.";

public static final String LISTEN_HOST = propertyName("listen-host");
public static final String LISTEN_HOST = propertyName("listen.host");
public static final String DEFAULT_LISTEN_HOST = "0.0.0.0";
public static final String LISTEN_HOST_DOC =
"The hostname to listen on";

public static final String LISTEN_PORT = propertyName("listen-port");
public static final String LISTEN_PORT = propertyName("listen.port");
public static final int DEFAULT_LISTEN_PORT = 8089;
public static final String LISTEN_PORT_DOC =
"The port to listen on";

public static final String KEY_PATH = propertyName("key-path");
public static final String KEY_PATH = propertyName("key.path");
public static final String KEY_PATH_DOC =
"Path to key file";

public static final String CERT_PATH = propertyName("cert-path");
public static final String CERT_PATH = propertyName("cert.path");
public static final String CERT_PATH_DOC =
"Path to cert file";

public static final String WORKER_POOL_SIZE = propertyName("worker.pool.size");
public static final String WORKER_POOL_DOC =
"Max number of worker threads for executing blocking code";
public static final int DEFAULT_WORKER_POOL_SIZE = 100;

private static String propertyName(final String name) {
return KsqlConfig.KSQL_CONFIG_PROPERTY_PREFIX + PROPERTY_PREFIX + name;
}
Expand Down Expand Up @@ -89,7 +94,13 @@ private static String propertyName(final String name) {
Type.STRING,
null,
Importance.MEDIUM,
CERT_PATH_DOC);
CERT_PATH_DOC)
.define(
WORKER_POOL_SIZE,
Type.INT,
DEFAULT_WORKER_POOL_SIZE,
Importance.MEDIUM,
WORKER_POOL_DOC);

public ApiServerConfig(final Map<?, ?> map) {
super(CONFIG_DEF, map);
Expand Down

0 comments on commit 055406e

Please sign in to comment.