diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/reactive/BasePublisher.java b/ksqldb-common/src/main/java/io/confluent/ksql/reactive/BasePublisher.java index 95c1fc67162a..47f1c5f54ac7 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/reactive/BasePublisher.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/reactive/BasePublisher.java @@ -164,6 +164,10 @@ private void doSubscribe(final Subscriber subscriber) { } catch (final Throwable t) { sendError(new IllegalStateException("Exception encountered in onSubscribe", t)); } + if (isFailed()) { + sendError(new IllegalStateException( + "Cannot subscribe to failed publisher. Failure cause: " + failure)); + } afterSubscribe(); } diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index e9b0fba891ff..52be8f948242 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -285,6 +285,13 @@ public class KsqlConfig extends AbstractConfig { + "functions, aggregations, or joins, but may include projections and filters."; public static final boolean KSQL_QUERY_PUSH_SCALABLE_ENABLED_DEFAULT = false; + public static final String KSQL_QUERY_PUSH_SCALABLE_NEW_NODE_CONTINUITY + = "ksql.query.push.scalable.new.node.continuity"; + public static final String KSQL_QUERY_PUSH_SCALABLE_NEW_NODE_CONTINUITY_DOC = + "Whether new node continuity is enforced for scalable push queries. This means that it's an " + + "error for an existing query to miss data processed on a newly added node"; + public static final boolean KSQL_QUERY_PUSH_SCALABLE_NEW_NODE_CONTINUITY_DEFAULT = false; + public static final String KSQL_QUERY_PUSH_SCALABLE_INTERPRETER_ENABLED = "ksql.query.push.scalable.interpreter.enabled"; public static final String KSQL_QUERY_PUSH_SCALABLE_INTERPRETER_ENABLED_DOC = @@ -901,6 +908,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) { Importance.LOW, KSQL_QUERY_PUSH_SCALABLE_ENABLED_DOC ) + .define( + KSQL_QUERY_PUSH_SCALABLE_NEW_NODE_CONTINUITY, + Type.BOOLEAN, + KSQL_QUERY_PUSH_SCALABLE_NEW_NODE_CONTINUITY_DEFAULT, + Importance.LOW, + KSQL_QUERY_PUSH_SCALABLE_NEW_NODE_CONTINUITY_DOC + ) .define( KSQL_QUERY_PUSH_SCALABLE_INTERPRETER_ENABLED, Type.BOOLEAN, diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlRequestConfig.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlRequestConfig.java index 5b7213ba5129..cd051a88a351 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlRequestConfig.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlRequestConfig.java @@ -56,6 +56,13 @@ public class KsqlRequestConfig extends AbstractConfig { private static final String KSQL_REQUEST_QUERY_PUSH_SKIP_FORWARDING_DOC = "Controls whether a ksql host forwards a push query request to another host"; + public static final String KSQL_REQUEST_QUERY_PUSH_REGISTRY_START = + "request.ksql.query.push.registry.start"; + public static final boolean KSQL_REQUEST_QUERY_PUSH_REGISTRY_START_DEFAULT = false; + private static final String KSQL_REQUEST_QUERY_PUSH_REGISTRY_START_DOC = + "Indicates whether a connecting node expects to be at the start of the registry data. After a" + + "rebalance, this ensures we don't miss any data."; + private static ConfigDef buildConfigDef() { final ConfigDef configDef = new ConfigDef() .define( @@ -88,6 +95,12 @@ private static ConfigDef buildConfigDef() { KSQL_REQUEST_QUERY_PUSH_SKIP_FORWARDING_DEFAULT, ConfigDef.Importance.LOW, KSQL_REQUEST_QUERY_PUSH_SKIP_FORWARDING_DOC + ).define( + KSQL_REQUEST_QUERY_PUSH_REGISTRY_START, + Type.BOOLEAN, + KSQL_REQUEST_QUERY_PUSH_REGISTRY_START_DEFAULT, + ConfigDef.Importance.LOW, + KSQL_REQUEST_QUERY_PUSH_REGISTRY_START_DOC ); return configDef; } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java index c48408173025..66c123513ba5 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java @@ -45,6 +45,7 @@ import io.confluent.ksql.physical.pull.PullQueryResult; import io.confluent.ksql.physical.scalablepush.PushPhysicalPlan; import io.confluent.ksql.physical.scalablepush.PushPhysicalPlanBuilder; +import io.confluent.ksql.physical.scalablepush.PushQueryPreparer; import io.confluent.ksql.physical.scalablepush.PushQueryQueuePopulator; import io.confluent.ksql.physical.scalablepush.PushRouting; import io.confluent.ksql.physical.scalablepush.PushRoutingOptions; @@ -272,7 +273,8 @@ ScalablePushQueryMetadata executeScalablePushQuery( final PushPhysicalPlan physicalPlan = buildScalablePushPhysicalPlan( logicalPlan, analysis, - context + context, + pushRoutingOptions ); final TransientQueryQueue transientQueryQueue = new TransientQueryQueue(analysis.getLimitClause()); @@ -285,12 +287,15 @@ ScalablePushQueryMetadata executeScalablePushQuery( final PushQueryQueuePopulator populator = () -> pushRouting.handlePushQuery(serviceContext, physicalPlan, statement, pushRoutingOptions, physicalPlan.getOutputSchema(), transientQueryQueue); + final PushQueryPreparer preparer = () -> + pushRouting.preparePushQuery(physicalPlan, statement, pushRoutingOptions); final ScalablePushQueryMetadata metadata = new ScalablePushQueryMetadata( physicalPlan.getOutputSchema(), physicalPlan.getQueryId(), transientQueryQueue, resultType, - populator + populator, + preparer ); return metadata; @@ -452,12 +457,14 @@ private LogicalPlanNode buildAndValidateLogicalPlan( private PushPhysicalPlan buildScalablePushPhysicalPlan( final LogicalPlanNode logicalPlan, final ImmutableAnalysis analysis, - final Context context + final Context context, + final PushRoutingOptions pushRoutingOptions ) { final PushPhysicalPlanBuilder builder = new PushPhysicalPlanBuilder( engineContext.getProcessingLogContext(), - ScalablePushQueryExecutionUtil.findQuery(engineContext, analysis) + ScalablePushQueryExecutionUtil.findQuery(engineContext, analysis), + pushRoutingOptions.getExpectingStartOfRegistryData() ); return builder.buildPushPhysicalPlan(logicalPlan, context); } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlan.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlan.java index a814f08998e2..b6b88412c927 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlan.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlan.java @@ -82,9 +82,12 @@ public boolean isClosed() { private void maybeNext(final Publisher publisher) { List row; - while (!isErrored(publisher) && (row = (List)next()) != null) { + while (!isErrored(publisher) && (row = (List) next(publisher)) != null) { publisher.accept(row); } + if (publisher.isFailed()) { + return; + } if (!closed) { if (timer >= 0) { context.owner().cancelTimer(timer); @@ -112,14 +115,23 @@ private boolean isErrored(final Publisher publisher) { private void open(final Publisher publisher) { VertxUtils.checkContext(context); - dataSourceOperator.setNewRowCallback(() -> context.runOnContext(v -> maybeNext(publisher))); - root.open(); - maybeNext(publisher); + try { + dataSourceOperator.setNewRowCallback(() -> context.runOnContext(v -> maybeNext(publisher))); + root.open(); + maybeNext(publisher); + } catch (Throwable t) { + publisher.sendException(t); + } } - private Object next() { + private Object next(final Publisher publisher) { VertxUtils.checkContext(context); - return root.next(); + try { + return root.next(); + } catch (final Throwable t) { + publisher.sendException(t); + return null; + } } public void close() { @@ -149,6 +161,11 @@ public ScalablePushRegistry getScalablePushRegistry() { return scalablePushRegistry; } + @SuppressFBWarnings(value = "EI_EXPOSE_REP") + public Context getContext() { + return context; + } + public static class Publisher extends BufferedPublisher> { public Publisher(final Context ctx) { @@ -162,5 +179,13 @@ public void reportDroppedRows() { public void reportHasError() { sendError(new RuntimeException("Persistent query has error")); } + + public void sendException(final Throwable e) { + sendError(e); + } + + public boolean isFailed() { + return super.isFailed(); + } } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlanBuilder.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlanBuilder.java index 84c3280c72b6..0b2cae8c1b71 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlanBuilder.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlanBuilder.java @@ -49,17 +49,20 @@ public class PushPhysicalPlanBuilder { private final ProcessingLogContext processingLogContext; private final PersistentQueryMetadata persistentQueryMetadata; + private final boolean expectingStartOfRegistryData; private final Stacker contextStacker; private final QueryId queryId; public PushPhysicalPlanBuilder( final ProcessingLogContext processingLogContext, - final PersistentQueryMetadata persistentQueryMetadata + final PersistentQueryMetadata persistentQueryMetadata, + final boolean expectingStartOfRegistryData ) { this.processingLogContext = Objects.requireNonNull( processingLogContext, "processingLogContext"); this.persistentQueryMetadata = Objects.requireNonNull( persistentQueryMetadata, "persistentQueryMetadata"); + this.expectingStartOfRegistryData = expectingStartOfRegistryData; this.contextStacker = new Stacker(); queryId = uniqueQueryId(); } @@ -160,7 +163,8 @@ private AbstractPhysicalOperator translateDataSourceNode( final ScalablePushRegistry scalablePushRegistry = persistentQueryMetadata.getScalablePushRegistry() .orElseThrow(() -> new IllegalStateException("Scalable push registry cannot be found")); - return new PeekStreamOperator(scalablePushRegistry, logicalNode, queryId); + return new PeekStreamOperator(scalablePushRegistry, logicalNode, queryId, + expectingStartOfRegistryData); } private QueryId uniqueQueryId() { diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushQueryPreparer.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushQueryPreparer.java new file mode 100644 index 000000000000..fe3643f1e4ac --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushQueryPreparer.java @@ -0,0 +1,21 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.physical.scalablepush; + +public interface PushQueryPreparer { + + void prepare(); +} diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushRouting.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushRouting.java index 5703b645d23a..83cfac4770b0 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushRouting.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushRouting.java @@ -16,32 +16,46 @@ package io.confluent.ksql.physical.scalablepush; import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.confluent.ksql.GenericRow; import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.physical.pull.HARouting; import io.confluent.ksql.physical.scalablepush.locator.PushLocator.KsqlNode; +import io.confluent.ksql.query.QueryId; import io.confluent.ksql.query.TransientQueryQueue; import io.confluent.ksql.reactive.BaseSubscriber; import io.confluent.ksql.reactive.BufferedPublisher; import io.confluent.ksql.rest.client.RestResponse; +import io.confluent.ksql.rest.entity.KsqlErrorMessage; import io.confluent.ksql.rest.entity.StreamedRow; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.KsqlRequestConfig; +import io.confluent.ksql.util.VertxUtils; import io.vertx.core.Context; +import java.util.Collection; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.function.Function; import java.util.stream.Collectors; import org.reactivestreams.Subscription; import org.slf4j.Logger; @@ -51,7 +65,26 @@ public class PushRouting implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(HARouting.class); + private static final long CLUSTER_CHECK_INTERVAL_MS = 1000; + private static final long HOST_CACHE_EXPIRATION_MS = 1000; + + private final Function> registryToNodes; + private final long clusterCheckInterval; + private final boolean backgroundRetries; + public PushRouting() { + this(createLoadingCache(), CLUSTER_CHECK_INTERVAL_MS, true); + } + + @VisibleForTesting + public PushRouting( + final Function> registryToNodes, + final long clusterCheckInterval, + final boolean backgroundRetries + ) { + this.registryToNodes = registryToNodes; + this.clusterCheckInterval = clusterCheckInterval; + this.backgroundRetries = backgroundRetries; } @Override @@ -70,13 +103,39 @@ public CompletableFuture handlePushQuery( final LogicalSchema outputSchema, final TransientQueryQueue transientQueryQueue ) { - final List hosts = pushPhysicalPlan.getScalablePushRegistry() - .getLocator() - .locate() + final Set hosts = getInitialHosts(pushPhysicalPlan, statement, pushRoutingOptions); + + final PushConnectionsHandle pushConnectionsHandle = new PushConnectionsHandle(); + // Returns a future with the handle once the initial connection is made + final CompletableFuture result = connectToHosts( + serviceContext, pushPhysicalPlan, statement, hosts, outputSchema, + transientQueryQueue, pushConnectionsHandle, false); + // Only check for new nodes if this is the source node + if (backgroundRetries && !pushRoutingOptions.getHasBeenForwarded()) { + checkForNewHostsOnContext(serviceContext, pushPhysicalPlan, statement, hosts, outputSchema, + transientQueryQueue, pushConnectionsHandle); + } + return result; + } + + public void preparePushQuery( + final PushPhysicalPlan pushPhysicalPlan, + final ConfiguredStatement statement, + final PushRoutingOptions pushRoutingOptions + ) { + // Ensure that we have the expected hosts and the below doesn't throw an exception. + getInitialHosts(pushPhysicalPlan, statement, pushRoutingOptions); + } + + private Set getInitialHosts( + final PushPhysicalPlan pushPhysicalPlan, + final ConfiguredStatement statement, + final PushRoutingOptions pushRoutingOptions + ) { + final Set hosts = registryToNodes.apply(pushPhysicalPlan.getScalablePushRegistry()) .stream() - .filter(node -> !pushRoutingOptions.getIsSkipForwardRequest() || node.isLocal()) - .distinct() - .collect(Collectors.toList()); + .filter(node -> !pushRoutingOptions.getHasBeenForwarded() || node.isLocal()) + .collect(Collectors.toSet()); if (hosts.isEmpty()) { LOG.error("Unable to execute push query: {}. No nodes executing persistent queries", @@ -85,9 +144,7 @@ public CompletableFuture handlePushQuery( "Unable to execute push query. No nodes executing persistent queries %s", statement.getStatementText())); } - - return connectToHosts(serviceContext, pushPhysicalPlan, statement, hosts, outputSchema, - transientQueryQueue); + return hosts; } /** @@ -98,18 +155,34 @@ private CompletableFuture connectToHosts( final ServiceContext serviceContext, final PushPhysicalPlan pushPhysicalPlan, final ConfiguredStatement statement, - final List hosts, + final Collection hosts, final LogicalSchema outputSchema, - final TransientQueryQueue transientQueryQueue + final TransientQueryQueue transientQueryQueue, + final PushConnectionsHandle pushConnectionsHandle, + final boolean dynamicallyAddedNode ) { final Map> futureMap = new LinkedHashMap<>(); - final CompletableFuture errorCallback = new CompletableFuture<>(); for (final KsqlNode node : hosts) { + pushConnectionsHandle.add( + node, new RoutingResult(RoutingResultStatus.IN_PROGRESS, () -> { })); + final CompletableFuture callback = new CompletableFuture<>(); + callback.handle((v, t) -> { + if (t == null) { + pushConnectionsHandle.get(node) + .ifPresent(result -> { + result.close(); + result.updateStatus(RoutingResultStatus.COMPLETE); + }); + LOG.info("Host {} completed request {}.", node, pushPhysicalPlan.getQueryId()); + } else { + pushConnectionsHandle.completeExceptionally(t); + } + return null; + }); futureMap.put(node, executeOrRouteQuery( node, statement, serviceContext, pushPhysicalPlan, outputSchema, - transientQueryQueue, errorCallback::completeExceptionally)); + transientQueryQueue, callback, dynamicallyAddedNode)); } - final PushConnectionsHandle pushConnectionsHandle = new PushConnectionsHandle(errorCallback); return CompletableFuture.allOf(futureMap.values().toArray(new CompletableFuture[0])) .thenApply(v -> { for (final KsqlNode node : hosts) { @@ -125,15 +198,20 @@ private CompletableFuture connectToHosts( .map(Entry::getKey) .findFirst() .orElse(null); - LOG.warn("Error routing query {} to host {} at timestamp {} with exception {}", - statement.getStatementText(), node, System.currentTimeMillis(), t.getCause()); - - pushConnectionsHandle.close(); - pushConnectionsHandle.completeExceptionally( - new KsqlException(String.format( - "Unable to execute push query \"%s\". %s", - statement.getStatementText(), t.getCause().getMessage()))); - + LOG.warn("Error routing query {} id {} to host {} at timestamp {} with exception {}", + statement.getStatementText(), pushPhysicalPlan.getQueryId(), node, + System.currentTimeMillis(), t.getCause()); + + // We only fail the whole thing if this is not a new dynamically added node. We allow + // retries in that case and don't fail the original request. + pushConnectionsHandle.get(node) + .ifPresent(result -> result.updateStatus(RoutingResultStatus.FAILED)); + if (!dynamicallyAddedNode) { + pushConnectionsHandle.completeExceptionally( + new KsqlException(String.format( + "Unable to execute push query \"%s\". %s", + statement.getStatementText(), t.getCause().getMessage()))); + } return pushConnectionsHandle; }); } @@ -146,11 +224,13 @@ static CompletableFuture executeOrRouteQuery( final PushPhysicalPlan pushPhysicalPlan, final LogicalSchema outputSchema, final TransientQueryQueue transientQueryQueue, - final Consumer errorCallback + final CompletableFuture callback, + final boolean dynamicallyAddedNode ) { if (node.isLocal()) { - LOG.debug("Query {} executed locally at host {} at timestamp {}.", - statement.getStatementText(), node.location(), System.currentTimeMillis()); + LOG.debug("Query {} id {} executed locally at host {} at timestamp {}.", + statement.getStatementText(), pushPhysicalPlan.getQueryId(), node.location(), + System.currentTimeMillis()); final AtomicReference>> publisherRef = new AtomicReference<>(null); return CompletableFuture.completedFuture(null) @@ -158,7 +238,7 @@ static CompletableFuture executeOrRouteQuery( .thenApply(publisher -> { publisherRef.set(publisher); publisher.subscribe(new LocalQueryStreamSubscriber(publisher.getContext(), - transientQueryQueue, errorCallback)); + transientQueryQueue, callback, node, pushPhysicalPlan.getQueryId())); return new RoutingResult(RoutingResultStatus.SUCCESS, () -> { pushPhysicalPlan.close(); publisher.close(); @@ -184,11 +264,11 @@ static CompletableFuture executeOrRouteQuery( final AtomicReference> publisherRef = new AtomicReference<>(null); final CompletableFuture> publisherFuture - = forwardTo(node, statement, serviceContext, outputSchema); + = forwardTo(node, statement, serviceContext, outputSchema, dynamicallyAddedNode); return publisherFuture.thenApply(publisher -> { publisherRef.set(publisher); publisher.subscribe(new RemoteStreamSubscriber(publisher.getContext(), transientQueryQueue, - errorCallback)); + callback, node, pushPhysicalPlan.getQueryId())); return new RoutingResult(RoutingResultStatus.SUCCESS, publisher::close); }).exceptionally(t -> { LOG.error("Error forwarding query {} to node {}", @@ -210,12 +290,14 @@ private static CompletableFuture> forwardTo( final KsqlNode owner, final ConfiguredStatement statement, final ServiceContext serviceContext, - final LogicalSchema outputSchema + final LogicalSchema outputSchema, + final boolean dynamicallyAddedNode ) { // Add skip forward flag to properties final Map requestProperties = ImmutableMap.of( KsqlRequestConfig.KSQL_REQUEST_QUERY_PUSH_SKIP_FORWARDING, true, - KsqlRequestConfig.KSQL_REQUEST_INTERNAL_REQUEST, true); + KsqlRequestConfig.KSQL_REQUEST_INTERNAL_REQUEST, true, + KsqlRequestConfig.KSQL_REQUEST_QUERY_PUSH_REGISTRY_START, dynamicallyAddedNode); final CompletableFuture>> future = serviceContext .getKsqlClient() @@ -238,13 +320,110 @@ private static CompletableFuture> forwardTo( }); } + private void checkForNewHostsOnContext( + final ServiceContext serviceContext, + final PushPhysicalPlan pushPhysicalPlan, + final ConfiguredStatement statement, + final Set hosts, + final LogicalSchema outputSchema, + final TransientQueryQueue transientQueryQueue, + final PushConnectionsHandle pushConnectionsHandle + ) { + pushPhysicalPlan.getContext().runOnContext(v -> + checkForNewHosts(serviceContext, pushPhysicalPlan, statement, outputSchema, + transientQueryQueue, pushConnectionsHandle)); + } + + private void checkForNewHosts( + final ServiceContext serviceContext, + final PushPhysicalPlan pushPhysicalPlan, + final ConfiguredStatement statement, + final LogicalSchema outputSchema, + final TransientQueryQueue transientQueryQueue, + final PushConnectionsHandle pushConnectionsHandle + ) { + VertxUtils.checkContext(pushPhysicalPlan.getContext()); + if (pushConnectionsHandle.isClosed()) { + return; + } + final Set updatedHosts = registryToNodes.apply( + pushPhysicalPlan.getScalablePushRegistry()); + final Set hosts = pushConnectionsHandle.getActiveHosts(); + final Set newHosts = Sets.difference(updatedHosts, hosts).stream() + .filter(node -> !node.isLocal()) + .filter(node -> + pushConnectionsHandle.get(node) + .map(routingResult -> routingResult.getStatus() != RoutingResultStatus.IN_PROGRESS) + .orElse(true)) + .collect(Collectors.toSet()); + final Set removedHosts = Sets.difference(hosts, updatedHosts); + if (newHosts.size() > 0) { + LOG.info("Dynamically adding new hosts {} for {}", newHosts, pushPhysicalPlan.getQueryId()); + connectToHosts(serviceContext, pushPhysicalPlan, statement, newHosts, outputSchema, + transientQueryQueue, pushConnectionsHandle, true); + } + if (removedHosts.size() > 0) { + LOG.info("Dynamically removing hosts {} for {}", removedHosts, pushPhysicalPlan.getQueryId()); + for (final KsqlNode node : removedHosts) { + final RoutingResult result = pushConnectionsHandle.remove(node); + result.close(); + result.updateStatus(RoutingResultStatus.REMOVED); + } + } + pushPhysicalPlan.getContext().owner().setTimer(clusterCheckInterval, timerId -> + checkForNewHosts(serviceContext, pushPhysicalPlan, statement, outputSchema, + transientQueryQueue, pushConnectionsHandle)); + } + + private static Set loadCurrentHosts(final ScalablePushRegistry scalablePushRegistry) { + return new HashSet<>(scalablePushRegistry + .getLocator() + .locate()); + } + + private static Function> createLoadingCache() { + final LoadingCache> cache = CacheBuilder.newBuilder() + .maximumSize(40) + .expireAfterWrite(HOST_CACHE_EXPIRATION_MS, TimeUnit.MILLISECONDS) + .build(new CacheLoader>() { + @Override + public Set load(final ScalablePushRegistry scalablePushRegistry) { + return loadCurrentHosts(scalablePushRegistry); + } + }); + return cache::getUnchecked; + } + + /** + * The status for a connection + */ public enum RoutingResultStatus { - SUCCESS + // The host connection is being set up but is not connected yet. + IN_PROGRESS, + // The host connection was successful and in use + SUCCESS, + // The host connection was closed and isn't active, but hasn't yet been removed from the map + // since they're part of the known set of hosts. + COMPLETE, + // The connection has been removed since the host is no longer running the persistent query. + REMOVED, + // The request to the other host failed on the last try + FAILED; + + static boolean isHostActive(final RoutingResultStatus status) { + switch (status) { + case IN_PROGRESS: + case SUCCESS: + return true; + default: + return false; + } + } } public static class RoutingResult { - private final RoutingResultStatus status; private final AutoCloseable closeable; + private volatile RoutingResultStatus status; public RoutingResult(final RoutingResultStatus status, final AutoCloseable closeable) { this.status = status; @@ -262,6 +441,10 @@ public void close() { public RoutingResultStatus getStatus() { return status; } + + public void updateStatus(final RoutingResultStatus status) { + this.status = status; + } } /** @@ -270,15 +453,21 @@ public RoutingResultStatus getStatus() { private static class RemoteStreamSubscriber extends BaseSubscriber { private final TransientQueryQueue transientQueryQueue; - private final Consumer errorCallback; + private final CompletableFuture callback; + private final KsqlNode node; + private final QueryId queryId; private boolean closed; RemoteStreamSubscriber(final Context context, final TransientQueryQueue transientQueryQueue, - final Consumer errorCallback) { + final CompletableFuture callback, + final KsqlNode node, + final QueryId queryId) { super(context); this.transientQueryQueue = transientQueryQueue; - this.errorCallback = errorCallback; + this.callback = callback; + this.node = node; + this.queryId = queryId; } @Override @@ -291,32 +480,46 @@ protected synchronized void handleValue(final StreamedRow row) { if (closed) { return; } - if (row.isTerminal()) { + if (row.getFinalMessage().isPresent()) { close(); return; } if (row.getRow().isPresent()) { if (!transientQueryQueue.acceptRowNonBlocking(null, GenericRow.fromList(row.getRow().get().getColumns()))) { - errorCallback.accept(new KsqlException("Hit limit of request queue")); + callback.completeExceptionally(new KsqlException("Hit limit of request queue")); close(); return; } } + if (row.getErrorMessage().isPresent()) { + final KsqlErrorMessage errorMessage = row.getErrorMessage().get(); + LOG.error("Received error from remote node {} and id {}: {}", node, queryId, errorMessage); + callback.completeExceptionally(new KsqlException("Remote server had an error: " + + errorMessage.getErrorCode() + " - " + errorMessage.getMessage())); + close(); + return; + } makeRequest(1); } @Override protected void handleComplete() { + close(); } @Override protected void handleError(final Throwable t) { - errorCallback.accept(t); + LOG.error("Received error from remote node {} for id {}: {}", node, queryId, t.getMessage(), + t); + callback.completeExceptionally(t); + close(); } synchronized void close() { closed = true; + // Is a noop if already completed in some manner + callback.complete(null); context.runOnContext(v -> cancel()); } } @@ -327,17 +530,23 @@ synchronized void close() { private static class LocalQueryStreamSubscriber extends BaseSubscriber> { private final TransientQueryQueue transientQueryQueue; - private final Consumer errorCallback; + private final CompletableFuture callback; + private final KsqlNode localNode; + private final QueryId queryId; private boolean closed; LocalQueryStreamSubscriber( final Context context, final TransientQueryQueue transientQueryQueue, - final Consumer errorCallback + final CompletableFuture callback, + final KsqlNode localNode, + final QueryId queryId ) { super(context); this.transientQueryQueue = transientQueryQueue; - this.errorCallback = errorCallback; + this.callback = callback; + this.localNode = localNode; + this.queryId = queryId; } @Override @@ -351,7 +560,7 @@ protected synchronized void handleValue(final List row) { return; } if (!transientQueryQueue.acceptRowNonBlocking(null, GenericRow.fromList(row))) { - errorCallback.accept(new KsqlException("Hit limit of request queue")); + callback.completeExceptionally(new KsqlException("Hit limit of request queue")); close(); return; } @@ -361,15 +570,20 @@ protected synchronized void handleValue(final List row) { @Override protected void handleComplete() { + close(); } @Override protected void handleError(final Throwable t) { - errorCallback.accept(t); + LOG.error("Received error from local node {} for id {}: {}", localNode, queryId, + t.getMessage(), t); + callback.completeExceptionally(t); } synchronized void close() { closed = true; + // Is a noop if already completed in some manner + callback.complete(null); context.runOnContext(v -> cancel()); } } @@ -383,8 +597,8 @@ public static class PushConnectionsHandle { private final CompletableFuture errorCallback; @SuppressFBWarnings(value = "EI_EXPOSE_REP") - public PushConnectionsHandle(final CompletableFuture errorCallback) { - this.errorCallback = errorCallback; + public PushConnectionsHandle() { + this.errorCallback = new CompletableFuture<>(); // If anything calls the error callback. all results are closed. errorCallback.exceptionally(t -> { @@ -397,8 +611,12 @@ public void add(final KsqlNode ksqlNode, final RoutingResult result) { results.put(ksqlNode, result); } - public void remove(final KsqlNode ksqlNode) { - results.remove(ksqlNode); + public RoutingResult remove(final KsqlNode ksqlNode) { + return results.remove(ksqlNode); + } + + public Optional get(final KsqlNode ksqlNode) { + return Optional.ofNullable(results.getOrDefault(ksqlNode, null)); } public void close() { @@ -407,6 +625,21 @@ public void close() { } } + public Set getAllHosts() { + return ImmutableSet.copyOf(results.keySet()); + } + + public Set getActiveHosts() { + return results.entrySet().stream() + .filter(e -> RoutingResultStatus.isHostActive(e.getValue().getStatus())) + .map(Entry::getKey) + .collect(ImmutableSet.toImmutableSet()); + } + + public boolean isClosed() { + return errorCallback.isDone(); + } + public void onException(final Consumer consumer) { errorCallback.exceptionally(t -> { consumer.accept(t); @@ -416,6 +649,7 @@ public void onException(final Consumer consumer) { public void completeExceptionally(final Throwable throwable) { if (!errorCallback.isDone()) { + close(); errorCallback.completeExceptionally(throwable); } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushRoutingOptions.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushRoutingOptions.java index 63ac173feba1..9e65f6386d55 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushRoutingOptions.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushRoutingOptions.java @@ -21,5 +21,9 @@ public interface PushRoutingOptions { // If we should avoid skipping forwarding the request because it's already been forwarded. - boolean getIsSkipForwardRequest(); + boolean getHasBeenForwarded(); + + // When a rebalance occurs and we connect to a new node, we don't want to miss anything, so we + // set this flag indicating we should error if this expectation isn't met. + boolean getExpectingStartOfRegistryData(); } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/ScalablePushRegistry.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/ScalablePushRegistry.java index eaf9d1c5c054..47cbf67b6e16 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/ScalablePushRegistry.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/ScalablePushRegistry.java @@ -56,23 +56,27 @@ public class ScalablePushRegistry implements ProcessorSupplier processingQueues = new ConcurrentHashMap<>(); private boolean closed = false; + private volatile boolean hasReceivedData = false; public ScalablePushRegistry( final PushLocator pushLocator, final LogicalSchema logicalSchema, final boolean isTable, - final boolean windowed + final boolean windowed, + final boolean newNodeContinuityEnforced ) { this.pushLocator = pushLocator; this.logicalSchema = logicalSchema; this.isTable = isTable; this.windowed = windowed; + this.newNodeContinuityEnforced = newNodeContinuityEnforced; } public synchronized void close() { @@ -83,10 +87,16 @@ public synchronized void close() { closed = true; } - public synchronized void register(final ProcessingQueue processingQueue) { + public synchronized void register( + final ProcessingQueue processingQueue, + final boolean expectingStartOfRegistryData + ) { if (closed) { throw new IllegalStateException("Shouldn't register after closing"); } + if (hasReceivedData && newNodeContinuityEnforced && expectingStartOfRegistryData) { + throw new IllegalStateException("New node missed data"); + } processingQueues.put(processingQueue.getQueryId(), processingQueue); } @@ -117,6 +127,7 @@ int numRegistered() { @SuppressWarnings("unchecked") private void handleRow(final Record record) { + hasReceivedData = true; final Object key = record.key(); final GenericRow value = record.value(); @@ -187,7 +198,8 @@ public static Optional create( final Supplier> allPersistentQueries, final boolean isTable, final boolean windowed, - final Map streamsProperties + final Map streamsProperties, + final boolean newNodeContinuityEnforced ) { final Object appServer = streamsProperties.get(StreamsConfig.APPLICATION_SERVER_CONFIG); if (appServer == null) { @@ -207,6 +219,7 @@ public static Optional create( } final PushLocator pushLocator = new AllHostsLocator(allPersistentQueries, localhost); - return Optional.of(new ScalablePushRegistry(pushLocator, logicalSchema, isTable, windowed)); + return Optional.of(new ScalablePushRegistry(pushLocator, logicalSchema, isTable, windowed, + newNodeContinuityEnforced)); } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/locator/AllHostsLocator.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/locator/AllHostsLocator.java index 5e5ebf8ef7c4..b5c66f19460a 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/locator/AllHostsLocator.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/locator/AllHostsLocator.java @@ -26,7 +26,6 @@ import java.util.Objects; import java.util.function.Supplier; import java.util.stream.Collectors; -import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.StreamsMetadata; import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.internals.StreamsMetadataImpl; @@ -60,7 +59,6 @@ public List locate() { } return currentQueries.stream() - .filter(persistentQueryMetadata -> persistentQueryMetadata.getState() == State.RUNNING) .map(QueryMetadata::getAllMetadata) .filter(Objects::nonNull) .flatMap(Collection::stream) diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/operators/PeekStreamOperator.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/operators/PeekStreamOperator.java index 984b13c9b0f7..d6b128f54e0a 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/operators/PeekStreamOperator.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/operators/PeekStreamOperator.java @@ -32,20 +32,23 @@ public class PeekStreamOperator extends AbstractPhysicalOperator implements Push private final DataSourceNode logicalNode; private final ScalablePushRegistry scalablePushRegistry; private final ProcessingQueue processingQueue; + private final boolean expectingStartOfRegistryData; public PeekStreamOperator( final ScalablePushRegistry scalablePushRegistry, final DataSourceNode logicalNode, - final QueryId queryId + final QueryId queryId, + final boolean expectingStartOfRegistryData ) { this.scalablePushRegistry = scalablePushRegistry; this.logicalNode = logicalNode; this.processingQueue = new ProcessingQueue(queryId); + this.expectingStartOfRegistryData = expectingStartOfRegistryData; } @Override public void open() { - scalablePushRegistry.register(processingQueue); + scalablePushRegistry.register(processingQueue, expectingStartOfRegistryData); } @Override diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java index 95e881b53b2d..f3a9127a9822 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java @@ -214,7 +214,8 @@ private static Optional applyScalablePushProcessor( isTable = false; } final Optional registry = ScalablePushRegistry.create(schema, - allPersistentQueries, isTable, windowed, streamsProperties); + allPersistentQueries, isTable, windowed, streamsProperties, + ksqlConfig.getBoolean(KsqlConfig.KSQL_QUERY_PUSH_SCALABLE_NEW_NODE_CONTINUITY)); registry.ifPresent(r -> stream.process(registry.get())); return registry; } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/ScalablePushQueryMetadata.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/ScalablePushQueryMetadata.java index f4bfa9e9fe42..d256bfc705a0 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/ScalablePushQueryMetadata.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/ScalablePushQueryMetadata.java @@ -16,6 +16,7 @@ package io.confluent.ksql.util; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.confluent.ksql.physical.scalablepush.PushQueryPreparer; import io.confluent.ksql.physical.scalablepush.PushQueryQueuePopulator; import io.confluent.ksql.physical.scalablepush.PushRouting.PushConnectionsHandle; import io.confluent.ksql.query.BlockingRowQueue; @@ -34,6 +35,7 @@ public class ScalablePushQueryMetadata implements PushQueryMetadata { private final BlockingRowQueue rowQueue; private final ResultType resultType; private final PushQueryQueuePopulator pushQueryQueuePopulator; + private final PushQueryPreparer pushQueryPreparer; // Future for the start of the connections, which creates a handle private CompletableFuture startFuture = new CompletableFuture<>(); @@ -47,26 +49,40 @@ public ScalablePushQueryMetadata( final QueryId queryId, final BlockingRowQueue blockingRowQueue, final ResultType resultType, - final PushQueryQueuePopulator pushQueryQueuePopulator + final PushQueryQueuePopulator pushQueryQueuePopulator, + final PushQueryPreparer pushQueryPreparer ) { this.logicalSchema = logicalSchema; this.queryId = queryId; this.rowQueue = blockingRowQueue; this.resultType = resultType; this.pushQueryQueuePopulator = pushQueryQueuePopulator; + this.pushQueryPreparer = pushQueryPreparer; + } + + /** + * Prepare to start. Any exceptions thrown here will result in an error return code rather than + * an error written to the stream. + */ + public void prepare() { + // Any exceptions aren't meant to trickle up to the caller. This will result in non ok error + // codes and is good for fast failing. + pushQueryPreparer.prepare(); } @Override public void start() { - pushQueryQueuePopulator.run().thenApply(handle -> { - startFuture.complete(handle); - handle.onException(runningFuture::completeExceptionally); - return null; - }).exceptionally(t -> { - startFuture.completeExceptionally(t); - runningFuture.completeExceptionally(t); - return null; - }); + CompletableFuture.completedFuture(null) + .thenCompose(v -> pushQueryQueuePopulator.run()) + .thenApply(handle -> { + startFuture.complete(handle); + handle.onException(runningFuture::completeExceptionally); + return null; + }).exceptionally(t -> { + startFuture.completeExceptionally(t); + runningFuture.completeExceptionally(t); + return null; + }); } @Override diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlanBuilderTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlanBuilderTest.java index 31c382295b1d..37f649472144 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlanBuilderTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlanBuilderTest.java @@ -91,7 +91,7 @@ public void setUp() { public void shouldBuildPhysicalPlan() { // Given: final PushPhysicalPlanBuilder builder = new PushPhysicalPlanBuilder(logContext, - persistentQueryMetadata); + persistentQueryMetadata, false); // When: final PushPhysicalPlan pushPhysicalPlan = @@ -111,7 +111,7 @@ public void shouldThrowOnNoOutputNode() { // Given: when(logicalPlanNode.getNode()).thenReturn(Optional.empty()); final PushPhysicalPlanBuilder builder = new PushPhysicalPlanBuilder(logContext, - persistentQueryMetadata); + persistentQueryMetadata, false); // When: final Exception e = assertThrows( @@ -128,7 +128,7 @@ public void shouldThrowOnNotBareOutputNode() { // Given: when(logicalPlanNode.getNode()).thenReturn(Optional.of(mock(OutputNode.class))); final PushPhysicalPlanBuilder builder = new PushPhysicalPlanBuilder(logContext, - persistentQueryMetadata); + persistentQueryMetadata, false); // When: final Exception e = assertThrows( @@ -147,7 +147,7 @@ public void shouldThrowOnUnknownLogicalNode() { // Given: when(ksqlBareOutputNode.getSource()).thenReturn(mock(PlanNode.class)); final PushPhysicalPlanBuilder builder = new PushPhysicalPlanBuilder(logContext, - persistentQueryMetadata); + persistentQueryMetadata, false); // When: final Exception e = assertThrows( @@ -164,7 +164,7 @@ public void shouldThrowOnMultipleSources() { // Given: when(projectNode.getSources()).thenReturn(ImmutableList.of(filterNode, dataSourceNode)); final PushPhysicalPlanBuilder builder = new PushPhysicalPlanBuilder(logContext, - persistentQueryMetadata); + persistentQueryMetadata, false); // When: final Exception e = assertThrows( @@ -182,7 +182,7 @@ public void shouldThrowOnNoDataSourceOperator() { // Given: when(filterNode.getSources()).thenReturn(ImmutableList.of()); final PushPhysicalPlanBuilder builder = new PushPhysicalPlanBuilder(logContext, - persistentQueryMetadata); + persistentQueryMetadata, false); // When: final Exception e = assertThrows( diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlanTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlanTest.java index 98be7ab0f624..5285bec72d97 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlanTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlanTest.java @@ -4,6 +4,7 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.is; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; @@ -157,6 +158,41 @@ public void shouldStopOnHasError() throws InterruptedException { assertThat(pushPhysicalPlan.isClosed(), is(true)); } + @Test + public void shouldThrowErrorOnOpen() throws InterruptedException { + final PushPhysicalPlan pushPhysicalPlan = new PushPhysicalPlan(root, logicalSchema, queryId, + scalablePushRegistry, pushDataSourceOperator, context); + doNothing().when(pushDataSourceOperator).setNewRowCallback(runnableCaptor.capture()); + doThrow(new RuntimeException("Error on open")).when(root).open(); + + final BufferedPublisher> publisher = pushPhysicalPlan.execute(); + final TestSubscriber> subscriber = new TestSubscriber<>(); + publisher.subscribe(subscriber); + + while (subscriber.getError() == null) { + Thread.sleep(100); + } + assertThat(subscriber.getError().getMessage(), containsString("Error on open")); + } + + @Test + public void shouldThrowErrorOnNext() throws InterruptedException { + final PushPhysicalPlan pushPhysicalPlan = new PushPhysicalPlan(root, logicalSchema, queryId, + scalablePushRegistry, pushDataSourceOperator, context); + doNothing().when(pushDataSourceOperator).setNewRowCallback(runnableCaptor.capture()); + when(pushDataSourceOperator.droppedRows()).thenReturn(false); + doThrow(new RuntimeException("Error on next")).when(root).next(); + + final BufferedPublisher> publisher = pushPhysicalPlan.execute(); + final TestSubscriber> subscriber = new TestSubscriber<>(); + publisher.subscribe(subscriber); + + while (subscriber.getError() == null) { + Thread.sleep(100); + } + assertThat(subscriber.getError().getMessage(), containsString("Error on next")); + } + public static class TestSubscriber implements Subscriber { private Subscription sub; diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/PushRoutingTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/PushRoutingTest.java index f9558083db07..c55a23cd106c 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/PushRoutingTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/PushRoutingTest.java @@ -10,10 +10,13 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import io.confluent.ksql.GenericRow; import io.confluent.ksql.config.SessionConfig; import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.physical.scalablepush.PushRouting.PushConnectionsHandle; +import io.confluent.ksql.physical.scalablepush.PushRouting.RoutingResult; +import io.confluent.ksql.physical.scalablepush.PushRouting.RoutingResultStatus; import io.confluent.ksql.physical.scalablepush.locator.PushLocator; import io.confluent.ksql.physical.scalablepush.locator.PushLocator.KsqlNode; import io.confluent.ksql.query.TransientQueryQueue; @@ -25,7 +28,6 @@ import io.confluent.ksql.services.SimpleKsqlClient; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KeyValue; -import io.confluent.ksql.util.KsqlConfig; import io.vertx.core.Context; import io.vertx.core.Vertx; import java.net.URI; @@ -36,6 +38,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicReference; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -93,13 +96,14 @@ public void setUp() { when(sessionConfig.getOverrides()).thenReturn(ImmutableMap.of()); when(serviceContext.getKsqlClient()).thenReturn(simpleKsqlClient); when(pushPhysicalPlan.getScalablePushRegistry()).thenReturn(scalablePushRegistry); + when(pushPhysicalPlan.getContext()).thenReturn(context); when(scalablePushRegistry.getLocator()).thenReturn(locator); when(locator.locate()).thenReturn(ImmutableList.of(ksqlNodeLocal, ksqlNodeRemote)); when(ksqlNodeLocal.location()).thenReturn(URI.create("http://localhost:8088")); when(ksqlNodeLocal.isLocal()).thenReturn(true); when(ksqlNodeRemote.location()).thenReturn(URI.create("http://remote:8088")); when(ksqlNodeRemote.isLocal()).thenReturn(false); - when(pushRoutingOptions.getIsSkipForwardRequest()).thenReturn(false); + when(pushRoutingOptions.getHasBeenForwarded()).thenReturn(false); transientQueryQueue = new TransientQueryQueue(OptionalInt.empty()); } @@ -159,10 +163,195 @@ public void shouldSucceed_forward() throws ExecutionException, InterruptedExcept assertThat(rows.contains(REMOTE_ROW2.getRow().get().getColumns()), is(true)); } + @Test + public void shouldSucceed_addRemoteNode() throws ExecutionException, InterruptedException { + // Given: + final AtomicReference> nodes = new AtomicReference<>( + ImmutableSet.of(ksqlNodeLocal)); + final PushRouting routing = new PushRouting(sqr -> nodes.get(), 50, true); + BufferedPublisher> localPublisher = new BufferedPublisher<>(context); + BufferedPublisher remotePublisher = new BufferedPublisher<>(context); + when(pushPhysicalPlan.execute()).thenReturn(localPublisher); + when(simpleKsqlClient.makeQueryRequestStreamed(any(), any(), any(), any())) + .thenReturn(createFuture(RestResponse.successful(200, remotePublisher))); + + // When: + CompletableFuture future = + routing.handlePushQuery(serviceContext, pushPhysicalPlan, statement, pushRoutingOptions, + outputSchema, transientQueryQueue); + future.get(); + context.runOnContext(v -> { + localPublisher.accept(LOCAL_ROW1); + localPublisher.accept(LOCAL_ROW2); + }); + + Set> rows = new HashSet<>(); + while (rows.size() < 2) { + final KeyValue, GenericRow> kv = transientQueryQueue.poll(); + if (kv == null) { + Thread.sleep(100); + continue; + } + rows.add(kv.value().values()); + } + + nodes.set(ImmutableSet.of(ksqlNodeLocal, ksqlNodeRemote)); + context.runOnContext(v -> { + remotePublisher.accept(REMOTE_ROW1); + remotePublisher.accept(REMOTE_ROW2); + }); + + // Then: + while (rows.size() < 4) { + final KeyValue, GenericRow> kv = transientQueryQueue.poll(); + if (kv == null) { + Thread.sleep(100); + continue; + } + rows.add(kv.value().values()); + } + assertThat(rows.contains(LOCAL_ROW1), is(true)); + assertThat(rows.contains(LOCAL_ROW2), is(true)); + assertThat(rows.contains(REMOTE_ROW1.getRow().get().getColumns()), is(true)); + assertThat(rows.contains(REMOTE_ROW2.getRow().get().getColumns()), is(true)); + } + + @Test + public void shouldSucceed_removeRemoteNode() throws ExecutionException, InterruptedException { + // Given: + final AtomicReference> nodes = new AtomicReference<>( + ImmutableSet.of(ksqlNodeLocal, ksqlNodeRemote)); + final PushRouting routing = new PushRouting(sqr -> nodes.get(), 50, true); + BufferedPublisher> localPublisher = new BufferedPublisher<>(context); + BufferedPublisher remotePublisher = new BufferedPublisher<>(context); + when(pushPhysicalPlan.execute()).thenReturn(localPublisher); + when(simpleKsqlClient.makeQueryRequestStreamed(any(), any(), any(), any())) + .thenReturn(createFuture(RestResponse.successful(200, remotePublisher))); + + // When: + CompletableFuture future = + routing.handlePushQuery(serviceContext, pushPhysicalPlan, statement, pushRoutingOptions, + outputSchema, transientQueryQueue); + final PushConnectionsHandle handle = future.get(); + context.runOnContext(v -> { + localPublisher.accept(LOCAL_ROW1); + localPublisher.accept(LOCAL_ROW2); + remotePublisher.accept(REMOTE_ROW1); + remotePublisher.accept(REMOTE_ROW2); + }); + + Set> rows = new HashSet<>(); + while (rows.size() < 4) { + final KeyValue, GenericRow> kv = transientQueryQueue.poll(); + if (kv == null) { + Thread.sleep(100); + continue; + } + rows.add(kv.value().values()); + } + + final RoutingResult result = handle.get(ksqlNodeRemote).get(); + nodes.set(ImmutableSet.of(ksqlNodeLocal)); + while (handle.get(ksqlNodeRemote).isPresent()) { + Thread.sleep(100); + continue; + } + + // Then: + assertThat(rows.contains(LOCAL_ROW1), is(true)); + assertThat(rows.contains(LOCAL_ROW2), is(true)); + assertThat(rows.contains(REMOTE_ROW1.getRow().get().getColumns()), is(true)); + assertThat(rows.contains(REMOTE_ROW2.getRow().get().getColumns()), is(true)); + assertThat(result.getStatus(), is(RoutingResultStatus.REMOVED)); + } + + @Test + public void shouldSucceed_remoteNodeComplete() throws ExecutionException, InterruptedException { + // Given: + final AtomicReference> nodes = new AtomicReference<>( + ImmutableSet.of(ksqlNodeLocal, ksqlNodeRemote)); + final PushRouting routing = new PushRouting(sqr -> nodes.get(), 50, false); + BufferedPublisher> localPublisher = new BufferedPublisher<>(context); + BufferedPublisher remotePublisher = new BufferedPublisher<>(context); + when(pushPhysicalPlan.execute()).thenReturn(localPublisher); + when(simpleKsqlClient.makeQueryRequestStreamed(any(), any(), any(), any())) + .thenReturn(createFuture(RestResponse.successful(200, remotePublisher))); + + // When: + CompletableFuture future = + routing.handlePushQuery(serviceContext, pushPhysicalPlan, statement, pushRoutingOptions, + outputSchema, transientQueryQueue); + final PushConnectionsHandle handle = future.get(); + context.runOnContext(v -> { + localPublisher.accept(LOCAL_ROW1); + localPublisher.accept(LOCAL_ROW2); + remotePublisher.accept(REMOTE_ROW1); + remotePublisher.accept(REMOTE_ROW2); + remotePublisher.complete(); + }); + + Set> rows = new HashSet<>(); + while (rows.size() < 4) { + final KeyValue, GenericRow> kv = transientQueryQueue.poll(); + if (kv == null) { + Thread.sleep(100); + continue; + } + rows.add(kv.value().values()); + } + while (!handle.get(ksqlNodeRemote).isPresent() + || handle.get(ksqlNodeRemote).get().getStatus() != RoutingResultStatus.COMPLETE) { + Thread.sleep(100); + continue; + } + + // Then: + assertThat(rows.contains(LOCAL_ROW1), is(true)); + assertThat(rows.contains(LOCAL_ROW2), is(true)); + assertThat(rows.contains(REMOTE_ROW1.getRow().get().getColumns()), is(true)); + assertThat(rows.contains(REMOTE_ROW2.getRow().get().getColumns()), is(true)); + assertThat(handle.get(ksqlNodeRemote).get().getStatus(), is(RoutingResultStatus.COMPLETE)); + } + + @Test + public void shouldFail_remoteNodeException() throws ExecutionException, InterruptedException { + // Given: + final AtomicReference> nodes = new AtomicReference<>( + ImmutableSet.of(ksqlNodeLocal, ksqlNodeRemote)); + final PushRouting routing = new PushRouting(sqr -> nodes.get(), 50, true); + BufferedPublisher> localPublisher = new BufferedPublisher<>(context); + TestRemotePublisher remotePublisher = new TestRemotePublisher(context); + when(pushPhysicalPlan.execute()).thenReturn(localPublisher); + when(simpleKsqlClient.makeQueryRequestStreamed(any(), any(), any(), any())) + .thenReturn(createFuture(RestResponse.successful(200, remotePublisher))); + + // When: + CompletableFuture future = + routing.handlePushQuery(serviceContext, pushPhysicalPlan, statement, pushRoutingOptions, + outputSchema, transientQueryQueue); + final PushConnectionsHandle handle = future.get(); + final AtomicReference exception = new AtomicReference<>(null); + handle.onException(exception::set); + context.runOnContext(v -> { + localPublisher.accept(LOCAL_ROW1); + localPublisher.accept(LOCAL_ROW2); + remotePublisher.error(new RuntimeException("Random error")); + }); + + while (exception.get() == null) { + Thread.sleep(100); + continue; + } + + // Then: + assertThat(exception.get().getMessage(), containsString("Random error")); + } + + @Test public void shouldSucceed_justForwarded() throws ExecutionException, InterruptedException { // Given: - when(pushRoutingOptions.getIsSkipForwardRequest()).thenReturn(true); + when(pushRoutingOptions.getHasBeenForwarded()).thenReturn(true); final PushRouting routing = new PushRouting(); BufferedPublisher> localPublisher = new BufferedPublisher<>(context); when(pushPhysicalPlan.execute()).thenReturn(localPublisher); @@ -195,7 +384,7 @@ public void shouldSucceed_justForwarded() throws ExecutionException, Interrupted @Test public void shouldFail_duringPlanExecute() throws ExecutionException, InterruptedException { // Given: - when(pushRoutingOptions.getIsSkipForwardRequest()).thenReturn(true); + when(pushRoutingOptions.getHasBeenForwarded()).thenReturn(true); final PushRouting routing = new PushRouting(); when(pushPhysicalPlan.execute()).thenThrow(new RuntimeException("Error!")); @@ -249,7 +438,7 @@ public void shouldFail_errorRemoteCall() throws ExecutionException, InterruptedE public void shouldFail_hitRequestLimitLocal() throws ExecutionException, InterruptedException { // Given: transientQueryQueue = new TransientQueryQueue(OptionalInt.empty(), 1, 100); - when(pushRoutingOptions.getIsSkipForwardRequest()).thenReturn(true); + when(pushRoutingOptions.getHasBeenForwarded()).thenReturn(true); final PushRouting routing = new PushRouting(); BufferedPublisher> localPublisher = new BufferedPublisher<>(context); when(pushPhysicalPlan.execute()).thenReturn(localPublisher); @@ -311,4 +500,15 @@ public void shouldFail_hitRequestLimitRemote() throws ExecutionException, Interr assertThat(rows.contains(REMOTE_ROW1.getRow().get().getColumns()), is(true)); assertThat(handle.getError().getMessage(), containsString("Hit limit of request queue")); } + + private static class TestRemotePublisher extends BufferedPublisher { + + public TestRemotePublisher(Context ctx) { + super(ctx); + } + + public void error(final Throwable e) { + sendError(e); + } + } } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/ScalablePushRegistryTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/ScalablePushRegistryTest.java index c42b5ba8ee3d..998c11fe41db 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/ScalablePushRegistryTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/ScalablePushRegistryTest.java @@ -72,7 +72,7 @@ public void setUp() { @Test public void shouldRegisterAndGetQueueOffer_nonWindowed() { // Given: - ScalablePushRegistry registry = new ScalablePushRegistry(locator, SCHEMA, false, false); + ScalablePushRegistry registry = new ScalablePushRegistry(locator, SCHEMA, false, false, false); when(record.key()).thenReturn(genericKey); when(record.value()).thenReturn(genericRow); when(record.timestamp()).thenReturn(TIMESTAMP); @@ -80,7 +80,7 @@ public void shouldRegisterAndGetQueueOffer_nonWindowed() { when(genericRow.values()).thenAnswer(a -> VALUE); // When: - registry.register(processingQueue); + registry.register(processingQueue, false); assertThat(registry.numRegistered(), is(1)); // Then: @@ -96,7 +96,7 @@ public void shouldRegisterAndGetQueueOffer_nonWindowed() { @Test public void shouldRegisterAndGetQueueOffer_windowed() { // Given: - ScalablePushRegistry registry = new ScalablePushRegistry(locator, SCHEMA, true, true); + ScalablePushRegistry registry = new ScalablePushRegistry(locator, SCHEMA, true, true, false); when(record.key()).thenReturn(windowed); when(record.value()).thenReturn(genericRow); when(record.timestamp()).thenReturn(TIMESTAMP); @@ -107,7 +107,7 @@ public void shouldRegisterAndGetQueueOffer_windowed() { // When: - registry.register(processingQueue); + registry.register(processingQueue, false); assertThat(registry.numRegistered(), is(1)); // Then: @@ -121,10 +121,28 @@ public void shouldRegisterAndGetQueueOffer_windowed() { assertThat(registry.numRegistered(), is(0)); } + @Test + public void shouldEnforceNewNodeContinuity() { + // Given: + ScalablePushRegistry registry = new ScalablePushRegistry(locator, SCHEMA, true, true, true); + when(record.key()).thenReturn(windowed); + when(record.value()).thenReturn(genericRow); + + // When: + final Processor processor = registry.get(); + processor.init(processorContext); + processor.process(record); + final Exception e = assertThrows(IllegalStateException.class, + () -> registry.register(processingQueue, true)); + + // Then: + assertThat(e.getMessage(), containsString("New node missed data")); + } + @Test public void shouldCatchException() { // Given: - ScalablePushRegistry registry = new ScalablePushRegistry(locator, SCHEMA, false, false); + ScalablePushRegistry registry = new ScalablePushRegistry(locator, SCHEMA, false, false, false); when(record.key()).thenReturn(genericKey); when(record.value()).thenReturn(genericRow); when(record.timestamp()).thenReturn(TIMESTAMP); @@ -133,7 +151,7 @@ public void shouldCatchException() { when(processingQueue.offer(any())).thenThrow(new RuntimeException("Error!")); // When: - registry.register(processingQueue); + registry.register(processingQueue, false); // Then: final Processor processor = registry.get(); @@ -146,7 +164,8 @@ public void shouldCreate() { // When: final Optional registry = ScalablePushRegistry.create(SCHEMA, Collections::emptyList, false, false, - ImmutableMap.of(StreamsConfig.APPLICATION_SERVER_CONFIG, "http://localhost:8088")); + ImmutableMap.of(StreamsConfig.APPLICATION_SERVER_CONFIG, "http://localhost:8088"), + false); // Then: assertThat(registry.isPresent(), is(true)); @@ -158,7 +177,7 @@ public void shouldCreate_badApplicationServer() { final Exception e = assertThrows( IllegalArgumentException.class, () -> ScalablePushRegistry.create(SCHEMA, Collections::emptyList, false, false, - ImmutableMap.of(StreamsConfig.APPLICATION_SERVER_CONFIG, 123)) + ImmutableMap.of(StreamsConfig.APPLICATION_SERVER_CONFIG, 123), false) ); // Then @@ -171,7 +190,7 @@ public void shouldCreate_badUrlApplicationServer() { final Exception e = assertThrows( IllegalArgumentException.class, () -> ScalablePushRegistry.create(SCHEMA, Collections::emptyList, false, false, - ImmutableMap.of(StreamsConfig.APPLICATION_SERVER_CONFIG, "abc")) + ImmutableMap.of(StreamsConfig.APPLICATION_SERVER_CONFIG, "abc"), false) ); // Then @@ -183,7 +202,7 @@ public void shouldCreate_noApplicationServer() { // When final Optional registry = ScalablePushRegistry.create(SCHEMA, Collections::emptyList, false, false, - ImmutableMap.of()); + ImmutableMap.of(), false); // Then assertThat(registry.isPresent(), is(false)); @@ -192,8 +211,8 @@ public void shouldCreate_noApplicationServer() { @Test public void shouldCallOnErrorOnQueue() { // Given - ScalablePushRegistry registry = new ScalablePushRegistry(locator, SCHEMA, false, false); - registry.register(processingQueue); + ScalablePushRegistry registry = new ScalablePushRegistry(locator, SCHEMA, false, false, false); + registry.register(processingQueue, false); // When registry.onError(); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/operators/PeekStreamOperatorTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/operators/PeekStreamOperatorTest.java index db6242c856d8..6b844e0aac0e 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/operators/PeekStreamOperatorTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/operators/PeekStreamOperatorTest.java @@ -3,6 +3,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -41,14 +42,15 @@ public class PeekStreamOperatorTest { @Test public void shouldGetRowsFromOperator() { // Given: - final PeekStreamOperator locator = new PeekStreamOperator(registry, dataSourceNode, QUERY_ID); + final PeekStreamOperator locator = new PeekStreamOperator(registry, dataSourceNode, QUERY_ID, + false); locator.setNewRowCallback(newRowCallback); // When: locator.open(); // Then: - verify(registry, times(1)).register(processingQueueCaptor.capture()); + verify(registry, times(1)).register(processingQueueCaptor.capture(), eq(false)); final ProcessingQueue processingQueue = processingQueueCaptor.getValue(); processingQueue.offer(row1); processingQueue.offer(row2); @@ -63,7 +65,8 @@ public void shouldGetRowsFromOperator() { @Test public void shouldDefaultToFalseForHasErrorOnQueue() { // Given: - final PeekStreamOperator locator = new PeekStreamOperator(registry, dataSourceNode, QUERY_ID); + final PeekStreamOperator locator = new PeekStreamOperator(registry, dataSourceNode, QUERY_ID, + false); // When: locator.open(); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/ScalablePushQueryMetadataTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/ScalablePushQueryMetadataTest.java index ad3ad40e5360..d4a90d11a85f 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/ScalablePushQueryMetadataTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/ScalablePushQueryMetadataTest.java @@ -62,7 +62,8 @@ public void setUp() { new QueryId("queryid"), blockingRowQueue, ResultType.STREAM, - populator + populator, + () -> { } ); } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java index 94b7492ee96d..25b7455c6ab2 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java @@ -179,6 +179,7 @@ private QueryPublisher createScalablePushQueryPublisher( final ScalablePushQueryMetadata query = ksqlEngine .executeScalablePushQuery(analysis, serviceContext, statement, pushRouting, routingOptions, plannerOptions, context); + query.prepare(); publisher.setQueryHandle(new KsqlQueryHandle(query), false, true); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PushQueryConfigRoutingOptions.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PushQueryConfigRoutingOptions.java index aede665a2573..19c84fc635e2 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PushQueryConfigRoutingOptions.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PushQueryConfigRoutingOptions.java @@ -31,11 +31,20 @@ public PushQueryConfigRoutingOptions( } @Override - public boolean getIsSkipForwardRequest() { + public boolean getHasBeenForwarded() { if (requestProperties.containsKey(KsqlRequestConfig.KSQL_REQUEST_QUERY_PUSH_SKIP_FORWARDING)) { return (Boolean) requestProperties.get( KsqlRequestConfig.KSQL_REQUEST_QUERY_PUSH_SKIP_FORWARDING); } return KsqlRequestConfig.KSQL_REQUEST_QUERY_PUSH_SKIP_FORWARDING_DEFAULT; } + + @Override + public boolean getExpectingStartOfRegistryData() { + if (requestProperties.containsKey(KsqlRequestConfig.KSQL_REQUEST_QUERY_PUSH_REGISTRY_START)) { + return (Boolean) requestProperties.get( + KsqlRequestConfig.KSQL_REQUEST_QUERY_PUSH_REGISTRY_START); + } + return KsqlRequestConfig.KSQL_REQUEST_QUERY_PUSH_REGISTRY_START_DEFAULT; + } } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PushQueryPublisher.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PushQueryPublisher.java index f20267264acf..cc1f43e9063c 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PushQueryPublisher.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PushQueryPublisher.java @@ -35,6 +35,7 @@ import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KeyValue; import io.confluent.ksql.util.PushQueryMetadata; +import io.confluent.ksql.util.ScalablePushQueryMetadata; import io.confluent.ksql.util.TransientQueryMetadata; import io.vertx.core.Context; import java.util.Collection; @@ -139,10 +140,11 @@ public synchronized void subscribe(final Flow.Subscriber final ImmutableAnalysis analysis = ksqlEngine.analyzeQueryWithNoOutputTopic(query.getStatement(), query.getStatementText()); - - queryMetadata = ksqlEngine + final ScalablePushQueryMetadata pushQueryMetadata = ksqlEngine .executeScalablePushQuery(analysis, serviceContext, query, pushRouting, routingOptions, plannerOptions, context); + pushQueryMetadata.prepare(); + queryMetadata = pushQueryMetadata; } else { queryMetadata = ksqlEngine .executeTransientQuery(serviceContext, query, true); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index 3303d0dbb619..6719cfa27858 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -508,6 +508,7 @@ private EndpointResponse handleScalablePushQuery( final ScalablePushQueryMetadata query = ksqlEngine .executeScalablePushQuery(analysis, serviceContext, configured, pushRouting, routingOptions, plannerOptions, context); + query.prepare(); final QueryStreamWriter queryStreamWriter = new QueryStreamWriter( query, diff --git a/ksqldb-rest-client/src/main/java/io/confluent/ksql/rest/client/StreamPublisher.java b/ksqldb-rest-client/src/main/java/io/confluent/ksql/rest/client/StreamPublisher.java index 4df180fd9c4a..e436392862e0 100644 --- a/ksqldb-rest-client/src/main/java/io/confluent/ksql/rest/client/StreamPublisher.java +++ b/ksqldb-rest-client/src/main/java/io/confluent/ksql/rest/client/StreamPublisher.java @@ -72,6 +72,7 @@ public static Buffer toJsonMsg(final Buffer responseLine, final boolean stripArr } }) .endHandler(v -> complete()); + response.request().connection().closeHandler(v -> complete()); } public void close() {