Skip to content

Commit

Permalink
feat: add stream pull queries to websocket endpoint (#8143)
Browse files Browse the repository at this point in the history
Finishing out the stream pull query implementation cycle, this adds support
to the websocket endpoint, building on the HTTP/1 and HTTP/2 endpoints.
  • Loading branch information
vvcephei committed Sep 22, 2021
1 parent 2abb098 commit 57626f3
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -314,12 +314,6 @@ public TransientQueryMetadata executeTransientQuery(
}
}

/**
* Unlike the other queries, stream pull queries are split into create and wait because the three
* API endpoints all need to do different stuff before, in the middle of, and after these two
* phases. One of them actually needs to wait on the pull query in a callback after starting the
* query, so splitting it into two method calls was the most practical choice.
*/
public StreamPullQueryMetadata createStreamPullQuery(
final ServiceContext serviceContext,
final ImmutableAnalysis analysis,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ static class PushQuerySubscription extends PollingSubscription<Collection<Stream
this.queryMetadata = requireNonNull(queryMetadata, "queryMetadata");

queryMetadata.setLimitHandler(this::setDone);
queryMetadata.setCompletionHandler(this::setDone);
queryMetadata.setUncaughtExceptionHandler(
e -> {
setError(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.RateLimiter;
import io.confluent.ksql.analyzer.ImmutableAnalysis;
import io.confluent.ksql.analyzer.PullQueryValidator;
import io.confluent.ksql.api.server.SlidingWindowRateLimiter;
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.engine.KsqlEngine;
Expand All @@ -44,6 +43,7 @@
import io.confluent.ksql.rest.server.LocalCommands;
import io.confluent.ksql.rest.server.StatementParser;
import io.confluent.ksql.rest.server.computation.CommandQueue;
import io.confluent.ksql.rest.server.resources.streaming.PushQueryPublisher.PushQuerySubscription;
import io.confluent.ksql.rest.util.CommandStoreUtil;
import io.confluent.ksql.rest.util.ConcurrencyLimiter;
import io.confluent.ksql.rest.util.ScalablePushUtil;
Expand All @@ -52,6 +52,7 @@
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.StreamPullQueryMetadata;
import io.confluent.ksql.version.metrics.ActivenessRegistrar;
import io.vertx.core.Context;
import io.vertx.core.MultiMap;
Expand Down Expand Up @@ -297,11 +298,30 @@ private void handleQuery(final RequestContext info, final Query query,
return;
}
case KSTREAM: {
throw new KsqlStatementException(
"Pull queries are not supported on streams."
+ PullQueryValidator.PULL_QUERY_SYNTAX_HELP,
statement.getStatementText()
final StreamPullQueryMetadata queryMetadata =
ksqlEngine.createStreamPullQuery(
info.securityContext.getServiceContext(),
analysis,
configured,
true
);

localCommands.ifPresent(lc -> lc.write(queryMetadata.getTransientQueryMetadata()));

final PushQuerySubscription subscription =
new PushQuerySubscription(exec,
streamSubscriber,
queryMetadata.getTransientQueryMetadata()
);

log.info(
"Running query {}",
queryMetadata.getTransientQueryMetadata().getQueryId().toString()
);
queryMetadata.getTransientQueryMetadata().start();

streamSubscriber.onSubscribe(subscription);
return;
}
default:
throw new KsqlStatementException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
Expand Down Expand Up @@ -102,6 +103,7 @@
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand Down Expand Up @@ -217,6 +219,7 @@ public class RestApiTest {
.withProperties(ClientTrustStore.trustStoreProps())
.withProperty(KsqlConfig.KSQL_QUERY_PUSH_V2_REGISTRY_INSTALLED, true)
.withProperty(KsqlConfig.KSQL_QUERY_PUSH_V2_ENABLED, true)
.withProperty(KsqlConfig.KSQL_QUERY_STREAM_PULL_QUERY_ENABLED, true)
.build();

@ClassRule
Expand Down Expand Up @@ -344,6 +347,36 @@ public void shouldExecutePushQueryThatReturnsStreamOverWebSocketWithJsonContentT
));
}

@Test
public void shouldExecutePullQueryThatReturnsStreamOverWebSocketWithJsonContentType() {
// When:
final List<String> messages = makeWebSocketRequest(
"SELECT * from " + PAGE_VIEW_STREAM + ";",
MediaType.APPLICATION_JSON,
MediaType.APPLICATION_JSON
);

// Then:
assertThat(messages, equalTo(
ImmutableList.of(
"[{\"name\":\"PAGEID\",\"schema\":{\"type\":\"STRING\",\"fields\":null,\"memberSchema\":null}}"
+ ",{\"name\":\"USERID\",\"schema\":{\"type\":\"STRING\",\"fields\":null,\"memberSchema\":null}}"
+ ",{\"name\":\"VIEWTIME\",\"schema\":{\"type\":\"BIGINT\",\"fields\":null,\"memberSchema\":null}}]",
"{\"row\":{\"columns\":[\"PAGE_1\",\"USER_1\",1]}}",
"{\"row\":{\"columns\":[\"PAGE_2\",\"USER_2\",2]}}",
"{\"row\":{\"columns\":[\"PAGE_3\",\"USER_4\",3]}}",
"{\"row\":{\"columns\":[\"PAGE_4\",\"USER_3\",4]}}",
"{\"row\":{\"columns\":[\"PAGE_5\",\"USER_0\",5]}}",
"{\"row\":{\"columns\":[\"PAGE_5\",\"USER_2\",6]}}",
"{\"row\":{\"columns\":[\"PAGE_5\",\"USER_3\",7]}}",
// This is a bit weird, but it's clearly what the code is meant to produce.
// I'm unsure if it's ok to change this to make more sense, or if user code depends
// on this completion message.
"{\"error\":\"done\"}"
)
));
}

@Test
public void shouldExecutePushQueryThatReturnsTableOverWebSocketWithJsonContentType() {
// When:
Expand Down

0 comments on commit 57626f3

Please sign in to comment.