From fa7625e6c3c52b8f7eff66bacfc0ddef3719630c Mon Sep 17 00:00:00 2001 From: olim7t Date: Wed, 4 Oct 2017 17:42:54 -0700 Subject: [PATCH] JAVA-1638: Check schema agreement --- changelog/README.md | 1 + .../datastax/oss/driver/api/core/Cluster.java | 37 ++ .../api/core/config/CoreDriverOption.java | 7 +- .../driver/api/core/cql/ExecutionInfo.java | 20 ++ .../driver/internal/core/ClusterWrapper.java | 5 + .../driver/internal/core/DefaultCluster.java | 5 + .../core/cql/CqlRequestHandlerBase.java | 31 +- .../core/cql/DefaultExecutionInfo.java | 10 +- .../core/metadata/DefaultTopologyMonitor.java | 11 + .../core/metadata/MetadataManager.java | 3 +- .../core/metadata/SchemaAgreementChecker.java | 210 ++++++++++++ .../core/metadata/TopologyMonitor.java | 8 + .../internal/core/session/DefaultSession.java | 2 +- .../internal/core/session/ReprepareOnUp.java | 68 ++-- .../driver/internal/core/util/NanoTime.java | 28 +- core/src/main/resources/reference.conf | 38 ++- .../metadata/SchemaAgreementCheckerTest.java | 320 ++++++++++++++++++ .../core/session/ReprepareOnUpTest.java | 44 ++- 18 files changed, 786 insertions(+), 62 deletions(-) create mode 100644 core/src/main/java/com/datastax/oss/driver/internal/core/metadata/SchemaAgreementChecker.java create mode 100644 core/src/test/java/com/datastax/oss/driver/internal/core/metadata/SchemaAgreementCheckerTest.java diff --git a/changelog/README.md b/changelog/README.md index 17311a49960..7acb688002a 100644 --- a/changelog/README.md +++ b/changelog/README.md @@ -4,6 +4,7 @@ ### 4.0.0-alpha2 (in progress) +- [new feature] JAVA-1638: Check schema agreement - [new feature] JAVA-1494: Implement Snappy and LZ4 compression - [new feature] JAVA-1514: Port Uuids utility class - [new feature] JAVA-1520: Add node state listeners diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/Cluster.java b/core/src/main/java/com/datastax/oss/driver/api/core/Cluster.java index 0ef7fa8f045..9309ad3aa35 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/Cluster.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/Cluster.java @@ -19,6 +19,7 @@ import com.datastax.oss.driver.api.core.context.DriverContext; import com.datastax.oss.driver.api.core.metadata.Metadata; import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.metadata.NodeState; import com.datastax.oss.driver.api.core.metadata.NodeStateListener; import com.datastax.oss.driver.api.core.metadata.schema.SchemaChangeListener; import com.datastax.oss.driver.api.core.session.CqlSession; @@ -117,6 +118,42 @@ default Metadata refreshSchema() { return CompletableFutures.getUninterruptibly(refreshSchemaAsync()); } + /** + * Checks if all nodes in the cluster agree on a common schema version. + * + *

Due to the distributed nature of Cassandra, schema changes made on one node might not be + * immediately visible to others. Under certain circumstances, the driver waits until all nodes + * agree on a common schema version (namely: before a schema refresh, before repreparing all + * queries on a newly up node, and before completing a successful schema-altering query). To do + * so, it queries system tables to find out the schema version of all nodes that are currently + * {@link NodeState#UP UP}. If all the versions match, the check succeeds, otherwise it is retried + * periodically, until a given timeout (specified in the configuration). + * + *

A schema agreement failure is not fatal, but it might produce unexpected results (for + * example, getting an "unconfigured table" error for a table that you created right before, just + * because the two queries went to different coordinators). + * + *

Note that schema agreement never succeeds in a mixed-version cluster (it would be + * challenging because the way the schema version is computed varies across server versions); the + * assumption is that schema updates are unlikely to happen during a rolling upgrade anyway. + * + * @return a future that completes with {@code true} if the nodes agree, or {@code false} if the + * timeout fired. + * @see CoreDriverOption#CONTROL_CONNECTION_AGREEMENT_INTERVAL + * @see CoreDriverOption#CONTROL_CONNECTION_AGREEMENT_TIMEOUT + */ + CompletionStage checkSchemaAgreementAsync(); + + /** + * Convenience method to call {@link #checkSchemaAgreementAsync()} and block for the result. + * + *

This must not be called on a driver thread. + */ + default boolean checkSchemaAgreement() { + BlockingOperation.checkNotDriverThread(); + return CompletableFutures.getUninterruptibly(checkSchemaAgreementAsync()); + } + /** Returns a context that provides access to all the policies used by this driver instance. */ DriverContext getContext(); diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/CoreDriverOption.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/CoreDriverOption.java index 5c0be813342..1d785c843ec 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/config/CoreDriverOption.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/CoreDriverOption.java @@ -51,7 +51,12 @@ public enum CoreDriverOption implements DriverOption { RELATIVE_SPECULATIVE_EXECUTION_DELAY("delay", false), CONTROL_CONNECTION_TIMEOUT("connection.control-connection.timeout", true), - CONTROL_CONNECTION_PAGE_SIZE("connection.control-connection.page-size", true), + CONTROL_CONNECTION_AGREEMENT_INTERVAL( + "connection.control-connection.schema-agreement.interval", true), + CONTROL_CONNECTION_AGREEMENT_TIMEOUT( + "connection.control-connection.schema-agreement.timeout", true), + CONTROL_CONNECTION_AGREEMENT_WARN( + "connection.control-connection.schema-agreement.warn-on-failure", true), COALESCER_ROOT("connection.coalescer", true), RELATIVE_COALESCER_MAX_RUNS("max-runs-with-no-work", false), diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/cql/ExecutionInfo.java b/core/src/main/java/com/datastax/oss/driver/api/core/cql/ExecutionInfo.java index 40702dc804f..9f7395a10d3 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/cql/ExecutionInfo.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/cql/ExecutionInfo.java @@ -15,7 +15,9 @@ */ package com.datastax.oss.driver.api.core.cql; +import com.datastax.oss.driver.api.core.Cluster; import com.datastax.oss.driver.api.core.CoreProtocolVersion; +import com.datastax.oss.driver.api.core.config.CoreDriverOption; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.specex.SpeculativeExecutionPolicy; import java.nio.ByteBuffer; @@ -92,4 +94,22 @@ public interface ExecutionInfo { * versions, this map will always be empty. */ Map getIncomingPayload(); + + /** + * Whether the cluster reached schema agreement after the execution of this query. + * + *

After a successful schema-altering query (ex: creating a table), the driver will check if + * the cluster's nodes agree on the new schema version. If not, it will keep retrying a few times + * (the retry delay and timeout are set through the configuration). + * + *

If this method returns {@code false}, clients can call {@link + * Cluster#checkSchemaAgreement()} later to perform the check manually. + * + *

Schema agreement is only checked for schema-altering queries. For other query types, this + * method will always return {@code true}. + * + * @see CoreDriverOption#CONTROL_CONNECTION_AGREEMENT_INTERVAL + * @see CoreDriverOption#CONTROL_CONNECTION_AGREEMENT_TIMEOUT + */ + boolean isSchemaInAgreement(); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/ClusterWrapper.java b/core/src/main/java/com/datastax/oss/driver/internal/core/ClusterWrapper.java index d8f2813cc13..acb713f9ad8 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/ClusterWrapper.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/ClusterWrapper.java @@ -61,6 +61,11 @@ public CompletionStage refreshSchemaAsync() { return delegate.refreshSchemaAsync(); } + @Override + public CompletionStage checkSchemaAgreementAsync() { + return delegate.checkSchemaAgreementAsync(); + } + @Override public DriverContext getContext() { return delegate.getContext(); diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/DefaultCluster.java b/core/src/main/java/com/datastax/oss/driver/internal/core/DefaultCluster.java index b41657a71f2..a60725c553d 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/DefaultCluster.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/DefaultCluster.java @@ -107,6 +107,11 @@ public CompletionStage refreshSchemaAsync() { return metadataManager.refreshSchema(null, true, true); } + @Override + public CompletionStage checkSchemaAgreementAsync() { + return context.topologyMonitor().checkSchemaAgreement(); + } + @Override public DriverContext getContext() { return context; diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerBase.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerBase.java index 0341f7bc875..cd43a4e8b09 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerBase.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerBase.java @@ -291,9 +291,13 @@ private void cancelScheduledTasks() { } private void setFinalResult( - Result resultMessage, Frame responseFrame, NodeResponseCallback callback) { + Result resultMessage, + Frame responseFrame, + boolean schemaInAgreement, + NodeResponseCallback callback) { try { - ExecutionInfo executionInfo = buildExecutionInfo(callback, resultMessage, responseFrame); + ExecutionInfo executionInfo = + buildExecutionInfo(callback, resultMessage, responseFrame, schemaInAgreement); AsyncResultSet resultSet = Conversions.toResultSet(resultMessage, executionInfo, session, context); if (result.complete(resultSet)) { @@ -310,7 +314,10 @@ private void setFinalResult( } private ExecutionInfo buildExecutionInfo( - NodeResponseCallback callback, Result resultMessage, Frame responseFrame) { + NodeResponseCallback callback, + Result resultMessage, + Frame responseFrame, + boolean schemaInAgreement) { ByteBuffer pagingState = (resultMessage instanceof Rows) ? ((Rows) resultMessage).getMetadata().pagingState : null; return new DefaultExecutionInfo( @@ -320,7 +327,8 @@ private ExecutionInfo buildExecutionInfo( callback.execution, errors, pagingState, - responseFrame); + responseFrame, + schemaInAgreement); } private void setFinalError(Throwable error) { @@ -394,16 +402,19 @@ public void onResponse(Frame responseFrame) { try { Message responseMessage = responseFrame.message; if (responseMessage instanceof SchemaChange) { - // TODO schema agreement, and chain setFinalResult to the result SchemaChange schemaChange = (SchemaChange) responseMessage; context - .metadataManager() - .refreshSchema(schemaChange.keyspace, false, false) + .topologyMonitor() + .checkSchemaAgreement() + .thenCombine( + context.metadataManager().refreshSchema(schemaChange.keyspace, false, false), + (schemaInAgreement, metadata) -> schemaInAgreement) .whenComplete( - ((metadata, error) -> setFinalResult(schemaChange, responseFrame, this))); + ((schemaInAgreement, error) -> + setFinalResult(schemaChange, responseFrame, schemaInAgreement, this))); } else if (responseMessage instanceof Result) { LOG.debug("[{}] Got result, completing", logPrefix); - setFinalResult((Result) responseMessage, responseFrame, this); + setFinalResult((Result) responseMessage, responseFrame, true, this); } else if (responseMessage instanceof Error) { LOG.debug("[{}] Got error response, processing", logPrefix); processErrorResponse((Error) responseMessage); @@ -518,7 +529,7 @@ private void processRetryDecision(RetryDecision decision, Throwable error) { setFinalError(error); break; case IGNORE: - setFinalResult(Void.INSTANCE, null, this); + setFinalResult(Void.INSTANCE, null, true, this); break; } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultExecutionInfo.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultExecutionInfo.java index 4c2ecebe328..278afc6ac49 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultExecutionInfo.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultExecutionInfo.java @@ -36,6 +36,7 @@ public class DefaultExecutionInfo implements ExecutionInfo { private final UUID tracingId; private final List warnings; private final Map customPayload; + private final boolean schemaInAgreement; public DefaultExecutionInfo( Statement statement, @@ -44,7 +45,8 @@ public DefaultExecutionInfo( int successfulExecutionIndex, List> errors, ByteBuffer pagingState, - Frame frame) { + Frame frame, + boolean schemaInAgreement) { this.statement = statement; this.coordinator = coordinator; this.speculativeExecutionCount = speculativeExecutionCount; @@ -56,6 +58,7 @@ public DefaultExecutionInfo( // Note: the collections returned by the protocol layer are already unmodifiable this.warnings = (frame == null) ? Collections.emptyList() : frame.warnings; this.customPayload = (frame == null) ? Collections.emptyMap() : frame.customPayload; + this.schemaInAgreement = schemaInAgreement; } @Override @@ -99,4 +102,9 @@ public List getWarnings() { public Map getIncomingPayload() { return customPayload; } + + @Override + public boolean isSchemaInAgreement() { + return schemaInAgreement; + } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java index 52af8a1a833..a827b292607 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java @@ -54,6 +54,7 @@ public class DefaultTopologyMonitor implements TopologyMonitor { private static final int INFINITE_PAGE_SIZE = -1; private final String logPrefix; + private final InternalDriverContext context; private final ControlConnection controlConnection; private final AddressTranslator addressTranslator; private final Duration timeout; @@ -63,6 +64,7 @@ public class DefaultTopologyMonitor implements TopologyMonitor { public DefaultTopologyMonitor(InternalDriverContext context) { this.logPrefix = context.clusterName(); + this.context = context; this.controlConnection = context.controlConnection(); this.addressTranslator = context.addressTranslator(); DriverConfigProfile config = context.config().getDefaultProfile(); @@ -146,6 +148,15 @@ public CompletionStage> refreshNodeList() { }); } + @Override + public CompletionStage checkSchemaAgreement() { + if (closeFuture.isDone()) { + return CompletableFuture.completedFuture(true); + } + DriverChannel channel = controlConnection.channel(); + return new SchemaAgreementChecker(channel, context, port, logPrefix).run(); + } + @Override public CompletionStage closeFuture() { return closeFuture; diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java index ad746fe773b..26bc267c356 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java @@ -350,8 +350,9 @@ private void startSchemaRequest(CompletableFuture future) { currentSchemaRefresh = future; LOG.debug("[{}] Starting schema refresh", logPrefix); maybeInitControlConnection() + .thenCompose(v -> context.topologyMonitor().checkSchemaAgreement()) // 1. Query system tables - .thenCompose(v -> schemaQueriesFactory.newInstance(future).execute()) + .thenCompose(b -> schemaQueriesFactory.newInstance(future).execute()) // 2. Parse the rows into metadata objects, put them in a MetadataRefresh // 3. Apply the MetadataRefresh .thenApplyAsync(this::parseAndApplySchemaRows, adminExecutor) diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/SchemaAgreementChecker.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/SchemaAgreementChecker.java new file mode 100644 index 00000000000..3b50cf378ea --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/SchemaAgreementChecker.java @@ -0,0 +1,210 @@ +/* + * Copyright (C) 2017-2017 DataStax Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.internal.core.metadata; + +import com.datastax.oss.driver.api.core.config.CoreDriverOption; +import com.datastax.oss.driver.api.core.config.DriverConfigProfile; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.metadata.NodeState; +import com.datastax.oss.driver.internal.core.adminrequest.AdminRequestHandler; +import com.datastax.oss.driver.internal.core.adminrequest.AdminResult; +import com.datastax.oss.driver.internal.core.adminrequest.AdminRow; +import com.datastax.oss.driver.internal.core.channel.DriverChannel; +import com.datastax.oss.driver.internal.core.context.InternalDriverContext; +import com.datastax.oss.driver.internal.core.util.NanoTime; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.time.Duration; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class SchemaAgreementChecker { + + private static final Logger LOG = LoggerFactory.getLogger(SchemaAgreementChecker.class); + private static final int INFINITE_PAGE_SIZE = -1; + @VisibleForTesting static final InetAddress BIND_ALL_ADDRESS; + + static { + try { + BIND_ALL_ADDRESS = InetAddress.getByAddress(new byte[4]); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + } + + private final DriverChannel channel; + private final InternalDriverContext context; + private final int port; + private final String logPrefix; + private final Duration queryTimeout; + private final long intervalNs; + private final long timeoutNs; + private final boolean warnOnFailure; + private final long start; + private final CompletableFuture result = new CompletableFuture<>(); + + SchemaAgreementChecker( + DriverChannel channel, InternalDriverContext context, int port, String logPrefix) { + this.channel = channel; + this.context = context; + this.port = port; + this.logPrefix = logPrefix; + DriverConfigProfile config = context.config().getDefaultProfile(); + this.queryTimeout = config.getDuration(CoreDriverOption.CONTROL_CONNECTION_TIMEOUT); + this.intervalNs = + config.getDuration(CoreDriverOption.CONTROL_CONNECTION_AGREEMENT_INTERVAL).toNanos(); + this.timeoutNs = + config.getDuration(CoreDriverOption.CONTROL_CONNECTION_AGREEMENT_TIMEOUT).toNanos(); + this.warnOnFailure = config.getBoolean(CoreDriverOption.CONTROL_CONNECTION_AGREEMENT_WARN); + this.start = System.nanoTime(); + } + + public CompletionStage run() { + LOG.debug("[{}] Checking schema agreement", logPrefix); + if (timeoutNs == 0) { + result.complete(false); + } else { + sendQueries(); + } + return result; + } + + private void sendQueries() { + long elapsedNs = System.nanoTime() - start; + if (elapsedNs > timeoutNs) { + String message = + String.format( + "[%s] Schema agreement not reached after %s", logPrefix, NanoTime.format(elapsedNs)); + if (warnOnFailure) { + LOG.warn(message); + } else { + LOG.debug(message); + } + result.complete(false); + } else { + CompletionStage localQuery = + query("SELECT schema_version FROM system.local WHERE key='local'"); + CompletionStage peersQuery = + query("SELECT peer, rpc_address, schema_version FROM system.peers"); + + localQuery + .thenCombine(peersQuery, this::extractSchemaVersions) + .whenComplete(this::completeOrReschedule); + } + } + + private Set extractSchemaVersions(AdminResult controlNodeResult, AdminResult peersResult) { + ImmutableSet.Builder uuids = ImmutableSet.builder(); + + UUID uuid; + Iterator iterator = controlNodeResult.iterator(); + if (iterator.hasNext() && (uuid = iterator.next().getUuid("schema_version")) != null) { + uuids.add(uuid); + } + + Map nodes = context.metadataManager().getMetadata().getNodes(); + for (AdminRow peerRow : peersResult) { + InetSocketAddress connectAddress = getConnectAddress(peerRow); + Node node = nodes.get(connectAddress); + if (node == null || node.getState() != NodeState.UP) { + continue; + } + uuid = peerRow.getUuid("schema_version"); + if (uuid != null) { + uuids.add(uuid); + } + } + return uuids.build(); + } + + private InetSocketAddress getConnectAddress(AdminRow peerRow) { + // This is actually broadcast_address + InetAddress broadcastAddress = peerRow.getInetAddress("peer"); + // The address we are looking for (this corresponds to broadcast_rpc_address in the peer's + // cassandra yaml file; if this setting if unset, it defaults to the value for rpc_address or + // rpc_interface + InetAddress rpcAddress = peerRow.getInetAddress("rpc_address"); + + if (rpcAddress == null) { + LOG.warn( + "[{}] Found corrupted row with null rpc_address in system.peers (peer = {}), " + + "excluding from schema agreement", + logPrefix, + broadcastAddress); + return null; + } else if (rpcAddress.equals(BIND_ALL_ADDRESS)) { + LOG.warn( + "[{}] Found peer with 0.0.0.0 as rpc_address in system.peers, using peer ({}) instead", + logPrefix, + broadcastAddress); + rpcAddress = broadcastAddress; + } + return context.addressTranslator().translate(new InetSocketAddress(rpcAddress, port)); + } + + private void completeOrReschedule(Set uuids, Throwable error) { + if (error != null) { + LOG.debug( + "[{}] Error while checking schema agreement, completing now (false)", logPrefix, error); + result.complete(false); + } else if (uuids.size() == 1) { + LOG.debug( + "[{}] Schema agreement reached ({}), completing", logPrefix, uuids.iterator().next()); + result.complete(true); + } else { + LOG.debug( + "[{}] Schema agreement not reached yet ({}), rescheduling in {}", + logPrefix, + uuids, + NanoTime.format(intervalNs)); + channel + .eventLoop() + .schedule(this::sendQueries, intervalNs, TimeUnit.NANOSECONDS) + .addListener( + f -> { + if (!f.isSuccess()) { + LOG.debug( + "[{}] Error while rescheduling schema agreement, completing now (false)", + logPrefix, + f.cause()); + } + }); + } + } + + @VisibleForTesting + protected CompletionStage query(String queryString) { + return AdminRequestHandler.query( + channel, + queryString, + Collections.emptyMap(), + queryTimeout, + INFINITE_PAGE_SIZE, + logPrefix) + .start(); + } +} diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/TopologyMonitor.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/TopologyMonitor.java index b41024eb362..d882d96f38e 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/TopologyMonitor.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/TopologyMonitor.java @@ -85,4 +85,12 @@ public interface TopologyMonitor extends AsyncAutoCloseable { * always be returned in a single message (no paging). */ CompletionStage> refreshNodeList(); + + /** + * Checks whether the nodes in the cluster agree on a common schema version. + * + *

This should typically be implemented with a few retries and a timeout, as the schema can + * take a while to replicate across nodes. + */ + CompletionStage checkSchemaAgreement(); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/session/DefaultSession.java b/core/src/main/java/com/datastax/oss/driver/internal/core/session/DefaultSession.java index 25f44b88313..bc7ff91dc2f 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/session/DefaultSession.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/session/DefaultSession.java @@ -443,7 +443,7 @@ private void reprepareStatements(ChannelPool pool) { logPrefix + "|" + pool.getNode().getConnectAddress(), pool, repreparePayloads, - config, + context, () -> RunOrSchedule.on(adminExecutor, () -> onPoolReady(pool))) .start(); } else { diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/session/ReprepareOnUp.java b/core/src/main/java/com/datastax/oss/driver/internal/core/session/ReprepareOnUp.java index beae064e39e..42a8f35137f 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/session/ReprepareOnUp.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/session/ReprepareOnUp.java @@ -21,7 +21,9 @@ import com.datastax.oss.driver.internal.core.adminrequest.AdminResult; import com.datastax.oss.driver.internal.core.adminrequest.AdminRow; import com.datastax.oss.driver.internal.core.channel.DriverChannel; +import com.datastax.oss.driver.internal.core.context.InternalDriverContext; import com.datastax.oss.driver.internal.core.cql.CqlRequestHandlerBase; +import com.datastax.oss.driver.internal.core.metadata.TopologyMonitor; import com.datastax.oss.driver.internal.core.pool.ChannelPool; import com.datastax.oss.driver.internal.core.util.concurrent.RunOrSchedule; import com.datastax.oss.protocol.internal.Message; @@ -61,6 +63,7 @@ class ReprepareOnUp { private final String logPrefix; private final DriverChannel channel; private final Map repreparePayloads; + private final TopologyMonitor topologyMonitor; private final Runnable whenPrepared; private final boolean checkSystemTable; private final int maxStatements; @@ -77,14 +80,16 @@ class ReprepareOnUp { String logPrefix, ChannelPool pool, Map repreparePayloads, - DriverConfig config, + InternalDriverContext context, Runnable whenPrepared) { this.logPrefix = logPrefix; this.channel = pool.next(); this.repreparePayloads = repreparePayloads; + this.topologyMonitor = context.topologyMonitor(); this.whenPrepared = whenPrepared; + DriverConfig config = context.config(); this.checkSystemTable = config.getDefaultProfile().getBoolean(CoreDriverOption.REPREPARE_CHECK_SYSTEM_TABLE); this.timeout = config.getDefaultProfile().getDuration(CoreDriverOption.REPREPARE_TIMEOUT); @@ -103,28 +108,45 @@ void start() { LOG.debug("[{}] No channel available to reprepare, done", logPrefix); whenPrepared.run(); } else { - if (LOG.isDebugEnabled()) { // check because ConcurrentMap.size is not a constant operation - LOG.debug( - "[{}] {} statements to reprepare on newly added/up node", - logPrefix, - repreparePayloads.size()); - } - if (checkSystemTable) { - LOG.debug("[{}] Checking which statements the server knows about", logPrefix); - queryAsync(QUERY_SERVER_IDS, Collections.emptyMap(), "QUERY system.prepared_statements") - .whenComplete(this::gatherServerIds); - } else { - LOG.debug( - "[{}] {} is disabled, repreparing directly", - logPrefix, - CoreDriverOption.REPREPARE_CHECK_SYSTEM_TABLE.getPath()); - RunOrSchedule.on( - channel.eventLoop(), - () -> { - serverKnownIds = Collections.emptySet(); - gatherPayloadsToReprepare(); - }); - } + topologyMonitor + .checkSchemaAgreement() + .whenComplete( + (agreed, error) -> { + if (error != null) { + LOG.debug( + "[{}] Error while checking schema agreement, proceeding anyway", + logPrefix, + error); + } else if (!agreed) { + LOG.debug("[{}] Did not reach schema agreement, proceeding anyway", logPrefix); + } + // Check log level because ConcurrentMap.size is not a constant operation + if (LOG.isDebugEnabled()) { + LOG.debug( + "[{}] {} statements to reprepare on newly added/up node", + logPrefix, + repreparePayloads.size()); + } + if (checkSystemTable) { + LOG.debug("[{}] Checking which statements the server knows about", logPrefix); + queryAsync( + QUERY_SERVER_IDS, + Collections.emptyMap(), + "QUERY system.prepared_statements") + .whenComplete(this::gatherServerIds); + } else { + LOG.debug( + "[{}] {} is disabled, repreparing directly", + logPrefix, + CoreDriverOption.REPREPARE_CHECK_SYSTEM_TABLE.getPath()); + RunOrSchedule.on( + channel.eventLoop(), + () -> { + serverKnownIds = Collections.emptySet(); + gatherPayloadsToReprepare(); + }); + } + }); } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/util/NanoTime.java b/core/src/main/java/com/datastax/oss/driver/internal/core/util/NanoTime.java index be25faefa33..e7d290118a6 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/util/NanoTime.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/util/NanoTime.java @@ -25,19 +25,23 @@ public class NanoTime { /** Formats a duration in the best unit (truncating the fractional part). */ public static String formatTimeSince(long startTimeNs) { - long delta = System.nanoTime() - startTimeNs; - if (delta >= ONE_HOUR) { - return (delta / ONE_HOUR) + " h"; - } else if (delta >= ONE_MINUTE) { - return (delta / ONE_MINUTE) + " mn"; - } else if (delta >= ONE_SECOND) { - return (delta / ONE_SECOND) + " s"; - } else if (delta >= ONE_MILLISECOND) { - return (delta / ONE_MILLISECOND) + " ms"; - } else if (delta >= ONE_MICROSECOND) { - return (delta / ONE_MICROSECOND) + " us"; + return format(System.nanoTime() - startTimeNs); + } + + /** Formats a duration in the best unit (truncating the fractional part). */ + public static String format(long elapsedNs) { + if (elapsedNs >= ONE_HOUR) { + return (elapsedNs / ONE_HOUR) + " h"; + } else if (elapsedNs >= ONE_MINUTE) { + return (elapsedNs / ONE_MINUTE) + " mn"; + } else if (elapsedNs >= ONE_SECOND) { + return (elapsedNs / ONE_SECOND) + " s"; + } else if (elapsedNs >= ONE_MILLISECOND) { + return (elapsedNs / ONE_MILLISECOND) + " ms"; + } else if (elapsedNs >= ONE_MICROSECOND) { + return (elapsedNs / ONE_MICROSECOND) + " us"; } else { - return delta + " ns"; + return elapsedNs + " ns"; } } } diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 07bf97ad278..21831bd34ca 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -165,9 +165,41 @@ datastax-java-driver { # How long the driver waits for responses to control queries (e.g. fetching the list of # nodes, refreshing the schema). timeout = 500 milliseconds - # The page size used for control queries. If a query returns more than this number of - # results, it will be fetched in multiple requests. - page-size = 5000 + + # Due to the distributed nature of Cassandra, schema changes made on one node might not be + # immediately visible to others. Under certain circumstances, the driver waits until all nodes + # agree on a common schema version (namely: before a schema refresh, before repreparing all + # queries on a newly up node, and before completing a successful schema-altering query). + # To do so, it queries system tables to find out the schema version of all nodes that are + # currently UP. If all the versions match, the check succeeds, otherwise it is retried + # periodically, until a given timeout. + # + # A schema agreement failure is not fatal, but it might produce unexpected results (for + # example, getting an "unconfigured table" error for a table that you created right before, + # just because the two queries went to different coordinators). + # + # Note that schema agreement never succeeds in a mixed-version cluster (it would be + # challenging because the way the schema version is computed varies across server versions); + # the assumption is that schema updates are unlikely to happen during a rolling upgrade + # anyway. + schema-agreement { + # The interval between each attempt. + # This option can be changed at runtime, the new value will be used for checks issued after + # the change. + interval = 200 milliseconds + + # The timeout after which schema agreement fails. + # If this is set to 0, schema agreement is skipped and will always fail. + # This option can be changed at runtime, the new value will be used for checks issued after + # the change. + timeout = 10 seconds + + # Whether to log a warning if schema agreement fails. + # You might want to change this if you've set the timeout to 0. + # This option can be changed at runtime, the new value will be used for checks issued after + # the change. + warn-on-failure = true + } } # The component that coalesces writes on the connections. diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/SchemaAgreementCheckerTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/SchemaAgreementCheckerTest.java new file mode 100644 index 00000000000..7158b4b3102 --- /dev/null +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/SchemaAgreementCheckerTest.java @@ -0,0 +1,320 @@ +/* + * Copyright (C) 2017-2017 DataStax Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.internal.core.metadata; + +import com.datastax.oss.driver.api.core.addresstranslation.AddressTranslator; +import com.datastax.oss.driver.api.core.addresstranslation.PassThroughAddressTranslator; +import com.datastax.oss.driver.api.core.config.CoreDriverOption; +import com.datastax.oss.driver.api.core.config.DriverConfig; +import com.datastax.oss.driver.api.core.config.DriverConfigProfile; +import com.datastax.oss.driver.api.core.metadata.Metadata; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.metadata.NodeState; +import com.datastax.oss.driver.internal.core.adminrequest.AdminResult; +import com.datastax.oss.driver.internal.core.adminrequest.AdminRow; +import com.datastax.oss.driver.internal.core.channel.DriverChannel; +import com.datastax.oss.driver.internal.core.context.InternalDriverContext; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterators; +import io.netty.channel.EventLoop; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.time.Duration; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.Map; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import static com.datastax.oss.driver.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.never; + +@RunWith(MockitoJUnitRunner.class) +public class SchemaAgreementCheckerTest { + + private static final InetSocketAddress ADDRESS1 = new InetSocketAddress("127.0.0.1", 9042); + private static final InetSocketAddress ADDRESS2 = new InetSocketAddress("127.0.0.2", 9042); + private static final UUID VERSION1 = UUID.randomUUID(); + private static final UUID VERSION2 = UUID.randomUUID(); + + @Mock private InternalDriverContext context; + @Mock private DriverConfig config; + @Mock private DriverConfigProfile defaultConfig; + @Mock private DriverChannel channel; + @Mock private EventLoop eventLoop; + @Mock private MetadataManager metadataManager; + @Mock private Metadata metadata; + @Mock private DefaultNode node1; + @Mock private DefaultNode node2; + private AddressTranslator addressTranslator; + + @Before + public void setup() { + Mockito.when(defaultConfig.getDuration(CoreDriverOption.CONTROL_CONNECTION_TIMEOUT)) + .thenReturn(Duration.ofSeconds(1)); + Mockito.when(defaultConfig.getDuration(CoreDriverOption.CONTROL_CONNECTION_AGREEMENT_INTERVAL)) + .thenReturn(Duration.ofMillis(200)); + Mockito.when(defaultConfig.getDuration(CoreDriverOption.CONTROL_CONNECTION_AGREEMENT_TIMEOUT)) + .thenReturn(Duration.ofSeconds(10)); + Mockito.when(defaultConfig.getBoolean(CoreDriverOption.CONTROL_CONNECTION_AGREEMENT_WARN)) + .thenReturn(true); + Mockito.when(config.getDefaultProfile()).thenReturn(defaultConfig); + Mockito.when(context.config()).thenReturn(config); + + addressTranslator = + Mockito.spy( + new PassThroughAddressTranslator(context, CoreDriverOption.ADDRESS_TRANSLATOR_ROOT)); + Mockito.when(context.addressTranslator()).thenReturn(addressTranslator); + + Map nodes = ImmutableMap.of(ADDRESS1, node1, ADDRESS2, node2); + Mockito.when(metadata.getNodes()).thenReturn(nodes); + Mockito.when(metadataManager.getMetadata()).thenReturn(metadata); + Mockito.when(context.metadataManager()).thenReturn(metadataManager); + + Mockito.when(node2.getState()).thenReturn(NodeState.UP); + + Mockito.when(eventLoop.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class))) + .thenAnswer( + invocation -> { // Ignore delay and run immediately: + Runnable task = invocation.getArgument(0); + task.run(); + return null; + }); + Mockito.when(channel.eventLoop()).thenReturn(eventLoop); + } + + @Test + public void should_skip_if_timeout_is_zero() { + // Given + Mockito.when(defaultConfig.getDuration(CoreDriverOption.CONTROL_CONNECTION_AGREEMENT_TIMEOUT)) + .thenReturn(Duration.ZERO); + TestSchemaAgreementChecker checker = new TestSchemaAgreementChecker(channel, context); + + // When + CompletionStage future = checker.run(); + + // Then + assertThat(future).isSuccess(b -> assertThat(b).isFalse()); + } + + @Test + public void should_succeed_if_only_one_node() { + // Given + TestSchemaAgreementChecker checker = new TestSchemaAgreementChecker(channel, context); + checker.stubQueries( + new StubbedQuery( + "SELECT schema_version FROM system.local WHERE key='local'", + mockResult(mockRow(null, null, VERSION1))), + new StubbedQuery( + "SELECT peer, rpc_address, schema_version FROM system.peers", mockResult(/*empty*/ ))); + + // When + CompletionStage future = checker.run(); + + // Then + assertThat(future).isSuccess(b -> assertThat(b).isTrue()); + } + + @Test + public void should_succeed_if_versions_match_on_first_try() { + // Given + TestSchemaAgreementChecker checker = new TestSchemaAgreementChecker(channel, context); + checker.stubQueries( + new StubbedQuery( + "SELECT schema_version FROM system.local WHERE key='local'", + mockResult(mockRow(null, null, VERSION1))), + new StubbedQuery( + "SELECT peer, rpc_address, schema_version FROM system.peers", + mockResult(mockRow(null, ADDRESS2.getAddress(), VERSION1)))); + + // When + CompletionStage future = checker.run(); + + // Then + assertThat(future).isSuccess(b -> assertThat(b).isTrue()); + Mockito.verify(addressTranslator).translate(ADDRESS2); + } + + @Test + public void should_ignore_down_peers() { + // Given + TestSchemaAgreementChecker checker = new TestSchemaAgreementChecker(channel, context); + Mockito.when(node2.getState()).thenReturn(NodeState.DOWN); + checker.stubQueries( + new StubbedQuery( + "SELECT schema_version FROM system.local WHERE key='local'", + mockResult(mockRow(null, null, VERSION1))), + new StubbedQuery( + "SELECT peer, rpc_address, schema_version FROM system.peers", + mockResult(mockRow(null, ADDRESS2.getAddress(), VERSION2)))); + + // When + CompletionStage future = checker.run(); + + // Then + assertThat(future).isSuccess(b -> assertThat(b).isTrue()); + Mockito.verify(addressTranslator).translate(ADDRESS2); + } + + @Test + public void should_ignore_malformed_rows() { + // Given + TestSchemaAgreementChecker checker = new TestSchemaAgreementChecker(channel, context); + checker.stubQueries( + new StubbedQuery( + "SELECT schema_version FROM system.local WHERE key='local'", + mockResult(mockRow(null, null, VERSION1))), + new StubbedQuery( + "SELECT peer, rpc_address, schema_version FROM system.peers", + mockResult(mockRow(null, null, VERSION2)))); + + // When + CompletionStage future = checker.run(); + + // Then + assertThat(future).isSuccess(b -> assertThat(b).isTrue()); + Mockito.verify(addressTranslator, never()).translate(ADDRESS2); + } + + @Test + public void should_use_peer_if_rpc_address_is_0_0_0_0() { + // Given + TestSchemaAgreementChecker checker = new TestSchemaAgreementChecker(channel, context); + Mockito.when(node2.getState()).thenReturn(NodeState.DOWN); + checker.stubQueries( + new StubbedQuery( + "SELECT schema_version FROM system.local WHERE key='local'", + mockResult(mockRow(null, null, VERSION1))), + new StubbedQuery( + "SELECT peer, rpc_address, schema_version FROM system.peers", + mockResult( + mockRow( + ADDRESS2.getAddress(), SchemaAgreementChecker.BIND_ALL_ADDRESS, VERSION2)))); + + // When + CompletionStage future = checker.run(); + + // Then + assertThat(future).isSuccess(b -> assertThat(b).isTrue()); + Mockito.verify(addressTranslator).translate(ADDRESS2); + } + + @Test + public void should_reschedule_if_versions_do_not_match_on_first_try() { + // Given + TestSchemaAgreementChecker checker = new TestSchemaAgreementChecker(channel, context); + checker.stubQueries( + // First round + new StubbedQuery( + "SELECT schema_version FROM system.local WHERE key='local'", + mockResult(mockRow(null, null, VERSION1))), + new StubbedQuery( + "SELECT peer, rpc_address, schema_version FROM system.peers", + mockResult(mockRow(null, ADDRESS2.getAddress(), VERSION2))), + + // Second round + new StubbedQuery( + "SELECT schema_version FROM system.local WHERE key='local'", + mockResult(mockRow(null, null, VERSION1))), + new StubbedQuery( + "SELECT peer, rpc_address, schema_version FROM system.peers", + mockResult(mockRow(null, ADDRESS2.getAddress(), VERSION1)))); + + // When + CompletionStage future = checker.run(); + + // Then + assertThat(future).isSuccess(b -> assertThat(b).isTrue()); + } + + @Test + public void should_fail_if_versions_do_not_match_after_timeout() { + // Given + Mockito.when(defaultConfig.getDuration(CoreDriverOption.CONTROL_CONNECTION_AGREEMENT_TIMEOUT)) + .thenReturn(Duration.ofNanos(10)); + TestSchemaAgreementChecker checker = new TestSchemaAgreementChecker(channel, context); + checker.stubQueries( + new StubbedQuery( + "SELECT schema_version FROM system.local WHERE key='local'", + mockResult(mockRow(null, null, VERSION1))), + new StubbedQuery( + "SELECT peer, rpc_address, schema_version FROM system.peers", + mockResult(mockRow(null, ADDRESS2.getAddress(), VERSION1)))); + + // When + CompletionStage future = checker.run(); + + // Then + assertThat(future).isSuccess(b -> assertThat(b).isFalse()); + } + + /** Extend to mock the query execution logic. */ + private static class TestSchemaAgreementChecker extends SchemaAgreementChecker { + + private final Queue queries = new LinkedList<>(); + + TestSchemaAgreementChecker(DriverChannel channel, InternalDriverContext context) { + super(channel, context, 9042, "test"); + } + + private void stubQueries(StubbedQuery... queries) { + this.queries.addAll(Arrays.asList(queries)); + } + + @Override + protected CompletionStage query(String queryString) { + StubbedQuery nextQuery = queries.poll(); + assertThat(nextQuery).isNotNull(); + assertThat(nextQuery.queryString).isEqualTo(queryString); + return CompletableFuture.completedFuture(nextQuery.result); + } + } + + private static class StubbedQuery { + private final String queryString; + private final AdminResult result; + + private StubbedQuery(String queryString, AdminResult result) { + this.queryString = queryString; + this.result = result; + } + } + + private AdminRow mockRow(InetAddress peer, InetAddress rpcAddress, UUID uuid) { + AdminRow row = Mockito.mock(AdminRow.class); + Mockito.when(row.getInetAddress("peer")).thenReturn(peer); + Mockito.when(row.getInetAddress("rpc_address")).thenReturn(rpcAddress); + Mockito.when(row.getUuid("schema_version")).thenReturn(uuid); + return row; + } + + private AdminResult mockResult(AdminRow... rows) { + AdminResult result = Mockito.mock(AdminResult.class); + Mockito.when(result.iterator()).thenReturn(Iterators.forArray(rows)); + return result; + } +} diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/session/ReprepareOnUpTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/session/ReprepareOnUpTest.java index 3e882963a8f..d1b0ecd7888 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/session/ReprepareOnUpTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/session/ReprepareOnUpTest.java @@ -21,7 +21,10 @@ import com.datastax.oss.driver.api.core.config.DriverConfigProfile; import com.datastax.oss.driver.internal.core.adminrequest.AdminResult; import com.datastax.oss.driver.internal.core.channel.DriverChannel; +import com.datastax.oss.driver.internal.core.context.InternalDriverContext; +import com.datastax.oss.driver.internal.core.metadata.TopologyMonitor; import com.datastax.oss.driver.internal.core.pool.ChannelPool; +import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures; import com.datastax.oss.protocol.internal.Message; import com.datastax.oss.protocol.internal.ProtocolConstants; import com.datastax.oss.protocol.internal.request.Prepare; @@ -56,8 +59,10 @@ public class ReprepareOnUpTest { @Mock private ChannelPool pool; @Mock private DriverChannel channel; @Mock private EventLoop eventLoop; + @Mock private InternalDriverContext context; @Mock private DriverConfig config; @Mock private DriverConfigProfile defaultConfigProfile; + @Mock private TopologyMonitor topologyMonitor; private Runnable whenPrepared; private CompletionStage done; @@ -78,6 +83,11 @@ public void setup() { .thenReturn(0); Mockito.when(defaultConfigProfile.getInt(CoreDriverOption.REPREPARE_MAX_PARALLELISM)) .thenReturn(100); + Mockito.when(context.config()).thenReturn(config); + + Mockito.when(topologyMonitor.checkSchemaAgreement()) + .thenReturn(CompletableFuture.completedFuture(true)); + Mockito.when(context.topologyMonitor()).thenReturn(topologyMonitor); done = new CompletableFuture<>(); whenPrepared = () -> ((CompletableFuture) done).complete(null); @@ -87,7 +97,7 @@ public void setup() { public void should_complete_immediately_if_no_prepared_statements() { // Given MockReprepareOnUp reprepareOnUp = - new MockReprepareOnUp("test", pool, getMockPayloads(/*none*/ ), config, whenPrepared); + new MockReprepareOnUp("test", pool, getMockPayloads(/*none*/ ), context, whenPrepared); // When reprepareOnUp.start(); @@ -101,7 +111,7 @@ public void should_complete_immediately_if_pool_empty() { // Given Mockito.when(pool.next()).thenReturn(null); MockReprepareOnUp reprepareOnUp = - new MockReprepareOnUp("test", pool, getMockPayloads('a'), config, whenPrepared); + new MockReprepareOnUp("test", pool, getMockPayloads('a'), context, whenPrepared); // When reprepareOnUp.start(); @@ -114,7 +124,7 @@ public void should_complete_immediately_if_pool_empty() { public void should_reprepare_all_if_system_table_query_fails() { MockReprepareOnUp reprepareOnUp = new MockReprepareOnUp( - "test", pool, getMockPayloads('a', 'b', 'c', 'd', 'e', 'f'), config, whenPrepared); + "test", pool, getMockPayloads('a', 'b', 'c', 'd', 'e', 'f'), context, whenPrepared); reprepareOnUp.start(); @@ -138,7 +148,7 @@ public void should_reprepare_all_if_system_table_query_fails() { public void should_reprepare_all_if_system_table_empty() { MockReprepareOnUp reprepareOnUp = new MockReprepareOnUp( - "test", pool, getMockPayloads('a', 'b', 'c', 'd', 'e', 'f'), config, whenPrepared); + "test", pool, getMockPayloads('a', 'b', 'c', 'd', 'e', 'f'), context, whenPrepared); reprepareOnUp.start(); @@ -167,7 +177,7 @@ public void should_reprepare_all_if_system_query_disabled() { MockReprepareOnUp reprepareOnUp = new MockReprepareOnUp( - "test", pool, getMockPayloads('a', 'b', 'c', 'd', 'e', 'f'), config, whenPrepared); + "test", pool, getMockPayloads('a', 'b', 'c', 'd', 'e', 'f'), context, whenPrepared); reprepareOnUp.start(); @@ -186,7 +196,7 @@ public void should_reprepare_all_if_system_query_disabled() { public void should_not_reprepare_already_known_statements() { MockReprepareOnUp reprepareOnUp = new MockReprepareOnUp( - "test", pool, getMockPayloads('a', 'b', 'c', 'd', 'e', 'f'), config, whenPrepared); + "test", pool, getMockPayloads('a', 'b', 'c', 'd', 'e', 'f'), context, whenPrepared); reprepareOnUp.start(); @@ -208,6 +218,20 @@ public void should_not_reprepare_already_known_statements() { assertThat(done).isSuccess(v -> assertThat(reprepareOnUp.queries).isEmpty()); } + @Test + public void should_proceed_if_schema_agreement_not_reached() { + Mockito.when(topologyMonitor.checkSchemaAgreement()) + .thenReturn(CompletableFuture.completedFuture(false)); + should_not_reprepare_already_known_statements(); + } + + @Test + public void should_proceed_if_schema_agreement_fails() { + Mockito.when(topologyMonitor.checkSchemaAgreement()) + .thenReturn(CompletableFutures.failedFuture(new RuntimeException("test"))); + should_not_reprepare_already_known_statements(); + } + @Test public void should_limit_number_of_statements_to_reprepare() { Mockito.when(defaultConfigProfile.getInt(CoreDriverOption.REPREPARE_MAX_STATEMENTS)) @@ -215,7 +239,7 @@ public void should_limit_number_of_statements_to_reprepare() { MockReprepareOnUp reprepareOnUp = new MockReprepareOnUp( - "test", pool, getMockPayloads('a', 'b', 'c', 'd', 'e', 'f'), config, whenPrepared); + "test", pool, getMockPayloads('a', 'b', 'c', 'd', 'e', 'f'), context, whenPrepared); reprepareOnUp.start(); @@ -244,7 +268,7 @@ public void should_limit_number_of_statements_reprepared_in_parallel() { MockReprepareOnUp reprepareOnUp = new MockReprepareOnUp( - "test", pool, getMockPayloads('a', 'b', 'c', 'd', 'e', 'f'), config, whenPrepared); + "test", pool, getMockPayloads('a', 'b', 'c', 'd', 'e', 'f'), context, whenPrepared); reprepareOnUp.start(); @@ -298,9 +322,9 @@ private static class MockReprepareOnUp extends ReprepareOnUp { String logPrefix, ChannelPool pool, Map repreparePayloads, - DriverConfig config, + InternalDriverContext context, Runnable whenPrepared) { - super(logPrefix, pool, repreparePayloads, config, whenPrepared); + super(logPrefix, pool, repreparePayloads, context, whenPrepared); } @Override