Skip to content

Commit

Permalink
feat: execute source table query plans (#8061)
Browse files Browse the repository at this point in the history
  • Loading branch information
spena committed Aug 28, 2021
1 parent 14328bc commit e2c3211
Show file tree
Hide file tree
Showing 21 changed files with 541 additions and 203 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public enum KsqlQueryType {
}

public enum PersistentQueryType {
CREATE_AS, INSERT
CREATE_SOURCE, CREATE_AS, INSERT
}

public enum KsqlQueryStatus {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public final class SourceSchemas {
SourceSchemas(final Map<SourceName, LogicalSchema> sourceSchemas) {
this.sourceSchemas = ImmutableMap.copyOf(requireNonNull(sourceSchemas, "sourceSchemas"));

// This will fail
if (sourceSchemas.isEmpty()) {
throw new IllegalArgumentException("Must supply at least one schema");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,13 @@ public DdlCommandResult executeCreateTable(final CreateTableCommand createTable)
createTable.getIsSource()
);
metaStore.putSource(ksqlTable, createTable.isOrReplace());
metaStore.addSourceReferences(ksqlTable.getName(), withQuerySources);

// Source tables only has a query source reference to itself. We don't need to register
// this source for source tables.
if (!createTable.getIsSource()) {
metaStore.addSourceReferences(ksqlTable.getName(), withQuerySources);
}

return new DdlCommandResult(true, "Table created");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static io.confluent.ksql.metastore.model.DataSource.DataSourceType;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.KsqlExecutionContext.ExecuteResult;
Expand Down Expand Up @@ -80,12 +81,14 @@
import io.confluent.ksql.query.QueryRegistry;
import io.confluent.ksql.query.TransientQueryQueue;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.utils.FormatOptions;
import io.confluent.ksql.serde.KeyFormat;
import io.confluent.ksql.serde.RefinementInfo;
import io.confluent.ksql.serde.ValueFormat;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.PersistentQueryMetadata;
Expand Down Expand Up @@ -159,11 +162,20 @@ ExecuteResult execute(final KsqlPlan plan) {
}

final QueryPlan queryPlan = plan.getQueryPlan().get();
final DataSource sinkSource = engineContext.getMetaStore().getSource(queryPlan.getSink());

if (sinkSource != null && sinkSource.isSource()) {
throw new KsqlException(String.format("Cannot insert into read-only %s: %s",
sinkSource.getDataSourceType().getKsqlType().toLowerCase(), sinkSource.getName().text()));
final KsqlConstants.PersistentQueryType persistentQueryType =
plan.getPersistentQueryType().get();

// CREATE_SOURCE do not write to any topic. We check for read-only topics only for queries
// that attempt to write to a sink (i.e. INSERT or CREATE_AS).
if (persistentQueryType != KsqlConstants.PersistentQueryType.CREATE_SOURCE) {
final DataSource sinkSource = engineContext.getMetaStore()
.getSource(queryPlan.getSink().get());

if (sinkSource != null && sinkSource.isSource()) {
throw new KsqlException(String.format("Cannot insert into read-only %s: %s",
sinkSource.getDataSourceType().getKsqlType().toLowerCase(),
sinkSource.getName().text()));
}
}

final Optional<String> ddlResult = plan.getDdlCommand().map(ddl ->
Expand All @@ -177,7 +189,7 @@ ExecuteResult execute(final KsqlPlan plan) {
return ExecuteResult.of(executePersistentQuery(
queryPlan,
plan.getStatementText(),
plan.getDdlCommand().isPresent())
persistentQueryType)
);
}

Expand Down Expand Up @@ -444,7 +456,7 @@ private KsqlPlan sourceTablePlan(

final QueryPlan queryPlan = new QueryPlan(
getSourceNames(outputNode),
null,
Optional.empty(),
plans.physicalPlan.getPhysicalPlan(),
plans.physicalPlan.getQueryId()
);
Expand Down Expand Up @@ -505,7 +517,7 @@ KsqlPlan plan(final ConfiguredStatement<?> statement) {

final QueryPlan queryPlan = new QueryPlan(
getSourceNames(outputNode),
outputNode.getSinkName().get(),
outputNode.getSinkName(),
plans.physicalPlan.getPhysicalPlan(),
plans.physicalPlan.getQueryId()
);
Expand Down Expand Up @@ -547,13 +559,12 @@ private ExecutorPlans planQuery(
Optional.of(outputNode)
);
final QueryId queryId = QueryIdUtil.buildId(
statement.getStatement(),
engineContext,
engineContext.idGenerator(),
outputNode,
ksqlConfig.getBoolean(KsqlConfig.KSQL_CREATE_OR_REPLACE_ENABLED),
withQueryId,
statement.getStatement() instanceof CreateTable
&& ((CreateTable) statement.getStatement()).isSource()
withQueryId
);

if (withQueryId.isPresent()
Expand Down Expand Up @@ -738,10 +749,24 @@ private String executeDdl(
}
}

private Set<DataSource> getSources(final QueryPlan queryPlan) {
final ImmutableSet.Builder<DataSource> sources = ImmutableSet.builder();
for (final SourceName name : queryPlan.getSources()) {
final DataSource dataSource = engineContext.getMetaStore().getSource(name);
if (dataSource == null) {
throw new KsqlException("Unknown source: " + name.toString(FormatOptions.noEscape()));
}

sources.add(dataSource);
}

return sources.build();
}

private PersistentQueryMetadata executePersistentQuery(
final QueryPlan queryPlan,
final String statementText,
final boolean createAsQuery
final KsqlConstants.PersistentQueryType persistentQueryType
) {
final QueryRegistry queryRegistry = engineContext.getQueryRegistry();
return queryRegistry.createOrReplacePersistentQuery(
Expand All @@ -751,11 +776,11 @@ private PersistentQueryMetadata executePersistentQuery(
engineContext.getMetaStore(),
statementText,
queryPlan.getQueryId(),
engineContext.getMetaStore().getSource(queryPlan.getSink()),
queryPlan.getSources(),
queryPlan.getSink().map(s -> engineContext.getMetaStore().getSource(s)),
getSources(queryPlan),
queryPlan.getPhysicalPlan(),
buildPlanSummary(queryPlan.getQueryId(), queryPlan.getPhysicalPlan()),
createAsQuery
persistentQueryType
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@

package io.confluent.ksql.engine;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.confluent.ksql.execution.ddl.commands.DdlCommand;
import io.confluent.ksql.util.KsqlConstants;
import java.util.Optional;

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME)
Expand All @@ -35,6 +37,9 @@ public interface KsqlPlan {

KsqlPlan withoutQuery();

@JsonIgnore
Optional<KsqlConstants.PersistentQueryType> getPersistentQueryType();

static KsqlPlan ddlPlanCurrent(final String statementText, final DdlCommand ddlCommand) {
return new KsqlPlanV1(statementText, Optional.of(ddlCommand), Optional.empty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
package io.confluent.ksql.engine;

import com.fasterxml.jackson.annotation.JsonProperty;
import io.confluent.ksql.execution.ddl.commands.CreateTableCommand;
import io.confluent.ksql.execution.ddl.commands.DdlCommand;
import io.confluent.ksql.util.KsqlConstants;
import java.util.Objects;
import java.util.Optional;

Expand Down Expand Up @@ -66,6 +68,27 @@ public KsqlPlan withoutQuery() {
return new KsqlPlanV1(statementText, ddlCommand, Optional.empty());
}

@Override
public Optional<KsqlConstants.PersistentQueryType> getPersistentQueryType() {
if (!queryPlan.isPresent()) {
return Optional.empty();
}

// CREATE_AS and CREATE_SOURCE commands contain a DDL command and a Query plan.
if (ddlCommand.isPresent()) {
if (ddlCommand.get() instanceof CreateTableCommand
&& ((CreateTableCommand) ddlCommand.get()).getIsSource()) {
return Optional.of(KsqlConstants.PersistentQueryType.CREATE_SOURCE);
} else {
return Optional.of(KsqlConstants.PersistentQueryType.CREATE_AS);
}
} else {
// INSERT INTO persistent queries are the only queries types that exist without a
// DDL command linked to the plan.
return Optional.of(KsqlConstants.PersistentQueryType.INSERT);
}
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import com.google.common.collect.Iterables;
import io.confluent.ksql.metastore.model.DataSource.DataSourceType;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.tree.CreateTable;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.planner.plan.KsqlStructuredDataOutputNode;
import io.confluent.ksql.planner.plan.OutputNode;
import io.confluent.ksql.query.QueryId;
Expand Down Expand Up @@ -82,35 +84,36 @@ private static void validateWithQueryId(final String queryId) {
/**
* Builds a {@link QueryId} for a physical plan specification.
*
* @param statement the statement that requires the query ID
* @param engineContext the context representing the current state of the engine
* @param idGenerator generates query ids
* @param outputNode the logical plan
* @param createOrReplaceEnabled whether or not the queryID can replace an existing one
* @return the {@link QueryId} to be used
*/
static QueryId buildId(
final Statement statement,
final EngineContext engineContext,
final QueryIdGenerator idGenerator,
final OutputNode outputNode,
final boolean createOrReplaceEnabled,
final Optional<String> withQueryId,
final boolean isSourceTable) {
final Optional<String> withQueryId) {
if (withQueryId.isPresent()) {
final String queryId = withQueryId.get().toUpperCase();
validateWithQueryId(queryId);
return new QueryId(queryId);
}
if (statement instanceof CreateTable && ((CreateTable) statement).isSource()) {
// Use the CST name as part of the QueryID
final String suffix = ((CreateTable) statement).getName().text().toUpperCase()
+ "_" + idGenerator.getNext().toUpperCase();
return new QueryId(ReservedQueryIdsPrefixes.CST + suffix);
}

if (!outputNode.getSinkName().isPresent()) {
if (isSourceTable) {
final String suffix = outputNode.getId().toString().toUpperCase()
+ "_" + idGenerator.getNext().toUpperCase();
return new QueryId(ReservedQueryIdsPrefixes.CST + suffix);
} else {
final String prefix =
"transient_" + outputNode.getSource().getLeftmostSourceNode().getAlias().text() + "_";
return new QueryId(prefix + Math.abs(ThreadLocalRandom.current().nextLong()));
}
final String prefix =
"transient_" + outputNode.getSource().getLeftmostSourceNode().getAlias().text() + "_";
return new QueryId(prefix + Math.abs(ThreadLocalRandom.current().nextLong()));
}

final KsqlStructuredDataOutputNode structured = (KsqlStructuredDataOutputNode) outputNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,18 @@
import io.confluent.ksql.query.QueryId;
import java.util.Comparator;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

public final class QueryPlan {
private final ImmutableSet<SourceName> sources;
private final SourceName sink;
private final Optional<SourceName> sink;
private final ExecutionStep<?> physicalPlan;
private final QueryId queryId;

public QueryPlan(
@JsonProperty(value = "sources", required = true) final Set<SourceName> sources,
@JsonProperty(value = "sink", required = true) final SourceName sink,
@JsonProperty(value = "sink") final Optional<SourceName> sink,
@JsonProperty(value = "physicalPlan", required = true) final ExecutionStep<?> physicalPlan,
@JsonProperty(value = "queryId", required = true) final QueryId queryId
) {
Expand All @@ -48,7 +49,7 @@ public QueryPlan(
this.queryId = Objects.requireNonNull(queryId, "queryId");
}

public SourceName getSink() {
public Optional<SourceName> getSink() {
return sink;
}

Expand Down

0 comments on commit e2c3211

Please sign in to comment.