From e2c3211c23eac9c92806d013c7ad8e4f1bef7ae7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Pe=C3=B1a?= Date: Fri, 27 Aug 2021 22:01:18 -0500 Subject: [PATCH] feat: execute source table query plans (#8061) --- .../io/confluent/ksql/util/KsqlConstants.java | 2 +- .../ksql/analyzer/SourceSchemas.java | 1 + .../ksql/ddl/commands/DdlCommandExec.java | 8 +- .../confluent/ksql/engine/EngineExecutor.java | 55 +++++-- .../io/confluent/ksql/engine/KsqlPlan.java | 5 + .../io/confluent/ksql/engine/KsqlPlanV1.java | 23 +++ .../io/confluent/ksql/engine/QueryIdUtil.java | 25 +-- .../io/confluent/ksql/engine/QueryPlan.java | 7 +- .../confluent/ksql/query/QueryExecutor.java | 87 +++++++--- .../confluent/ksql/query/QueryRegistry.java | 7 +- .../ksql/query/QueryRegistryImpl.java | 92 ++++++----- .../ksql/ddl/commands/DdlCommandExecTest.java | 14 +- .../confluent/ksql/engine/KsqlEngineTest.java | 35 ++++ .../confluent/ksql/engine/KsqlPlanV1Test.java | 78 +++++++++ .../ksql/engine/QueryIdUtilTest.java | 58 ++++--- .../confluent/ksql/engine/QueryPlanTest.java | 14 +- .../ksql/query/QueryExecutorTest.java | 69 +++++--- .../ksql/query/QueryRegistryImplTest.java | 155 +++++++++++++----- ...istentQueriesInSharedRuntimesImplTest.java | 5 +- .../ksql/rest/server/computation/Command.java | 2 +- .../resources/ksql-plan-schema/schema.json | 2 +- 21 files changed, 541 insertions(+), 203 deletions(-) diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConstants.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConstants.java index 49de6a71adc1..57c84532bee9 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConstants.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConstants.java @@ -53,7 +53,7 @@ public enum KsqlQueryType { } public enum PersistentQueryType { - CREATE_AS, INSERT + CREATE_SOURCE, CREATE_AS, INSERT } public enum KsqlQueryStatus { diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/analyzer/SourceSchemas.java b/ksqldb-engine/src/main/java/io/confluent/ksql/analyzer/SourceSchemas.java index ecc9e80d799e..44fdfed387d6 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/analyzer/SourceSchemas.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/analyzer/SourceSchemas.java @@ -39,6 +39,7 @@ public final class SourceSchemas { SourceSchemas(final Map sourceSchemas) { this.sourceSchemas = ImmutableMap.copyOf(requireNonNull(sourceSchemas, "sourceSchemas")); + // This will fail if (sourceSchemas.isEmpty()) { throw new IllegalArgumentException("Must supply at least one schema"); } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/DdlCommandExec.java b/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/DdlCommandExec.java index 5cac8a2b6e57..259d04e5d88a 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/DdlCommandExec.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/DdlCommandExec.java @@ -130,7 +130,13 @@ public DdlCommandResult executeCreateTable(final CreateTableCommand createTable) createTable.getIsSource() ); metaStore.putSource(ksqlTable, createTable.isOrReplace()); - metaStore.addSourceReferences(ksqlTable.getName(), withQuerySources); + + // Source tables only has a query source reference to itself. We don't need to register + // this source for source tables. + if (!createTable.getIsSource()) { + metaStore.addSourceReferences(ksqlTable.getName(), withQuerySources); + } + return new DdlCommandResult(true, "Table created"); } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java index 23913444a73b..41572c49bf1c 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java @@ -18,6 +18,7 @@ import static io.confluent.ksql.metastore.model.DataSource.DataSourceType; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.confluent.ksql.KsqlExecutionContext.ExecuteResult; @@ -80,12 +81,14 @@ import io.confluent.ksql.query.QueryRegistry; import io.confluent.ksql.query.TransientQueryQueue; import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.utils.FormatOptions; import io.confluent.ksql.serde.KeyFormat; import io.confluent.ksql.serde.RefinementInfo; import io.confluent.ksql.serde.ValueFormat; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.KsqlConstants; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.KsqlStatementException; import io.confluent.ksql.util.PersistentQueryMetadata; @@ -159,11 +162,20 @@ ExecuteResult execute(final KsqlPlan plan) { } final QueryPlan queryPlan = plan.getQueryPlan().get(); - final DataSource sinkSource = engineContext.getMetaStore().getSource(queryPlan.getSink()); - - if (sinkSource != null && sinkSource.isSource()) { - throw new KsqlException(String.format("Cannot insert into read-only %s: %s", - sinkSource.getDataSourceType().getKsqlType().toLowerCase(), sinkSource.getName().text())); + final KsqlConstants.PersistentQueryType persistentQueryType = + plan.getPersistentQueryType().get(); + + // CREATE_SOURCE do not write to any topic. We check for read-only topics only for queries + // that attempt to write to a sink (i.e. INSERT or CREATE_AS). + if (persistentQueryType != KsqlConstants.PersistentQueryType.CREATE_SOURCE) { + final DataSource sinkSource = engineContext.getMetaStore() + .getSource(queryPlan.getSink().get()); + + if (sinkSource != null && sinkSource.isSource()) { + throw new KsqlException(String.format("Cannot insert into read-only %s: %s", + sinkSource.getDataSourceType().getKsqlType().toLowerCase(), + sinkSource.getName().text())); + } } final Optional ddlResult = plan.getDdlCommand().map(ddl -> @@ -177,7 +189,7 @@ ExecuteResult execute(final KsqlPlan plan) { return ExecuteResult.of(executePersistentQuery( queryPlan, plan.getStatementText(), - plan.getDdlCommand().isPresent()) + persistentQueryType) ); } @@ -444,7 +456,7 @@ private KsqlPlan sourceTablePlan( final QueryPlan queryPlan = new QueryPlan( getSourceNames(outputNode), - null, + Optional.empty(), plans.physicalPlan.getPhysicalPlan(), plans.physicalPlan.getQueryId() ); @@ -505,7 +517,7 @@ KsqlPlan plan(final ConfiguredStatement statement) { final QueryPlan queryPlan = new QueryPlan( getSourceNames(outputNode), - outputNode.getSinkName().get(), + outputNode.getSinkName(), plans.physicalPlan.getPhysicalPlan(), plans.physicalPlan.getQueryId() ); @@ -547,13 +559,12 @@ private ExecutorPlans planQuery( Optional.of(outputNode) ); final QueryId queryId = QueryIdUtil.buildId( + statement.getStatement(), engineContext, engineContext.idGenerator(), outputNode, ksqlConfig.getBoolean(KsqlConfig.KSQL_CREATE_OR_REPLACE_ENABLED), - withQueryId, - statement.getStatement() instanceof CreateTable - && ((CreateTable) statement.getStatement()).isSource() + withQueryId ); if (withQueryId.isPresent() @@ -738,10 +749,24 @@ private String executeDdl( } } + private Set getSources(final QueryPlan queryPlan) { + final ImmutableSet.Builder sources = ImmutableSet.builder(); + for (final SourceName name : queryPlan.getSources()) { + final DataSource dataSource = engineContext.getMetaStore().getSource(name); + if (dataSource == null) { + throw new KsqlException("Unknown source: " + name.toString(FormatOptions.noEscape())); + } + + sources.add(dataSource); + } + + return sources.build(); + } + private PersistentQueryMetadata executePersistentQuery( final QueryPlan queryPlan, final String statementText, - final boolean createAsQuery + final KsqlConstants.PersistentQueryType persistentQueryType ) { final QueryRegistry queryRegistry = engineContext.getQueryRegistry(); return queryRegistry.createOrReplacePersistentQuery( @@ -751,11 +776,11 @@ private PersistentQueryMetadata executePersistentQuery( engineContext.getMetaStore(), statementText, queryPlan.getQueryId(), - engineContext.getMetaStore().getSource(queryPlan.getSink()), - queryPlan.getSources(), + queryPlan.getSink().map(s -> engineContext.getMetaStore().getSource(s)), + getSources(queryPlan), queryPlan.getPhysicalPlan(), buildPlanSummary(queryPlan.getQueryId(), queryPlan.getPhysicalPlan()), - createAsQuery + persistentQueryType ); } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/KsqlPlan.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/KsqlPlan.java index 7eec436e698a..cf2d64a681a3 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/KsqlPlan.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/KsqlPlan.java @@ -15,10 +15,12 @@ package io.confluent.ksql.engine; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes.Type; import com.fasterxml.jackson.annotation.JsonTypeInfo; import io.confluent.ksql.execution.ddl.commands.DdlCommand; +import io.confluent.ksql.util.KsqlConstants; import java.util.Optional; @JsonTypeInfo(use = JsonTypeInfo.Id.NAME) @@ -35,6 +37,9 @@ public interface KsqlPlan { KsqlPlan withoutQuery(); + @JsonIgnore + Optional getPersistentQueryType(); + static KsqlPlan ddlPlanCurrent(final String statementText, final DdlCommand ddlCommand) { return new KsqlPlanV1(statementText, Optional.of(ddlCommand), Optional.empty()); } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/KsqlPlanV1.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/KsqlPlanV1.java index 5036d26a9ee7..2596912eb494 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/KsqlPlanV1.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/KsqlPlanV1.java @@ -16,7 +16,9 @@ package io.confluent.ksql.engine; import com.fasterxml.jackson.annotation.JsonProperty; +import io.confluent.ksql.execution.ddl.commands.CreateTableCommand; import io.confluent.ksql.execution.ddl.commands.DdlCommand; +import io.confluent.ksql.util.KsqlConstants; import java.util.Objects; import java.util.Optional; @@ -66,6 +68,27 @@ public KsqlPlan withoutQuery() { return new KsqlPlanV1(statementText, ddlCommand, Optional.empty()); } + @Override + public Optional getPersistentQueryType() { + if (!queryPlan.isPresent()) { + return Optional.empty(); + } + + // CREATE_AS and CREATE_SOURCE commands contain a DDL command and a Query plan. + if (ddlCommand.isPresent()) { + if (ddlCommand.get() instanceof CreateTableCommand + && ((CreateTableCommand) ddlCommand.get()).getIsSource()) { + return Optional.of(KsqlConstants.PersistentQueryType.CREATE_SOURCE); + } else { + return Optional.of(KsqlConstants.PersistentQueryType.CREATE_AS); + } + } else { + // INSERT INTO persistent queries are the only queries types that exist without a + // DDL command linked to the plan. + return Optional.of(KsqlConstants.PersistentQueryType.INSERT); + } + } + @Override public boolean equals(final Object o) { if (this == o) { diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/QueryIdUtil.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/QueryIdUtil.java index 82582832cc81..f9d7e3a2ed9c 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/QueryIdUtil.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/QueryIdUtil.java @@ -18,6 +18,8 @@ import com.google.common.collect.Iterables; import io.confluent.ksql.metastore.model.DataSource.DataSourceType; import io.confluent.ksql.name.SourceName; +import io.confluent.ksql.parser.tree.CreateTable; +import io.confluent.ksql.parser.tree.Statement; import io.confluent.ksql.planner.plan.KsqlStructuredDataOutputNode; import io.confluent.ksql.planner.plan.OutputNode; import io.confluent.ksql.query.QueryId; @@ -82,6 +84,7 @@ private static void validateWithQueryId(final String queryId) { /** * Builds a {@link QueryId} for a physical plan specification. * + * @param statement the statement that requires the query ID * @param engineContext the context representing the current state of the engine * @param idGenerator generates query ids * @param outputNode the logical plan @@ -89,28 +92,28 @@ private static void validateWithQueryId(final String queryId) { * @return the {@link QueryId} to be used */ static QueryId buildId( + final Statement statement, final EngineContext engineContext, final QueryIdGenerator idGenerator, final OutputNode outputNode, final boolean createOrReplaceEnabled, - final Optional withQueryId, - final boolean isSourceTable) { + final Optional withQueryId) { if (withQueryId.isPresent()) { final String queryId = withQueryId.get().toUpperCase(); validateWithQueryId(queryId); return new QueryId(queryId); } + if (statement instanceof CreateTable && ((CreateTable) statement).isSource()) { + // Use the CST name as part of the QueryID + final String suffix = ((CreateTable) statement).getName().text().toUpperCase() + + "_" + idGenerator.getNext().toUpperCase(); + return new QueryId(ReservedQueryIdsPrefixes.CST + suffix); + } if (!outputNode.getSinkName().isPresent()) { - if (isSourceTable) { - final String suffix = outputNode.getId().toString().toUpperCase() - + "_" + idGenerator.getNext().toUpperCase(); - return new QueryId(ReservedQueryIdsPrefixes.CST + suffix); - } else { - final String prefix = - "transient_" + outputNode.getSource().getLeftmostSourceNode().getAlias().text() + "_"; - return new QueryId(prefix + Math.abs(ThreadLocalRandom.current().nextLong())); - } + final String prefix = + "transient_" + outputNode.getSource().getLeftmostSourceNode().getAlias().text() + "_"; + return new QueryId(prefix + Math.abs(ThreadLocalRandom.current().nextLong())); } final KsqlStructuredDataOutputNode structured = (KsqlStructuredDataOutputNode) outputNode; diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/QueryPlan.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/QueryPlan.java index 8f5b970ac987..418c5f67af8d 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/QueryPlan.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/QueryPlan.java @@ -25,17 +25,18 @@ import io.confluent.ksql.query.QueryId; import java.util.Comparator; import java.util.Objects; +import java.util.Optional; import java.util.Set; public final class QueryPlan { private final ImmutableSet sources; - private final SourceName sink; + private final Optional sink; private final ExecutionStep physicalPlan; private final QueryId queryId; public QueryPlan( @JsonProperty(value = "sources", required = true) final Set sources, - @JsonProperty(value = "sink", required = true) final SourceName sink, + @JsonProperty(value = "sink") final Optional sink, @JsonProperty(value = "physicalPlan", required = true) final ExecutionStep physicalPlan, @JsonProperty(value = "queryId", required = true) final QueryId queryId ) { @@ -48,7 +49,7 @@ public QueryPlan( this.queryId = Objects.requireNonNull(queryId, "queryId"); } - public SourceName getSink() { + public Optional getSink() { return sink; } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java index 8b0faf3ed681..1815fcbceda7 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java @@ -49,6 +49,8 @@ import io.confluent.ksql.properties.PropertiesUtil; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.PhysicalSchema; +import io.confluent.ksql.serde.KeyFormat; +import io.confluent.ksql.serde.ValueFormat; import io.confluent.ksql.serde.WindowInfo; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.util.KsqlConfig; @@ -75,6 +77,7 @@ import java.util.Set; import java.util.UUID; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.streams.StreamsBuilder; @@ -247,8 +250,8 @@ PersistentQueryMetadata buildPersistentQueryInDedicatedRuntime( final KsqlConstants.PersistentQueryType persistentQueryType, final String statementText, final QueryId queryId, - final DataSource sinkDataSource, - final Set sources, + final Optional sinkDataSource, + final Set sources, final ExecutionStep physicalPlan, final String planSummary, final QueryMetadata.Listener listener, @@ -258,10 +261,32 @@ PersistentQueryMetadata buildPersistentQueryInDedicatedRuntime( final String applicationId = QueryApplicationId.build(ksqlConfig, true, queryId); final Map streamsProperties = buildStreamsProperties(applicationId, queryId); + final LogicalSchema logicalSchema; + final KeyFormat keyFormat; + final ValueFormat valueFormat; + + switch (persistentQueryType) { + // CREATE_SOURCE does not have a sink, so the schema is obtained from the query source + case CREATE_SOURCE: + final DataSource dataSource = Iterables.getOnlyElement(sources); + + logicalSchema = dataSource.getSchema(); + keyFormat = dataSource.getKsqlTopic().getKeyFormat(); + valueFormat = dataSource.getKsqlTopic().getValueFormat(); + + break; + default: + logicalSchema = sinkDataSource.get().getSchema(); + keyFormat = sinkDataSource.get().getKsqlTopic().getKeyFormat(); + valueFormat = sinkDataSource.get().getKsqlTopic().getValueFormat(); + + break; + } + final PhysicalSchema querySchema = PhysicalSchema.from( - sinkDataSource.getSchema(), - sinkDataSource.getKsqlTopic().getKeyFormat().getFeatures(), - sinkDataSource.getKsqlTopic().getValueFormat().getFeatures() + logicalSchema, + keyFormat.getFeatures(), + valueFormat.getFeatures() ); final RuntimeBuildContext runtimeBuildContext = buildContext( @@ -277,7 +302,7 @@ PersistentQueryMetadata buildPersistentQueryInDedicatedRuntime( querySchema.logicalSchema(), result, allPersistentQueries, - sinkDataSource.getKsqlTopic().getKeyFormat().isWindowed(), + keyFormat.isWindowed(), streamsProperties, ksqlConfig ); @@ -289,7 +314,7 @@ PersistentQueryMetadata buildPersistentQueryInDedicatedRuntime( materializationProviderBuilderFactory.materializationProviderBuilder( info, querySchema, - sinkDataSource.getKsqlTopic().getKeyFormat(), + keyFormat, streamsProperties, applicationId )); @@ -304,8 +329,8 @@ PersistentQueryMetadata buildPersistentQueryInDedicatedRuntime( persistentQueryType, statementText, querySchema, - sources, - Optional.of(sinkDataSource), + sources.stream().map(DataSource::getName).collect(Collectors.toSet()), + sinkDataSource, planSummary, queryId, materializationProviderBuilder, @@ -333,15 +358,15 @@ PersistentQueryMetadata buildPersistentQueryInSharedRuntime( final KsqlConstants.PersistentQueryType persistentQueryType, final String statementText, final QueryId queryId, - final DataSource sinkDataSource, - final Set sources, + final Optional sinkDataSource, + final Set sources, final ExecutionStep physicalPlan, final String planSummary, final QueryMetadata.Listener listener, final Supplier> allPersistentQueries ) { final SharedKafkaStreamsRuntime sharedKafkaStreamsRuntime = getKafkaStreamsInstance( - sources, + sources.stream().map(DataSource::getName).collect(Collectors.toSet()), queryId); final String applicationId = sharedKafkaStreamsRuntime @@ -350,12 +375,33 @@ PersistentQueryMetadata buildPersistentQueryInSharedRuntime( .toString(); final Map streamsProperties = sharedKafkaStreamsRuntime.getStreamProperties(); + final LogicalSchema logicalSchema; + final KeyFormat keyFormat; + final ValueFormat valueFormat; + + switch (persistentQueryType) { + // CREATE_SOURCE does not have a sink, so the schema is obtained from the query source + case CREATE_SOURCE: + final DataSource dataSource = Iterables.getOnlyElement(sources); + + logicalSchema = dataSource.getSchema(); + keyFormat = dataSource.getKsqlTopic().getKeyFormat(); + valueFormat = dataSource.getKsqlTopic().getValueFormat(); + + break; + default: + logicalSchema = sinkDataSource.get().getSchema(); + keyFormat = sinkDataSource.get().getKsqlTopic().getKeyFormat(); + valueFormat = sinkDataSource.get().getKsqlTopic().getValueFormat(); + + break; + } + final PhysicalSchema querySchema = PhysicalSchema.from( - sinkDataSource.getSchema(), - sinkDataSource.getKsqlTopic().getKeyFormat().getFeatures(), - sinkDataSource.getKsqlTopic().getValueFormat().getFeatures() + logicalSchema, + keyFormat.getFeatures(), + valueFormat.getFeatures() ); - final NamedTopologyStreamsBuilder namedTopologyStreamsBuilder = new NamedTopologyStreamsBuilder( queryId.toString() ); @@ -373,7 +419,7 @@ PersistentQueryMetadata buildPersistentQueryInSharedRuntime( querySchema.logicalSchema(), result, allPersistentQueries, - sinkDataSource.getKsqlTopic().getKeyFormat().isWindowed(), + keyFormat.isWindowed(), streamsProperties, ksqlConfig ); @@ -385,7 +431,7 @@ PersistentQueryMetadata buildPersistentQueryInSharedRuntime( materializationProviderBuilderFactory.materializationProviderBuilder( info, querySchema, - sinkDataSource.getKsqlTopic().getKeyFormat(), + keyFormat, streamsProperties, applicationId )); @@ -400,7 +446,7 @@ PersistentQueryMetadata buildPersistentQueryInSharedRuntime( persistentQueryType, statementText, querySchema, - sources, + sources.stream().map(DataSource::getName).collect(Collectors.toSet()), planSummary, applicationId, topology, @@ -411,13 +457,12 @@ PersistentQueryMetadata buildPersistentQueryInSharedRuntime( materializationProviderBuilder, physicalPlan, getUncaughtExceptionProcessingLogger(queryId), - Optional.of(sinkDataSource), + sinkDataSource, listener, classifier, streamsProperties, scalablePushRegistry ); - } private SharedKafkaStreamsRuntime getKafkaStreamsInstance( diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryRegistry.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryRegistry.java index 1c1e8f133bbe..a306341275f0 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryRegistry.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryRegistry.java @@ -24,6 +24,7 @@ import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.serde.WindowInfo; import io.confluent.ksql.services.ServiceContext; +import io.confluent.ksql.util.KsqlConstants; import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.QueryMetadata; import io.confluent.ksql.util.TransientQueryMetadata; @@ -91,11 +92,11 @@ PersistentQueryMetadata createOrReplacePersistentQuery( MetaStore metaStore, String statementText, QueryId queryId, - DataSource sinkDataSource, - Set sources, + Optional sinkDataSource, + Set sources, ExecutionStep physicalPlan, String planSummary, - boolean createAsQuery + KsqlConstants.PersistentQueryType persistentQueryType ); // CHECKSTYLE_RULES.ON: ParameterNumberCheck diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryRegistryImpl.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryRegistryImpl.java index bbf1f0db6267..9c379e6a4788 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryRegistryImpl.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryRegistryImpl.java @@ -174,7 +174,7 @@ public TransientQueryMetadata createTransientQuery( new ListenerImpl(), new StreamsBuilder() ); - registerQuery(serviceContext, metaStore, query, false); + registerTransientQuery(serviceContext, metaStore, query); return query; } @@ -187,11 +187,11 @@ public PersistentQueryMetadata createOrReplacePersistentQuery( final MetaStore metaStore, final String statementText, final QueryId queryId, - final DataSource sinkDataSource, - final Set sources, + final Optional sinkDataSource, + final Set sources, final ExecutionStep physicalPlan, final String planSummary, - final boolean createAsQuery) { + final KsqlConstants.PersistentQueryType persistentQueryType) { // CHECKSTYLE_RULES.ON: ParameterNumberCheck final QueryExecutor executor = executorFactory.create( config, @@ -208,9 +208,7 @@ public PersistentQueryMetadata createOrReplacePersistentQuery( if (ksqlConfig.getBoolean(KsqlConfig.KSQL_SHARED_RUNTIME_ENABLED)) { query = executor.buildPersistentQueryInSharedRuntime( ksqlConfig, - createAsQuery - ? KsqlConstants.PersistentQueryType.CREATE_AS - : KsqlConstants.PersistentQueryType.INSERT, + persistentQueryType, statementText, queryId, sinkDataSource, @@ -223,9 +221,7 @@ public PersistentQueryMetadata createOrReplacePersistentQuery( } else { query = executor.buildPersistentQueryInDedicatedRuntime( ksqlConfig, - createAsQuery - ? KsqlConstants.PersistentQueryType.CREATE_AS - : KsqlConstants.PersistentQueryType.INSERT, + persistentQueryType, statementText, queryId, sinkDataSource, @@ -237,7 +233,7 @@ public PersistentQueryMetadata createOrReplacePersistentQuery( new StreamsBuilder() ); } - registerQuery(serviceContext, metaStore, query, createAsQuery); + registerPersistentQuery(serviceContext, metaStore, query); return query; } @@ -316,45 +312,56 @@ public void close(final boolean closePersistent) { } } - private void registerQuery( + private void registerPersistentQuery( final ServiceContext serviceContext, final MetaStore metaStore, - final QueryMetadata query, - final boolean createAsQuery + final PersistentQueryMetadata persistentQuery ) { - if (query instanceof PersistentQueryMetadata) { - final PersistentQueryMetadata persistentQuery = (PersistentQueryMetadata) query; - final QueryId queryId = persistentQuery.getQueryId(); - - // don't use persistentQueries.put(queryId) here because oldQuery.close() - // will remove any query with oldQuery.getQueryId() from the map of persistent - // queries - final PersistentQueryMetadata oldQuery = persistentQueries.get(queryId); - if (oldQuery != null) { - oldQuery.getPhysicalPlan() - .validateUpgrade(((PersistentQueryMetadata) query).getPhysicalPlan()); - - // don't close the old query so that we don't delete the changelog - // topics and the state store, instead use QueryMetadata#stop - oldQuery.stop(); - unregisterQuery(oldQuery); - } + final QueryId queryId = persistentQuery.getQueryId(); + + // don't use persistentQueries.put(queryId) here because oldQuery.close() + // will remove any query with oldQuery.getQueryId() from the map of persistent + // queries + final PersistentQueryMetadata oldQuery = persistentQueries.get(queryId); + if (oldQuery != null) { + oldQuery.getPhysicalPlan().validateUpgrade((persistentQuery).getPhysicalPlan()); + + // don't close the old query so that we don't delete the changelog + // topics and the state store, instead use QueryMetadata#stop + oldQuery.stop(); + unregisterQuery(oldQuery); + } - // Initialize the query before it's exposed to other threads via the map/sets. - persistentQuery.initialize(); - persistentQueries.put(queryId, persistentQuery); - if (createAsQuery) { + // Initialize the query before it's exposed to other threads via the map/sets. + persistentQuery.initialize(); + persistentQueries.put(queryId, persistentQuery); + switch (persistentQuery.getPersistentQueryType()) { + case CREATE_SOURCE: + createAsQueries.put(Iterables.getOnlyElement(persistentQuery.getSourceNames()), queryId); + break; + case CREATE_AS: createAsQueries.put(persistentQuery.getSinkName().get(), queryId); - } else { - // Only INSERT queries exist beside CREATE_AS + break; + case INSERT: sinkAndSources(persistentQuery).forEach(sourceName -> insertQueries.computeIfAbsent(sourceName, x -> Collections.synchronizedSet(new HashSet<>())).add(queryId)); - } - } else { - // Initialize the query before it's exposed to other threads via {@link allLiveQueries}. - query.initialize(); + break; + default: + // do nothing } + + allLiveQueries.put(persistentQuery.getQueryId(), persistentQuery); + notifyCreate(serviceContext, metaStore, persistentQuery); + } + + private void registerTransientQuery( + final ServiceContext serviceContext, + final MetaStore metaStore, + final TransientQueryMetadata query + ) { + // Initialize the query before it's exposed to other threads via {@link allLiveQueries}. + query.initialize(); allLiveQueries.put(query.getQueryId(), query); notifyCreate(serviceContext, metaStore, query); } @@ -366,6 +373,9 @@ private void unregisterQuery(final QueryMetadata query) { persistentQueries.remove(queryId); switch (persistentQuery.getPersistentQueryType()) { + case CREATE_SOURCE: + createAsQueries.remove(Iterables.getOnlyElement(persistentQuery.getSourceNames())); + break; case CREATE_AS: createAsQueries.remove(persistentQuery.getSinkName().get()); break; diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/DdlCommandExecTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/DdlCommandExecTest.java index f4aaed858576..876f142f8f04 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/DdlCommandExecTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/DdlCommandExecTest.java @@ -7,6 +7,7 @@ import static org.junit.Assert.assertThrows; import static org.mockito.Mockito.when; +import com.google.common.collect.ImmutableSet; import io.confluent.ksql.execution.ddl.commands.AlterSourceCommand; import io.confluent.ksql.execution.ddl.commands.CreateStreamCommand; import io.confluent.ksql.execution.ddl.commands.CreateTableCommand; @@ -314,18 +315,17 @@ public void shouldAddNormalTableWhenNoTypeIsSpecified() { @Test public void shouldAddSourceTable() { // Given: - final CreateTableCommand cmd = buildCreateTable( - SourceName.of("t1"), - false, - true - ); + final CreateTableCommand cmd = buildCreateTable(TABLE_NAME, false, true); // When: - cmdExec.execute(SQL_TEXT, cmd, true, NO_QUERY_SOURCES); + cmdExec.execute(SQL_TEXT, cmd, true, ImmutableSet.of(TABLE_NAME)); // Then: - final KsqlTable ksqlTable = (KsqlTable) metaStore.getSource(SourceName.of("t1")); + final KsqlTable ksqlTable = (KsqlTable) metaStore.getSource(TABLE_NAME); assertThat(ksqlTable.isSource(), is(true)); + + // Check query source was not added as constraints for source tables + assertThat(metaStore.getSourceConstraints(TABLE_NAME), is(NO_QUERY_SOURCES)); } @Test diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java index 0ed7c041bbe8..ff8358ec12c6 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java @@ -26,6 +26,7 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasItems; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; @@ -55,6 +56,7 @@ import io.confluent.ksql.engine.QueryCleanupService.QueryCleanupTask; import io.confluent.ksql.function.InternalFunctionRegistry; import io.confluent.ksql.metastore.MutableMetaStore; +import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.name.SourceName; import io.confluent.ksql.parser.KsqlParser.ParsedStatement; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; @@ -64,7 +66,11 @@ import io.confluent.ksql.parser.tree.CreateTable; import io.confluent.ksql.parser.tree.DropTable; import io.confluent.ksql.query.QueryId; +import io.confluent.ksql.schema.ksql.Column; import io.confluent.ksql.schema.ksql.SystemColumns; +import io.confluent.ksql.schema.ksql.types.SqlBaseType; +import io.confluent.ksql.schema.ksql.types.SqlPrimitiveType; +import io.confluent.ksql.schema.ksql.types.SqlType; import io.confluent.ksql.services.FakeKafkaConsumerGroupClient; import io.confluent.ksql.services.FakeKafkaTopicClient; import io.confluent.ksql.services.ServiceContext; @@ -169,6 +175,35 @@ public void shouldCreatePersistentQueries() { is(SourceName.of("FOO"))); } + @Test + public void shouldCreateSourceTablesQueries() { + // Given: + givenTopicsExist("s0_topic", "t1_topic"); + + // When: + final List queries = KsqlEngineTestUtil.execute( + serviceContext, + ksqlEngine, + "create source table t1 (f0 bigint primary key, f1 double, f2 boolean) " + + "with (kafka_topic='t1_topic', value_format='json');", + KSQL_CONFIG, + Collections.emptyMap()); + + // Then: + assertThat(queries, hasSize(1)); + assertThat(queries.get(0), is(instanceOf(PersistentQueryMetadata.class))); + + final PersistentQueryMetadata metadata = (PersistentQueryMetadata) queries.get(0); + assertThat(metadata.getPersistentQueryType(), + is(KsqlConstants.PersistentQueryType.CREATE_SOURCE)); + assertThat((metadata).getSink(), is(Optional.empty())); + assertThat((metadata).getSinkName(), is(Optional.empty())); + assertThat((metadata).getDataSourceType(), is(Optional.empty())); + assertThat((metadata).getResultTopic(), is(Optional.empty())); + assertThat(metadata.getLogicalSchema().key(), hasItems(hasFullName("F0"))); + assertThat(metadata.getLogicalSchema().value(), hasItems(hasFullName("F1"), hasFullName("F2"))); + } + @Test public void shouldNotHaveRowTimeAndRowKeyColumnsInPersistentQueryValueSchema() { // When: diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/KsqlPlanV1Test.java b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/KsqlPlanV1Test.java index a2534b3956e3..948eb35778f5 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/KsqlPlanV1Test.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/KsqlPlanV1Test.java @@ -15,12 +15,21 @@ package io.confluent.ksql.engine; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.when; + import com.google.common.testing.EqualsTester; +import io.confluent.ksql.execution.ddl.commands.CreateStreamCommand; +import io.confluent.ksql.execution.ddl.commands.CreateTableCommand; import io.confluent.ksql.execution.ddl.commands.DdlCommand; import java.util.Optional; + +import io.confluent.ksql.util.KsqlConstants; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; @SuppressWarnings("UnstableApiUsage") @@ -47,4 +56,73 @@ public void shouldImplementEquals() { .addEqualityGroup(new KsqlPlanV1("foo", Optional.of(ddlCommand1), Optional.of(queryPlan2))) .testEquals(); } + + @Test + public void shouldReturnNoPersistentQueryTypeOnPlansWithoutQueryPlans() { + // Given: + final KsqlPlanV1 plan = new KsqlPlanV1( + "stmt", + Optional.of(ddlCommand1), + Optional.empty()); + + // When/Then: + assertThat(plan.getPersistentQueryType(), is(Optional.empty())); + } + + @Test + public void shouldReturnInsertPersistentQueryTypeOnPlansWithoutDdlCommands() { + // Given: + final KsqlPlanV1 plan = new KsqlPlanV1( + "stmt", + Optional.empty(), + Optional.of(queryPlan1)); + + // When/Then: + assertThat(plan.getPersistentQueryType(), + is(Optional.of(KsqlConstants.PersistentQueryType.INSERT))); + } + + @Test + public void shouldReturnCreateSourcePersistentQueryTypeOnCreateSourceTable() { + // Given: + final CreateTableCommand ddlCommand = Mockito.mock(CreateTableCommand.class); + when(ddlCommand.getIsSource()).thenReturn(true); + final KsqlPlanV1 plan = new KsqlPlanV1( + "stmt", + Optional.of(ddlCommand), + Optional.of(queryPlan1)); + + // When/Then: + assertThat(plan.getPersistentQueryType(), + is(Optional.of(KsqlConstants.PersistentQueryType.CREATE_SOURCE))); + } + + @Test + public void shouldReturnCreateAsPersistentQueryTypeOnCreateTable() { + // Given: + final CreateTableCommand ddlCommand = Mockito.mock(CreateTableCommand.class); + when(ddlCommand.getIsSource()).thenReturn(false); + final KsqlPlanV1 plan = new KsqlPlanV1( + "stmt", + Optional.of(ddlCommand), + Optional.of(queryPlan1)); + + // When/Then: + assertThat(plan.getPersistentQueryType(), + is(Optional.of(KsqlConstants.PersistentQueryType.CREATE_AS))); + } + + @Test + public void shouldReturnCreateAsPersistentQueryTypeOnCreateStream() { + // Given: + final CreateStreamCommand ddlCommand = Mockito.mock(CreateStreamCommand.class); + final KsqlPlanV1 plan = new KsqlPlanV1( + "stmt", + Optional.of(ddlCommand), + Optional.of(queryPlan1)); + + // When/Then: + assertThat(plan.getPersistentQueryType(), + is(Optional.of(KsqlConstants.PersistentQueryType.CREATE_AS))); + } } \ No newline at end of file diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/QueryIdUtilTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/QueryIdUtilTest.java index dd593db62fda..e24e556f5a2d 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/QueryIdUtilTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/QueryIdUtilTest.java @@ -19,11 +19,14 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableSet; import io.confluent.ksql.metastore.model.DataSource.DataSourceType; import io.confluent.ksql.name.SourceName; +import io.confluent.ksql.parser.tree.CreateTable; +import io.confluent.ksql.parser.tree.Statement; import io.confluent.ksql.planner.plan.DataSourceNode; import io.confluent.ksql.planner.plan.KsqlBareOutputNode; import io.confluent.ksql.planner.plan.KsqlStructuredDataOutputNode; @@ -62,6 +65,8 @@ public class QueryIdUtilTest { private DataSourceNode dataSourceNode; @Mock private SourceName sourceName; + @Mock + private Statement statement; @Before public void setup() { when(engineContext.getQueryRegistry()).thenReturn(queryRegistry); @@ -78,8 +83,8 @@ public void shouldGenerateUniqueRandomIdsForTransientQueries() { // When: long numUniqueIds = IntStream.range(0, 100) - .mapToObj(i -> QueryIdUtil.buildId(engineContext, idGenerator, transientPlan, - false, Optional.empty(), false)) + .mapToObj(i -> QueryIdUtil.buildId(statement, engineContext, idGenerator, transientPlan, + false, Optional.empty())) .distinct() .count(); @@ -94,8 +99,8 @@ public void shouldComputeQueryIdCorrectlyForInsertInto() { when(idGenerator.getNext()).thenReturn("1"); // When: - final QueryId queryId = QueryIdUtil.buildId(engineContext, idGenerator, plan, - false, Optional.empty(), false); + final QueryId queryId = QueryIdUtil.buildId(statement, engineContext, idGenerator, plan, + false, Optional.empty()); // Then: assertThat(queryId, is(new QueryId("INSERTQUERY_1"))); @@ -112,8 +117,8 @@ public void shouldComputeQueryIdCorrectlyForNewStream() { when(queryRegistry.getQueriesWithSink(SINK)).thenReturn(ImmutableSet.of()); // When: - final QueryId queryId = QueryIdUtil.buildId(engineContext, idGenerator, plan, - false, Optional.empty(), false); + final QueryId queryId = QueryIdUtil.buildId(statement, engineContext, idGenerator, plan, + false, Optional.empty()); // Then: assertThat(queryId, is(new QueryId("CSAS_FOO_1"))); } @@ -129,8 +134,8 @@ public void shouldComputeQueryIdCorrectlyForNewTable() { when(queryRegistry.getQueriesWithSink(SINK)).thenReturn(ImmutableSet.of()); // When: - final QueryId queryId = QueryIdUtil.buildId(engineContext, idGenerator, plan, - false, Optional.empty(), false); + final QueryId queryId = QueryIdUtil.buildId(statement, engineContext, idGenerator, plan, + false, Optional.empty()); // Then: assertThat(queryId, is(new QueryId("CTAS_FOO_1"))); @@ -139,13 +144,14 @@ public void shouldComputeQueryIdCorrectlyForNewTable() { @Test public void shouldComputeQueryIdCorrectlyForNewSourceTable() { // Given: - when(plan.getSinkName()).thenReturn(Optional.empty()); - when(plan.getId()).thenReturn(new PlanNodeId("FOO")); + final CreateTable createTableStmt = mock(CreateTable.class); + when(createTableStmt.getName()).thenReturn(SourceName.of("FOO")); + when(createTableStmt.isSource()).thenReturn(true); when(idGenerator.getNext()).thenReturn("1"); // When: - final QueryId queryId = QueryIdUtil.buildId(engineContext, idGenerator, plan, - false, Optional.empty(), true); + final QueryId queryId = QueryIdUtil.buildId(createTableStmt, engineContext, idGenerator, plan, + false, Optional.empty()); // Then: assertThat(queryId, is(new QueryId("CST_FOO_1"))); @@ -160,8 +166,8 @@ public void shouldReuseExistingQueryId() { .thenReturn(ImmutableSet.of(new QueryId("CTAS_FOO_10"))); // When: - final QueryId queryId = QueryIdUtil.buildId(engineContext, idGenerator, plan, - true, Optional.empty(), false); + final QueryId queryId = QueryIdUtil.buildId(statement, engineContext, idGenerator, plan, + true, Optional.empty()); // Then: assertThat(queryId, is(new QueryId("CTAS_FOO_10"))); @@ -177,8 +183,8 @@ public void shouldThrowOnReuseIfCreateOrReplacedIsDisabled() { .thenReturn(ImmutableSet.of(new QueryId("CTAS_FOO_10"))); // When: - QueryIdUtil.buildId(engineContext, idGenerator, plan, - false, Optional.empty(), false); + QueryIdUtil.buildId(statement, engineContext, idGenerator, plan, + false, Optional.empty()); } @Test @@ -191,8 +197,8 @@ public void shouldThrowIfMultipleQueriesExist() { // When: final KsqlException e = assertThrows(KsqlException.class, () -> - QueryIdUtil.buildId(engineContext, idGenerator, plan, false, - Optional.empty(), false)); + QueryIdUtil.buildId(statement, engineContext, idGenerator, plan, false, + Optional.empty())); // Then: assertThat(e.getMessage(), containsString("there are multiple queries writing")); @@ -201,8 +207,8 @@ public void shouldThrowIfMultipleQueriesExist() { @Test public void shouldReturnWithQueryIdInUppercase(){ // When: - final QueryId queryId = QueryIdUtil.buildId(engineContext, idGenerator, plan, - false, Optional.of("my_query_id"), false); + final QueryId queryId = QueryIdUtil.buildId(statement, engineContext, idGenerator, plan, + false, Optional.of("my_query_id")); // Then: assertThat(queryId, is(new QueryId("MY_QUERY_ID"))); @@ -213,8 +219,8 @@ public void shouldThrowIfWithQueryIdIsReserved() { // When: final Exception e = assertThrows( Exception.class, - () -> QueryIdUtil.buildId(engineContext, idGenerator, plan, - false, Optional.of("insertquery_custom"), false) + () -> QueryIdUtil.buildId(statement, engineContext, idGenerator, plan, + false, Optional.of("insertquery_custom")) ); // Then: @@ -228,8 +234,8 @@ public void shouldThrowIfWithQueryIdIsNotValid() { // When: final Exception e = assertThrows( Exception.class, - () -> QueryIdUtil.buildId(engineContext, idGenerator, plan, - false, Optional.of("with space"), false) + () -> QueryIdUtil.buildId(statement, engineContext, idGenerator, plan, + false, Optional.of("with space")) ); // Then: @@ -247,8 +253,8 @@ public void shouldCreateTransientQueryIdWithSourceName() { when(sourceName.text()).thenReturn(SOURCE); // When: - final QueryId queryId = QueryIdUtil.buildId(engineContext, idGenerator, transientPlan, - false, Optional.empty(), false); + final QueryId queryId = QueryIdUtil.buildId(statement, engineContext, idGenerator, transientPlan, + false, Optional.empty()); // Then: assertThat(queryId.toString(), containsString("transient_source")); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/QueryPlanTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/QueryPlanTest.java index 655fde6c3264..0068d28d3ebb 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/QueryPlanTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/QueryPlanTest.java @@ -20,6 +20,8 @@ import io.confluent.ksql.execution.plan.ExecutionStep; import io.confluent.ksql.name.SourceName; import io.confluent.ksql.query.QueryId; + +import java.util.Optional; import java.util.Set; import org.junit.Before; import org.junit.Test; @@ -56,12 +58,12 @@ public void setup() { public void shouldImplementEquals() { new EqualsTester() .addEqualityGroup( - new QueryPlan(sources1, sink1, plan1, id1), - new QueryPlan(sources1, sink1, plan1, id1)) - .addEqualityGroup(new QueryPlan(sources2, sink1, plan1, id1)) - .addEqualityGroup(new QueryPlan(sources1, sink2, plan1, id1)) - .addEqualityGroup(new QueryPlan(sources1, sink1, plan2, id1)) - .addEqualityGroup(new QueryPlan(sources1, sink1, plan1, id2)) + new QueryPlan(sources1, Optional.of(sink1), plan1, id1), + new QueryPlan(sources1, Optional.of(sink1), plan1, id1)) + .addEqualityGroup(new QueryPlan(sources2, Optional.of(sink1), plan1, id1)) + .addEqualityGroup(new QueryPlan(sources1, Optional.of(sink2), plan1, id1)) + .addEqualityGroup(new QueryPlan(sources1, Optional.of(sink1), plan2, id1)) + .addEqualityGroup(new QueryPlan(sources1, Optional.of(sink1), plan1, id2)) .testEquals(); } } \ No newline at end of file diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/query/QueryExecutorTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/query/QueryExecutorTest.java index ec763c5f3041..0c9bc4d97dd5 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/query/QueryExecutorTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/query/QueryExecutorTest.java @@ -49,7 +49,6 @@ import io.confluent.ksql.serde.KeyFormat; import io.confluent.ksql.serde.SerdeFeatures; import io.confluent.ksql.serde.ValueFormat; -import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlConstants; @@ -65,7 +64,7 @@ import java.util.Optional; import java.util.OptionalInt; import java.util.Set; -import java.util.function.Consumer; +import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerInterceptor; @@ -79,7 +78,6 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.TopologyDescription; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper; import org.junit.Before; @@ -88,6 +86,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) @@ -96,8 +95,8 @@ public class QueryExecutorTest { private static final String STATEMENT_TEXT = "KSQL STATEMENT"; private static final QueryId QUERY_ID = new QueryId("queryid"); private static final QueryId QUERY_ID_2 = new QueryId("queryid-2"); - private static final Set SOURCES - = ImmutableSet.of(SourceName.of("foo"), SourceName.of("bar")); + private static final Set SOURCES + = ImmutableSet.of(givenSource("foo"), givenSource("bar")); private static final SourceName SINK_NAME = SourceName.of("baz"); private static final String STORE_NAME = "store"; private static final String SUMMARY = "summary"; @@ -152,10 +151,6 @@ public class QueryExecutorTest { @Mock private ServiceContext serviceContext; @Mock - private KafkaTopicClient topicClient; - @Mock - private Consumer closeCallback; - @Mock private KafkaStreamsBuilder kafkaStreamsBuilder; @Mock private StreamsBuilder streamsBuilder; @@ -168,8 +163,6 @@ public class QueryExecutorTest { @Mock private Topology topology; @Mock - private TopologyDescription topoDesc; - @Mock private KsMaterializationFactory ksMaterializationFactory; @Mock private KsMaterialization ksMaterialization; @@ -251,7 +244,7 @@ public void shouldBuildTransientQueryCorrectly() { final TransientQueryMetadata queryMetadata = queryBuilder.buildTransientQuery( STATEMENT_TEXT, QUERY_ID, - SOURCES, + SOURCES.stream().map(DataSource::getName).collect(Collectors.toSet()), physicalPlan, SUMMARY, TRANSIENT_SINK_SCHEMA, @@ -265,7 +258,8 @@ public void shouldBuildTransientQueryCorrectly() { // Then: assertThat(queryMetadata.getStatementString(), equalTo(STATEMENT_TEXT)); - assertThat(queryMetadata.getSourceNames(), equalTo(SOURCES)); + assertThat(queryMetadata.getSourceNames(), equalTo(SOURCES.stream() + .map(DataSource::getName).collect(Collectors.toSet()))); assertThat(queryMetadata.getExecutionPlan(), equalTo(SUMMARY)); assertThat(queryMetadata.getTopology(), is(topology)); assertThat(queryMetadata.getOverriddenProperties(), equalTo(OVERRIDES)); @@ -296,7 +290,8 @@ public void shouldBuildCreateAsPersistentQueryCorrectly() { assertThat(queryMetadata.getSinkName().get(), equalTo(SINK_NAME)); assertThat(queryMetadata.getPhysicalSchema(), equalTo(SINK_PHYSICAL_SCHEMA)); assertThat(queryMetadata.getResultTopic(), is(Optional.of(ksqlTopic))); - assertThat(queryMetadata.getSourceNames(), equalTo(SOURCES)); + assertThat(queryMetadata.getSourceNames(), equalTo(SOURCES.stream() + .map(DataSource::getName).collect(Collectors.toSet()))); assertThat(queryMetadata.getDataSourceType().get(), equalTo(DataSourceType.KSTREAM)); assertThat(queryMetadata.getExecutionPlan(), equalTo(SUMMARY)); assertThat(queryMetadata.getTopology(), is(topology)); @@ -329,7 +324,8 @@ public void shouldBuildInsertPersistentQueryCorrectly() { assertThat(queryMetadata.getSinkName().get(), equalTo(SINK_NAME)); assertThat(queryMetadata.getPhysicalSchema(), equalTo(SINK_PHYSICAL_SCHEMA)); assertThat(queryMetadata.getResultTopic(), is(Optional.of(ksqlTopic))); - assertThat(queryMetadata.getSourceNames(), equalTo(SOURCES)); + assertThat(queryMetadata.getSourceNames(), equalTo(SOURCES.stream() + .map(DataSource::getName).collect(Collectors.toSet()))); assertThat(queryMetadata.getDataSourceType().get(), equalTo(DataSourceType.KSTREAM)); assertThat(queryMetadata.getExecutionPlan(), equalTo(SUMMARY)); assertThat(queryMetadata.getTopology(), is(topology)); @@ -355,6 +351,30 @@ public void shouldBuildPersistentQueryWithCorrectStreamsApp() { verify(kafkaStreams).start(); } + @Test + public void shouldStartCreateSourceQueryWithMaterializationProvider() { + when(ksqlConfig.getBoolean(KsqlConfig.KSQL_SHARED_RUNTIME_ENABLED)).thenReturn(true); + + // Given: + final DataSource source = givenSource("foo"); + when(source.getSchema()).thenReturn(SINK_SCHEMA); + when(source.getKsqlTopic()).thenReturn(ksqlTopic); + final PersistentQueryMetadata queryMetadata = buildPersistentQuery( + ImmutableSet.of(source), + KsqlConstants.PersistentQueryType.CREATE_SOURCE, + QUERY_ID, + Optional.empty() + ); + queryMetadata.initialize(); + queryMetadata.start(); + + // When: + final Optional result = queryMetadata.getMaterialization(QUERY_ID, stacker); + + // Then: + assertThat(result.get(), is(materialization)); + } + @Test public void shouldStartPersistentQueryWithCorrectMaterializationProvider() { // Given: @@ -623,8 +643,7 @@ public void shouldMakePersistentQueriesWithDifferentSources() { QUERY_ID); PersistentQueryMetadata queryMetadata2 = buildPersistentQuery( - ImmutableSet.of(SourceName.of("food"), - SourceName.of("bard")), + ImmutableSet.of(givenSource("food"), givenSource("bard")), KsqlConstants.PersistentQueryType.CREATE_AS, QUERY_ID); assertThat("did not chose the same runtime", queryMetadata.getKafkaStreams().equals(queryMetadata2.getKafkaStreams())); @@ -708,9 +727,16 @@ private void givenTransientQuery() { when(streamHolder.getStream()).thenReturn(kstream); } - private PersistentQueryMetadata buildPersistentQuery(final Set sourceNames, + private PersistentQueryMetadata buildPersistentQuery(final Set sources, final KsqlConstants.PersistentQueryType persistentQueryType, final QueryId queryId) { + return buildPersistentQuery(sources, persistentQueryType, queryId, Optional.of(sink)); + } + + private PersistentQueryMetadata buildPersistentQuery(final Set sources, + final KsqlConstants.PersistentQueryType persistentQueryType, + final QueryId queryId, + final Optional sink) { if (ksqlConfig.getBoolean(KsqlConfig.KSQL_SHARED_RUNTIME_ENABLED)) { return queryBuilder.buildPersistentQueryInSharedRuntime( ksqlConfig, @@ -718,7 +744,7 @@ private PersistentQueryMetadata buildPersistentQuery(final Set sourc STATEMENT_TEXT, queryId, sink, - sourceNames, + sources, physicalPlan, SUMMARY, queryListener, @@ -740,4 +766,9 @@ private PersistentQueryMetadata buildPersistentQuery(final Set sourc } } + private static DataSource givenSource(final String name) { + final DataSource source = Mockito.mock(DataSource.class); + when(source.getName()).thenReturn(SourceName.of(name)); + return source; + } } \ No newline at end of file diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/query/QueryRegistryImplTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/query/QueryRegistryImplTest.java index d42e3424212a..7ad4a51b2488 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/query/QueryRegistryImplTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/query/QueryRegistryImplTest.java @@ -1,5 +1,8 @@ package io.confluent.ksql.query; +import static io.confluent.ksql.util.KsqlConstants.PersistentQueryType.CREATE_AS; +import static io.confluent.ksql.util.KsqlConstants.PersistentQueryType.CREATE_SOURCE; +import static io.confluent.ksql.util.KsqlConstants.PersistentQueryType.INSERT; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.empty; @@ -51,9 +54,12 @@ import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; import org.mockito.quality.Strictness; +import org.mockito.Mockito; @RunWith(Parameterized.class) public class QueryRegistryImplTest { + private static final SourceName SINK = SourceName.of("source"); + @Mock private SessionConfig config; @Mock @@ -74,6 +80,8 @@ public class QueryRegistryImplTest { private ServiceContext serviceContext; @Mock private MetaStore metaStore; + @Mock + private DataSource source; @Captor private ArgumentCaptor queryListenerCaptor; @SuppressWarnings("Unused") @@ -106,7 +114,8 @@ public void setup() { public void shouldGetAllLiveQueries() { // Given: final Set queries = ImmutableSet.of( - givenCreate(registry, "q1", "source", "sink", true), + givenCreate(registry, "q1", "source1", Optional.of("sink"), CREATE_AS), + givenCreate(registry, "q2", "source2", Optional.empty(), CREATE_SOURCE), givenCreateTransient(registry, "transient1") ); @@ -121,7 +130,8 @@ public void shouldGetAllLiveQueries() { @Test public void shouldGetAllLiveQueriesSandbox() { // Given: - givenCreate(registry, "q1", "source", "sink", true); + givenCreate(registry, "q1", "source", Optional.of("sink"), CREATE_AS); + givenCreate(registry, "q2", "source", Optional.empty(), CREATE_SOURCE); givenCreateTransient(registry, "transient1"); final QueryRegistry sandbox = registry.createSandbox(); @@ -133,45 +143,56 @@ public void shouldGetAllLiveQueriesSandbox() { final Set ids = listed.stream() .map(q -> q.getQueryId().toString()) .collect(Collectors.toSet()); - assertThat(ids, contains("q1", "transient1")); + assertThat(ids, contains("q1", "q2", "transient1")); } @Test public void shouldGetPersistentQueries() { // Given: - final PersistentQueryMetadata q1 = givenCreate(registry, "q1", "source", "sink1", true); - final PersistentQueryMetadata q2 = givenCreate(registry, "q2", "source", "sink2", false); + final PersistentQueryMetadata q1 = givenCreate(registry, "q1", "source", + Optional.of("sink1"), CREATE_AS); + final PersistentQueryMetadata q2 = givenCreate(registry, "q2", "source", + Optional.of("sink2"), INSERT); + final PersistentQueryMetadata q3 = givenCreate(registry, "q3", "source", + Optional.empty(), CREATE_SOURCE); // When: final Map persistent = registry.getPersistentQueries(); // Then: - assertThat(persistent.size(), is(2)); + assertThat(persistent.size(), is(3)); assertThat(persistent.get(new QueryId("q1")), is(q1)); assertThat(persistent.get(new QueryId("q2")), is(q2)); + assertThat(persistent.get(new QueryId("q3")), is(q3)); } @Test public void shouldGetQuery() { // Given: final TransientQueryMetadata q1 = givenCreateTransient(registry, "transient1"); - final PersistentQueryMetadata q2 = givenCreate(registry, "q1", "source", "sink1", true); + final PersistentQueryMetadata q2 = givenCreate(registry, "q1", "source", + Optional.of("sink1"), CREATE_AS); + final PersistentQueryMetadata q3 = givenCreate(registry, "q2", "source", + Optional.empty(), CREATE_SOURCE); // When: final QueryMetadata queryMetadata1 = registry.getQuery(q1.getQueryId()).get(); final QueryMetadata queryMetadata2 = registry.getQuery(q2.getQueryId()).get(); + final QueryMetadata queryMetadata3 = registry.getQuery(q3.getQueryId()).get(); // Then: assertThat(queryMetadata1, is(q1)); assertThat(queryMetadata2, is(q2)); + assertThat(queryMetadata3, is(q3)); } @Test public void shouldGetQueriesWithSink() { // Given: - givenCreate(registry, "q1", "source", "sink1", true); - givenCreate(registry, "q2", "source", "sink1", false); - givenCreate(registry, "q3", "source", "sink2", false); + givenCreate(registry, "q1", "source", Optional.of("sink1"), CREATE_AS); + givenCreate(registry, "q2", "source", Optional.of("sink1"), INSERT); + givenCreate(registry, "q3", "source", Optional.of("sink2"), INSERT); + givenCreate(registry, "q4", "source", Optional.empty(), CREATE_SOURCE); // When: final Set queries = registry.getQueriesWithSink(SourceName.of("sink1")); @@ -183,9 +204,10 @@ public void shouldGetQueriesWithSink() { @Test public void shouldGetQueriesWithSinkFromSandbox() { // Given: - givenCreate(registry, "q1", "source", "sink1", true); - givenCreate(registry, "q2", "source", "sink1", false); - givenCreate(registry, "q3", "source", "sink2", false); + givenCreate(registry, "q1", "source", Optional.of("sink1"), CREATE_AS); + givenCreate(registry, "q2", "source", Optional.of("sink1"), INSERT); + givenCreate(registry, "q3", "source", Optional.of("sink2"), INSERT); + givenCreate(registry, "q4", "source", Optional.empty(), CREATE_SOURCE); final QueryRegistry sandbox = registry.createSandbox(); // When: @@ -198,9 +220,10 @@ public void shouldGetQueriesWithSinkFromSandbox() { @Test public void shouldGetQueryThatCreatedSource() { // Given: - final QueryMetadata query = givenCreate(registry, "q1", "source", "sink1", true); - givenCreate(registry, "q2", "source", "sink1", false); - givenCreate(registry, "q3", "source", "sink2", false); + final QueryMetadata query = givenCreate(registry, "q1", "source", + Optional.of("sink1"), CREATE_AS); + givenCreate(registry, "q2", "source", Optional.of("sink1"), INSERT); + givenCreate(registry, "q3", "source", Optional.of("sink2"), INSERT); // When: final Optional found = registry.getCreateAsQuery(SourceName.of("sink1")); @@ -212,9 +235,9 @@ public void shouldGetQueryThatCreatedSource() { @Test public void shouldGetQueryThatCreatedSourceOnSandbox() { // Given: - givenCreate(registry, "q1", "source", "sink1", true); - givenCreate(registry, "q2", "source", "sink1", false); - givenCreate(registry, "q3", "source", "sink2", false); + givenCreate(registry, "q1", "source", Optional.of("sink1"), CREATE_AS); + givenCreate(registry, "q2", "source", Optional.of("sink1"), INSERT); + givenCreate(registry, "q3", "source", Optional.of("sink2"), INSERT); final QueryRegistry sandbox = registry.createSandbox(); // When: @@ -226,10 +249,10 @@ public void shouldGetQueryThatCreatedSourceOnSandbox() { @Test public void shouldGetQueriesInsertingIntoOrReadingFromSource() { - givenCreate(registry, "q1", "source", "sink1", true); - givenCreate(registry, "q2", "source", "sink1", false); - givenCreate(registry, "q3", "source", "sink1", false); - givenCreate(registry, "q4", "sink1", "sink2", false); + givenCreate(registry, "q1", "source", Optional.of("sink1"), CREATE_AS); + givenCreate(registry, "q2", "source", Optional.of("sink1"), INSERT); + givenCreate(registry, "q3", "source", Optional.of("sink1"), INSERT); + givenCreate(registry, "q4", "sink1", Optional.of("sink2"), INSERT); // When: final Set queries = registry.getInsertQueries(SourceName.of("sink1"), (n, q) -> true); @@ -241,50 +264,85 @@ public void shouldGetQueriesInsertingIntoOrReadingFromSource() { @Test public void shouldRemoveQueryFromCreateAsQueriesWhenTerminatingCreateAsQuery() { // Given: - givenCreate(registry, "q1", "source", "sink1", true); - givenCreate(registry, "i1", "source", "sink1", false); + givenCreate(registry, "q1", "source", Optional.of("sink1"), CREATE_AS); + givenCreate(registry, "q2", "source", Optional.empty(), CREATE_SOURCE); + givenCreate(registry, "i1", "source", Optional.of("sink1"), INSERT); final QueryRegistry sandbox = registry.createSandbox(); final QueryMetadata q1 = sandbox.getPersistentQuery(new QueryId("q1")).get(); + final QueryMetadata q2 = sandbox.getPersistentQuery(new QueryId("q2")).get(); final QueryMetadata i1 = sandbox.getPersistentQuery(new QueryId("i1")).get(); // When: q1.close(); final Optional createAsQueries = sandbox.getCreateAsQuery(SourceName.of("sink1")); + final Optional createSourceQueries = + sandbox.getCreateAsQuery(SourceName.of("source")); final Set insertQueries = sandbox.getInsertQueries(SourceName.of("sink1"), (n, q) -> true); // Then: assertThat(createAsQueries, equalTo(Optional.empty())); + assertThat(createSourceQueries, equalTo(Optional.of(q2))); assertThat(insertQueries, contains(i1.getQueryId())); } @Test public void shouldRemoveQueryFromInsertQueriesWhenTerminatingInsertQuery() { // Given: - givenCreate(registry, "q1", "source", "sink1", true); - givenCreate(registry, "i1", "source", "sink1", false); + givenCreate(registry, "q1", "source", Optional.of("sink1"), CREATE_AS); + givenCreate(registry, "q2", "source", Optional.empty(), CREATE_SOURCE); + givenCreate(registry, "i1", "source", Optional.of("sink1"), INSERT); final QueryRegistry sandbox = registry.createSandbox(); final QueryMetadata q1 = sandbox.getPersistentQuery(new QueryId("q1")).get(); + final QueryMetadata q2 = sandbox.getPersistentQuery(new QueryId("q2")).get(); final QueryMetadata i1 = sandbox.getPersistentQuery(new QueryId("i1")).get(); // When: i1.close(); final QueryMetadata createAsQueries = sandbox.getCreateAsQuery(SourceName.of("sink1")).get(); + final QueryMetadata createSourceQueries = + sandbox.getCreateAsQuery(SourceName.of("source")).get(); final Set insertQueries = sandbox.getInsertQueries(SourceName.of("sink1"), (n, q) -> true); // Then: assertThat(createAsQueries, equalTo(q1)); + assertThat(createSourceQueries, equalTo(q2)); assertThat(insertQueries, empty()); } + @Test + public void shouldRemoveQueryFromCreateAsQueriesWhenTerminatingCreateSourceQuery() { + // Given: + givenCreate(registry, "q1", "source", Optional.of("sink1"), CREATE_AS); + givenCreate(registry, "q2", "source", Optional.empty(), CREATE_SOURCE); + givenCreate(registry, "i1", "source", Optional.of("sink1"), INSERT); + final QueryRegistry sandbox = registry.createSandbox(); + final QueryMetadata q1 = sandbox.getPersistentQuery(new QueryId("q1")).get(); + final QueryMetadata q2 = sandbox.getPersistentQuery(new QueryId("q2")).get(); + final QueryMetadata i1 = sandbox.getPersistentQuery(new QueryId("i1")).get(); + + // When: + q2.close(); + final QueryMetadata createAsQueries = sandbox.getCreateAsQuery(SourceName.of("sink1")).get(); + final Optional createSourceQueries = + sandbox.getCreateAsQuery(SourceName.of("source")); + final Set insertQueries = + sandbox.getInsertQueries(SourceName.of("sink1"), (n, q) -> true); + + // Then: + assertThat(createAsQueries, equalTo(q1)); + assertThat(createSourceQueries, equalTo(Optional.empty())); + assertThat(insertQueries, contains(i1.getQueryId())); + } + @Test public void shouldCopyInsertsOnSandbox() { // Given: - givenCreate(registry, "q1", "source", "sink1", true); - givenCreate(registry, "q2", "source", "sink1", false); - givenCreate(registry, "q3", "source", "sink1", false); + givenCreate(registry, "q1", "source", Optional.of("sink1"), CREATE_AS); + givenCreate(registry, "q2", "source", Optional.of("sink1"), INSERT); + givenCreate(registry, "q3", "source", Optional.of("sink1"), INSERT); final QueryRegistry sandbox = registry.createSandbox(); // When: @@ -297,7 +355,8 @@ public void shouldCopyInsertsOnSandbox() { @Test public void shouldCallListenerOnCreate() { // Given/When: - final QueryMetadata query = givenCreate(registry, "q1", "source", "sink1", true); + final QueryMetadata query = givenCreate(registry, "q1", "source", + Optional.of("sink1"), CREATE_AS); // Then: verify(listener1).onCreate(serviceContext, metaStore, query); @@ -353,7 +412,8 @@ public void shouldCallSandboxOnCreate() { final QueryRegistry sandbox = registry.createSandbox(); // When: - final PersistentQueryMetadata q = givenCreate(sandbox, "q1", "source", "sink1", true); + final PersistentQueryMetadata q = givenCreate(sandbox, "q1", "source", + Optional.of("sink1"), CREATE_AS); // Then: verify(sandboxListener).onCreate(serviceContext, metaStore, q); @@ -364,7 +424,7 @@ public void shouldCallSandboxOnCreate() { @Test public void shouldCallSandboxOnCloseOld() { // Given: - givenCreate(registry, "q1", "source", "sink1", true); + givenCreate(registry, "q1", "source", Optional.of("sink1"), CREATE_AS); final QueryRegistry sandbox = registry.createSandbox(); final QueryMetadata sandboxQuery = sandbox.getPersistentQuery(new QueryId("q1")).get(); @@ -400,7 +460,7 @@ private QueryMetadata.Listener givenCreateGetListener( final QueryRegistry registry, final String id ) { - givenCreate(registry, id, "source", "sink1", true); + givenCreate(registry, id, "source", Optional.of("sink1"), CREATE_AS); if (!sharedRuntimes) { verify(executor).buildPersistentQueryInDedicatedRuntime( any(), any(), any(), any(), any(), any(), any(), any(), queryListenerCaptor.capture(), any(), any()); @@ -411,24 +471,31 @@ private QueryMetadata.Listener givenCreateGetListener( return queryListenerCaptor.getValue(); } + private DataSource toSource(final String name) { + final DataSource source = Mockito.mock(DataSource.class); + return source; + } + private PersistentQueryMetadata givenCreate( final QueryRegistry registry, final String id, final String source, - final String sink, - boolean createAs + final Optional sink, + KsqlConstants.PersistentQueryType persistentQueryType ) { final QueryId queryId = new QueryId(id); final PersistentQueryMetadata query = mock(PersistentQueryMetadataImpl.class); final DataSource sinkSource = mock(DataSource.class); - when(sinkSource.getName()).thenReturn(SourceName.of(sink)); + + sink.ifPresent(s -> { + when(sinkSource.getName()).thenReturn(SourceName.of(s)); + when(query.getSinkName()).thenReturn(Optional.of(SourceName.of(s))); + }); + when(query.getQueryId()).thenReturn(queryId); - when(query.getSinkName()).thenReturn(Optional.of(SourceName.of(sink))); when(query.getSink()).thenReturn(Optional.of(sinkSource)); when(query.getSourceNames()).thenReturn(ImmutableSet.of(SourceName.of(source))); - when(query.getPersistentQueryType()).thenReturn(createAs - ? KsqlConstants.PersistentQueryType.CREATE_AS - : KsqlConstants.PersistentQueryType.INSERT); + when(query.getPersistentQueryType()).thenReturn(persistentQueryType); if (sharedRuntimes) { when(executor.buildPersistentQueryInSharedRuntime( any(), any(), any(), any(), any(), any(), any(), any(), any(), any()) @@ -447,11 +514,11 @@ private PersistentQueryMetadata givenCreate( metaStore, "sql", queryId, - sinkSource, - ImmutableSet.of(SourceName.of(source)), + Optional.of(sinkSource), + ImmutableSet.of(toSource(source)), mock(ExecutionStep.class), "plan-summary", - createAs + persistentQueryType ); return query; } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/PersistentQueriesInSharedRuntimesImplTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/PersistentQueriesInSharedRuntimesImplTest.java index ac9f5ec870a5..34e3d0e24aea 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/PersistentQueriesInSharedRuntimesImplTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/PersistentQueriesInSharedRuntimesImplTest.java @@ -16,10 +16,10 @@ package io.confluent.ksql.util; import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import com.google.common.collect.ImmutableSet; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.confluent.ksql.execution.plan.ExecutionStep; import io.confluent.ksql.logging.processing.ProcessingLogger; @@ -30,7 +30,6 @@ import io.confluent.ksql.schema.ksql.PhysicalSchema; import io.confluent.ksql.schema.query.QuerySchemas; import io.confluent.ksql.util.QueryMetadata.Listener; -import java.util.Collections; import java.util.Map; import java.util.Optional; @@ -84,7 +83,7 @@ public void setUp() { KsqlConstants.PersistentQueryType.CREATE_AS, SQL, physicalSchema, - Collections.EMPTY_SET, + ImmutableSet.of(), EXECUTION_PLAN, APPLICATION_ID, topology, diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/Command.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/Command.java index 12e6dc1be5f8..1204aa09014e 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/Command.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/Command.java @@ -37,7 +37,7 @@ public class Command { @VisibleForTesting - public static final int VERSION = 9; + public static final int VERSION = 10; private final String statement; private final Map overwriteProperties; diff --git a/ksqldb-rest-app/src/test/resources/ksql-plan-schema/schema.json b/ksqldb-rest-app/src/test/resources/ksql-plan-schema/schema.json index cb65f9d9d268..e9373b879338 100644 --- a/ksqldb-rest-app/src/test/resources/ksql-plan-schema/schema.json +++ b/ksqldb-rest-app/src/test/resources/ksql-plan-schema/schema.json @@ -288,7 +288,7 @@ "type" : "string" } }, - "required" : [ "sources", "sink", "physicalPlan", "queryId" ] + "required" : [ "sources", "physicalPlan", "queryId" ] }, "StreamAggregate" : { "type" : "object",