Skip to content

Commit

Permalink
refactor: pull query execution (#7874)
Browse files Browse the repository at this point in the history
* Consolidate ad-hoc query analysis
* Clarify the type of query in various "execute query" methods in the codebase
* Misc refactors to prepare for stream pull queries

Reviewers: Almog Gavra, Guozhang Wang
  • Loading branch information
vvcephei committed Aug 10, 2021
1 parent 61d6172 commit 2e1d635
Show file tree
Hide file tree
Showing 15 changed files with 309 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,8 @@
import io.confluent.ksql.integration.IntegrationTestHarness;
import io.confluent.ksql.integration.Retry;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.client.StreamPublisher;
import io.confluent.ksql.rest.entity.ConnectorList;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.rest.integration.RestIntegrationTestUtil;
import io.confluent.ksql.rest.server.ConnectExecutable;
import io.confluent.ksql.rest.server.TestKsqlRestApp;
Expand All @@ -97,7 +94,6 @@
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
Expand Down Expand Up @@ -174,7 +170,8 @@ public class ClientIntegrationTest {
private static final String EMPTY_TEST_STREAM_2 = EMPTY_TEST_DATA_PROVIDER_2.sourceName();

private static final String PUSH_QUERY = "SELECT * FROM " + TEST_STREAM + " EMIT CHANGES;";
private static final String PULL_QUERY = "SELECT * from " + AGG_TABLE + " WHERE K=" + AN_AGG_KEY + ";";
private static final String PULL_QUERY_ON_TABLE =
"SELECT * from " + AGG_TABLE + " WHERE K=" + AN_AGG_KEY + ";";
private static final int PUSH_QUERY_LIMIT_NUM_ROWS = 2;
private static final String PUSH_QUERY_WITH_LIMIT =
"SELECT * FROM " + TEST_STREAM + " EMIT CHANGES LIMIT " + PUSH_QUERY_LIMIT_NUM_ROWS + ";";
Expand Down Expand Up @@ -370,24 +367,29 @@ public void shouldStreamPushQuerySync() throws Exception {
}

@Test
public void shouldStreamPullQueryAsync() throws Exception {
public void shouldStreamPullQueryOnTableAsync() throws Exception {
// When
final StreamedQueryResult streamedQueryResult = client.streamQuery(PULL_QUERY).get();
final StreamedQueryResult streamedQueryResult = client.streamQuery(PULL_QUERY_ON_TABLE).get();

// Then
assertThat(streamedQueryResult.columnNames(), is(PULL_QUERY_COLUMN_NAMES));
assertThat(streamedQueryResult.columnTypes(), is(PULL_QUERY_COLUMN_TYPES));
assertThat(streamedQueryResult.queryID(), is(nullValue()));

shouldReceivePullQueryRow(streamedQueryResult);
shouldReceiveRows(
streamedQueryResult,
1,
ClientIntegrationTest::verifyPullQueryRows,
true
);

assertThatEventually(streamedQueryResult::isComplete, is(true));
}

@Test
public void shouldStreamPullQuerySync() throws Exception {
public void shouldStreamPullQueryOnTableSync() throws Exception {
// When
final StreamedQueryResult streamedQueryResult = client.streamQuery(PULL_QUERY).get();
final StreamedQueryResult streamedQueryResult = client.streamQuery(PULL_QUERY_ON_TABLE).get();

// Then
assertThat(streamedQueryResult.columnNames(), is(PULL_QUERY_COLUMN_NAMES));
Expand Down Expand Up @@ -483,7 +485,7 @@ public void shouldAllowSubscribeStreamedQueryResultIfComplete() throws Exception
@Test
public void shouldExecutePullQuery() throws Exception {
// When
final BatchedQueryResult batchedQueryResult = client.executeQuery(PULL_QUERY);
final BatchedQueryResult batchedQueryResult = client.executeQuery(PULL_QUERY_ON_TABLE);

// Then
assertThat(batchedQueryResult.queryID().get(), is(nullValue()));
Expand Down Expand Up @@ -1356,15 +1358,6 @@ private static void verifyStreamRowWithIndex(final Row row, final int index) {
assertThat(obj.toString(), is(obj.toJsonString()));
}

private static void shouldReceivePullQueryRow(final Publisher<Row> publisher) {
shouldReceiveRows(
publisher,
1,
ClientIntegrationTest::verifyPullQueryRows,
true
);
}

private static void verifyPullQueryRows(final List<Row> rows) {
assertThat(rows, hasSize(1));
verifyPullQueryRow(rows.get(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.ksql;

import io.confluent.ksql.analyzer.ImmutableAnalysis;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.engine.KsqlPlan;
import io.confluent.ksql.execution.streams.RoutingOptions;
Expand Down Expand Up @@ -146,7 +147,7 @@ default PreparedStatement<?> prepare(ParsedStatement stmt) {
* Executes a query using the supplied service context.
* @return the query metadata
*/
TransientQueryMetadata executeQuery(
TransientQueryMetadata executeTransientQuery(
ServiceContext serviceContext,
ConfiguredStatement<Query> statement,
boolean excludeTombstones
Expand All @@ -163,7 +164,8 @@ TransientQueryMetadata executeQuery(
* call PullQueryResult.start to start the query.
* @return the rows that are the result of the query evaluation.
*/
PullQueryResult executePullQuery(
PullQueryResult executeTablePullQuery(
ImmutableAnalysis analysis,
ServiceContext serviceContext,
ConfiguredStatement<Query> statement,
HARouting routing,
Expand All @@ -185,6 +187,7 @@ PullQueryResult executePullQuery(
* @return A ScalablePushQueryMetadata object
*/
ScalablePushQueryMetadata executeScalablePushQuery(
ImmutableAnalysis analysis,
ServiceContext serviceContext,
ConfiguredStatement<Query> statement,
PushRouting pushRouting,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,12 @@ private enum CustomExecutors {
}),
QUERY(Query.class, (executionContext, stmt, props) -> {
return ExecuteResult.of(
executionContext.executeQuery(executionContext.getServiceContext(), stmt.cast(), false));
executionContext.executeTransientQuery(
executionContext.getServiceContext(),
stmt.cast(),
false
)
);
})
;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@
import com.google.common.collect.Iterables;
import io.confluent.ksql.KsqlExecutionContext.ExecuteResult;
import io.confluent.ksql.analyzer.ImmutableAnalysis;
import io.confluent.ksql.analyzer.QueryAnalyzer;
import io.confluent.ksql.analyzer.RewrittenAnalysis;
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.engine.QueryExecutionUtil.ColumnReferenceRewriter;
import io.confluent.ksql.execution.ddl.commands.DdlCommand;
import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.execution.streams.RoutingOptions;
Expand Down Expand Up @@ -165,7 +162,8 @@ ExecuteResult execute(final KsqlPlan plan) {
* @param pullQueryMetrics JMX metrics
* @return the rows that are the result of evaluating the pull query
*/
PullQueryResult executePullQuery(
PullQueryResult executeTablePullQuery(
final ImmutableAnalysis analysis,
final ConfiguredStatement<Query> statement,
final HARouting routing,
final RoutingOptions routingOptions,
Expand All @@ -183,12 +181,6 @@ PullQueryResult executePullQuery(
RoutingNodeType routingNodeType = null;

try {
final QueryAnalyzer queryAnalyzer = new QueryAnalyzer(engineContext.getMetaStore(),
NO_OUTPUT_TOPIC_PREFIX);
final ImmutableAnalysis analysis = new RewrittenAnalysis(
queryAnalyzer.analyze(statement.getStatement(), Optional.empty()),
new ColumnReferenceRewriter()::process
);
// Do not set sessionConfig.getConfig to true! The copying is inefficient and slows down pull
// query performance significantly. Instead use QueryPlannerOptions which check overrides
// deliberately.
Expand Down Expand Up @@ -263,6 +255,7 @@ PullQueryResult executePullQuery(
}

ScalablePushQueryMetadata executeScalablePushQuery(
final ImmutableAnalysis analysis,
final ConfiguredStatement<Query> statement,
final PushRouting pushRouting,
final PushRoutingOptions pushRoutingOptions,
Expand All @@ -271,12 +264,6 @@ ScalablePushQueryMetadata executeScalablePushQuery(
) {
final SessionConfig sessionConfig = statement.getSessionConfig();
try {
final QueryAnalyzer queryAnalyzer = new QueryAnalyzer(engineContext.getMetaStore(),
NO_OUTPUT_TOPIC_PREFIX);
final ImmutableAnalysis analysis = new RewrittenAnalysis(
queryAnalyzer.analyze(statement.getStatement(), Optional.empty()),
new ColumnReferenceRewriter()::process
);
final KsqlConfig ksqlConfig = sessionConfig.getConfig(false);
final LogicalPlanNode logicalPlan = buildAndValidateLogicalPlan(
statement, analysis, ksqlConfig, queryPlannerOptions, true);
Expand Down Expand Up @@ -316,7 +303,7 @@ ScalablePushQueryMetadata executeScalablePushQuery(


@SuppressWarnings("OptionalGetWithoutIsPresent") // Known to be non-empty
TransientQueryMetadata executeQuery(
TransientQueryMetadata executeTransientQuery(
final ConfiguredStatement<Query> statement,
final boolean excludeTombstones
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
import com.google.common.collect.ImmutableList;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.ServiceInfo;
import io.confluent.ksql.analyzer.Analysis;
import io.confluent.ksql.analyzer.ImmutableAnalysis;
import io.confluent.ksql.analyzer.QueryAnalyzer;
import io.confluent.ksql.analyzer.RewrittenAnalysis;
import io.confluent.ksql.execution.streams.RoutingOptions;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.internal.KsqlEngineMetrics;
Expand Down Expand Up @@ -68,7 +72,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling
public class KsqlEngine implements KsqlExecutionContext, Closeable {
// CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling

private static final Logger log = LoggerFactory.getLogger(KsqlEngine.class);

Expand Down Expand Up @@ -269,15 +275,15 @@ public ExecuteResult execute(
}

@Override
public TransientQueryMetadata executeQuery(
public TransientQueryMetadata executeTransientQuery(
final ServiceContext serviceContext,
final ConfiguredStatement<Query> statement,
final boolean excludeTombstones
) {
try {
final TransientQueryMetadata query = EngineExecutor
.create(primaryContext, serviceContext, statement.getSessionConfig())
.executeQuery(statement, excludeTombstones);
.executeTransientQuery(statement, excludeTombstones);
return query;
} catch (final KsqlStatementException e) {
throw e;
Expand All @@ -289,6 +295,7 @@ public TransientQueryMetadata executeQuery(

@Override
public ScalablePushQueryMetadata executeScalablePushQuery(
final ImmutableAnalysis analysis,
final ServiceContext serviceContext,
final ConfiguredStatement<Query> statement,
final PushRouting pushRouting,
Expand All @@ -298,13 +305,19 @@ public ScalablePushQueryMetadata executeScalablePushQuery(
) {
final ScalablePushQueryMetadata query = EngineExecutor
.create(primaryContext, serviceContext, statement.getSessionConfig())
.executeScalablePushQuery(statement, pushRouting, pushRoutingOptions, queryPlannerOptions,
.executeScalablePushQuery(
analysis,
statement,
pushRouting,
pushRoutingOptions,
queryPlannerOptions,
context);
return query;
}

@Override
public PullQueryResult executePullQuery(
public PullQueryResult executeTablePullQuery(
final ImmutableAnalysis analysis,
final ServiceContext serviceContext,
final ConfiguredStatement<Query> statement,
final HARouting routing,
Expand All @@ -319,7 +332,8 @@ public PullQueryResult executePullQuery(
serviceContext,
statement.getSessionConfig()
)
.executePullQuery(
.executeTablePullQuery(
analysis,
statement,
routing,
routingOptions,
Expand Down Expand Up @@ -373,6 +387,26 @@ public static boolean isExecutableStatement(final Statement statement) {
|| statement instanceof Query;
}

/**
* For analyzing queries that you know won't have an output topic, such as pull queries.
*/
public ImmutableAnalysis analyzeQueryWithNoOutputTopic(
final Query query,
final String queryText) {

final QueryAnalyzer queryAnalyzer = new QueryAnalyzer(getMetaStore(), "");
final Analysis analysis;
try {
analysis = queryAnalyzer.analyze(query, Optional.empty());
} catch (final KsqlException e) {
throw new KsqlStatementException(e.getMessage(), queryText, e);
}
return new RewrittenAnalysis(
analysis,
new QueryExecutionUtil.ColumnReferenceRewriter()::process
);
}

private static final class CleanupListener implements QueryEventListener {
final QueryCleanupService cleanupService;
final ServiceContext serviceContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.analyzer.ImmutableAnalysis;
import io.confluent.ksql.execution.streams.RoutingOptions;
import io.confluent.ksql.internal.PullQueryExecutorMetrics;
import io.confluent.ksql.logging.processing.NoopProcessingLogContext;
Expand Down Expand Up @@ -158,7 +159,7 @@ public ExecuteResult execute(
}

@Override
public TransientQueryMetadata executeQuery(
public TransientQueryMetadata executeTransientQuery(
final ServiceContext serviceContext,
final ConfiguredStatement<Query> statement,
final boolean excludeTombstones
Expand All @@ -167,11 +168,12 @@ public TransientQueryMetadata executeQuery(
engineContext,
serviceContext,
statement.getSessionConfig()
).executeQuery(statement, excludeTombstones);
).executeTransientQuery(statement, excludeTombstones);
}

@Override
public PullQueryResult executePullQuery(
public PullQueryResult executeTablePullQuery(
final ImmutableAnalysis analysis,
final ServiceContext serviceContext,
final ConfiguredStatement<Query> statement,
final HARouting routing,
Expand All @@ -184,7 +186,8 @@ public PullQueryResult executePullQuery(
engineContext,
serviceContext,
statement.getSessionConfig()
).executePullQuery(
).executeTablePullQuery(
analysis,
statement,
routing,
routingOptions,
Expand All @@ -196,6 +199,7 @@ public PullQueryResult executePullQuery(

@Override
public ScalablePushQueryMetadata executeScalablePushQuery(
final ImmutableAnalysis analysis,
final ServiceContext serviceContext,
final ConfiguredStatement<Query> statement,
final PushRouting pushRouting,
Expand All @@ -208,6 +212,7 @@ public ScalablePushQueryMetadata executeScalablePushQuery(
serviceContext,
statement.getSessionConfig()
).executeScalablePushQuery(
analysis,
statement,
pushRouting,
pushRoutingOptions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public static TransientQueryMetadata executeQuery(
final ConfiguredStatement<Query> configured = ConfiguredStatement.of(
prepared, SessionConfig.of(ksqlConfig, overriddenProperties)).cast();
try {
return engine.executeQuery(serviceContext, configured, false);
return engine.executeTransientQuery(serviceContext, configured, false);
} catch (final KsqlStatementException e) {
// use the original statement text in the exception so that tests
// can easily check that the failed statement is the input statement
Expand Down
Loading

0 comments on commit 2e1d635

Please sign in to comment.