Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRPC #37

Merged
merged 15 commits into from
Oct 18, 2017
Merged

DRPC #37

merged 15 commits into from
Oct 18, 2017

Conversation

akshaisarma
Copy link
Member

This builds off the initial DRPC work from @ShriramKumar and redoes it without replicating the Storm code. Feel free to take a look but I'll wire this up with an actual instance to test first.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.3%) to 99.624% when pulling 69787d5 on akshaisarma:drpc into 0e3b1ca on yahoo:master.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.3%) to 99.624% when pulling 689252d on akshaisarma:drpc into 0e3b1ca on yahoo:master.


Number connectTimeout = config.getRequiredConfigAs(DRPCConfig.DRPC_HTTP_CONNECT_TIMEOUT_MS, Number.class);
Number retryLimit = config.getRequiredConfigAs(DRPCConfig.DRPC_HTTP_CONNECT_RETRY_LIMIT, Number.class);
List<String> urls = (List<String>) config.getRequiredConfigAs(DRPCConfig.DRPC_SERVERS, List.class);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This local var name is the same as line 38 and make it a little bit hard to read. Can you change to a different name?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do

bullet.topology.join.bolt.cpu.load: 100.0
bullet.topology.join.bolt.memory.on.heap.load: 512.0
bullet.topology.join.bolt.memory.off.heap.load: 160.0
bullet.topology.join.bolt.parallelism: 20
bullet.topology.join.bolt.parallelism: 2
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The numbers are changed a lot. Is there any reason for it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For defaults, these were too high relative to the filter bolt parallelisms, so I dropped it. This parallelism is sufficient for hundreds of simultaneous queries.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.3%) to 99.625% when pulling 030cd82 on akshaisarma:drpc into 0e3b1ca on yahoo:master.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.3%) to 99.626% when pulling 285503b on akshaisarma:drpc into 0e3b1ca on yahoo:master.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.3%) to 99.626% when pulling e3b8ec0 on akshaisarma:drpc into 0e3b1ca on yahoo:master.

}

@Override
public void nextTuple() {
PubSubMessage message = null;
try {
message = subscriber.receive();
} catch (PubSubException e) {
} catch (Exception e) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What else exception does it throw? I checked the interface definition, only PubSubException thrown.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To catch RuntimeExceptions. Basically if anything bad happens for any reason, we don't want to restart the spout (ex. bad JSON in PubSubMessage throws a GSON RuntimeException)

@@ -51,8 +71,9 @@ public void nextTuple() {
log.error(e.getMessage());
}
if (message != null) {
// TODO: No need for two streams. Just send a unified Query stream. JoinBolt needs to not do that join.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"JoinBolt need not do that join."

Also, maybe call message.getId() just once and save to value to avoid calling the function 4 times?

// The path that queries must be POSTed to. This generally is "drpc".
public static final String DRPC_HTTP_PATH = PREFIX + "http.path";


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you want two blank lines here?

}
return IntStream.range(0, n).mapToObj(i -> new DRPCQuerySubscriber(config, maxUncommittedMessages))
.collect(Collectors.toList());

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete empty line.

@Override
public void close() {
log.warn("Failing all pending requests: {}", emittedIDs);
emittedIDs.values().forEach(spout::fail);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed, messages are not removed from emittedIDs when a message is ack'ed... Override commit() and do that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@coveralls
Copy link

Coverage Status

Coverage increased (+0.3%) to 99.628% when pulling 323ffb9 on akshaisarma:drpc into 0e3b1ca on yahoo:master.

@akshaisarma
Copy link
Member Author

I tested this out on a Storm cluster with a custom build of bullet-service using the DRPC pubsub. Looks to be working.

@akshaisarma akshaisarma merged commit 307ff90 into bullet-db:master Oct 18, 2017
@akshaisarma akshaisarma deleted the drpc branch October 18, 2017 21:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants