From 658419da1725e4632a159d25a33d93f9a2f41a36 Mon Sep 17 00:00:00 2001 From: olim7t Date: Wed, 26 Apr 2017 08:57:00 -0700 Subject: [PATCH] Add cluster shutdown Introduce AsyncAutoCloseable and have all driver components implement it. Note: user-provided components like LBP and AddressTranslator are not closed yet. --- .../driver/api/core/AsyncAutoCloseable.java | 65 ++++ .../datastax/oss/driver/api/core/Cluster.java | 4 +- .../driver/internal/core/DefaultCluster.java | 124 +++++- .../core/context/DefaultNettyOptions.java | 2 +- .../internal/core/context/NettyOptions.java | 2 +- .../core/control/ControlConnection.java | 18 +- .../core/metadata/DefaultTopologyMonitor.java | 33 +- .../core/metadata/MetadataManager.java | 48 ++- .../core/metadata/NodeStateManager.java | 361 ++++++++++-------- .../core/metadata/TopologyMonitor.java | 3 +- .../internal/core/pool/ChannelPool.java | 29 +- .../core/util/concurrent/Debouncer.java | 16 + .../core/control/ControlConnectionTest.java | 10 +- .../metadata/DefaultTopologyMonitorTest.java | 13 + .../core/metadata/MetadataManagerTest.java | 10 +- .../core/metadata/NodeStateManagerTest.java | 13 + .../internal/core/pool/ChannelPoolTest.java | 4 +- .../core/util/concurrent/DebouncerTest.java | 32 ++ 18 files changed, 579 insertions(+), 208 deletions(-) create mode 100644 core/src/main/java/com/datastax/oss/driver/api/core/AsyncAutoCloseable.java diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/AsyncAutoCloseable.java b/core/src/main/java/com/datastax/oss/driver/api/core/AsyncAutoCloseable.java new file mode 100644 index 00000000000..b104c27e371 --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/api/core/AsyncAutoCloseable.java @@ -0,0 +1,65 @@ +/* + * Copyright (C) 2017-2017 DataStax Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.api.core; + +import com.datastax.oss.driver.internal.core.util.concurrent.BlockingOperation; +import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures; +import java.util.concurrent.CompletionStage; + +/** + * An object that can be closed in an asynchronous, non-blocking manner. + * + *

For convenience, this extends the JDK's {@code AutoCloseable} in order to be usable in + * try-with-resource blocks (in that case, the blocking {@link #close()} will be used). + */ +public interface AsyncAutoCloseable extends AutoCloseable { + + /** + * Returns a stage that will complete when {@link #close()} or {@link #forceCloseAsync()} is + * called, and the shutdown sequence completes. + */ + CompletionStage closeFuture(); + + /** + * Initiates an orderly shutdown: no new requests are accepted, but all pending requests are + * allowed to complete normally. + * + * @return a stage that will complete when the shutdown sequence is complete. Multiple calls to + * this method or {@link #forceCloseAsync()} always return the same instance. + */ + CompletionStage closeAsync(); + + /** + * Initiates a forced shutdown of this instance: no new requests are accepted, and all pending + * requests will complete with an exception. + * + * @return a stage that will complete when the shutdown sequence is complete. Multiple calls to + * this method or {@link #close()} always return the same instance. + */ + CompletionStage forceCloseAsync(); + + /** + * {@inheritDoc} + * + *

This method is implemented by calling {@link #closeAsync()} and blocking on the result. This + * should not be called on a driver thread. + */ + @Override + default void close() throws Exception { + BlockingOperation.checkNotDriverThread(); + CompletableFutures.getUninterruptibly(closeAsync().toCompletableFuture()); + } +} diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/Cluster.java b/core/src/main/java/com/datastax/oss/driver/api/core/Cluster.java index 737a6a4b202..1ab6904aa54 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/Cluster.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/Cluster.java @@ -18,9 +18,10 @@ import com.datastax.oss.driver.api.core.context.DriverContext; import com.datastax.oss.driver.api.core.metadata.Metadata; import com.datastax.oss.driver.api.core.metadata.Node; +import java.util.concurrent.CompletionStage; /** An instance of the driver, that connects to a Cassandra cluster. */ -public interface Cluster { +public interface Cluster extends AsyncAutoCloseable { /** Returns a builder to create a new instance of the default implementation. */ static ClusterBuilder builder() { return new ClusterBuilder(); @@ -44,5 +45,6 @@ static ClusterBuilder builder() { */ Metadata getMetadata(); + /** Returns a context that provides access to all the policies used by this driver instance. */ DriverContext getContext(); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/DefaultCluster.java b/core/src/main/java/com/datastax/oss/driver/internal/core/DefaultCluster.java index 0f9eb9da933..5ad704cec6f 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/DefaultCluster.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/DefaultCluster.java @@ -15,17 +15,24 @@ */ package com.datastax.oss.driver.internal.core; +import com.datastax.oss.driver.api.core.AsyncAutoCloseable; import com.datastax.oss.driver.api.core.Cluster; import com.datastax.oss.driver.api.core.context.DriverContext; import com.datastax.oss.driver.api.core.metadata.Metadata; import com.datastax.oss.driver.internal.core.context.InternalDriverContext; import com.datastax.oss.driver.internal.core.metadata.MetadataManager; import com.datastax.oss.driver.internal.core.metadata.NodeStateManager; +import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures; import com.datastax.oss.driver.internal.core.util.concurrent.RunOrSchedule; +import com.google.common.collect.ImmutableList; import io.netty.util.concurrent.EventExecutor; import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,15 +73,38 @@ public DriverContext getContext() { return context; } + @Override + public CompletionStage closeFuture() { + return singleThreaded.closeFuture; + } + + @Override + public CompletionStage closeAsync() { + RunOrSchedule.on(adminExecutor, singleThreaded::close); + return singleThreaded.closeFuture; + } + + @Override + public CompletionStage forceCloseAsync() { + RunOrSchedule.on(adminExecutor, singleThreaded::forceClose); + return singleThreaded.closeFuture; + } + private class SingleThreaded { private final InternalDriverContext context; private final Set initialContactPoints; + private final NodeStateManager nodeStateManager; private final CompletableFuture initFuture = new CompletableFuture<>(); private boolean initWasCalled; + private final CompletableFuture closeFuture = new CompletableFuture<>(); + private boolean closeWasCalled; + private boolean forceCloseWasCalled; + private List> childrenCloseFutures; private SingleThreaded(InternalDriverContext context, Set contactPoints) { this.context = context; + this.nodeStateManager = new NodeStateManager(context); this.initialContactPoints = contactPoints; } @@ -85,8 +115,6 @@ private void init() { } initWasCalled = true; - new NodeStateManager(context); - // If any contact points were provided, store them in the metadata right away (the // control connection will need them if it has to initialize) MetadataManager metadataManager = context.metadataManager(); @@ -124,5 +152,97 @@ private void init() { return null; }); } + + private void close() { + assert adminExecutor.inEventLoop(); + if (closeWasCalled) { + return; + } + closeWasCalled = true; + + LOG.debug("Closing {}", this); + childrenCloseFutures = new ArrayList<>(); + for (AsyncAutoCloseable closeable : internalComponentsToClose()) { + LOG.debug("Closing {}", closeable); + childrenCloseFutures.add(closeable.closeAsync()); + } + CompletableFutures.whenAllDone(childrenCloseFutures) + .whenCompleteAsync(this::onChildrenClosed, adminExecutor); + } + + private void forceClose() { + assert adminExecutor.inEventLoop(); + if (forceCloseWasCalled) { + return; + } + forceCloseWasCalled = true; + + LOG.debug("Force-closing {} (was {}closed before)", this, (closeWasCalled ? "" : "not ")); + + if (closeWasCalled) { + // childrenCloseFutures is already created, and onChildrenClosed has already been called + for (AsyncAutoCloseable closeable : internalComponentsToClose()) { + LOG.debug("Force-closing {}", closeable); + closeable.forceCloseAsync(); + } + } else { + closeWasCalled = true; + childrenCloseFutures = new ArrayList<>(); + for (AsyncAutoCloseable closeable : internalComponentsToClose()) { + LOG.debug("Force-closing {}", closeable); + childrenCloseFutures.add(closeable.forceCloseAsync()); + } + CompletableFutures.whenAllDone(childrenCloseFutures) + .whenCompleteAsync(this::onChildrenClosed, adminExecutor); + } + } + + private void onChildrenClosed(@SuppressWarnings("unused") Void ignored, Throwable error) { + assert adminExecutor.inEventLoop(); + if (error != null) { + LOG.warn("Unexpected error while closing", error); + } + try { + for (CompletionStage future : childrenCloseFutures) { + warnIfFailed(future); + } + context + .nettyOptions() + .onClose() + .addListener( + f -> { + if (!f.isSuccess()) { + closeFuture.completeExceptionally(f.cause()); + } else { + closeFuture.complete(null); + } + }); + } catch (Throwable t) { + // Being paranoid here, but we don't want to risk swallowing an exception and leaving close + // hanging + LOG.warn("Unexpected error while closing", t); + } + } + + private void warnIfFailed(CompletionStage stage) { + CompletableFuture future = stage.toCompletableFuture(); + assert future.isDone(); + if (future.isCompletedExceptionally()) { + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + // InterruptedException can't happen actually, but including it to make compiler happy + LOG.warn("Unexpected error while closing", e.getCause()); + } + } + } + + private List internalComponentsToClose() { + return ImmutableList.of( + nodeStateManager, + metadataManager, + context.topologyMonitor(), + context.controlConnection()); + } } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/context/DefaultNettyOptions.java b/core/src/main/java/com/datastax/oss/driver/internal/core/context/DefaultNettyOptions.java index c3bf98c5f92..86060848c4a 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/context/DefaultNettyOptions.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/context/DefaultNettyOptions.java @@ -76,7 +76,7 @@ public void afterChannelInitialized(Channel channel) { } @Override - public Future onShutdown() { + public Future onClose() { return ioEventLoopGroup.shutdownGracefully(); } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/context/NettyOptions.java b/core/src/main/java/com/datastax/oss/driver/internal/core/context/NettyOptions.java index 5186ae2244d..532741c20bd 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/context/NettyOptions.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/context/NettyOptions.java @@ -79,5 +79,5 @@ public interface NettyOptions { * that you have allocated elsewhere in this component, for example shut down custom event loop * groups. */ - Future onShutdown(); + Future onClose(); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java b/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java index 6ade97ee58e..728c8621dc9 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java @@ -16,6 +16,7 @@ package com.datastax.oss.driver.internal.core.control; import com.datastax.oss.driver.api.core.AllNodesFailedException; +import com.datastax.oss.driver.api.core.AsyncAutoCloseable; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.internal.core.channel.ChannelEvent; import com.datastax.oss.driver.internal.core.channel.DriverChannel; @@ -57,7 +58,7 @@ *

If a custom {@link TopologyMonitor} is used, the control connection is used only for schema * refreshes; if schema metadata is also disabled, the control connection never initializes. */ -public class ControlConnection implements EventCallback { +public class ControlConnection implements EventCallback, AsyncAutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(ControlConnection.class); private final InternalDriverContext context; @@ -103,8 +104,19 @@ public void reconnectNow() { RunOrSchedule.on(adminExecutor, singleThreaded::reconnectNow); } - /** Note: control queries are never critical, so there is no graceful close. */ - public CompletionStage forceClose() { + @Override + public CompletionStage closeFuture() { + return singleThreaded.closeFuture; + } + + @Override + public CompletionStage closeAsync() { + // Control queries are never critical, so there is no graceful close. + return forceCloseAsync(); + } + + @Override + public CompletionStage forceCloseAsync() { RunOrSchedule.on(adminExecutor, singleThreaded::forceClose); return singleThreaded.closeFuture; } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java index 6e5695d975b..edf16bc94b1 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java @@ -24,6 +24,7 @@ import com.datastax.oss.driver.internal.core.channel.DriverChannel; import com.datastax.oss.driver.internal.core.context.InternalDriverContext; import com.datastax.oss.driver.internal.core.control.ControlConnection; +import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import java.net.InetAddress; @@ -54,23 +55,31 @@ public class DefaultTopologyMonitor implements TopologyMonitor { private final ControlConnection controlConnection; private final AddressTranslator addressTranslator; private final Duration timeout; + private final CompletableFuture closeFuture; @VisibleForTesting volatile int port = -1; public DefaultTopologyMonitor(InternalDriverContext context) { this.controlConnection = context.controlConnection(); - addressTranslator = context.addressTranslator(); + this.addressTranslator = context.addressTranslator(); DriverConfigProfile config = context.config().defaultProfile(); this.timeout = config.getDuration(CoreDriverOption.CONTROL_CONNECTION_TIMEOUT); + this.closeFuture = new CompletableFuture<>(); } @Override public CompletionStage init() { + if (closeFuture.isDone()) { + return CompletableFutures.failedFuture(new IllegalStateException("closed")); + } return controlConnection.init(true); } @Override public CompletionStage> refreshNode(Node node) { + if (closeFuture.isDone()) { + return CompletableFutures.failedFuture(new IllegalStateException("closed")); + } LOG.debug("Refreshing info for {}", node); DriverChannel channel = controlConnection.channel(); if (node.getConnectAddress().equals(channel.address())) { @@ -93,6 +102,9 @@ public CompletionStage> refreshNode(Node node) { @Override public CompletionStage> getNewNodeInfo(InetSocketAddress connectAddress) { + if (closeFuture.isDone()) { + return CompletableFutures.failedFuture(new IllegalStateException("closed")); + } LOG.debug("Fetching info for new node {}", connectAddress); DriverChannel channel = controlConnection.channel(); return query(channel, "SELECT * FROM system.peers") @@ -101,6 +113,9 @@ public CompletionStage> getNewNodeInfo(InetSocketAddress conn @Override public CompletionStage> refreshNodeList() { + if (closeFuture.isDone()) { + return CompletableFutures.failedFuture(new IllegalStateException("closed")); + } LOG.debug("Refreshing node list"); DriverChannel channel = controlConnection.channel(); savePort(channel); @@ -120,6 +135,22 @@ public CompletionStage> refreshNodeList() { }); } + @Override + public CompletionStage closeFuture() { + return closeFuture; + } + + @Override + public CompletionStage closeAsync() { + closeFuture.complete(null); + return closeFuture; + } + + @Override + public CompletionStage forceCloseAsync() { + return closeAsync(); + } + @VisibleForTesting protected CompletionStage query( DriverChannel channel, String queryString, Map parameters) { diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java index 061163bc75e..ed34d13c736 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java @@ -15,6 +15,7 @@ */ package com.datastax.oss.driver.internal.core.metadata; +import com.datastax.oss.driver.api.core.AsyncAutoCloseable; import com.datastax.oss.driver.api.core.metadata.Metadata; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.internal.core.context.InternalDriverContext; @@ -31,7 +32,7 @@ import org.slf4j.LoggerFactory; /** Holds the immutable instance of the {@link Metadata}, and handles requests to update it. */ -public class MetadataManager { +public class MetadataManager implements AsyncAutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(MetadataManager.class); private final InternalDriverContext context; @@ -42,7 +43,7 @@ public class MetadataManager { public MetadataManager(InternalDriverContext context) { this.context = context; this.adminExecutor = context.nettyOptions().adminEventExecutorGroup().next(); - this.singleThreaded = new SingleThreaded(context); + this.singleThreaded = new SingleThreaded(); this.metadata = DefaultMetadata.EMPTY; } @@ -115,14 +116,27 @@ public void refreshSchema( // TODO refresh schema metadata } - // TODO user-controlled refresh, shutdown? + // TODO user-controlled refresh? - private class SingleThreaded { - private final InternalDriverContext context; + @Override + public CompletionStage closeFuture() { + return singleThreaded.closeFuture; + } - private SingleThreaded(InternalDriverContext context) { - this.context = context; - } + @Override + public CompletionStage closeAsync() { + RunOrSchedule.on(adminExecutor, singleThreaded::close); + return singleThreaded.closeFuture; + } + + @Override + public CompletionStage forceCloseAsync() { + return this.closeAsync(); + } + + private class SingleThreaded { + private final CompletableFuture closeFuture = new CompletableFuture<>(); + private boolean closeWasCalled; private void initNodes( Set addresses, CompletableFuture initNodesFuture) { @@ -162,15 +176,25 @@ private void addNode(InetSocketAddress address, Optional maybeInfo) { private void removeNode(InetSocketAddress address) { refresh(new RemoveNodeRefresh(metadata, address)); } + + private void close() { + if (closeWasCalled) { + return; + } + closeWasCalled = true; + closeFuture.complete(null); + } } @VisibleForTesting Void refresh(MetadataRefresh refresh) { assert adminExecutor.inEventLoop(); - refresh.compute(); - metadata = refresh.newMetadata; - for (Object event : refresh.events) { - context.eventBus().fire(event); + if (!singleThreaded.closeWasCalled) { + refresh.compute(); + metadata = refresh.newMetadata; + for (Object event : refresh.events) { + context.eventBus().fire(event); + } } return null; } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/NodeStateManager.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/NodeStateManager.java index 6fcd86cd6ca..840eda1a9d2 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/NodeStateManager.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/NodeStateManager.java @@ -15,6 +15,7 @@ */ package com.datastax.oss.driver.internal.core.metadata; +import com.datastax.oss.driver.api.core.AsyncAutoCloseable; import com.datastax.oss.driver.api.core.config.CoreDriverOption; import com.datastax.oss.driver.api.core.config.DriverConfigProfile; import com.datastax.oss.driver.api.core.metadata.NodeState; @@ -29,6 +30,8 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,184 +41,228 @@ * *

See {@link NodeState} and {@link TopologyEvent} for a description of the state change rules. */ -public class NodeStateManager { +public class NodeStateManager implements AsyncAutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(NodeStateManager.class); private final EventExecutor adminExecutor; - private final MetadataManager metadataManager; - private final EventBus eventBus; - private final Debouncer> topologyEventDebouncer; + private final SingleThreaded singleThreaded; public NodeStateManager(InternalDriverContext context) { this.adminExecutor = context.nettyOptions().adminEventExecutorGroup().next(); - this.metadataManager = context.metadataManager(); - - DriverConfigProfile config = context.config().defaultProfile(); - this.topologyEventDebouncer = - new Debouncer<>( - adminExecutor, - this::coalesceTopologyEvents, - this::flushTopologyEvents, - config.getDuration(CoreDriverOption.METADATA_TOPOLOGY_WINDOW), - config.getInt(CoreDriverOption.METADATA_TOPOLOGY_MAX_EVENTS)); - - this.eventBus = context.eventBus(); - this.eventBus.register( - ChannelEvent.class, RunOrSchedule.on(adminExecutor, this::onChannelEvent)); - this.eventBus.register( - TopologyEvent.class, RunOrSchedule.on(adminExecutor, this::onTopologyEvent)); - // Note: this component exists for the whole life of the driver instance, so don't worry about - // unregistering the listeners. + this.singleThreaded = new SingleThreaded(context); } - private void onChannelEvent(ChannelEvent event) { - assert adminExecutor.inEventLoop(); - LOG.debug("Processing {}", event); - @SuppressWarnings("SuspiciousMethodCalls") - DefaultNode node = (DefaultNode) metadataManager.getMetadata().getNodes().get(event.address); - assert node != null; - switch (event.type) { - case OPENED: - node.openConnections += 1; - if (node.state == NodeState.DOWN || node.state == NodeState.UNKNOWN) { - setState(node, NodeState.UP, "a new connection was opened to it"); - } - break; - case CLOSED: - node.openConnections -= 1; - break; - case RECONNECTION_STARTED: - node.reconnections += 1; - if (node.openConnections == 0) { - setState(node, NodeState.DOWN, "it has no connections and started reconnecting"); - } - break; - case RECONNECTION_STOPPED: - node.reconnections -= 1; - break; - } + @Override + public CompletionStage closeFuture() { + return singleThreaded.closeFuture; } - private void onDebouncedTopologyEvent(TopologyEvent event) { - assert adminExecutor.inEventLoop(); - LOG.debug("Processing {}", event); - DefaultNode node = (DefaultNode) metadataManager.getMetadata().getNodes().get(event.address); - switch (event.type) { - case SUGGEST_UP: - if (node == null) { - LOG.debug("Received UP event for unknown node {}, adding it", event.address); - metadataManager.addNode(event.address); - } else if (node.state == NodeState.FORCED_DOWN) { - LOG.debug("Not setting {} UP because it is FORCED_DOWN", node); - } else { - setState(node, NodeState.UP, "an UP topology event was received"); - } - break; - case SUGGEST_DOWN: - if (node == null) { - LOG.debug("Received DOWN event for unknown node {}, ignoring it", event.address); - } else if (node.openConnections > 0) { - LOG.debug("Not setting {} DOWN because it still has active connections", node); - } else if (node.state == NodeState.FORCED_DOWN) { - LOG.debug("Not setting {} DOWN because it is FORCED_DOWN", node); - } else { - setState(node, NodeState.DOWN, "a DOWN topology event was received"); - } - break; - case FORCE_UP: - if (node == null) { - LOG.debug("Received FORCE_UP event for unknown node {}, adding it", event.address); - metadataManager.addNode(event.address); - } else { - setState(node, NodeState.UP, "a FORCE_UP topology event was received"); - } - break; - case FORCE_DOWN: - if (node == null) { - LOG.debug("Received FORCE_DOWN event for unknown node {}, ignoring it", event.address); - } else { - setState(node, NodeState.FORCED_DOWN, "a FORCE_DOWN topology event was received"); - } - break; - case SUGGEST_ADDED: - if (node != null) { - LOG.debug( - "Received ADDED event for {} but it is already in our metadata, ignoring", node); - } else { - metadataManager.addNode(event.address); - } - break; - case SUGGEST_REMOVED: - if (node == null) { - LOG.debug( - "Received REMOVED event for {} but it is not in our metadata, ignoring", - event.address); - } else { - metadataManager.removeNode(event.address); - } - break; - } + @Override + public CompletionStage closeAsync() { + RunOrSchedule.on(adminExecutor, singleThreaded::close); + return singleThreaded.closeFuture; } - // Called by the event bus, needs debouncing - private void onTopologyEvent(TopologyEvent event) { - assert adminExecutor.inEventLoop(); - topologyEventDebouncer.receive(event); + @Override + public CompletionStage forceCloseAsync() { + return closeAsync(); } - // Called to process debounced events before flushing - private Collection coalesceTopologyEvents(List events) { - assert adminExecutor.inEventLoop(); - Collection result; - if (events.size() == 1) { - result = events; - } else { - // Keep the last FORCE* event for each node, or if there is none the last normal event - Map last = Maps.newHashMapWithExpectedSize(events.size()); - for (TopologyEvent event : events) { - if (event.isForceEvent() - || !last.containsKey(event.address) - || !last.get(event.address).isForceEvent()) { - last.put(event.address, event); - } + private class SingleThreaded { + + private final MetadataManager metadataManager; + private final EventBus eventBus; + private final Debouncer> topologyEventDebouncer; + private final CompletableFuture closeFuture = new CompletableFuture<>(); + private boolean closeWasCalled; + + private SingleThreaded(InternalDriverContext context) { + this.metadataManager = context.metadataManager(); + + DriverConfigProfile config = context.config().defaultProfile(); + this.topologyEventDebouncer = + new Debouncer<>( + adminExecutor, + this::coalesceTopologyEvents, + this::flushTopologyEvents, + config.getDuration(CoreDriverOption.METADATA_TOPOLOGY_WINDOW), + config.getInt(CoreDriverOption.METADATA_TOPOLOGY_MAX_EVENTS)); + + this.eventBus = context.eventBus(); + this.eventBus.register( + ChannelEvent.class, RunOrSchedule.on(adminExecutor, this::onChannelEvent)); + this.eventBus.register( + TopologyEvent.class, RunOrSchedule.on(adminExecutor, this::onTopologyEvent)); + // Note: this component exists for the whole life of the driver instance, so don't worry about + // unregistering the listeners. + + } + + private void onChannelEvent(ChannelEvent event) { + assert adminExecutor.inEventLoop(); + if (closeWasCalled) { + return; + } + LOG.debug("Processing {}", event); + @SuppressWarnings("SuspiciousMethodCalls") + DefaultNode node = (DefaultNode) metadataManager.getMetadata().getNodes().get(event.address); + assert node != null; + switch (event.type) { + case OPENED: + node.openConnections += 1; + if (node.state == NodeState.DOWN || node.state == NodeState.UNKNOWN) { + setState(node, NodeState.UP, "a new connection was opened to it"); + } + break; + case CLOSED: + node.openConnections -= 1; + break; + case RECONNECTION_STARTED: + node.reconnections += 1; + if (node.openConnections == 0) { + setState(node, NodeState.DOWN, "it has no connections and started reconnecting"); + } + break; + case RECONNECTION_STOPPED: + node.reconnections -= 1; + break; } - result = last.values(); } - LOG.debug("Coalesced topology events: {} => {}", events, result); - return result; - } - // Called when the debouncer flushes - private void flushTopologyEvents(Collection events) { - assert adminExecutor.inEventLoop(); - for (TopologyEvent event : events) { - onDebouncedTopologyEvent(event); + private void onDebouncedTopologyEvent(TopologyEvent event) { + assert adminExecutor.inEventLoop(); + if (closeWasCalled) { + return; + } + LOG.debug("Processing {}", event); + DefaultNode node = (DefaultNode) metadataManager.getMetadata().getNodes().get(event.address); + switch (event.type) { + case SUGGEST_UP: + if (node == null) { + LOG.debug("Received UP event for unknown node {}, adding it", event.address); + metadataManager.addNode(event.address); + } else if (node.state == NodeState.FORCED_DOWN) { + LOG.debug("Not setting {} UP because it is FORCED_DOWN", node); + } else { + setState(node, NodeState.UP, "an UP topology event was received"); + } + break; + case SUGGEST_DOWN: + if (node == null) { + LOG.debug("Received DOWN event for unknown node {}, ignoring it", event.address); + } else if (node.openConnections > 0) { + LOG.debug("Not setting {} DOWN because it still has active connections", node); + } else if (node.state == NodeState.FORCED_DOWN) { + LOG.debug("Not setting {} DOWN because it is FORCED_DOWN", node); + } else { + setState(node, NodeState.DOWN, "a DOWN topology event was received"); + } + break; + case FORCE_UP: + if (node == null) { + LOG.debug("Received FORCE_UP event for unknown node {}, adding it", event.address); + metadataManager.addNode(event.address); + } else { + setState(node, NodeState.UP, "a FORCE_UP topology event was received"); + } + break; + case FORCE_DOWN: + if (node == null) { + LOG.debug("Received FORCE_DOWN event for unknown node {}, ignoring it", event.address); + } else { + setState(node, NodeState.FORCED_DOWN, "a FORCE_DOWN topology event was received"); + } + break; + case SUGGEST_ADDED: + if (node != null) { + LOG.debug( + "Received ADDED event for {} but it is already in our metadata, ignoring", node); + } else { + metadataManager.addNode(event.address); + } + break; + case SUGGEST_REMOVED: + if (node == null) { + LOG.debug( + "Received REMOVED event for {} but it is not in our metadata, ignoring", + event.address); + } else { + metadataManager.removeNode(event.address); + } + break; + } + } + + // Called by the event bus, needs debouncing + private void onTopologyEvent(TopologyEvent event) { + assert adminExecutor.inEventLoop(); + topologyEventDebouncer.receive(event); } - } - private void setState(DefaultNode node, NodeState newState, String reason) { - NodeState oldState = node.state; - if (oldState != newState) { - LOG.debug("Transitioning {} {}=>{} (because {})", node, oldState, newState, reason); - node.state = newState; - if (newState != NodeState.UP) { - // Fire the event immediately - eventBus.fire(NodeStateEvent.changed(oldState, newState, node)); + // Called to process debounced events before flushing + private Collection coalesceTopologyEvents(List events) { + assert adminExecutor.inEventLoop(); + Collection result; + if (events.size() == 1) { + result = events; } else { - // Refresh the node first (but still fire event if that fails) - metadataManager - .refreshNode(node) - .whenComplete( - (success, error) -> { - try { - if (error != null) { - LOG.debug("Error while refreshing info for " + node, error); + // Keep the last FORCE* event for each node, or if there is none the last normal event + Map last = Maps.newHashMapWithExpectedSize(events.size()); + for (TopologyEvent event : events) { + if (event.isForceEvent() + || !last.containsKey(event.address) + || !last.get(event.address).isForceEvent()) { + last.put(event.address, event); + } + } + result = last.values(); + } + LOG.debug("Coalesced topology events: {} => {}", events, result); + return result; + } + + // Called when the debouncer flushes + private void flushTopologyEvents(Collection events) { + assert adminExecutor.inEventLoop(); + for (TopologyEvent event : events) { + onDebouncedTopologyEvent(event); + } + } + + private void close() { + assert adminExecutor.inEventLoop(); + if (closeWasCalled) { + return; + } + closeWasCalled = true; + topologyEventDebouncer.stop(); + closeFuture.complete(null); + } + + private void setState(DefaultNode node, NodeState newState, String reason) { + NodeState oldState = node.state; + if (oldState != newState) { + LOG.debug("Transitioning {} {}=>{} (because {})", node, oldState, newState, reason); + node.state = newState; + if (newState != NodeState.UP) { + // Fire the event immediately + eventBus.fire(NodeStateEvent.changed(oldState, newState, node)); + } else { + // Refresh the node first (but still fire event if that fails) + metadataManager + .refreshNode(node) + .whenComplete( + (success, error) -> { + try { + if (error != null) { + LOG.debug("Error while refreshing info for " + node, error); + } + eventBus.fire(NodeStateEvent.changed(oldState, newState, node)); + } catch (Throwable t) { + LOG.warn("Unexpected exception", t); } - eventBus.fire(NodeStateEvent.changed(oldState, newState, node)); - } catch (Throwable t) { - LOG.warn("Unexpected exception", t); - } - }); + }); + } } } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/TopologyMonitor.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/TopologyMonitor.java index b8a80cb478b..e999734a249 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/TopologyMonitor.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/TopologyMonitor.java @@ -15,6 +15,7 @@ */ package com.datastax.oss.driver.internal.core.metadata; +import com.datastax.oss.driver.api.core.AsyncAutoCloseable; import com.datastax.oss.driver.api.core.Cluster; import com.datastax.oss.driver.api.core.addresstranslation.AddressTranslator; import com.datastax.oss.driver.api.core.metadata.Node; @@ -37,7 +38,7 @@ * refreshes are done with queries to system tables. If you prefer to rely on an external monitoring * tool, this can be completely overridden. */ -public interface TopologyMonitor { +public interface TopologyMonitor extends AsyncAutoCloseable { /** * Triggers the initialization of the monitor. diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/pool/ChannelPool.java b/core/src/main/java/com/datastax/oss/driver/internal/core/pool/ChannelPool.java index d359a0333af..5d504a82e43 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/pool/ChannelPool.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/pool/ChannelPool.java @@ -15,6 +15,7 @@ */ package com.datastax.oss.driver.internal.core.pool; +import com.datastax.oss.driver.api.core.AsyncAutoCloseable; import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.internal.core.channel.ChannelEvent; import com.datastax.oss.driver.internal.core.channel.ChannelFactory; @@ -53,7 +54,7 @@ *

If one or more channels go down, a reconnection process starts in order to replace them; it * runs until the channel count is back to its intended target. */ -public class ChannelPool { +public class ChannelPool implements AsyncAutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(ChannelPool.class); /** @@ -120,26 +121,20 @@ public CompletionStage setKeyspace(CqlIdentifier newKeyspaceName) { return RunOrSchedule.on(adminExecutor, () -> singleThreaded.setKeyspace(newKeyspaceName)); } - /** - * Closes the pool gracefully: subsequent calls to {@link #next()} will fail, but the pool's - * channels will be closed gracefully (allowing pending requests to complete). - */ - public CompletionStage close() { - RunOrSchedule.on(adminExecutor, singleThreaded::close); + @Override + public CompletionStage closeFuture() { return singleThreaded.closeFuture; } - /** - * Closes the pool forcefully: subsequent calls to {@link #next()} will fail, and the pool's - * channels will be closed forcefully (aborting pending requests). - */ - public CompletionStage forceClose() { - RunOrSchedule.on(adminExecutor, singleThreaded::forceClose); + @Override + public CompletionStage closeAsync() { + RunOrSchedule.on(adminExecutor, singleThreaded::close); return singleThreaded.closeFuture; } - /** Does not close the pool, but returns a completion stage that will complete when it does. */ - public CompletionStage closeFuture() { + @Override + public CompletionStage forceCloseAsync() { + RunOrSchedule.on(adminExecutor, singleThreaded::forceClose); return singleThreaded.closeFuture; } @@ -156,7 +151,7 @@ private class SingleThreaded { private int wantedCount; private CompletableFuture connectFuture = new CompletableFuture<>(); private boolean isConnecting; - private CompletableFuture closeFuture = new CompletableFuture<>(); + private CompletableFuture closeFuture = new CompletableFuture<>(); private boolean isClosing; private CompletableFuture setKeyspaceFuture; @@ -366,7 +361,7 @@ private void close() { eventBus.fire(ChannelEvent.channelClosed(address)); return channel.close(); }, - () -> closeFuture.complete(ChannelPool.this), + () -> closeFuture.complete(null), (channel, error) -> LOG.warn(ChannelPool.this + " error closing channel " + channel, error)); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/util/concurrent/Debouncer.java b/core/src/main/java/com/datastax/oss/driver/internal/core/util/concurrent/Debouncer.java index 6c8ae599a7d..091bdd17c9f 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/util/concurrent/Debouncer.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/util/concurrent/Debouncer.java @@ -50,6 +50,7 @@ public class Debouncer { private List currentBatch = new ArrayList<>(); private ScheduledFuture nextFlush; + private boolean stopped; /** * Creates a new instance. @@ -77,6 +78,9 @@ public Debouncer( /** This must be called on eventExecutor too. */ public void receive(T element) { assert adminExecutor.inEventLoop(); + if (stopped) { + return; + } if (window.isZero() || maxEvents == 1) { LOG.debug( "Received {}, flushing immediately (window = {}, maxEvents = {})", @@ -123,4 +127,16 @@ private void cancelNextFlush() { } } } + + /** + * Stop debouncing: the next flush is cancelled, and all pending and future events will be + * ignored. + */ + public void stop() { + assert adminExecutor.inEventLoop(); + if (!stopped) { + stopped = true; + cancelNextFlush(); + } + } } diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTest.java index 48ce8ce602f..f855a99f945 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTest.java @@ -33,7 +33,7 @@ public class ControlConnectionTest extends ControlConnectionTestBase { @Test public void should_close_successfully_if_it_was_never_init() { // When - CompletionStage closeFuture = controlConnection.forceClose(); + CompletionStage closeFuture = controlConnection.forceCloseAsync(); // Then assertThat(closeFuture).isSuccess(); @@ -261,7 +261,7 @@ public void should_not_force_reconnection_if_closed() { factoryHelper.waitForCall(ADDRESS1); waitForPendingAdminTasks(); assertThat(initFuture).isSuccess(); - CompletionStage closeFuture = controlConnection.forceClose(); + CompletionStage closeFuture = controlConnection.forceCloseAsync(); assertThat(closeFuture).isSuccess(); // When @@ -287,7 +287,7 @@ public void should_close_channel_when_closing() { assertThat(initFuture).isSuccess(); // When - CompletionStage closeFuture = controlConnection.forceClose(); + CompletionStage closeFuture = controlConnection.forceCloseAsync(); waitForPendingAdminTasks(); // Then @@ -331,7 +331,7 @@ public void should_close_channel_if_closed_during_reconnection() { // When // the control connection gets closed before channel2 initialization is complete - controlConnection.forceClose(); + controlConnection.forceCloseAsync(); waitForPendingAdminTasks(); channel2Future.complete(channel2); waitForPendingAdminTasks(); @@ -377,7 +377,7 @@ public void should_handle_channel_failure_if_closed_during_reconnection() { // When // the control connection gets closed before channel1 initialization fails - controlConnection.forceClose(); + controlConnection.forceCloseAsync(); channel1Future.completeExceptionally(new Exception("mock failure")); waitForPendingAdminTasks(); diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitorTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitorTest.java index 15159e7e6e8..e1c4dc9443c 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitorTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitorTest.java @@ -224,6 +224,19 @@ public void should_refresh_node_list_from_local_and_peers() { }); } + @Test + public void should_stop_executing_queries_once_closed() throws Exception { + // Given + topologyMonitor.close(); + + // When + CompletionStage> futureInfos = topologyMonitor.refreshNodeList(); + + // Then + assertThat(futureInfos) + .isFailed(error -> assertThat(error).isInstanceOf(IllegalStateException.class)); + } + /** Mocks the query execution logic. */ private static class TestTopologyMonitor extends DefaultTopologyMonitor { diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/MetadataManagerTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/MetadataManagerTest.java index 041a7918b4a..1e11b66caee 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/MetadataManagerTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/MetadataManagerTest.java @@ -69,6 +69,11 @@ public void setup() { metadataManager = new TestMetadataManager(context); } + @AfterMethod + public void teardown() { + adminEventLoopGroup.shutdownGracefully(100, 200, TimeUnit.MILLISECONDS); + } + @Test public void should_add_contact_points() { // When @@ -200,11 +205,6 @@ public void should_remove_node() { assertThat(refresh.toRemove).isEqualTo(ADDRESS1); } - @AfterMethod - public void teardown() { - adminEventLoopGroup.shutdownGracefully(100, 200, TimeUnit.MILLISECONDS); - } - private class TestMetadataManager extends MetadataManager { private List refreshes = new CopyOnWriteArrayList<>(); diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/NodeStateManagerTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/NodeStateManagerTest.java index f885fbc5eb6..3ed41f5774e 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/NodeStateManagerTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/NodeStateManagerTest.java @@ -493,6 +493,19 @@ public void should_keep_node_up_if_reconnection_starts_with_some_connections() { Mockito.verify(eventBus, never()).fire(any(NodeStateEvent.class)); } + @Test + public void should_ignore_events_when_closed() throws Exception { + NodeStateManager manager = new NodeStateManager(context); + assertThat(node1.reconnections).isEqualTo(0); + + manager.close(); + + eventBus.fire(ChannelEvent.reconnectionStarted(node1.getConnectAddress())); + waitForPendingAdminTasks(); + + assertThat(node1.reconnections).isEqualTo(0); + } + // Wait for all the tasks on the pool's admin executor to complete. private void waitForPendingAdminTasks() { // This works because the event loop group is single-threaded diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/pool/ChannelPoolTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/pool/ChannelPoolTest.java index f99efa513fe..03e6dca692d 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/pool/ChannelPoolTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/pool/ChannelPoolTest.java @@ -589,7 +589,7 @@ public void should_close_all_channels_when_closed() throws Exception { Mockito.verify(reconnectionSchedule).nextDelay(); factoryHelper.waitForCalls(ADDRESS, 1); - CompletionStage closeFuture = pool.close(); + CompletionStage closeFuture = pool.closeAsync(); waitForPendingAdminTasks(); // The two original channels were closed normally @@ -651,7 +651,7 @@ public void should_force_close_all_channels_when_force_closed() throws Exception Mockito.verify(reconnectionSchedule).nextDelay(); factoryHelper.waitForCalls(ADDRESS, 1); - CompletionStage closeFuture = pool.forceClose(); + CompletionStage closeFuture = pool.forceCloseAsync(); waitForPendingAdminTasks(); // The two original channels were force-closed diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/util/concurrent/DebouncerTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/util/concurrent/DebouncerTest.java index 338a741cd93..73582c8e4aa 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/util/concurrent/DebouncerTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/util/concurrent/DebouncerTest.java @@ -162,4 +162,36 @@ public void should_force_flush_after_max_events() { Mockito.verify(scheduledFuture, times(9)).cancel(true); assertThat(results).containsExactly("0,1,2,3,4,5,6,7,8,9"); } + + @Test + public void should_cancel_next_flush_when_stopped() { + Debouncer debouncer = + new Debouncer<>( + adminExecutor, this::coalesce, this::flush, DEFAULT_WINDOW, DEFAULT_MAX_EVENTS); + + debouncer.receive(1); + Mockito.verify(adminExecutor) + .schedule( + Mockito.any(Runnable.class), + Mockito.eq(DEFAULT_WINDOW.toNanos()), + Mockito.eq(TimeUnit.NANOSECONDS)); + + debouncer.stop(); + Mockito.verify(scheduledFuture).cancel(true); + } + + @Test + public void should_ignore_new_events_when_flushed() { + Debouncer debouncer = + new Debouncer<>( + adminExecutor, this::coalesce, this::flush, DEFAULT_WINDOW, DEFAULT_MAX_EVENTS); + debouncer.stop(); + + debouncer.receive(1); + Mockito.verify(adminExecutor, never()) + .schedule( + Mockito.any(Runnable.class), + Mockito.eq(DEFAULT_WINDOW.toNanos()), + Mockito.eq(TimeUnit.NANOSECONDS)); + } }