Skip to content

Commit

Permalink
Allow overriding the retry policy per-query
Browse files Browse the repository at this point in the history
  • Loading branch information
Sylvain Lebresne committed Dec 17, 2012
1 parent 1fd4a97 commit b27c524
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 22 deletions.
31 changes: 31 additions & 0 deletions driver-core/src/main/java/com/datastax/driver/core/Query.java
Expand Up @@ -2,6 +2,8 @@

import java.nio.ByteBuffer;

import com.datastax.driver.core.policies.RetryPolicy;

/**
* An executable query.
* <p>
Expand All @@ -17,6 +19,8 @@ public abstract class Query {
private volatile ConsistencyLevel consistency;
private volatile boolean traceQuery;

private volatile RetryPolicy retryPolicy;

// We don't want to expose the constructor, because the code rely on this being only subclassed by Statement and BoundStatement
Query() {
this.consistency = ConsistencyLevel.ONE;
Expand Down Expand Up @@ -91,4 +95,31 @@ public boolean isTracing() {
* @return the routing key for this query or {@code null}.
*/
public abstract ByteBuffer getRoutingKey();

/**
* Sets the retry policy to use for this query.
* <p>
* The default retry policy, if this method is not called, is the one returned by
* {@link Policies#getRetryPolicy} in the cluster configuration. This
* method is thus only useful in case you want to punctually override the
* default policy for this request.
*
* @param policy the retry policy to use for this query.
* @return this {@code Query} object.
*/
public Query setRetryPolicy(RetryPolicy policy) {
this.retryPolicy = policy;
return this;
}

/**
* The retry policy sets for this query, if any.
*
* @return the retry policy sets specifically for this query or {@code null} if no query specific
* retry policy has been set through {@link #setRetryPolicy} (in which case
* the Cluster retry policy will apply if necessary).
*/
public RetryPolicy getRetryPolicy() {
return retryPolicy;
}
}
Expand Up @@ -42,6 +42,7 @@ class RetryingCallback implements Connection.ResponseCallback {
private final TimerContext timerContext;

private final Iterator<Host> queryPlan;
private final Query query;
private volatile Host current;
private volatile HostConnectionPool currentPool;

Expand All @@ -55,6 +56,7 @@ public RetryingCallback(Session.Manager manager, Connection.ResponseCallback cal
this.callback = callback;

this.queryPlan = manager.loadBalancer.newQueryPlan(query);
this.query = query;

this.timerContext = manager.configuration().isMetricsEnabled()
? metrics().getRequestsTimer().time()
Expand Down Expand Up @@ -186,7 +188,10 @@ public void onSet(Connection connection, Message.Response response) {
case ERROR:
ErrorMessage err = (ErrorMessage)response;
RetryPolicy.RetryDecision retry = null;
RetryPolicy retryPolicy = manager.configuration().getPolicies().getRetryPolicy();
RetryPolicy queryRetryPolicy = query.getRetryPolicy();
RetryPolicy retryPolicy = query.getRetryPolicy() == null
? manager.configuration().getPolicies().getRetryPolicy()
: query.getRetryPolicy();
switch (err.error.code()) {
case READ_TIMEOUT:
assert err.error instanceof ReadTimeoutException;
Expand All @@ -195,7 +200,7 @@ public void onSet(Connection connection, Message.Response response) {

ReadTimeoutException rte = (ReadTimeoutException)err.error;
ConsistencyLevel rcl = ConsistencyLevel.from(rte.consistency);
retry = retryPolicy.onReadTimeout(rcl, rte.blockFor, rte.received, rte.dataPresent, queryRetries);
retry = retryPolicy.onReadTimeout(query, rcl, rte.blockFor, rte.received, rte.dataPresent, queryRetries);
break;
case WRITE_TIMEOUT:
assert err.error instanceof WriteTimeoutException;
Expand All @@ -204,7 +209,7 @@ public void onSet(Connection connection, Message.Response response) {

WriteTimeoutException wte = (WriteTimeoutException)err.error;
ConsistencyLevel wcl = ConsistencyLevel.from(wte.consistency);
retry = retryPolicy.onWriteTimeout(wcl, WriteType.from(wte.writeType), wte.blockFor, wte.received, queryRetries);
retry = retryPolicy.onWriteTimeout(query, wcl, WriteType.from(wte.writeType), wte.blockFor, wte.received, queryRetries);
break;
case UNAVAILABLE:
assert err.error instanceof UnavailableException;
Expand All @@ -213,7 +218,7 @@ public void onSet(Connection connection, Message.Response response) {

UnavailableException ue = (UnavailableException)err.error;
ConsistencyLevel ucl = ConsistencyLevel.from(ue.consistency);
retry = retryPolicy.onUnavailable(ucl, ue.required, ue.alive, queryRetries);
retry = retryPolicy.onUnavailable(query, ucl, ue.required, ue.alive, queryRetries);
break;
case OVERLOADED:
// Try another node
Expand Down
Expand Up @@ -35,6 +35,7 @@ private DefaultRetryPolicy() {}
* timeout the dead replica will likely have been detected as dead and
* the retry has a high change of success.
*
* @param query the original query that timeouted.
* @param cl the original consistency level of the read that timeouted.
* @param requiredResponses the number of responses that were required to
* achieve the requested consistency level.
Expand All @@ -46,7 +47,7 @@ private DefaultRetryPolicy() {}
* @return {@code RetryDecision.retry(cl)} if no retry attempt has yet been tried and
* {@code receivedResponses >= requiredResponses && !dataRetrieved}, {@code RetryDecision.rethrow()} otherwise.
*/
public RetryDecision onReadTimeout(ConsistencyLevel cl, int requiredResponses, int receivedResponses, boolean dataRetrieved, int nbRetry) {
public RetryDecision onReadTimeout(Query query, ConsistencyLevel cl, int requiredResponses, int receivedResponses, boolean dataRetrieved, int nbRetry) {
if (nbRetry != 0)
return RetryDecision.rethrow();

Expand All @@ -66,6 +67,7 @@ public RetryDecision onReadTimeout(ConsistencyLevel cl, int requiredResponses, i
* nodes will likely have been detected as dead and the retry has thus a
* high change of success.
*
* @param query the original query that timeouted.
* @param cl the original consistency level of the write that timeouted.
* @param writeType the type of the write that timeouted.
* @param requiredAcks the number of acknowledgments that were required to
Expand All @@ -76,7 +78,7 @@ public RetryDecision onReadTimeout(ConsistencyLevel cl, int requiredResponses, i
* @return {@code RetryDecision.retry(cl)} if no retry attempt has yet been tried and
* {@code writeType == WriteType.BATCH_LOG}, {@code RetryDecision.rethrow()} otherwise.
*/
public RetryDecision onWriteTimeout(ConsistencyLevel cl, WriteType writeType, int requiredAcks, int receivedAcks, int nbRetry) {
public RetryDecision onWriteTimeout(Query query, ConsistencyLevel cl, WriteType writeType, int requiredAcks, int receivedAcks, int nbRetry) {
if (nbRetry != 0)
return RetryDecision.rethrow();

Expand All @@ -91,6 +93,8 @@ public RetryDecision onWriteTimeout(ConsistencyLevel cl, WriteType writeType, in
* This method never retries as a retry on an unavailable exception
* using the same consistency level has almost no change of success.
*
* @param query the original query for which the consistency level cannot
* be achieved.
* @param cl the original consistency level for the operation.
* @param requiredReplica the number of replica that should have been
* (known) alive for the operation to be attempted.
Expand All @@ -99,7 +103,7 @@ public RetryDecision onWriteTimeout(ConsistencyLevel cl, WriteType writeType, in
* @param nbRetry the number of retry already performed for this operation.
* @return {@code RetryDecision.rethrow()}.
*/
public RetryDecision onUnavailable(ConsistencyLevel cl, int requiredReplica, int aliveReplica, int nbRetry) {
public RetryDecision onUnavailable(Query query, ConsistencyLevel cl, int requiredReplica, int aliveReplica, int nbRetry) {
return RetryDecision.rethrow();
}
}
Expand Up @@ -76,6 +76,7 @@ else if (knownOk >= 1)
* retrieve, the operation is retried with the initial consistency
* level. Otherwise, an exception is thrown.
*
* @param query the original query that timeouted.
* @param cl the original consistency level of the read that timeouted.
* @param requiredResponses the number of responses that were required to
* achieve the requested consistency level.
Expand All @@ -86,7 +87,7 @@ else if (knownOk >= 1)
* @param nbRetry the number of retry already performed for this operation.
* @return a RetryDecision as defined above.
*/
public RetryDecision onReadTimeout(ConsistencyLevel cl, int requiredResponses, int receivedResponses, boolean dataRetrieved, int nbRetry) {
public RetryDecision onReadTimeout(Query query, ConsistencyLevel cl, int requiredResponses, int receivedResponses, boolean dataRetrieved, int nbRetry) {
if (nbRetry != 0)
return RetryDecision.rethrow();

Expand All @@ -111,6 +112,7 @@ public RetryDecision onReadTimeout(ConsistencyLevel cl, int requiredResponses, i
* if we know the write has been persisted on at least one replica, we
* ignore the exception. Otherwise, an exception is thrown.
*
* @param query the original query that timeouted.
* @param cl the original consistency level of the write that timeouted.
* @param writeType the type of the write that timeouted.
* @param requiredAcks the number of acknowledgments that were required to
Expand All @@ -120,7 +122,7 @@ public RetryDecision onReadTimeout(ConsistencyLevel cl, int requiredResponses, i
* @param nbRetry the number of retry already performed for this operation.
* @return a RetryDecision as defined above.
*/
public RetryDecision onWriteTimeout(ConsistencyLevel cl, WriteType writeType, int requiredAcks, int receivedAcks, int nbRetry) {
public RetryDecision onWriteTimeout(Query query, ConsistencyLevel cl, WriteType writeType, int requiredAcks, int receivedAcks, int nbRetry) {
if (nbRetry != 0)
return RetryDecision.rethrow();

Expand Down Expand Up @@ -150,6 +152,8 @@ public RetryDecision onWriteTimeout(ConsistencyLevel cl, WriteType writeType, in
* is know to be alive, the operation is retried at a lower consistency
* level.
*
* @param query the original query for which the consistency level cannot
* be achieved.
* @param cl the original consistency level for the operation.
* @param requiredReplica the number of replica that should have been
* (known) alive for the operation to be attempted.
Expand All @@ -158,7 +162,7 @@ public RetryDecision onWriteTimeout(ConsistencyLevel cl, WriteType writeType, in
* @param nbRetry the number of retry already performed for this operation.
* @return a RetryDecision as defined above.
*/
public RetryDecision onUnavailable(ConsistencyLevel cl, int requiredReplica, int aliveReplica, int nbRetry) {
public RetryDecision onUnavailable(Query query, ConsistencyLevel cl, int requiredReplica, int aliveReplica, int nbRetry) {
if (nbRetry != 0)
return RetryDecision.rethrow();

Expand Down
Expand Up @@ -17,6 +17,7 @@ private FallthroughRetryPolicy() {}
/**
* Defines whether to retry and at which consistency level on a read timeout.
*
* @param query the original query that timeouted.
* @param cl the original consistency level of the read that timeouted.
* @param requiredResponses the number of responses that were required to
* achieve the requested consistency level.
Expand All @@ -27,13 +28,14 @@ private FallthroughRetryPolicy() {}
* @param nbRetry the number of retry already performed for this operation.
* @return {@code RetryDecision.rethrow()}.
*/
public RetryDecision onReadTimeout(ConsistencyLevel cl, int requiredResponses, int receivedResponses, boolean dataRetrieved, int nbRetry) {
public RetryDecision onReadTimeout(Query query, ConsistencyLevel cl, int requiredResponses, int receivedResponses, boolean dataRetrieved, int nbRetry) {
return RetryDecision.rethrow();
}

/**
* Defines whether to retry and at which consistency level on a write timeout.
*
* @param query the original query that timeouted.
* @param cl the original consistency level of the write that timeouted.
* @param writeType the type of the write that timeouted.
* @param requiredAcks the number of acknowledgments that were required to
Expand All @@ -43,14 +45,16 @@ public RetryDecision onReadTimeout(ConsistencyLevel cl, int requiredResponses, i
* @param nbRetry the number of retry already performed for this operation.
* @return {@code RetryDecision.rethrow()}.
*/
public RetryDecision onWriteTimeout(ConsistencyLevel cl, WriteType writeType, int requiredAcks, int receivedAcks, int nbRetry) {
public RetryDecision onWriteTimeout(Query query, ConsistencyLevel cl, WriteType writeType, int requiredAcks, int receivedAcks, int nbRetry) {
return RetryDecision.rethrow();
}

/**
* Defines whether to retry and at which consistency level on an
* unavailable exception.
*
* @param query the original query for which the consistency level cannot
* be achieved.
* @param cl the original consistency level for the operation.
* @param requiredReplica the number of replica that should have been
* (known) alive for the operation to be attempted.
Expand All @@ -59,7 +63,7 @@ public RetryDecision onWriteTimeout(ConsistencyLevel cl, WriteType writeType, in
* @param nbRetry the number of retry already performed for this operation.
* @return {@code RetryDecision.rethrow()}.
*/
public RetryDecision onUnavailable(ConsistencyLevel cl, int requiredReplica, int aliveReplica, int nbRetry) {
public RetryDecision onUnavailable(Query query, ConsistencyLevel cl, int requiredReplica, int aliveReplica, int nbRetry) {
return RetryDecision.rethrow();
}
}
Expand Up @@ -32,8 +32,8 @@ private static ConsistencyLevel cl(ConsistencyLevel cl, RetryDecision decision)
return decision.getRetryConsistencyLevel() == null ? cl : decision.getRetryConsistencyLevel();
}

public RetryDecision onReadTimeout(ConsistencyLevel cl, int requiredResponses, int receivedResponses, boolean dataRetrieved, int nbRetry) {
RetryDecision decision = policy.onReadTimeout(cl, requiredResponses, receivedResponses, dataRetrieved, nbRetry);
public RetryDecision onReadTimeout(Query query, ConsistencyLevel cl, int requiredResponses, int receivedResponses, boolean dataRetrieved, int nbRetry) {
RetryDecision decision = policy.onReadTimeout(query, cl, requiredResponses, receivedResponses, dataRetrieved, nbRetry);
switch (decision.getType()) {
case IGNORE:
String f1 = "Ignoring read timeout (initial consistency: %s, required responses: %d, received responses: %d, data retrieved: %b, retries: %d)";
Expand All @@ -47,8 +47,8 @@ public RetryDecision onReadTimeout(ConsistencyLevel cl, int requiredResponses, i
return decision;
}

public RetryDecision onWriteTimeout(ConsistencyLevel cl, WriteType writeType, int requiredAcks, int receivedAcks, int nbRetry) {
RetryDecision decision = policy.onWriteTimeout(cl, writeType, requiredAcks, receivedAcks, nbRetry);
public RetryDecision onWriteTimeout(Query query, ConsistencyLevel cl, WriteType writeType, int requiredAcks, int receivedAcks, int nbRetry) {
RetryDecision decision = policy.onWriteTimeout(query, cl, writeType, requiredAcks, receivedAcks, nbRetry);
switch (decision.getType()) {
case IGNORE:
String f1 = "Ignoring write timeout (initial consistency: %s, write type: %s, required acknowledgments: %d, received acknowledgments: %d, retries: %d)";
Expand All @@ -62,8 +62,8 @@ public RetryDecision onWriteTimeout(ConsistencyLevel cl, WriteType writeType, in
return decision;
}

public RetryDecision onUnavailable(ConsistencyLevel cl, int requiredReplica, int aliveReplica, int nbRetry) {
RetryDecision decision = policy.onUnavailable(cl, requiredReplica, aliveReplica, nbRetry);
public RetryDecision onUnavailable(Query query, ConsistencyLevel cl, int requiredReplica, int aliveReplica, int nbRetry) {
RetryDecision decision = policy.onUnavailable(query, cl, requiredReplica, aliveReplica, nbRetry);
switch (decision.getType()) {
case IGNORE:
String f1 = "Ignoring unavailable exception (initial consistency: %s, required replica: %d, alive replica: %d, retries: %d)";
Expand Down
Expand Up @@ -95,6 +95,7 @@ public static RetryDecision ignore() {
* {@code false} (see
* {@link com.datastax.driver.core.exceptions.ReadTimeoutException#wasDataRetrieved}).
*
* @param query the original query that timeouted.
* @param cl the original consistency level of the read that timeouted.
* @param requiredResponses the number of responses that were required to
* achieve the requested consistency level.
Expand All @@ -107,11 +108,12 @@ public static RetryDecision ignore() {
* a {@link com.datastax.driver.core.exceptions.ReadTimeoutException} will
* be thrown for the operation.
*/
public RetryDecision onReadTimeout(ConsistencyLevel cl, int requiredResponses, int receivedResponses, boolean dataRetrieved, int nbRetry);
public RetryDecision onReadTimeout(Query query, ConsistencyLevel cl, int requiredResponses, int receivedResponses, boolean dataRetrieved, int nbRetry);

/**
* Defines whether to retry and at which consistency level on a write timeout.
*
* @param query the original query that timeouted.
* @param cl the original consistency level of the write that timeouted.
* @param writeType the type of the write that timeouted.
* @param requiredAcks the number of acknowledgments that were required to
Expand All @@ -123,12 +125,14 @@ public static RetryDecision ignore() {
* a {@link com.datastax.driver.core.exceptions.WriteTimeoutException} will
* be thrown for the operation.
*/
public RetryDecision onWriteTimeout(ConsistencyLevel cl, WriteType writeType, int requiredAcks, int receivedAcks, int nbRetry);
public RetryDecision onWriteTimeout(Query query, ConsistencyLevel cl, WriteType writeType, int requiredAcks, int receivedAcks, int nbRetry);

/**
* Defines whether to retry and at which consistency level on an
* unavailable exception.
*
* @param query the original query for which the consistency level cannot
* be achieved.
* @param cl the original consistency level for the operation.
* @param requiredReplica the number of replica that should have been
* (known) alive for the operation to be attempted.
Expand All @@ -139,5 +143,5 @@ public static RetryDecision ignore() {
* an {@link com.datastax.driver.core.exceptions.UnavailableException} will
* be thrown for the operation.
*/
public RetryDecision onUnavailable(ConsistencyLevel cl, int requiredReplica, int aliveReplica, int nbRetry);
public RetryDecision onUnavailable(Query query, ConsistencyLevel cl, int requiredReplica, int aliveReplica, int nbRetry);
}

0 comments on commit b27c524

Please sign in to comment.