Skip to content

Commit

Permalink
feat: Allow scalable push queries to handle rebalances (#7988)
Browse files Browse the repository at this point in the history
* feat: Allow scalable push queries to handle rebalances
  • Loading branch information
AlanConfluent committed Aug 18, 2021
1 parent 8ccef82 commit b3dbed3
Show file tree
Hide file tree
Showing 25 changed files with 740 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ private void doSubscribe(final Subscriber<? super T> subscriber) {
} catch (final Throwable t) {
sendError(new IllegalStateException("Exception encountered in onSubscribe", t));
}
if (isFailed()) {
sendError(new IllegalStateException(
"Cannot subscribe to failed publisher. Failure cause: " + failure));
}
afterSubscribe();
}

Expand Down
14 changes: 14 additions & 0 deletions ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,13 @@ public class KsqlConfig extends AbstractConfig {
+ "functions, aggregations, or joins, but may include projections and filters.";
public static final boolean KSQL_QUERY_PUSH_SCALABLE_ENABLED_DEFAULT = false;

public static final String KSQL_QUERY_PUSH_SCALABLE_NEW_NODE_CONTINUITY
= "ksql.query.push.scalable.new.node.continuity";
public static final String KSQL_QUERY_PUSH_SCALABLE_NEW_NODE_CONTINUITY_DOC =
"Whether new node continuity is enforced for scalable push queries. This means that it's an "
+ "error for an existing query to miss data processed on a newly added node";
public static final boolean KSQL_QUERY_PUSH_SCALABLE_NEW_NODE_CONTINUITY_DEFAULT = false;

public static final String KSQL_QUERY_PUSH_SCALABLE_INTERPRETER_ENABLED
= "ksql.query.push.scalable.interpreter.enabled";
public static final String KSQL_QUERY_PUSH_SCALABLE_INTERPRETER_ENABLED_DOC =
Expand Down Expand Up @@ -901,6 +908,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.LOW,
KSQL_QUERY_PUSH_SCALABLE_ENABLED_DOC
)
.define(
KSQL_QUERY_PUSH_SCALABLE_NEW_NODE_CONTINUITY,
Type.BOOLEAN,
KSQL_QUERY_PUSH_SCALABLE_NEW_NODE_CONTINUITY_DEFAULT,
Importance.LOW,
KSQL_QUERY_PUSH_SCALABLE_NEW_NODE_CONTINUITY_DOC
)
.define(
KSQL_QUERY_PUSH_SCALABLE_INTERPRETER_ENABLED,
Type.BOOLEAN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ public class KsqlRequestConfig extends AbstractConfig {
private static final String KSQL_REQUEST_QUERY_PUSH_SKIP_FORWARDING_DOC =
"Controls whether a ksql host forwards a push query request to another host";

public static final String KSQL_REQUEST_QUERY_PUSH_REGISTRY_START =
"request.ksql.query.push.registry.start";
public static final boolean KSQL_REQUEST_QUERY_PUSH_REGISTRY_START_DEFAULT = false;
private static final String KSQL_REQUEST_QUERY_PUSH_REGISTRY_START_DOC =
"Indicates whether a connecting node expects to be at the start of the registry data. After a"
+ "rebalance, this ensures we don't miss any data.";

private static ConfigDef buildConfigDef() {
final ConfigDef configDef = new ConfigDef()
.define(
Expand Down Expand Up @@ -88,6 +95,12 @@ private static ConfigDef buildConfigDef() {
KSQL_REQUEST_QUERY_PUSH_SKIP_FORWARDING_DEFAULT,
ConfigDef.Importance.LOW,
KSQL_REQUEST_QUERY_PUSH_SKIP_FORWARDING_DOC
).define(
KSQL_REQUEST_QUERY_PUSH_REGISTRY_START,
Type.BOOLEAN,
KSQL_REQUEST_QUERY_PUSH_REGISTRY_START_DEFAULT,
ConfigDef.Importance.LOW,
KSQL_REQUEST_QUERY_PUSH_REGISTRY_START_DOC
);
return configDef;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.confluent.ksql.physical.pull.PullQueryResult;
import io.confluent.ksql.physical.scalablepush.PushPhysicalPlan;
import io.confluent.ksql.physical.scalablepush.PushPhysicalPlanBuilder;
import io.confluent.ksql.physical.scalablepush.PushQueryPreparer;
import io.confluent.ksql.physical.scalablepush.PushQueryQueuePopulator;
import io.confluent.ksql.physical.scalablepush.PushRouting;
import io.confluent.ksql.physical.scalablepush.PushRoutingOptions;
Expand Down Expand Up @@ -272,7 +273,8 @@ ScalablePushQueryMetadata executeScalablePushQuery(
final PushPhysicalPlan physicalPlan = buildScalablePushPhysicalPlan(
logicalPlan,
analysis,
context
context,
pushRoutingOptions
);
final TransientQueryQueue transientQueryQueue
= new TransientQueryQueue(analysis.getLimitClause());
Expand All @@ -285,12 +287,15 @@ ScalablePushQueryMetadata executeScalablePushQuery(
final PushQueryQueuePopulator populator = () ->
pushRouting.handlePushQuery(serviceContext, physicalPlan, statement, pushRoutingOptions,
physicalPlan.getOutputSchema(), transientQueryQueue);
final PushQueryPreparer preparer = () ->
pushRouting.preparePushQuery(physicalPlan, statement, pushRoutingOptions);
final ScalablePushQueryMetadata metadata = new ScalablePushQueryMetadata(
physicalPlan.getOutputSchema(),
physicalPlan.getQueryId(),
transientQueryQueue,
resultType,
populator
populator,
preparer
);

return metadata;
Expand Down Expand Up @@ -452,12 +457,14 @@ private LogicalPlanNode buildAndValidateLogicalPlan(
private PushPhysicalPlan buildScalablePushPhysicalPlan(
final LogicalPlanNode logicalPlan,
final ImmutableAnalysis analysis,
final Context context
final Context context,
final PushRoutingOptions pushRoutingOptions
) {

final PushPhysicalPlanBuilder builder = new PushPhysicalPlanBuilder(
engineContext.getProcessingLogContext(),
ScalablePushQueryExecutionUtil.findQuery(engineContext, analysis)
ScalablePushQueryExecutionUtil.findQuery(engineContext, analysis),
pushRoutingOptions.getExpectingStartOfRegistryData()
);
return builder.buildPushPhysicalPlan(logicalPlan, context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,12 @@ public boolean isClosed() {

private void maybeNext(final Publisher publisher) {
List<?> row;
while (!isErrored(publisher) && (row = (List<?>)next()) != null) {
while (!isErrored(publisher) && (row = (List<?>) next(publisher)) != null) {
publisher.accept(row);
}
if (publisher.isFailed()) {
return;
}
if (!closed) {
if (timer >= 0) {
context.owner().cancelTimer(timer);
Expand Down Expand Up @@ -112,14 +115,23 @@ private boolean isErrored(final Publisher publisher) {

private void open(final Publisher publisher) {
VertxUtils.checkContext(context);
dataSourceOperator.setNewRowCallback(() -> context.runOnContext(v -> maybeNext(publisher)));
root.open();
maybeNext(publisher);
try {
dataSourceOperator.setNewRowCallback(() -> context.runOnContext(v -> maybeNext(publisher)));
root.open();
maybeNext(publisher);
} catch (Throwable t) {
publisher.sendException(t);
}
}

private Object next() {
private Object next(final Publisher publisher) {
VertxUtils.checkContext(context);
return root.next();
try {
return root.next();
} catch (final Throwable t) {
publisher.sendException(t);
return null;
}
}

public void close() {
Expand Down Expand Up @@ -149,6 +161,11 @@ public ScalablePushRegistry getScalablePushRegistry() {
return scalablePushRegistry;
}

@SuppressFBWarnings(value = "EI_EXPOSE_REP")
public Context getContext() {
return context;
}

public static class Publisher extends BufferedPublisher<List<?>> {

public Publisher(final Context ctx) {
Expand All @@ -162,5 +179,13 @@ public void reportDroppedRows() {
public void reportHasError() {
sendError(new RuntimeException("Persistent query has error"));
}

public void sendException(final Throwable e) {
sendError(e);
}

public boolean isFailed() {
return super.isFailed();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,20 @@ public class PushPhysicalPlanBuilder {

private final ProcessingLogContext processingLogContext;
private final PersistentQueryMetadata persistentQueryMetadata;
private final boolean expectingStartOfRegistryData;
private final Stacker contextStacker;
private final QueryId queryId;

public PushPhysicalPlanBuilder(
final ProcessingLogContext processingLogContext,
final PersistentQueryMetadata persistentQueryMetadata
final PersistentQueryMetadata persistentQueryMetadata,
final boolean expectingStartOfRegistryData
) {
this.processingLogContext = Objects.requireNonNull(
processingLogContext, "processingLogContext");
this.persistentQueryMetadata = Objects.requireNonNull(
persistentQueryMetadata, "persistentQueryMetadata");
this.expectingStartOfRegistryData = expectingStartOfRegistryData;
this.contextStacker = new Stacker();
queryId = uniqueQueryId();
}
Expand Down Expand Up @@ -160,7 +163,8 @@ private AbstractPhysicalOperator translateDataSourceNode(
final ScalablePushRegistry scalablePushRegistry =
persistentQueryMetadata.getScalablePushRegistry()
.orElseThrow(() -> new IllegalStateException("Scalable push registry cannot be found"));
return new PeekStreamOperator(scalablePushRegistry, logicalNode, queryId);
return new PeekStreamOperator(scalablePushRegistry, logicalNode, queryId,
expectingStartOfRegistryData);
}

private QueryId uniqueQueryId() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.physical.scalablepush;

public interface PushQueryPreparer {

void prepare();
}

0 comments on commit b3dbed3

Please sign in to comment.