-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add stream pull queries to websocket endpoint #8143
Conversation
/** | ||
* Unlike the other queries, stream pull queries are split into create and wait because the three | ||
* API endpoints all need to do different stuff before, in the middle of, and after these two | ||
* phases. One of them actually needs to wait on the pull query in a callback after starting the | ||
* query, so splitting it into two method calls was the most practical choice. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just noticed that this comment has become incorrect.
@@ -133,6 +133,7 @@ public synchronized void subscribe(final Flow.Subscriber<Collection<StreamedRow> | |||
this.queryMetadata = requireNonNull(queryMetadata, "queryMetadata"); | |||
|
|||
queryMetadata.setLimitHandler(this::setDone); | |||
queryMetadata.setCompletionHandler(this::setDone); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hooking in to the new completion callback. It looks like, similar to the HTTP/2 endpoint, limit queries don't print "limit reached", but unlike the HTTP/2 they actually do print "done" instead of just stopping.
It doesn't seem good that we managed to come up with three different ways to indicate the completion of a query in our three different query endpoints... But it's out of scope for this PR.
final StreamPullQueryMetadata queryMetadata = | ||
ksqlEngine.createStreamPullQuery( | ||
info.securityContext.getServiceContext(), | ||
analysis, | ||
configured, | ||
true | ||
); | ||
|
||
localCommands.ifPresent(lc -> lc.write(queryMetadata.getTransientQueryMetadata())); | ||
|
||
final PushQuerySubscription subscription = | ||
new PushQuerySubscription(exec, | ||
streamSubscriber, | ||
queryMetadata.getTransientQueryMetadata() | ||
); | ||
|
||
log.info( | ||
"Running query {}", | ||
queryMetadata.getTransientQueryMetadata().getQueryId().toString() | ||
); | ||
queryMetadata.getTransientQueryMetadata().start(); | ||
|
||
streamSubscriber.onSubscribe(subscription); | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main part of the implementation. It's very similar to the HTTP/2 implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have been looking in the code and it seems we don't have a PushQueryPublisherTest
like we do for PullQueryPublisherTest
. Shouldn't we have one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I noticed that, too. We could add one, but I didn't bother because the publisher is already covered by other tests, and the pull query publisher test looked of dubious value to me.
@@ -344,6 +347,36 @@ public void shouldExecutePushQueryThatReturnsStreamOverWebSocketWithJsonContentT | |||
)); | |||
} | |||
|
|||
@Test | |||
public void shouldExecutePullQueryThatReturnsStreamOverWebSocketWithJsonContentType() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost all the code is tested in the other endpoints, so we just have a quick smoke test here.
In the future, I hope we can refactor RQTT into a functional test suite that runs against all the endpoints.
a3d582e
to
6291a71
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @vvcephei! It would be nice to have a PushQueryPublisherTest
like we do for pull queries but it's up to you if you want to do that since it's outside the scope of this PR
Finishing out the stream pull query implementation cycle, this adds support
to the websocket endpoint, building on the HTTP/1 and HTTP/2 endpoints.
Reviewer checklist