Skip to content

Commit

Permalink
chore: make CREATE OR REPLACE use execution step and pseudocolumn ver…
Browse files Browse the repository at this point in the history
…sions of original query (#8097)
  • Loading branch information
Sullivan-Patrick committed Oct 1, 2021
1 parent 07ec88a commit 2274da8
Show file tree
Hide file tree
Showing 14 changed files with 435 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import io.confluent.ksql.execution.expression.tree.UnqualifiedColumnReferenceExp;
import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.execution.plan.Formats;
import io.confluent.ksql.execution.plan.PlanInfo;
import io.confluent.ksql.execution.plan.PlanInfoExtractor;
import io.confluent.ksql.execution.streams.RoutingOptions;
import io.confluent.ksql.function.InternalFunctionRegistry;
import io.confluent.ksql.internal.PullQueryExecutorMetrics;
Expand Down Expand Up @@ -683,12 +685,24 @@ private ExecutorPlans planQuery(
&& engineContext.getQueryRegistry().getPersistentQuery(queryId).isPresent()) {
throw new KsqlException(String.format("Query ID '%s' already exists.", queryId));
}
final Optional<PersistentQueryMetadata> persistentQueryMetadata =
engineContext.getQueryRegistry().getPersistentQuery(queryId);

final Optional<PlanInfo> oldPlanInfo;

if (persistentQueryMetadata.isPresent()) {
final ExecutionStep<?> oldPlan = persistentQueryMetadata.get().getPhysicalPlan();
oldPlanInfo = Optional.of(oldPlan.extractPlanInfo(new PlanInfoExtractor()));
} else {
oldPlanInfo = Optional.empty();
}

final PhysicalPlan physicalPlan = queryEngine.buildPhysicalPlan(
logicalPlan,
config,
metaStore,
queryId
queryId,
oldPlanInfo
);
return new ExecutorPlans(logicalPlan, physicalPlan);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.confluent.ksql.analyzer.Analysis;
import io.confluent.ksql.analyzer.QueryAnalyzer;
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.execution.plan.PlanInfo;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.parser.tree.Query;
Expand Down Expand Up @@ -76,7 +77,8 @@ PhysicalPlan buildPhysicalPlan(
final LogicalPlanNode logicalPlanNode,
final SessionConfig config,
final MetaStore metaStore,
final QueryId queryId
final QueryId queryId,
final Optional<PlanInfo> oldPlanInfo
) {

final StreamsBuilder builder = new StreamsBuilder();
Expand All @@ -90,6 +92,6 @@ PhysicalPlan buildPhysicalPlan(
metaStore
);

return physicalPlanBuilder.buildPhysicalPlan(logicalPlanNode, queryId);
return physicalPlanBuilder.buildPhysicalPlan(logicalPlanNode, queryId, oldPlanInfo);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.ksql.physical;

import io.confluent.ksql.execution.plan.PlanInfo;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.planner.LogicalPlanNode;
Expand All @@ -26,6 +27,7 @@
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.util.KsqlConfig;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.streams.StreamsBuilder;

public class PhysicalPlanBuilder {
Expand Down Expand Up @@ -54,15 +56,17 @@ public PhysicalPlanBuilder(

public PhysicalPlan buildPhysicalPlan(
final LogicalPlanNode logicalPlanNode,
final QueryId queryId
final QueryId queryId,
final Optional<PlanInfo> oldPlanInfo
) {
final OutputNode outputNode = logicalPlanNode.getNode()
.orElseThrow(() -> new IllegalArgumentException("Need an output node to build a plan"));

final PlanBuildContext buildContext = PlanBuildContext.of(
ksqlConfig,
serviceContext,
functionRegistry
functionRegistry,
oldPlanInfo
);

final SchemaKStream<?> resultStream = outputNode.buildStream(buildContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.plan.PlanInfo;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import java.util.Optional;

/**
* Contains all the context required to build an execution plan from a logical plan.
Expand All @@ -30,28 +32,29 @@ public final class PlanBuildContext {
private final KsqlConfig ksqlConfig;
private final ServiceContext serviceContext;
private final FunctionRegistry functionRegistry;
private final Optional<PlanInfo> planInfo;

public static PlanBuildContext of(
final KsqlConfig ksqlConfig,
final ServiceContext serviceContext,
final FunctionRegistry functionRegistry
final FunctionRegistry functionRegistry,
final Optional<PlanInfo> planInfo
) {
return new PlanBuildContext(
ksqlConfig,
serviceContext,
functionRegistry
);
ksqlConfig, serviceContext, functionRegistry, planInfo);
}

@SuppressFBWarnings(value = "EI_EXPOSE_REP2")
private PlanBuildContext(
final KsqlConfig ksqlConfig,
final ServiceContext serviceContext,
final FunctionRegistry functionRegistry
final FunctionRegistry functionRegistry,
final Optional<PlanInfo> planInfo
) {
this.ksqlConfig = requireNonNull(ksqlConfig, "ksqlConfig");
this.serviceContext = requireNonNull(serviceContext, "serviceContext");
this.functionRegistry = requireNonNull(functionRegistry, "functionRegistry");
this.planInfo = requireNonNull(planInfo, "planInfo");
}

public ServiceContext getServiceContext() {
Expand All @@ -67,11 +70,16 @@ public FunctionRegistry getFunctionRegistry() {
return functionRegistry;
}

public Optional<PlanInfo> getPlanInfo() {
return planInfo;
}

public PlanBuildContext withKsqlConfig(final KsqlConfig newConfig) {
return of(
newConfig,
serviceContext,
functionRegistry
functionRegistry,
planInfo
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.confluent.ksql.execution.plan.KTableHolder;
import io.confluent.ksql.execution.plan.SourceStep;
import io.confluent.ksql.execution.plan.StreamSource;
import io.confluent.ksql.execution.plan.TableSourceV1;
import io.confluent.ksql.execution.plan.WindowedStreamSource;
import io.confluent.ksql.execution.streams.ExecutionStepFactory;
import io.confluent.ksql.execution.streams.StepSchemaResolver;
Expand All @@ -35,6 +36,7 @@
import io.confluent.ksql.serde.KeyFormat;
import io.confluent.ksql.serde.WindowInfo;
import io.confluent.ksql.util.KsqlConfig;
import java.util.Set;
import org.apache.kafka.streams.kstream.Windowed;

/**
Expand Down Expand Up @@ -89,14 +91,16 @@ private static SchemaKStream<?> buildWindowedStream(
final WindowInfo windowInfo = dataSource.getKsqlTopic().getKeyFormat().getWindowInfo()
.orElseThrow(IllegalArgumentException::new);

final int pseudoColumnVersionToUse = determinePseudoColumnVersionToUse(buildContext);

final WindowedStreamSource step = ExecutionStepFactory.streamSourceWindowed(
contextStacker,
dataSource.getSchema(),
dataSource.getKafkaTopicName(),
Formats.from(dataSource.getKsqlTopic()),
windowInfo,
dataSource.getTimestampColumn(),
SystemColumns.getPseudoColumnVersionFromConfig(buildContext.getKsqlConfig())
pseudoColumnVersionToUse
);

return schemaKStream(
Expand All @@ -116,13 +120,15 @@ private static SchemaKStream<?> buildStream(
throw new IllegalArgumentException("windowed");
}

final int pseudoColumnVersionToUse = determinePseudoColumnVersionToUse(buildContext);

final StreamSource step = ExecutionStepFactory.streamSource(
contextStacker,
dataSource.getSchema(),
dataSource.getKafkaTopicName(),
Formats.from(dataSource.getKsqlTopic()),
dataSource.getTimestampColumn(),
SystemColumns.getPseudoColumnVersionFromConfig(buildContext.getKsqlConfig())
pseudoColumnVersionToUse
);

return schemaKStream(
Expand All @@ -141,6 +147,8 @@ private static SchemaKTable<?> buildWindowedTable(
final WindowInfo windowInfo = dataSource.getKsqlTopic().getKeyFormat().getWindowInfo()
.orElseThrow(IllegalArgumentException::new);

final int pseudoColumnVersionToUse = determinePseudoColumnVersionToUse(buildContext);

final SourceStep<KTableHolder<Windowed<GenericKey>>> step =
ExecutionStepFactory.tableSourceWindowed(
contextStacker,
Expand All @@ -149,7 +157,7 @@ private static SchemaKTable<?> buildWindowedTable(
Formats.from(dataSource.getKsqlTopic()),
windowInfo,
dataSource.getTimestampColumn(),
SystemColumns.getPseudoColumnVersionFromConfig(buildContext.getKsqlConfig())
pseudoColumnVersionToUse
);

return schemaKTable(
Expand All @@ -174,15 +182,33 @@ private static SchemaKTable<?> buildTable(

final SourceStep<KTableHolder<GenericKey>> step;

if (buildContext.getKsqlConfig().getBoolean(KsqlConfig.KSQL_ROWPARTITION_ROWOFFSET_ENABLED)) {
final int pseudoColumnVersionToUse = determinePseudoColumnVersionToUse(buildContext);

// If the old query has a v1 table step, continue to use it.
// See https://github.com/confluentinc/ksql/pull/7990
boolean useOldExecutionStepVersion = false;
if (buildContext.getPlanInfo().isPresent()) {
final Set<ExecutionStep<?>> sourceSteps = buildContext.getPlanInfo().get().getSources();
useOldExecutionStepVersion = sourceSteps
.stream()
.anyMatch(executionStep -> executionStep instanceof TableSourceV1);
}

if (useOldExecutionStepVersion
&& pseudoColumnVersionToUse != SystemColumns.LEGACY_PSEUDOCOLUMN_VERSION_NUMBER) {
throw new IllegalStateException("TableSourceV2 was released in conjunction with pseudocolumn"
+ "version 1. Something has gone very wrong");
}
if (buildContext.getKsqlConfig().getBoolean(KsqlConfig.KSQL_ROWPARTITION_ROWOFFSET_ENABLED)
&& !useOldExecutionStepVersion) {
step = ExecutionStepFactory.tableSource(
contextStacker,
dataSource.getSchema(),
dataSource.getKafkaTopicName(),
Formats.from(dataSource.getKsqlTopic()),
dataSource.getTimestampColumn(),
InternalFormats.of(keyFormat, Formats.from(dataSource.getKsqlTopic()).getValueFormat()),
SystemColumns.getPseudoColumnVersionFromConfig(buildContext.getKsqlConfig())
pseudoColumnVersionToUse
);

} else {
Expand All @@ -192,7 +218,7 @@ private static SchemaKTable<?> buildTable(
dataSource.getKafkaTopicName(),
Formats.from(dataSource.getKsqlTopic()),
dataSource.getTimestampColumn(),
SystemColumns.getPseudoColumnVersionFromConfig(buildContext.getKsqlConfig())
pseudoColumnVersionToUse
);
}

Expand All @@ -204,6 +230,21 @@ private static SchemaKTable<?> buildTable(
);
}

private static int determinePseudoColumnVersionToUse(final PlanBuildContext buildContext) {

// Assume statement is CREATE OR REPLACE if this is present, as it indicates that there was
// an existing query with the same ID. if it wasn't COR, it will fail later
if (buildContext.getPlanInfo().isPresent()) {
final Set<ExecutionStep<?>> sourceSteps = buildContext.getPlanInfo().get().getSources();

return sourceSteps.stream()
.map(SourceStep.class::cast)
.mapToInt(SourceStep::getPseudoColumnVersion)
.findAny().getAsInt();
}
return SystemColumns.getPseudoColumnVersionFromConfig(buildContext.getKsqlConfig());
}

private static <K> SchemaKStream<K> schemaKStream(
final PlanBuildContext buildContext,
final LogicalSchema schema,
Expand Down Expand Up @@ -243,5 +284,4 @@ private static LogicalSchema resolveSchema(
buildContext.getFunctionRegistry()
).resolve(step, dataSource.getSchema());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public void shouldDoComparisons() throws Exception {
public void shouldDoComparisons_null() throws Exception {
ordersRow = GenericRow.genericRow(null, null, null, null, null, null, null, null, null);
assertOrdersError("1 = null",
compileTime("Unexpected error generating code for Test. expression:(1 = null)"),
compileTime("Unexpected error generating code for Test. expression: (1 = null)"),
compileTime("Invalid expression: Comparison with NULL not supported: INTEGER = NULL"));
assertOrders("ORDERID = 1", false);
assertOrders("ITEMID > 'a'", false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import java.util.Optional;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -49,7 +50,8 @@ public void setup() {
runtimeBuildContext = PlanBuildContext.of(
ksqlConfig,
serviceContext,
functionRegistry
functionRegistry,
Optional.empty()
);
}

Expand Down
Loading

0 comments on commit 2274da8

Please sign in to comment.