Skip to content

Commit

Permalink
JAVA-1917: Add ability to set node on statement (#1086)
Browse files Browse the repository at this point in the history
  • Loading branch information
GregBestland committed Aug 28, 2018
1 parent fe1407d commit fe1094a
Show file tree
Hide file tree
Showing 20 changed files with 440 additions and 67 deletions.
1 change: 1 addition & 0 deletions changelog/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### 4.0.0-beta2 (in progress)

- [new feature] JAVA-1917: Add ability to set node on statement
- [improvement] JAVA-1916: Base TimestampCodec.parse on java.util.Date.
- [improvement] JAVA-1940: Clean up test resources when CCM integration tests finish
- [bug] JAVA-1938: Make CassandraSchemaQueries classes public
Expand Down
22 changes: 22 additions & 0 deletions core/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,28 @@
"oldArchive": "com.datastax.oss:java-driver-core:jar:4.0.0-beta1",
"elementKind": "method",
"justification": "Renamed context getters for uniformity"
},
{
"code": "java.method.addedToInterface",
"new": "method com.datastax.oss.driver.api.core.metadata.Node com.datastax.oss.driver.api.core.session.Request::getNode()",
"package": "com.datastax.oss.driver.api.core.session",
"classQualifiedName": "com.datastax.oss.driver.api.core.session.Request",
"classSimpleName": "Request",
"methodName": "getNode",
"newArchive": "com.datastax.oss:java-driver-core:jar:4.0.0-beta2-SNAPSHOT",
"elementKind": "method",
"justification": "Add ability to query specific nodes for virtual tables"
},
{
"code": "java.method.addedToInterface",
"new": "method T com.datastax.oss.driver.api.core.cql.Statement<T extends com.datastax.oss.driver.api.core.cql.Statement<T extends com.datastax.oss.driver.api.core.cql.Statement<T>>>::setNode(com.datastax.oss.driver.api.core.metadata.Node)",
"package": "com.datastax.oss.driver.api.core.cql",
"classQualifiedName": "com.datastax.oss.driver.api.core.cql.Statement",
"classSimpleName": "Statement",
"methodName": "setNode",
"newArchive": "com.datastax.oss:java-driver-core:jar:4.0.0-beta2-SNAPSHOT",
"elementKind": "method",
"justification": "Add ability to query specific nodes for virtual tables"
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,13 @@ default CompletionStage<AsyncResultSet> executeAsync(@NonNull String query) {
* <li>{@link Request#getRoutingKeyspace() boundStatement.getRoutingKeyspace()} is set from
* either {@link Request#getKeyspace() simpleStatement.getKeyspace()} (if it's not {@code
* null}), or {@code simpleStatement.getRoutingKeyspace()};
* <li>on the other hand, {@link Statement#getTimestamp() boundStatement.getTimestamp()} is
* <b>not</b> copied from the simple statement. It will be set to {@link Long#MIN_VALUE},
* meaning that the value will be assigned by the session's timestamp generator.
* <li>on the other hand, the following attributes are <b>not</b> propagated:
* <ul>
* <li>{@link Statement#getTimestamp() boundStatement.getTimestamp()} will be set to
* {@link Long#MIN_VALUE}, meaning that the value will be assigned by the session's
* timestamp generator.
* <li>{@link Statement#getNode() boundStatement.getNode()} will always be {@code null}.
* </ul>
* </ul>
*
* If you want to customize this behavior, you can write your own implementation of {@link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ static BatchStatement newInstance(@NonNull BatchType batchType) {
Integer.MIN_VALUE,
null,
null,
null,
null);
}

Expand Down Expand Up @@ -87,6 +88,7 @@ static BatchStatement newInstance(
Integer.MIN_VALUE,
null,
null,
null,
null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ public BatchStatement build() {
pageSize,
consistencyLevel,
serialConsistencyLevel,
timeout);
timeout,
node);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public BoundStatementBuilder(@NonNull BoundStatement template) {
this.values = template.getValues().toArray(new ByteBuffer[this.variableDefinitions.size()]);
this.codecRegistry = template.codecRegistry();
this.protocolVersion = template.protocolVersion();
this.node = template.getNode();
}

@Override
Expand Down Expand Up @@ -167,6 +168,7 @@ public BoundStatement build() {
serialConsistencyLevel,
timeout,
codecRegistry,
protocolVersion);
protocolVersion,
node);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ static SimpleStatement newInstance(@NonNull String cqlQuery) {
Integer.MIN_VALUE,
null,
null,
null,
null);
}

Expand Down Expand Up @@ -107,6 +108,7 @@ static SimpleStatement newInstance(
Integer.MIN_VALUE,
null,
null,
null,
null);
}

Expand Down Expand Up @@ -134,6 +136,7 @@ static SimpleStatement newInstance(
Integer.MIN_VALUE,
null,
null,
null,
null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ public SimpleStatement build() {
pageSize,
consistencyLevel,
serialConsistencyLevel,
timeout);
timeout,
node);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.NoNodeAvailableException;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.context.DriverContext;
import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy;
import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.token.Token;
import com.datastax.oss.driver.api.core.session.Request;
import com.datastax.oss.driver.api.core.session.Session;
Expand Down Expand Up @@ -93,6 +97,30 @@ public interface Statement<T extends Statement<T>> extends Request {
@NonNull
T setRoutingKeyspace(@Nullable CqlIdentifier newRoutingKeyspace);

/**
* Sets the {@link Node} that should handle this query.
*
* <p>In the general case, use of this method is <em>heavily discouraged</em> and should only be
* used in the following cases:
*
* <ol>
* <li>Querying node-local tables, such as tables in the {@code system} and {@code system_views}
* keyspaces.
* <li>Applying a series of schema changes, where it may be advantageous to execute schema
* changes in sequence on the same node.
* </ol>
*
* <p>Configuring a specific node causes the configured {@link LoadBalancingPolicy} to be
* completely bypassed. However, if the load balancing policy dictates that the node is at
* distance {@link NodeDistance#IGNORED} or there is no active connectivity to the node, the
* request will fail with a {@link NoNodeAvailableException}.
*
* @param node The node that should be used to handle executions of this statement or null to
* delegate to the configured load balancing policy.
*/
@NonNull
T setNode(@Nullable Node node);

/**
* Shortcut for {@link #setRoutingKeyspace(CqlIdentifier)
* setRoutingKeyspace(CqlIdentifier.fromCql(newRoutingKeyspaceName))}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.token.Token;
import com.datastax.oss.protocol.internal.util.collection.NullAllowingImmutableMap;
import edu.umd.cs.findbugs.annotations.NonNull;
Expand Down Expand Up @@ -54,6 +55,7 @@ public abstract class StatementBuilder<T extends StatementBuilder<T, S>, S exten
@Nullable protected ConsistencyLevel consistencyLevel;
@Nullable protected ConsistencyLevel serialConsistencyLevel;
@Nullable protected Duration timeout;
@Nullable protected Node node;

protected StatementBuilder() {
// nothing to do
Expand All @@ -78,6 +80,7 @@ protected StatementBuilder(S template) {
this.consistencyLevel = template.getConsistencyLevel();
this.serialConsistencyLevel = template.getSerialConsistencyLevel();
this.timeout = template.getTimeout();
this.node = template.getNode();
}

/** @see Statement#setExecutionProfileName(String) */
Expand Down Expand Up @@ -199,6 +202,12 @@ public T withTimeout(@Nullable Duration timeout) {
return self;
}

/** @see Statement#setNode(Node) */
public T withNode(@Nullable Node node) {
this.node = node;
return self;
}

@NonNull
protected Map<String, ByteBuffer> buildCustomPayload() {
return (customPayloadBuilder == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverConfig;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.token.Token;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
Expand Down Expand Up @@ -160,4 +161,8 @@ public interface Request {
*/
@Nullable
Duration getTimeout();

/** @return The node configured on this statement, or null if none is configured. */
@Nullable
Node getNode();
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import com.datastax.oss.driver.internal.core.session.DefaultSession;
import com.datastax.oss.driver.internal.core.session.RepreparePayload;
import com.datastax.oss.driver.internal.core.util.Loggers;
import com.datastax.oss.driver.internal.core.util.collection.QueryPlan;
import com.datastax.oss.protocol.internal.Frame;
import com.datastax.oss.protocol.internal.Message;
import com.datastax.oss.protocol.internal.ProtocolConstants;
Expand Down Expand Up @@ -153,10 +154,15 @@ protected CqlRequestHandlerBase(
? config.getDefaultProfile()
: config.getProfile(profileName);
}
this.queryPlan =
context
.getLoadBalancingPolicyWrapper()
.newQueryPlan(statement, executionProfile.getName(), session);
if (this.statement.getNode() != null) {
this.queryPlan = new QueryPlan(this.statement.getNode());

} else {
this.queryPlan =
context
.getLoadBalancingPolicyWrapper()
.newQueryPlan(statement, executionProfile.getName(), session);
}
this.retryPolicy = context.getRetryPolicy(executionProfile.getName());
this.speculativeExecutionPolicy =
context.getSpeculativeExecutionPolicy(executionProfile.getName());
Expand Down
Loading

0 comments on commit fe1094a

Please sign in to comment.