From 57626f36e9387be2746d808acda5da0055141595 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Wed, 22 Sep 2021 14:20:37 -0500 Subject: [PATCH] feat: add stream pull queries to websocket endpoint (#8143) Finishing out the stream pull query implementation cycle, this adds support to the websocket endpoint, building on the HTTP/1 and HTTP/2 endpoints. --- .../io/confluent/ksql/engine/KsqlEngine.java | 6 ---- .../streaming/PushQueryPublisher.java | 1 + .../resources/streaming/WSQueryEndpoint.java | 30 ++++++++++++++--- .../ksql/rest/integration/RestApiTest.java | 33 +++++++++++++++++++ 4 files changed, 59 insertions(+), 11 deletions(-) diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java index cb5a546c7a9b..51bb973a0c89 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java @@ -314,12 +314,6 @@ public TransientQueryMetadata executeTransientQuery( } } - /** - * Unlike the other queries, stream pull queries are split into create and wait because the three - * API endpoints all need to do different stuff before, in the middle of, and after these two - * phases. One of them actually needs to wait on the pull query in a callback after starting the - * query, so splitting it into two method calls was the most practical choice. - */ public StreamPullQueryMetadata createStreamPullQuery( final ServiceContext serviceContext, final ImmutableAnalysis analysis, diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PushQueryPublisher.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PushQueryPublisher.java index ffc10bad49ba..d02f72c55e82 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PushQueryPublisher.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PushQueryPublisher.java @@ -133,6 +133,7 @@ static class PushQuerySubscription extends PollingSubscription { setError(e); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java index 0a8ee76a303b..f8e83392468d 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java @@ -22,7 +22,6 @@ import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.RateLimiter; import io.confluent.ksql.analyzer.ImmutableAnalysis; -import io.confluent.ksql.analyzer.PullQueryValidator; import io.confluent.ksql.api.server.SlidingWindowRateLimiter; import io.confluent.ksql.config.SessionConfig; import io.confluent.ksql.engine.KsqlEngine; @@ -44,6 +43,7 @@ import io.confluent.ksql.rest.server.LocalCommands; import io.confluent.ksql.rest.server.StatementParser; import io.confluent.ksql.rest.server.computation.CommandQueue; +import io.confluent.ksql.rest.server.resources.streaming.PushQueryPublisher.PushQuerySubscription; import io.confluent.ksql.rest.util.CommandStoreUtil; import io.confluent.ksql.rest.util.ConcurrencyLimiter; import io.confluent.ksql.rest.util.ScalablePushUtil; @@ -52,6 +52,7 @@ import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlStatementException; +import io.confluent.ksql.util.StreamPullQueryMetadata; import io.confluent.ksql.version.metrics.ActivenessRegistrar; import io.vertx.core.Context; import io.vertx.core.MultiMap; @@ -297,11 +298,30 @@ private void handleQuery(final RequestContext info, final Query query, return; } case KSTREAM: { - throw new KsqlStatementException( - "Pull queries are not supported on streams." - + PullQueryValidator.PULL_QUERY_SYNTAX_HELP, - statement.getStatementText() + final StreamPullQueryMetadata queryMetadata = + ksqlEngine.createStreamPullQuery( + info.securityContext.getServiceContext(), + analysis, + configured, + true + ); + + localCommands.ifPresent(lc -> lc.write(queryMetadata.getTransientQueryMetadata())); + + final PushQuerySubscription subscription = + new PushQuerySubscription(exec, + streamSubscriber, + queryMetadata.getTransientQueryMetadata() + ); + + log.info( + "Running query {}", + queryMetadata.getTransientQueryMetadata().getQueryId().toString() ); + queryMetadata.getTransientQueryMetadata().start(); + + streamSubscriber.onSubscribe(subscription); + return; } default: throw new KsqlStatementException( diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java index bcd417e20cce..c5c1678bef32 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java @@ -38,6 +38,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.endsWith; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -102,6 +103,7 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -217,6 +219,7 @@ public class RestApiTest { .withProperties(ClientTrustStore.trustStoreProps()) .withProperty(KsqlConfig.KSQL_QUERY_PUSH_V2_REGISTRY_INSTALLED, true) .withProperty(KsqlConfig.KSQL_QUERY_PUSH_V2_ENABLED, true) + .withProperty(KsqlConfig.KSQL_QUERY_STREAM_PULL_QUERY_ENABLED, true) .build(); @ClassRule @@ -344,6 +347,36 @@ public void shouldExecutePushQueryThatReturnsStreamOverWebSocketWithJsonContentT )); } + @Test + public void shouldExecutePullQueryThatReturnsStreamOverWebSocketWithJsonContentType() { + // When: + final List messages = makeWebSocketRequest( + "SELECT * from " + PAGE_VIEW_STREAM + ";", + MediaType.APPLICATION_JSON, + MediaType.APPLICATION_JSON + ); + + // Then: + assertThat(messages, equalTo( + ImmutableList.of( + "[{\"name\":\"PAGEID\",\"schema\":{\"type\":\"STRING\",\"fields\":null,\"memberSchema\":null}}" + + ",{\"name\":\"USERID\",\"schema\":{\"type\":\"STRING\",\"fields\":null,\"memberSchema\":null}}" + + ",{\"name\":\"VIEWTIME\",\"schema\":{\"type\":\"BIGINT\",\"fields\":null,\"memberSchema\":null}}]", + "{\"row\":{\"columns\":[\"PAGE_1\",\"USER_1\",1]}}", + "{\"row\":{\"columns\":[\"PAGE_2\",\"USER_2\",2]}}", + "{\"row\":{\"columns\":[\"PAGE_3\",\"USER_4\",3]}}", + "{\"row\":{\"columns\":[\"PAGE_4\",\"USER_3\",4]}}", + "{\"row\":{\"columns\":[\"PAGE_5\",\"USER_0\",5]}}", + "{\"row\":{\"columns\":[\"PAGE_5\",\"USER_2\",6]}}", + "{\"row\":{\"columns\":[\"PAGE_5\",\"USER_3\",7]}}", + // This is a bit weird, but it's clearly what the code is meant to produce. + // I'm unsure if it's ok to change this to make more sense, or if user code depends + // on this completion message. + "{\"error\":\"done\"}" + ) + )); + } + @Test public void shouldExecutePushQueryThatReturnsTableOverWebSocketWithJsonContentType() { // When: