Skip to content

Commit

Permalink
Pass AdminRequestHandler's custom payload at construction time
Browse files Browse the repository at this point in the history
This allows having a single start() method, which will simplify
throttling those requests.
  • Loading branch information
olim7t committed Mar 13, 2018
1 parent c6bdbb4 commit 7057301
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 11 deletions.
Expand Up @@ -62,7 +62,8 @@ public static AdminRequestHandler query(
if (!parameters.isEmpty()) {
debugString += " with parameters " + parameters;
}
return new AdminRequestHandler(channel, message, timeout, logPrefix, debugString);
return new AdminRequestHandler(
channel, message, Frame.NO_PAYLOAD, timeout, logPrefix, debugString);
}

public static AdminRequestHandler query(
Expand All @@ -72,6 +73,7 @@ public static AdminRequestHandler query(

private final DriverChannel channel;
private final Message message;
private final Map<String, ByteBuffer> customPayload;
private final Duration timeout;
private final String logPrefix;
private final String debugString;
Expand All @@ -83,21 +85,19 @@ public static AdminRequestHandler query(
public AdminRequestHandler(
DriverChannel channel,
Message message,
Map<String, ByteBuffer> customPayload,
Duration timeout,
String logPrefix,
String debugString) {
this.channel = channel;
this.message = message;
this.customPayload = customPayload;
this.timeout = timeout;
this.logPrefix = logPrefix;
this.debugString = debugString;
}

public CompletionStage<AdminResult> start() {
return start(Frame.NO_PAYLOAD);
}

public CompletionStage<AdminResult> start(Map<String, ByteBuffer> customPayload) {
LOG.debug("[{}] Executing {}", logPrefix, this);
channel.write(message, false, customPayload, this).addListener(this::onWriteComplete);
return result;
Expand Down Expand Up @@ -158,7 +158,12 @@ private AdminRequestHandler copy(ByteBuffer pagingState) {
QueryOptions newOptions =
buildQueryOptions(currentOptions.pageSize, currentOptions.namedValues, pagingState);
return new AdminRequestHandler(
channel, new Query(current.query, newOptions), timeout, logPrefix, debugString);
channel,
new Query(current.query, newOptions),
customPayload,
timeout,
logPrefix,
debugString);
}

private static QueryOptions buildQueryOptions(
Expand Down
Expand Up @@ -283,9 +283,11 @@ private CompletionStage<Void> prepareOnOtherNode(Node node) {
return CompletableFuture.completedFuture(null);
} else {
AdminRequestHandler handler =
new AdminRequestHandler(channel, message, timeout, logPrefix, message.toString());
new AdminRequestHandler(
channel, message, request.getCustomPayload(), timeout, logPrefix, message.toString());

return handler
.start(request.getCustomPayload())
.start()
.handle(
(result, error) -> {
if (error == null) {
Expand Down
Expand Up @@ -476,11 +476,12 @@ private void processErrorResponse(Error errorMessage) {
new AdminRequestHandler(
channel,
reprepareMessage,
repreparePayload.customPayload,
timeout,
logPrefix,
"Reprepare " + reprepareMessage.toString());
reprepareHandler
.start(repreparePayload.customPayload)
.start()
.handle(
(result, exception) -> {
if (exception != null) {
Expand Down
Expand Up @@ -232,7 +232,7 @@ private void startWorker() {
protected CompletionStage<AdminResult> queryAsync(
Message message, Map<String, ByteBuffer> customPayload, String debugString) {
AdminRequestHandler reprepareHandler =
new AdminRequestHandler(channel, message, timeout, logPrefix, debugString);
return reprepareHandler.start(customPayload);
new AdminRequestHandler(channel, message, customPayload, timeout, logPrefix, debugString);
return reprepareHandler.start();
}
}

0 comments on commit 7057301

Please sign in to comment.