Skip to content

Commit

Permalink
feat: build ks app from an execution plan visitor
Browse files Browse the repository at this point in the history
This patch implements a visitor that iterates over the execution plan
and builds the final kstreams app. In addition to defining and
implementing the visitor, this required updating the type built by
many of the plan nodes to a wrapper class that includes both a kstream/
ktable, and a factory for building key serdes.

Now that we have this visior, we no longer need the code in SchemaKX
that makes calls into kafka streams, so that's all cleaned up.

Finally, we need to actually call the visitor to build the streams app.
For now that's happening in PhysicalPlanBuilder, but that will get moved
very soon.
  • Loading branch information
rodesai committed Oct 1, 2019
1 parent d83c787 commit d688767
Show file tree
Hide file tree
Showing 74 changed files with 1,506 additions and 1,282 deletions.
Expand Up @@ -17,8 +17,11 @@

import static io.confluent.ksql.metastore.model.DataSource.DataSourceType;

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.errors.ProductionExceptionHandlerUtil;
import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
import io.confluent.ksql.execution.plan.PlanBuilder;
import io.confluent.ksql.execution.streams.KSPlanBuilder;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.logging.processing.NoopProcessingLogContext;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
Expand Down Expand Up @@ -59,7 +62,6 @@
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.QuerySchemas;
import io.confluent.ksql.util.TransientQueryMetadata;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -78,6 +80,8 @@
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;

// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling
public class PhysicalPlanBuilder {
Expand Down Expand Up @@ -152,6 +156,7 @@ public QueryMetadata buildPhysicalPlan(final LogicalPlanNode logicalPlanNode) {
ksqlConfig.getString(KsqlConfig.KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG);

return buildPlanForBareQuery(
ksqlQueryBuilder,
resultStream,
(KsqlBareOutputNode) outputNode,
getServiceId(),
Expand All @@ -169,20 +174,21 @@ public QueryMetadata buildPhysicalPlan(final LogicalPlanNode logicalPlanNode) {
ksqlConfig.getString(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG);

return buildPlanForStructuredOutputNode(
ksqlQueryBuilder,
logicalPlanNode.getStatementText(),
resultStream,
ksqlStructuredDataOutputNode,
getServiceId(),
persistanceQueryPrefix,
queryId,
ksqlQueryBuilder.getSchemas()
queryId
);
}

throw new KsqlException("Sink data source type unsupported: " + outputNode.getClass());
}

private QueryMetadata buildPlanForBareQuery(
final KsqlQueryBuilder ksqlQueryBuilder,
final SchemaKStream<?> schemaKStream,
final KsqlBareOutputNode bareOutputNode,
final String serviceId,
Expand All @@ -204,8 +210,18 @@ private QueryMetadata buildPlanForBareQuery(
processingLogContext
);

final KStream<?, GenericRow> kStream;
final PlanBuilder planBuilder = new KSPlanBuilder(ksqlQueryBuilder);
if (schemaKStream instanceof SchemaKTable) {
final KTable<?, GenericRow> kTable =
((SchemaKTable<?>) schemaKStream).getSourceTableStep().build(planBuilder).getTable();
kStream = kTable.toStream();
} else {
kStream = schemaKStream.getSourceStep().build(planBuilder).getStream();
}

final TransientQueryQueue<?> queue =
new TransientQueryQueue<>(schemaKStream, bareOutputNode.getLimit());
new TransientQueryQueue<>(kStream, bareOutputNode.getLimit());

final KafkaStreams streams = kafkaStreamsBuilder.buildKafkaStreams(builder, streamsProperties);

Expand All @@ -231,20 +247,22 @@ private QueryMetadata buildPlanForBareQuery(
}

private QueryMetadata buildPlanForStructuredOutputNode(
final KsqlQueryBuilder ksqlQueryBuilder,
final String sqlExpression,
final SchemaKStream<?> schemaKStream,
final KsqlStructuredDataOutputNode outputNode,
final String serviceId,
final String persistanceQueryPrefix,
final QueryId queryId,
final QuerySchemas schemas
final QueryId queryId
) {
final DataSourceType sourceType = (schemaKStream instanceof SchemaKTable)
? DataSourceType.KTABLE
: DataSourceType.KSTREAM;

final DataSource<?> sinkDataSource;
final PlanBuilder planBuilder = new KSPlanBuilder(ksqlQueryBuilder);
if (sourceType == DataSourceType.KTABLE) {
((SchemaKTable) schemaKStream).getSourceTableStep().build(planBuilder);
sinkDataSource = new KsqlTable<>(
sqlExpression,
outputNode.getIntoSourceName(),
Expand All @@ -255,6 +273,7 @@ private QueryMetadata buildPlanForStructuredOutputNode(
outputNode.getKsqlTopic()
);
} else {
schemaKStream.getSourceStep().build(planBuilder);
sinkDataSource = new KsqlStream<>(
sqlExpression,
outputNode.getIntoSourceName(),
Expand Down Expand Up @@ -315,7 +334,7 @@ private QueryMetadata buildPlanForStructuredOutputNode(
applicationId,
sinkDataSource.getKsqlTopic(),
topology,
schemas,
ksqlQueryBuilder.getSchemas(),
streamsProperties,
overriddenProperties,
queryCloseCallback
Expand Down
Expand Up @@ -16,14 +16,14 @@
package io.confluent.ksql.physical;

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.util.KsqlException;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Windowed;

/**
Expand All @@ -37,13 +37,12 @@ class TransientQueryQueue<K> {
private final BlockingQueue<KeyValue<String, GenericRow>> rowQueue =
new LinkedBlockingQueue<>(100);

TransientQueryQueue(final SchemaKStream<K> schemaKStream, final OptionalInt limit) {
TransientQueryQueue(final KStream<?, GenericRow> kstream, final OptionalInt limit) {
this.callback = limit.isPresent()
? new LimitedQueueCallback(limit.getAsInt())
: new UnlimitedQueueCallback();

schemaKStream.getKstream()
.foreach(new TransientQueryQueue.QueuePopulator<>(rowQueue, callback));
kstream.foreach(new TransientQueryQueue.QueuePopulator<>(rowQueue, callback));
}

BlockingQueue<KeyValue<String, GenericRow>> getQueue() {
Expand Down
Expand Up @@ -23,9 +23,9 @@
import io.confluent.ksql.execution.plan.Formats;
import io.confluent.ksql.execution.plan.StreamAggregate;
import io.confluent.ksql.execution.plan.StreamWindowedAggregate;
import io.confluent.ksql.execution.plan.TableWithKeySerdeFactory;
import io.confluent.ksql.execution.streams.ExecutionStepFactory;
import io.confluent.ksql.execution.streams.MaterializedFactory;
import io.confluent.ksql.execution.streams.StreamAggregateBuilder;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.model.WindowType;
Expand All @@ -45,12 +45,10 @@
import java.util.Optional;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Windowed;

public class SchemaKGroupedStream {

final KGroupedStream kgroupedStream;
final ExecutionStep<KGroupedStream<Struct, GenericRow>> sourceStep;
final KeyFormat keyFormat;
final KeySerde<Struct> keySerde;
Expand All @@ -61,7 +59,6 @@ public class SchemaKGroupedStream {
final MaterializedFactory materializedFactory;

SchemaKGroupedStream(
final KGroupedStream kgroupedStream,
final ExecutionStep<KGroupedStream<Struct, GenericRow>> sourceStep,
final KeyFormat keyFormat,
final KeySerde<Struct> keySerde,
Expand All @@ -71,7 +68,6 @@ public class SchemaKGroupedStream {
final FunctionRegistry functionRegistry
) {
this(
kgroupedStream,
sourceStep,
keyFormat,
keySerde,
Expand All @@ -84,7 +80,6 @@ public class SchemaKGroupedStream {
}

SchemaKGroupedStream(
final KGroupedStream kgroupedStream,
final ExecutionStep<KGroupedStream<Struct, GenericRow>> sourceStep,
final KeyFormat keyFormat,
final KeySerde<Struct> keySerde,
Expand All @@ -94,7 +89,6 @@ public class SchemaKGroupedStream {
final FunctionRegistry functionRegistry,
final MaterializedFactory materializedFactory
) {
this.kgroupedStream = kgroupedStream;
this.sourceStep = sourceStep;
this.keyFormat = Objects.requireNonNull(keyFormat, "keyFormat");
this.keySerde = Objects.requireNonNull(keySerde, "keySerde");
Expand Down Expand Up @@ -126,8 +120,7 @@ public SchemaKTable<?> aggregate(
) {
throwOnValueFieldCountMismatch(outputSchema, nonFuncColumnCount, aggregations);

final ExecutionStep<? extends KTable<?, GenericRow>> step;
final KTable table;
final ExecutionStep<? extends TableWithKeySerdeFactory<?>> step;
final KeySerde<?> newKeySerde;
final KeyFormat keyFormat;

Expand All @@ -145,12 +138,6 @@ public SchemaKTable<?> aggregate(
windowExpression.get().getKsqlWindowExpression()
);
step = aggregate;
table = StreamAggregateBuilder.build(
kgroupedStream,
aggregate,
queryBuilder,
materializedFactory
);
} else {
keyFormat = this.keyFormat;
newKeySerde = keySerde;
Expand All @@ -164,16 +151,9 @@ public SchemaKTable<?> aggregate(
aggregateSchema
);
step = aggregate;
table = StreamAggregateBuilder.build(
kgroupedStream,
aggregate,
queryBuilder,
materializedFactory
);
}

return new SchemaKTable(
table,
step,
keyFormat,
newKeySerde,
Expand Down
Expand Up @@ -26,7 +26,6 @@
import io.confluent.ksql.execution.plan.TableAggregate;
import io.confluent.ksql.execution.streams.ExecutionStepFactory;
import io.confluent.ksql.execution.streams.MaterializedFactory;
import io.confluent.ksql.execution.streams.TableAggregateBuilder;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.function.KsqlAggregateFunction;
import io.confluent.ksql.metastore.model.KeyField;
Expand All @@ -46,11 +45,9 @@
import org.apache.kafka.streams.kstream.KGroupedTable;

public class SchemaKGroupedTable extends SchemaKGroupedStream {
private final KGroupedTable kgroupedTable;
private final ExecutionStep<KGroupedTable<Struct, GenericRow>> sourceTableStep;

SchemaKGroupedTable(
final KGroupedTable kgroupedTable,
final ExecutionStep<KGroupedTable<Struct, GenericRow>> sourceTableStep,
final KeyFormat keyFormat,
final KeySerde<Struct> keySerde,
Expand All @@ -60,7 +57,6 @@ public class SchemaKGroupedTable extends SchemaKGroupedStream {
final FunctionRegistry functionRegistry
) {
this(
kgroupedTable,
sourceTableStep,
keyFormat,
keySerde,
Expand All @@ -72,7 +68,6 @@ public class SchemaKGroupedTable extends SchemaKGroupedStream {
}

SchemaKGroupedTable(
final KGroupedTable kgroupedTable,
final ExecutionStep<KGroupedTable<Struct, GenericRow>> sourceTableStep,
final KeyFormat keyFormat,
final KeySerde<Struct> keySerde,
Expand All @@ -83,7 +78,6 @@ public class SchemaKGroupedTable extends SchemaKGroupedStream {
final MaterializedFactory materializedFactory
) {
super(
null,
null,
keyFormat,
keySerde,
Expand All @@ -94,7 +88,6 @@ public class SchemaKGroupedTable extends SchemaKGroupedStream {
materializedFactory
);

this.kgroupedTable = Objects.requireNonNull(kgroupedTable, "kgroupedTable");
this.sourceTableStep = Objects.requireNonNull(sourceTableStep, "sourceTableStep");
}

Expand Down Expand Up @@ -144,12 +137,6 @@ public SchemaKTable<Struct> aggregate(
);

return new SchemaKTable<>(
TableAggregateBuilder.build(
kgroupedTable,
step,
queryBuilder,
materializedFactory
),
step,
keyFormat,
keySerde,
Expand Down

0 comments on commit d688767

Please sign in to comment.