Skip to content

Commit

Permalink
JAVA-1536: Add request throttling
Browse files Browse the repository at this point in the history
  • Loading branch information
olim7t committed Mar 13, 2018
1 parent 7057301 commit f1527f8
Show file tree
Hide file tree
Showing 30 changed files with 1,837 additions and 30 deletions.
1 change: 1 addition & 0 deletions changelog/README.md
Expand Up @@ -4,6 +4,7 @@

### 4.0.0-alpha4 (in progress)

- [new feature] JAVA-1536: Add request throttling
- [improvement] JAVA-1772: Revisit multi-response callbacks
- [new feature] JAVA-1537: Add remaining socket options
- [bug] JAVA-1756: Propagate custom payload when preparing a statement
Expand Down
@@ -0,0 +1,35 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.driver.api.core;

/**
* Thrown if the session uses a request throttler, and it didn't allow the current request to
* execute.
*
* <p>This can happen either when the session is overloaded, or at shutdown for requests that had
* been enqueued.
*/
public class RequestThrottlingException extends DriverException {

public RequestThrottlingException(String message) {
super(message, null, true);
}

@Override
public DriverException copy() {
return new RequestThrottlingException(getMessage());
}
}
Expand Up @@ -61,6 +61,11 @@ public enum DefaultDriverOption implements DriverOption {
REQUEST_TRACE_ATTEMPTS("request.trace.attempts", true),
REQUEST_TRACE_INTERVAL("request.trace.interval", true),
REQUEST_TRACE_CONSISTENCY("request.trace.consistency", true),
REQUEST_THROTTLER_CLASS("request.throttler.class", true),
REQUEST_THROTTLER_MAX_CONCURRENT_REQUESTS("request.throttler.max-concurrent-requests", false),
REQUEST_THROTTLER_MAX_REQUESTS_PER_SECOND("request.throttler.max-requests-per-second", false),
REQUEST_THROTTLER_MAX_QUEUE_SIZE("request.throttler.max-queue-size", false),
REQUEST_THROTTLER_DRAIN_INTERVAL("request.throttler.drain-interval", false),

CONTROL_CONNECTION_TIMEOUT("connection.control-connection.timeout", true),
CONTROL_CONNECTION_AGREEMENT_INTERVAL(
Expand Down Expand Up @@ -121,6 +126,9 @@ public enum DefaultDriverOption implements DriverOption {
METRICS_SESSION_CQL_REQUESTS_HIGHEST("metrics.session.cql-requests.highest-latency", false),
METRICS_SESSION_CQL_REQUESTS_DIGITS("metrics.session.cql-requests.significant-digits", false),
METRICS_SESSION_CQL_REQUESTS_INTERVAL("metrics.session.cql-requests.refresh-interval", false),
METRICS_SESSION_THROTTLING_HIGHEST("metrics.session.throttling.delay.highest-latency", false),
METRICS_SESSION_THROTTLING_DIGITS("metrics.session.throttling.delay.significant-digits", false),
METRICS_SESSION_THROTTLING_INTERVAL("metrics.session.throttling.delay.refresh-interval", false),
METRICS_NODE_CQL_MESSAGES_HIGHEST("metrics.node.cql-messages.highest-latency", false),
METRICS_NODE_CQL_MESSAGES_DIGITS("metrics.node.cql-messages.significant-digits", false),
METRICS_NODE_CQL_MESSAGES_INTERVAL("metrics.node.cql-messages.refresh-interval", false),
Expand Down
Expand Up @@ -23,6 +23,9 @@ public enum DefaultSessionMetric implements SessionMetric {
CONNECTED_NODES("connected-nodes"),
CQL_REQUESTS("cql-requests"),
CQL_CLIENT_TIMEOUTS("cql-client-timeouts"),
THROTTLING_DELAY("throttling.delay"),
THROTTLING_QUEUE_SIZE("throttling.queue-size"),
THROTTLING_ERRORS("throttling.errors"),
;

private static final Map<String, DefaultSessionMetric> BY_PATH = sortByPath();
Expand Down
Expand Up @@ -77,7 +77,7 @@ public static AdminRequestHandler query(
private final Duration timeout;
private final String logPrefix;
private final String debugString;
private final CompletableFuture<AdminResult> result = new CompletableFuture<>();
protected final CompletableFuture<AdminResult> result = new CompletableFuture<>();

// This is only ever accessed on the channel's event loop, so it doesn't need to be volatile
private ScheduledFuture<?> timeoutFuture;
Expand Down Expand Up @@ -110,12 +110,12 @@ private void onWriteComplete(Future<? super Void> future) {
channel.eventLoop().schedule(this::fireTimeout, timeout.toNanos(), TimeUnit.NANOSECONDS);
timeoutFuture.addListener(UncaughtExceptions::log);
} else {
result.completeExceptionally(future.cause());
setFinalError(future.cause());
}
}

private void fireTimeout() {
result.completeExceptionally(
setFinalError(
new DriverTimeoutException(String.format("%s timed out after %s", debugString, timeout)));
if (!channel.closeFuture().isDone()) {
channel.cancel(this);
Expand All @@ -127,7 +127,7 @@ public void onFailure(Throwable error) {
if (timeoutFuture != null) {
timeoutFuture.cancel(true);
}
result.completeExceptionally(error);
setFinalError(error);
}

@Override
Expand All @@ -141,16 +141,24 @@ public void onResponse(Frame responseFrame) {
Rows rows = (Rows) message;
ByteBuffer pagingState = rows.getMetadata().pagingState;
AdminRequestHandler nextHandler = (pagingState == null) ? null : this.copy(pagingState);
result.complete(new AdminResult(rows, nextHandler, channel.protocolVersion()));
setFinalResult(new AdminResult(rows, nextHandler, channel.protocolVersion()));
} else if (message instanceof Prepared) {
// Internal prepares are only "reprepare on up" types of queries, where we only care about
// success, not the actual result, so this is good enough:
result.complete(null);
setFinalResult(null);
} else {
result.completeExceptionally(new UnexpectedResponseException(debugString, message));
setFinalError(new UnexpectedResponseException(debugString, message));
}
}

protected boolean setFinalResult(AdminResult result) {
return this.result.complete(result);
}

protected boolean setFinalError(Throwable error) {
return result.completeExceptionally(error);
}

private AdminRequestHandler copy(ByteBuffer pagingState) {
assert message instanceof Query;
Query current = (Query) this.message;
Expand Down
@@ -0,0 +1,98 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.driver.internal.core.adminrequest;

import com.datastax.oss.driver.api.core.DriverTimeoutException;
import com.datastax.oss.driver.api.core.RequestThrottlingException;
import com.datastax.oss.driver.api.core.metrics.DefaultSessionMetric;
import com.datastax.oss.driver.internal.core.channel.DriverChannel;
import com.datastax.oss.driver.internal.core.metrics.SessionMetricUpdater;
import com.datastax.oss.driver.internal.core.session.throttling.RequestThrottler;
import com.datastax.oss.driver.internal.core.session.throttling.Throttled;
import com.datastax.oss.protocol.internal.Message;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;

public class ThrottledAdminRequestHandler extends AdminRequestHandler implements Throttled {

private final long startTimeNanos;
private final RequestThrottler throttler;
private final SessionMetricUpdater metricUpdater;

public ThrottledAdminRequestHandler(
DriverChannel channel,
Message message,
Map<String, ByteBuffer> customPayload,
Duration timeout,
RequestThrottler throttler,
SessionMetricUpdater metricUpdater,
String logPrefix,
String debugString) {
super(channel, message, customPayload, timeout, logPrefix, debugString);
this.startTimeNanos = System.nanoTime();
this.throttler = throttler;
this.metricUpdater = metricUpdater;
}

@Override
public CompletionStage<AdminResult> start() {
// Don't write request yet, wait for green light from throttler
throttler.register(this);
return result;
}

@Override
public void onThrottleReady(boolean wasDelayed) {
if (wasDelayed) {
metricUpdater.updateTimer(
DefaultSessionMetric.THROTTLING_DELAY,
System.nanoTime() - startTimeNanos,
TimeUnit.NANOSECONDS);
}
super.start();
}

@Override
public void onThrottleFailure(RequestThrottlingException error) {
metricUpdater.incrementCounter(DefaultSessionMetric.THROTTLING_ERRORS);
setFinalError(error);
}

@Override
protected boolean setFinalResult(AdminResult result) {
boolean wasSet = super.setFinalResult(result);
if (wasSet) {
throttler.signalSuccess(this);
}
return wasSet;
}

@Override
protected boolean setFinalError(Throwable error) {
boolean wasSet = super.setFinalError(error);
if (wasSet) {
if (error instanceof DriverTimeoutException) {
throttler.signalTimeout(this);
} else if (!(error instanceof RequestThrottlingException)) {
throttler.signalError(this, error);
}
}
return wasSet;
}
}
Expand Up @@ -59,6 +59,7 @@
import com.datastax.oss.driver.internal.core.servererrors.WriteTypeRegistry;
import com.datastax.oss.driver.internal.core.session.PoolManager;
import com.datastax.oss.driver.internal.core.session.RequestProcessorRegistry;
import com.datastax.oss.driver.internal.core.session.throttling.RequestThrottler;
import com.datastax.oss.driver.internal.core.ssl.JdkSslHandlerFactory;
import com.datastax.oss.driver.internal.core.ssl.SslHandlerFactory;
import com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry;
Expand Down Expand Up @@ -163,6 +164,8 @@ public class DefaultDriverContext implements InternalDriverContext {
new LazyReference<>("metricRegistry", this::buildMetricRegistry, cycleDetector);
private final LazyReference<MetricUpdaterFactory> metricUpdaterFactoryRef =
new LazyReference<>("metricUpdaterFactory", this::buildMetricUpdaterFactory, cycleDetector);
private final LazyReference<RequestThrottler> requestThrottlerRef =
new LazyReference<>("requestThrottler", this::buildRequestThrottler, cycleDetector);

private final DriverConfig config;
private final DriverConfigLoader configLoader;
Expand Down Expand Up @@ -369,6 +372,17 @@ protected MetricUpdaterFactory buildMetricUpdaterFactory() {
return new DefaultMetricUpdaterFactory(this);
}

protected RequestThrottler buildRequestThrottler() {
return Reflection.buildFromConfig(
this, DefaultDriverOption.REQUEST_THROTTLER_CLASS, RequestThrottler.class)
.orElseThrow(
() ->
new IllegalArgumentException(
String.format(
"Missing request throttler, check your configuration (%s)",
DefaultDriverOption.REQUEST_THROTTLER_CLASS)));
}

@Override
public String sessionName() {
return sessionName;
Expand Down Expand Up @@ -539,6 +553,11 @@ public MetricUpdaterFactory metricUpdaterFactory() {
return metricUpdaterFactoryRef.get();
}

@Override
public RequestThrottler requestThrottler() {
return requestThrottlerRef.get();
}

@Override
public CodecRegistry codecRegistry() {
return codecRegistry;
Expand Down
Expand Up @@ -34,6 +34,7 @@
import com.datastax.oss.driver.internal.core.servererrors.WriteTypeRegistry;
import com.datastax.oss.driver.internal.core.session.PoolManager;
import com.datastax.oss.driver.internal.core.session.RequestProcessorRegistry;
import com.datastax.oss.driver.internal.core.session.throttling.RequestThrottler;
import com.datastax.oss.driver.internal.core.ssl.SslHandlerFactory;
import com.datastax.oss.protocol.internal.Compressor;
import com.datastax.oss.protocol.internal.FrameCodec;
Expand Down Expand Up @@ -88,4 +89,6 @@ public interface InternalDriverContext extends DriverContext {
PoolManager poolManager();

MetricUpdaterFactory metricUpdaterFactory();

RequestThrottler requestThrottler();
}

0 comments on commit f1527f8

Please sign in to comment.