Skip to content

Commit

Permalink
JAVA-1638: Check schema agreement
Browse files Browse the repository at this point in the history
  • Loading branch information
olim7t committed Oct 13, 2017
1 parent 21d7a7a commit fa7625e
Show file tree
Hide file tree
Showing 18 changed files with 786 additions and 62 deletions.
1 change: 1 addition & 0 deletions changelog/README.md
Expand Up @@ -4,6 +4,7 @@

### 4.0.0-alpha2 (in progress)

- [new feature] JAVA-1638: Check schema agreement
- [new feature] JAVA-1494: Implement Snappy and LZ4 compression
- [new feature] JAVA-1514: Port Uuids utility class
- [new feature] JAVA-1520: Add node state listeners
Expand Down
37 changes: 37 additions & 0 deletions core/src/main/java/com/datastax/oss/driver/api/core/Cluster.java
Expand Up @@ -19,6 +19,7 @@
import com.datastax.oss.driver.api.core.context.DriverContext;
import com.datastax.oss.driver.api.core.metadata.Metadata;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.NodeState;
import com.datastax.oss.driver.api.core.metadata.NodeStateListener;
import com.datastax.oss.driver.api.core.metadata.schema.SchemaChangeListener;
import com.datastax.oss.driver.api.core.session.CqlSession;
Expand Down Expand Up @@ -117,6 +118,42 @@ default Metadata refreshSchema() {
return CompletableFutures.getUninterruptibly(refreshSchemaAsync());
}

/**
* Checks if all nodes in the cluster agree on a common schema version.
*
* <p>Due to the distributed nature of Cassandra, schema changes made on one node might not be
* immediately visible to others. Under certain circumstances, the driver waits until all nodes
* agree on a common schema version (namely: before a schema refresh, before repreparing all
* queries on a newly up node, and before completing a successful schema-altering query). To do
* so, it queries system tables to find out the schema version of all nodes that are currently
* {@link NodeState#UP UP}. If all the versions match, the check succeeds, otherwise it is retried
* periodically, until a given timeout (specified in the configuration).
*
* <p>A schema agreement failure is not fatal, but it might produce unexpected results (for
* example, getting an "unconfigured table" error for a table that you created right before, just
* because the two queries went to different coordinators).
*
* <p>Note that schema agreement never succeeds in a mixed-version cluster (it would be
* challenging because the way the schema version is computed varies across server versions); the
* assumption is that schema updates are unlikely to happen during a rolling upgrade anyway.
*
* @return a future that completes with {@code true} if the nodes agree, or {@code false} if the
* timeout fired.
* @see CoreDriverOption#CONTROL_CONNECTION_AGREEMENT_INTERVAL
* @see CoreDriverOption#CONTROL_CONNECTION_AGREEMENT_TIMEOUT
*/
CompletionStage<Boolean> checkSchemaAgreementAsync();

/**
* Convenience method to call {@link #checkSchemaAgreementAsync()} and block for the result.
*
* <p>This must not be called on a driver thread.
*/
default boolean checkSchemaAgreement() {
BlockingOperation.checkNotDriverThread();
return CompletableFutures.getUninterruptibly(checkSchemaAgreementAsync());
}

/** Returns a context that provides access to all the policies used by this driver instance. */
DriverContext getContext();

Expand Down
Expand Up @@ -51,7 +51,12 @@ public enum CoreDriverOption implements DriverOption {
RELATIVE_SPECULATIVE_EXECUTION_DELAY("delay", false),

CONTROL_CONNECTION_TIMEOUT("connection.control-connection.timeout", true),
CONTROL_CONNECTION_PAGE_SIZE("connection.control-connection.page-size", true),
CONTROL_CONNECTION_AGREEMENT_INTERVAL(
"connection.control-connection.schema-agreement.interval", true),
CONTROL_CONNECTION_AGREEMENT_TIMEOUT(
"connection.control-connection.schema-agreement.timeout", true),
CONTROL_CONNECTION_AGREEMENT_WARN(
"connection.control-connection.schema-agreement.warn-on-failure", true),

COALESCER_ROOT("connection.coalescer", true),
RELATIVE_COALESCER_MAX_RUNS("max-runs-with-no-work", false),
Expand Down
Expand Up @@ -15,7 +15,9 @@
*/
package com.datastax.oss.driver.api.core.cql;

import com.datastax.oss.driver.api.core.Cluster;
import com.datastax.oss.driver.api.core.CoreProtocolVersion;
import com.datastax.oss.driver.api.core.config.CoreDriverOption;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.specex.SpeculativeExecutionPolicy;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -92,4 +94,22 @@ public interface ExecutionInfo {
* versions, this map will always be empty.
*/
Map<String, ByteBuffer> getIncomingPayload();

/**
* Whether the cluster reached schema agreement after the execution of this query.
*
* <p>After a successful schema-altering query (ex: creating a table), the driver will check if
* the cluster's nodes agree on the new schema version. If not, it will keep retrying a few times
* (the retry delay and timeout are set through the configuration).
*
* <p>If this method returns {@code false}, clients can call {@link
* Cluster#checkSchemaAgreement()} later to perform the check manually.
*
* <p>Schema agreement is only checked for schema-altering queries. For other query types, this
* method will always return {@code true}.
*
* @see CoreDriverOption#CONTROL_CONNECTION_AGREEMENT_INTERVAL
* @see CoreDriverOption#CONTROL_CONNECTION_AGREEMENT_TIMEOUT
*/
boolean isSchemaInAgreement();
}
Expand Up @@ -61,6 +61,11 @@ public CompletionStage<Metadata> refreshSchemaAsync() {
return delegate.refreshSchemaAsync();
}

@Override
public CompletionStage<Boolean> checkSchemaAgreementAsync() {
return delegate.checkSchemaAgreementAsync();
}

@Override
public DriverContext getContext() {
return delegate.getContext();
Expand Down
Expand Up @@ -107,6 +107,11 @@ public CompletionStage<Metadata> refreshSchemaAsync() {
return metadataManager.refreshSchema(null, true, true);
}

@Override
public CompletionStage<Boolean> checkSchemaAgreementAsync() {
return context.topologyMonitor().checkSchemaAgreement();
}

@Override
public DriverContext getContext() {
return context;
Expand Down
Expand Up @@ -291,9 +291,13 @@ private void cancelScheduledTasks() {
}

private void setFinalResult(
Result resultMessage, Frame responseFrame, NodeResponseCallback callback) {
Result resultMessage,
Frame responseFrame,
boolean schemaInAgreement,
NodeResponseCallback callback) {
try {
ExecutionInfo executionInfo = buildExecutionInfo(callback, resultMessage, responseFrame);
ExecutionInfo executionInfo =
buildExecutionInfo(callback, resultMessage, responseFrame, schemaInAgreement);
AsyncResultSet resultSet =
Conversions.toResultSet(resultMessage, executionInfo, session, context);
if (result.complete(resultSet)) {
Expand All @@ -310,7 +314,10 @@ private void setFinalResult(
}

private ExecutionInfo buildExecutionInfo(
NodeResponseCallback callback, Result resultMessage, Frame responseFrame) {
NodeResponseCallback callback,
Result resultMessage,
Frame responseFrame,
boolean schemaInAgreement) {
ByteBuffer pagingState =
(resultMessage instanceof Rows) ? ((Rows) resultMessage).getMetadata().pagingState : null;
return new DefaultExecutionInfo(
Expand All @@ -320,7 +327,8 @@ private ExecutionInfo buildExecutionInfo(
callback.execution,
errors,
pagingState,
responseFrame);
responseFrame,
schemaInAgreement);
}

private void setFinalError(Throwable error) {
Expand Down Expand Up @@ -394,16 +402,19 @@ public void onResponse(Frame responseFrame) {
try {
Message responseMessage = responseFrame.message;
if (responseMessage instanceof SchemaChange) {
// TODO schema agreement, and chain setFinalResult to the result
SchemaChange schemaChange = (SchemaChange) responseMessage;
context
.metadataManager()
.refreshSchema(schemaChange.keyspace, false, false)
.topologyMonitor()
.checkSchemaAgreement()
.thenCombine(
context.metadataManager().refreshSchema(schemaChange.keyspace, false, false),
(schemaInAgreement, metadata) -> schemaInAgreement)
.whenComplete(
((metadata, error) -> setFinalResult(schemaChange, responseFrame, this)));
((schemaInAgreement, error) ->
setFinalResult(schemaChange, responseFrame, schemaInAgreement, this)));
} else if (responseMessage instanceof Result) {
LOG.debug("[{}] Got result, completing", logPrefix);
setFinalResult((Result) responseMessage, responseFrame, this);
setFinalResult((Result) responseMessage, responseFrame, true, this);
} else if (responseMessage instanceof Error) {
LOG.debug("[{}] Got error response, processing", logPrefix);
processErrorResponse((Error) responseMessage);
Expand Down Expand Up @@ -518,7 +529,7 @@ private void processRetryDecision(RetryDecision decision, Throwable error) {
setFinalError(error);
break;
case IGNORE:
setFinalResult(Void.INSTANCE, null, this);
setFinalResult(Void.INSTANCE, null, true, this);
break;
}
}
Expand Down
Expand Up @@ -36,6 +36,7 @@ public class DefaultExecutionInfo implements ExecutionInfo {
private final UUID tracingId;
private final List<String> warnings;
private final Map<String, ByteBuffer> customPayload;
private final boolean schemaInAgreement;

public DefaultExecutionInfo(
Statement<?> statement,
Expand All @@ -44,7 +45,8 @@ public DefaultExecutionInfo(
int successfulExecutionIndex,
List<Map.Entry<Node, Throwable>> errors,
ByteBuffer pagingState,
Frame frame) {
Frame frame,
boolean schemaInAgreement) {
this.statement = statement;
this.coordinator = coordinator;
this.speculativeExecutionCount = speculativeExecutionCount;
Expand All @@ -56,6 +58,7 @@ public DefaultExecutionInfo(
// Note: the collections returned by the protocol layer are already unmodifiable
this.warnings = (frame == null) ? Collections.emptyList() : frame.warnings;
this.customPayload = (frame == null) ? Collections.emptyMap() : frame.customPayload;
this.schemaInAgreement = schemaInAgreement;
}

@Override
Expand Down Expand Up @@ -99,4 +102,9 @@ public List<String> getWarnings() {
public Map<String, ByteBuffer> getIncomingPayload() {
return customPayload;
}

@Override
public boolean isSchemaInAgreement() {
return schemaInAgreement;
}
}
Expand Up @@ -54,6 +54,7 @@ public class DefaultTopologyMonitor implements TopologyMonitor {
private static final int INFINITE_PAGE_SIZE = -1;

private final String logPrefix;
private final InternalDriverContext context;
private final ControlConnection controlConnection;
private final AddressTranslator addressTranslator;
private final Duration timeout;
Expand All @@ -63,6 +64,7 @@ public class DefaultTopologyMonitor implements TopologyMonitor {

public DefaultTopologyMonitor(InternalDriverContext context) {
this.logPrefix = context.clusterName();
this.context = context;
this.controlConnection = context.controlConnection();
this.addressTranslator = context.addressTranslator();
DriverConfigProfile config = context.config().getDefaultProfile();
Expand Down Expand Up @@ -146,6 +148,15 @@ public CompletionStage<Iterable<NodeInfo>> refreshNodeList() {
});
}

@Override
public CompletionStage<Boolean> checkSchemaAgreement() {
if (closeFuture.isDone()) {
return CompletableFuture.completedFuture(true);
}
DriverChannel channel = controlConnection.channel();
return new SchemaAgreementChecker(channel, context, port, logPrefix).run();
}

@Override
public CompletionStage<Void> closeFuture() {
return closeFuture;
Expand Down
Expand Up @@ -350,8 +350,9 @@ private void startSchemaRequest(CompletableFuture<Metadata> future) {
currentSchemaRefresh = future;
LOG.debug("[{}] Starting schema refresh", logPrefix);
maybeInitControlConnection()
.thenCompose(v -> context.topologyMonitor().checkSchemaAgreement())
// 1. Query system tables
.thenCompose(v -> schemaQueriesFactory.newInstance(future).execute())
.thenCompose(b -> schemaQueriesFactory.newInstance(future).execute())
// 2. Parse the rows into metadata objects, put them in a MetadataRefresh
// 3. Apply the MetadataRefresh
.thenApplyAsync(this::parseAndApplySchemaRows, adminExecutor)
Expand Down

0 comments on commit fa7625e

Please sign in to comment.