Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: New API - integrate push queries into backend #4495

Merged
merged 1 commit into from Feb 11, 2020

Conversation

purplefox
Copy link
Contributor

@purplefox purplefox commented Feb 8, 2020

Description

Implements #4256

  • Integrates query streaming functionality with ksql engine
  • If the right ksql config setting is provided then starting the ksql rest app will also start the new API. The config setting is disabled by default. This is of course a temporary hack so we can have both the new API and and the old API running in parallel so we can run the "ride share app" demo. Once we have migrated the whole API, ksql-rest-app will be removed completely.
  • There's a bit of duplication in EngineExecutor/QueryExecutor to support the new way of creating transient queries. This is temporary and will disappear when we remove ksql-rest-app so won't need to support the old way of creating transient queries.
  • A bunch of refactoring around test classes.
  • Probably some other minor bits and pieces

Testing done

New integration test for the new API
More api tests
More unit tests

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@purplefox purplefox changed the title Server queries feat: Integrate new APIn query streaming into ksqlDB backend Feb 8, 2020
@purplefox purplefox changed the title feat: Integrate new APIn query streaming into ksqlDB backend feat: Integrate new API query streaming into ksqlDB backend Feb 8, 2020
@purplefox purplefox changed the title feat: Integrate new API query streaming into ksqlDB backend feat: New API - integrate push queries into backend Feb 8, 2020
@purplefox purplefox marked this pull request as ready for review February 9, 2020 12:38
@purplefox purplefox requested a review from a team as a code owner February 9, 2020 12:38
@purplefox purplefox force-pushed the server_queries branch 4 times, most recently from 64a3ee9 to 8f94874 Compare February 9, 2020 13:01
Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - since these PRs are adding pretty hefty chunks of code that will be critical in the future, I think it'll be good to get a second pair of eyes

Comment on lines 144 to 149
public String getName() {
return "tim";
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we avoid "hacks" like this? I've seen these things eventually end up leaking to the user and causing some level of confusion (e.g. an error message saying "The supplied principal "tim" does not have permissions to access FOO")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it has to return something, would you prefer "almog" ? ;)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while I'd love to have it be "almog" 😉 if it has to be set, can it be "NO_PRINICPAL"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

@@ -55,6 +55,11 @@
public static final String CERT_PATH_DOC =
"Path to cert file";

public static final String WORKER_POOL_SIZE = propertyName("worker-pool-size");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: all of our other configs have words separated by .s can we follow that here too? (and all the above configs)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty confused by this convention.

It seems that dots are

a) used to create scoping for properties - this seems a very normal use of dots in properties to me. E.g. ksql.api.* contains all stuff related to the api. ksql.engine.* - contains all stuff related to engine. Seems very reasonable and expected.
b) also used to separate words! Seems really odd, and breaks the scoping rules in a. Why do we do this? Seems broken to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@purplefox I think a main reason for .s is to follow the environment variable naming pattern, which translates e.g. an env var named KSQL_WORKER_POOL_SIZE into worker.pool.size in server properties.

To get a hyphenated property name, the env var would have to be e.g. KSQL_WORKER___POOL___SIZE 🙃

the converter is called env_to_props:
https://github.com/confluentinc/confluent-docker-utils/blob/3427c198e83b5d65d91b580a6df589f5d4799c14/confluent/docker_utils/dub.py#L50

here's how it applies for docker builds:
https://github.com/confluentinc/ksql/blob/master/ksql-docker/src/include/docker/run#L32

{% set kr_props = env_to_props('KSQL_', '') -%}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^ The existing convention for env var names is well-established and shared across Confluent components and their documentation. From the perspective of code it is indeed awkward that it breaks expectations of namespacing. However, considering end-user ergonomics, I believe we should keep the pattern in place and use dots for the prop names.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also might be just a byproduct of a bygone age... I know that LinkedIn had the same convention

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's weird... but ack

Comment on lines 25 to 27
String[] getColumnNames();

String[] getColumnTypes();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use List<String> in this API? Arrays are rather brittle and there's not much of a benefit of using them if it's non-primitive anyway

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will change this.

log.error("Failed to close query", ar.cause());
}
});
super.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it safe to do this before we've stopped the query handler? (i.e. should we move this into executeBlocking?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see it as an issue.

Comment on lines +198 to +212
if (buildResult instanceof KStreamHolder<?>) {
kstream = ((KStreamHolder<?>) buildResult).getStream();
} else if (buildResult instanceof KTableHolder<?>) {
final KTable<?, GenericRow> ktable = ((KTableHolder<?>) buildResult).getTable();
kstream = ktable.toStream();
} else {
throw new IllegalStateException("Unexpected type built from exection plan");
}

kstream.foreach((k, row) -> {
if (row == null) {
return;
}
rowConsumer.accept(row);
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems a little too hacked together for my liking - it's like making a modification to the physical/logical plan "outside" of the physical and logical planners. Not sure I have suggestions at the moment, but give it a thought

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's pretty much an exact copy and paste of the existing code for transient queries

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the main difference is that this adds a forEach, which actually affects the execution plan/topology

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The foreach is from here https://github.com/confluentinc/ksql/blob/master/ksql-engine/src/main/java/io/confluent/ksql/query/TransientQueryQueue.java#L59

I've basically just simplified the pre-existing code into a single place. There's no new logic though.


import io.confluent.ksql.GenericRow;

public interface RowConsumer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just have Consumer<GenericRow>?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

Comment on lines +392 to +399
if (apiServer != null) {
apiServer.stop();
apiServer = null;
}
if (vertx != null) {
vertx.close();
vertx = null;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we try/catch these as well?

import org.junit.rules.RuleChain;

@Category({IntegrationTest.class})
public class NewApiTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as above, can we avoid the word New in the code? Especially after we're done with the migration this won't make much sense!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a temporary name and will be renamed before KLIP-15 is complete. The only reason new is there is to distinguish it from the current rest api test.

QueryResponse queryResponse = new QueryResponse(buff.toString());
return queryResponse.rows.size();
} catch (Throwable t) {
return Integer.MAX_VALUE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any way to communicate this error so that if the test fails here we can figure out why?

Copy link
Contributor

@vcrfxia vcrfxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @purplefox ! LGTM with a bunch of questions inline, mostly just to improve my understanding.

@Override
public void subscribe(final Subscriber<? super T> subscriber) {
Objects.requireNonNull(subscriber);
if (Vertx.currentContext() == ctx) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of this if-else block? Specifically:

  • when/why would this method be called from a different context than the one passed when creating the publisher?
  • why is having the if-else preferable to simply always making the call async (i.e., the contents of the "else" part)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. The class doesn't control who is calling subscribe so we can't assume it's always called from the same context.
  2. If already on same context then it's safe to call directly which will be faster than doing it asynchronously.

this.closeHandler = Objects.requireNonNull(closeHandler);
this.id = new PushQueryId(UUID.randomUUID().toString());
server.registerQuery(this);
}

public void close() {
server.removeQuery(id);
querySubscriber.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come we're not closing the subscriber anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The subscriber doesn't have a close method any more, it's not needed. Closing the publisher will cause an onComplete to be sent to the subscriber which will result in the response being ended.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need to pass the subscriber into PushQueryHolder in that case? Looks like it's unused.

}

protected final void sendError(final Exception e) {
checkContext();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of asserting the context here, rather than in the other methods that call sendError()? Would the context here ever be different from the ones in those methods (doSubscribe() and doRequest())?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a method that accesses internal state so seems like a sensible place to check the context.

public synchronized void accept(final GenericRow row) {
Objects.requireNonNull(row);

if (closed || complete || !checkLimit()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come we don't check cancelled here?

}

@Test
public void shouldDeliverAfterSubscribe() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this test offer anything beyond PublisherTestBase#shouldDeliverAllRequestingOneByOneLoadAfterSubscribe()? AFAICT they appear to be testing the same thing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

@@ -23,11 +23,12 @@
private ErrorCodes() {
}

public static final int ERROR_CODE_MISSING_PARAM = 50001;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the decision to remove the 500 prefixes? These error codes are user-facing, right?

Copy link
Contributor Author

@purplefox purplefox Feb 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should tie our error codes to HTTP status codes, they seem orthogonal.
E.g. If I stream some inserts to the server, then I'll get a 200 OK response and the acks for the inserts will start coming up. Maybe the 100000th insert has some malformed JSON in which case the ack stream will contain an error "malformed JSON" and the response will be ended. The error here has got nothing to do with the HTTP status code (which was 200 in this case).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These error codes are exposed to users, though, right? How will a user understand the meaning of the error codes? (Or is that not the purpose of the error codes?)

@Override
protected PushQueryHandler createQuery(final String sql, final JsonObject properties,
final Context context, final WorkerExecutor workerExecutor, final RowConsumer rowConsumer) {
// Must be run on worker as all this stuff is slow
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pardon the ignorance, but how do we know "this stuff is slow"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anything taking more than a very small number of milliseconds will be slow for an event loop. I think we should err on the side of caution.

.withoutPlainListeners()
.withSaslSslListeners()
.withAclsEnabled(SUPER_USER.username)
.withAcl(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this ACLs setup relevant to the tests in this file? If not, can we remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I think they can be removed :)

} catch (Throwable t) {
return Integer.MAX_VALUE;
}
}, is(7));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this value coming from? Properties is empty and push queries default to using auto.offset.reset=latest so I'm surprised rows are being returned in this query.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's set in TestKsqlRestApp. But we also need to set it in the query properties. Good catch!

private final KsqlEngine ksqlEngine;
private final KsqlConfig ksqlConfig;
private final KsqlSecurityExtension securityExtension;
private final ServiceContextFactory theServiceContextFactory;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why not simply serviceContextFactory? (I assume there's a reason for diverging from convention -- I'm just not seeing what it is.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's vestigial from when there was something else called serviceContextFactory

Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not finished, but a few comments...

Comment on lines 166 to 171
public String[] getColumnNames() {
return colNamesFromSchema(queryMetadata.getLogicalSchema());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use List rather than naked arrays please?

protected boolean cancelled;

public BasePublisher(final Context ctx) {
this.ctx = ctx;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: validate params that will be stored in object state; ensuring object does not get into an invalid state.

Comment on lines 32 to 41
/**
* A query publisher that uses an internal blocking queue to store rows for delivery. It's currently
* necessary to use a blocking queue as Kafka Streams delivers message in a synchronous fashion with
* no back pressure. If the queue was not blocking then if the subscriber was slow the messages
* could build up on the queue eventually resulting in out of memory. The only mechanism we have to
* slow streams down is to block the thread. Kafka Streams uses dedicated streams per topology so
* this won't prevent the thread from doing useful work elsewhere but it does mean we can't have too
* many push queries in the server at any one time as we can end up with a lot of threads. Ideally
* Kafka Streams would use a non-blocking reactive model with back-pressure, e.g. using reactive
* streams. That way a small number of threads would be required to service all topologies and we
* wouldn't need to block.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try to avoid such 'I wish Streams did it this way' style comments in the code. Such points are for discussions, not comments in code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 one this one

Comment on lines 56 to 57
private String[] columnNames;
private String[] columnTypes;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use List not arrays please.

this.columnTypes = queryHandle.getColumnTypes();
}

@SuppressFBWarnings(value = "EI_EXPOSE_REP")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Don't expose mutable object state. It breaks encapsulation. Encapsulation is a pretty standard OO thing.

queryHandle.start();
}

private boolean checkLimit() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

checkContext();

int num = 0;
while (demand > 0 && !queue.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approving to at request to unblock @purplefox and avoid merge hell with other PRs - I don't see anything fundamentally wrong but (as Tim agreed offline) I'd like the open comments to be addressed in follow-up PRs.

Please address the synchronization issue https://github.com/confluentinc/ksql/pull/4495/files/8f948742a0ce64ab808dd9e5352bba82a613252d#r377817481 before merging

Summarizing the comments that are important to me to be addressed in a follow-up (and the green tick is conditional on fixing or discussing further):

  • swapping the String[] with List<String>
  • addressing the protected state demand
  • removing "new" from the code base and coming up with better naming

public static final String WORKER_POOL_SIZE = propertyName("worker.pool.size");
public static final String WORKER_POOL_DOC =
"Max number of worker threads for executing blocking code";
public static final int DEFAULT_WORKER_POOL_SIZE = 100;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it make sense to have so many workers relative to the number of verticles? Is the expectation that multiple blocking tasks created by the same will be running simultaneously?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We choose the number of event loops to be about the same as the number of cores because event loops should always be live, therefore you shouldn't need any more to utilise all the cores.
Workers are often used to execute blocking tasks - often blocking on IO, e.g. waiting on the network or file system etc. In this case the threads can spend a lot of their time inactive so in order to utilise all cores we choose a much larger number of threads than number of cores.
In our case if we find our workers aren't blocking on IO much, just doing long lived CPU tasks then we can probably get away with fewer, although we probably still want a reasonable amount so we don't end up with very long lived tasks causing those behind them to queue too long.

@purplefox purplefox merged commit 055406e into confluentinc:master Feb 11, 2020
@purplefox purplefox deleted the server_queries branch February 11, 2020 21:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants