diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/AsyncAutoCloseable.java b/core/src/main/java/com/datastax/oss/driver/api/core/AsyncAutoCloseable.java
new file mode 100644
index 00000000000..b104c27e371
--- /dev/null
+++ b/core/src/main/java/com/datastax/oss/driver/api/core/AsyncAutoCloseable.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright (C) 2017-2017 DataStax Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datastax.oss.driver.api.core;
+
+import com.datastax.oss.driver.internal.core.util.concurrent.BlockingOperation;
+import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
+import java.util.concurrent.CompletionStage;
+
+/**
+ * An object that can be closed in an asynchronous, non-blocking manner.
+ *
+ *
For convenience, this extends the JDK's {@code AutoCloseable} in order to be usable in
+ * try-with-resource blocks (in that case, the blocking {@link #close()} will be used).
+ */
+public interface AsyncAutoCloseable extends AutoCloseable {
+
+ /**
+ * Returns a stage that will complete when {@link #close()} or {@link #forceCloseAsync()} is
+ * called, and the shutdown sequence completes.
+ */
+ CompletionStage closeFuture();
+
+ /**
+ * Initiates an orderly shutdown: no new requests are accepted, but all pending requests are
+ * allowed to complete normally.
+ *
+ * @return a stage that will complete when the shutdown sequence is complete. Multiple calls to
+ * this method or {@link #forceCloseAsync()} always return the same instance.
+ */
+ CompletionStage closeAsync();
+
+ /**
+ * Initiates a forced shutdown of this instance: no new requests are accepted, and all pending
+ * requests will complete with an exception.
+ *
+ * @return a stage that will complete when the shutdown sequence is complete. Multiple calls to
+ * this method or {@link #close()} always return the same instance.
+ */
+ CompletionStage forceCloseAsync();
+
+ /**
+ * {@inheritDoc}
+ *
+ * This method is implemented by calling {@link #closeAsync()} and blocking on the result. This
+ * should not be called on a driver thread.
+ */
+ @Override
+ default void close() throws Exception {
+ BlockingOperation.checkNotDriverThread();
+ CompletableFutures.getUninterruptibly(closeAsync().toCompletableFuture());
+ }
+}
diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/Cluster.java b/core/src/main/java/com/datastax/oss/driver/api/core/Cluster.java
index 737a6a4b202..1ab6904aa54 100644
--- a/core/src/main/java/com/datastax/oss/driver/api/core/Cluster.java
+++ b/core/src/main/java/com/datastax/oss/driver/api/core/Cluster.java
@@ -18,9 +18,10 @@
import com.datastax.oss.driver.api.core.context.DriverContext;
import com.datastax.oss.driver.api.core.metadata.Metadata;
import com.datastax.oss.driver.api.core.metadata.Node;
+import java.util.concurrent.CompletionStage;
/** An instance of the driver, that connects to a Cassandra cluster. */
-public interface Cluster {
+public interface Cluster extends AsyncAutoCloseable {
/** Returns a builder to create a new instance of the default implementation. */
static ClusterBuilder builder() {
return new ClusterBuilder();
@@ -44,5 +45,6 @@ static ClusterBuilder builder() {
*/
Metadata getMetadata();
+ /** Returns a context that provides access to all the policies used by this driver instance. */
DriverContext getContext();
}
diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/DefaultCluster.java b/core/src/main/java/com/datastax/oss/driver/internal/core/DefaultCluster.java
index 0f9eb9da933..5ad704cec6f 100644
--- a/core/src/main/java/com/datastax/oss/driver/internal/core/DefaultCluster.java
+++ b/core/src/main/java/com/datastax/oss/driver/internal/core/DefaultCluster.java
@@ -15,17 +15,24 @@
*/
package com.datastax.oss.driver.internal.core;
+import com.datastax.oss.driver.api.core.AsyncAutoCloseable;
import com.datastax.oss.driver.api.core.Cluster;
import com.datastax.oss.driver.api.core.context.DriverContext;
import com.datastax.oss.driver.api.core.metadata.Metadata;
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.internal.core.metadata.MetadataManager;
import com.datastax.oss.driver.internal.core.metadata.NodeStateManager;
+import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
import com.datastax.oss.driver.internal.core.util.concurrent.RunOrSchedule;
+import com.google.common.collect.ImmutableList;
import io.netty.util.concurrent.EventExecutor;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,15 +73,38 @@ public DriverContext getContext() {
return context;
}
+ @Override
+ public CompletionStage closeFuture() {
+ return singleThreaded.closeFuture;
+ }
+
+ @Override
+ public CompletionStage closeAsync() {
+ RunOrSchedule.on(adminExecutor, singleThreaded::close);
+ return singleThreaded.closeFuture;
+ }
+
+ @Override
+ public CompletionStage forceCloseAsync() {
+ RunOrSchedule.on(adminExecutor, singleThreaded::forceClose);
+ return singleThreaded.closeFuture;
+ }
+
private class SingleThreaded {
private final InternalDriverContext context;
private final Set initialContactPoints;
+ private final NodeStateManager nodeStateManager;
private final CompletableFuture initFuture = new CompletableFuture<>();
private boolean initWasCalled;
+ private final CompletableFuture closeFuture = new CompletableFuture<>();
+ private boolean closeWasCalled;
+ private boolean forceCloseWasCalled;
+ private List> childrenCloseFutures;
private SingleThreaded(InternalDriverContext context, Set contactPoints) {
this.context = context;
+ this.nodeStateManager = new NodeStateManager(context);
this.initialContactPoints = contactPoints;
}
@@ -85,8 +115,6 @@ private void init() {
}
initWasCalled = true;
- new NodeStateManager(context);
-
// If any contact points were provided, store them in the metadata right away (the
// control connection will need them if it has to initialize)
MetadataManager metadataManager = context.metadataManager();
@@ -124,5 +152,97 @@ private void init() {
return null;
});
}
+
+ private void close() {
+ assert adminExecutor.inEventLoop();
+ if (closeWasCalled) {
+ return;
+ }
+ closeWasCalled = true;
+
+ LOG.debug("Closing {}", this);
+ childrenCloseFutures = new ArrayList<>();
+ for (AsyncAutoCloseable closeable : internalComponentsToClose()) {
+ LOG.debug("Closing {}", closeable);
+ childrenCloseFutures.add(closeable.closeAsync());
+ }
+ CompletableFutures.whenAllDone(childrenCloseFutures)
+ .whenCompleteAsync(this::onChildrenClosed, adminExecutor);
+ }
+
+ private void forceClose() {
+ assert adminExecutor.inEventLoop();
+ if (forceCloseWasCalled) {
+ return;
+ }
+ forceCloseWasCalled = true;
+
+ LOG.debug("Force-closing {} (was {}closed before)", this, (closeWasCalled ? "" : "not "));
+
+ if (closeWasCalled) {
+ // childrenCloseFutures is already created, and onChildrenClosed has already been called
+ for (AsyncAutoCloseable closeable : internalComponentsToClose()) {
+ LOG.debug("Force-closing {}", closeable);
+ closeable.forceCloseAsync();
+ }
+ } else {
+ closeWasCalled = true;
+ childrenCloseFutures = new ArrayList<>();
+ for (AsyncAutoCloseable closeable : internalComponentsToClose()) {
+ LOG.debug("Force-closing {}", closeable);
+ childrenCloseFutures.add(closeable.forceCloseAsync());
+ }
+ CompletableFutures.whenAllDone(childrenCloseFutures)
+ .whenCompleteAsync(this::onChildrenClosed, adminExecutor);
+ }
+ }
+
+ private void onChildrenClosed(@SuppressWarnings("unused") Void ignored, Throwable error) {
+ assert adminExecutor.inEventLoop();
+ if (error != null) {
+ LOG.warn("Unexpected error while closing", error);
+ }
+ try {
+ for (CompletionStage future : childrenCloseFutures) {
+ warnIfFailed(future);
+ }
+ context
+ .nettyOptions()
+ .onClose()
+ .addListener(
+ f -> {
+ if (!f.isSuccess()) {
+ closeFuture.completeExceptionally(f.cause());
+ } else {
+ closeFuture.complete(null);
+ }
+ });
+ } catch (Throwable t) {
+ // Being paranoid here, but we don't want to risk swallowing an exception and leaving close
+ // hanging
+ LOG.warn("Unexpected error while closing", t);
+ }
+ }
+
+ private void warnIfFailed(CompletionStage stage) {
+ CompletableFuture future = stage.toCompletableFuture();
+ assert future.isDone();
+ if (future.isCompletedExceptionally()) {
+ try {
+ future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ // InterruptedException can't happen actually, but including it to make compiler happy
+ LOG.warn("Unexpected error while closing", e.getCause());
+ }
+ }
+ }
+
+ private List internalComponentsToClose() {
+ return ImmutableList.of(
+ nodeStateManager,
+ metadataManager,
+ context.topologyMonitor(),
+ context.controlConnection());
+ }
}
}
diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/context/DefaultNettyOptions.java b/core/src/main/java/com/datastax/oss/driver/internal/core/context/DefaultNettyOptions.java
index c3bf98c5f92..86060848c4a 100644
--- a/core/src/main/java/com/datastax/oss/driver/internal/core/context/DefaultNettyOptions.java
+++ b/core/src/main/java/com/datastax/oss/driver/internal/core/context/DefaultNettyOptions.java
@@ -76,7 +76,7 @@ public void afterChannelInitialized(Channel channel) {
}
@Override
- public Future> onShutdown() {
+ public Future> onClose() {
return ioEventLoopGroup.shutdownGracefully();
}
}
diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/context/NettyOptions.java b/core/src/main/java/com/datastax/oss/driver/internal/core/context/NettyOptions.java
index 5186ae2244d..532741c20bd 100644
--- a/core/src/main/java/com/datastax/oss/driver/internal/core/context/NettyOptions.java
+++ b/core/src/main/java/com/datastax/oss/driver/internal/core/context/NettyOptions.java
@@ -79,5 +79,5 @@ public interface NettyOptions {
* that you have allocated elsewhere in this component, for example shut down custom event loop
* groups.
*/
- Future> onShutdown();
+ Future> onClose();
}
diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java b/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java
index 6ade97ee58e..728c8621dc9 100644
--- a/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java
+++ b/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java
@@ -16,6 +16,7 @@
package com.datastax.oss.driver.internal.core.control;
import com.datastax.oss.driver.api.core.AllNodesFailedException;
+import com.datastax.oss.driver.api.core.AsyncAutoCloseable;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.internal.core.channel.ChannelEvent;
import com.datastax.oss.driver.internal.core.channel.DriverChannel;
@@ -57,7 +58,7 @@
* If a custom {@link TopologyMonitor} is used, the control connection is used only for schema
* refreshes; if schema metadata is also disabled, the control connection never initializes.
*/
-public class ControlConnection implements EventCallback {
+public class ControlConnection implements EventCallback, AsyncAutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(ControlConnection.class);
private final InternalDriverContext context;
@@ -103,8 +104,19 @@ public void reconnectNow() {
RunOrSchedule.on(adminExecutor, singleThreaded::reconnectNow);
}
- /** Note: control queries are never critical, so there is no graceful close. */
- public CompletionStage forceClose() {
+ @Override
+ public CompletionStage closeFuture() {
+ return singleThreaded.closeFuture;
+ }
+
+ @Override
+ public CompletionStage closeAsync() {
+ // Control queries are never critical, so there is no graceful close.
+ return forceCloseAsync();
+ }
+
+ @Override
+ public CompletionStage forceCloseAsync() {
RunOrSchedule.on(adminExecutor, singleThreaded::forceClose);
return singleThreaded.closeFuture;
}
diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java
index 6e5695d975b..edf16bc94b1 100644
--- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java
+++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java
@@ -24,6 +24,7 @@
import com.datastax.oss.driver.internal.core.channel.DriverChannel;
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.internal.core.control.ControlConnection;
+import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import java.net.InetAddress;
@@ -54,23 +55,31 @@ public class DefaultTopologyMonitor implements TopologyMonitor {
private final ControlConnection controlConnection;
private final AddressTranslator addressTranslator;
private final Duration timeout;
+ private final CompletableFuture closeFuture;
@VisibleForTesting volatile int port = -1;
public DefaultTopologyMonitor(InternalDriverContext context) {
this.controlConnection = context.controlConnection();
- addressTranslator = context.addressTranslator();
+ this.addressTranslator = context.addressTranslator();
DriverConfigProfile config = context.config().defaultProfile();
this.timeout = config.getDuration(CoreDriverOption.CONTROL_CONNECTION_TIMEOUT);
+ this.closeFuture = new CompletableFuture<>();
}
@Override
public CompletionStage init() {
+ if (closeFuture.isDone()) {
+ return CompletableFutures.failedFuture(new IllegalStateException("closed"));
+ }
return controlConnection.init(true);
}
@Override
public CompletionStage> refreshNode(Node node) {
+ if (closeFuture.isDone()) {
+ return CompletableFutures.failedFuture(new IllegalStateException("closed"));
+ }
LOG.debug("Refreshing info for {}", node);
DriverChannel channel = controlConnection.channel();
if (node.getConnectAddress().equals(channel.address())) {
@@ -93,6 +102,9 @@ public CompletionStage> refreshNode(Node node) {
@Override
public CompletionStage> getNewNodeInfo(InetSocketAddress connectAddress) {
+ if (closeFuture.isDone()) {
+ return CompletableFutures.failedFuture(new IllegalStateException("closed"));
+ }
LOG.debug("Fetching info for new node {}", connectAddress);
DriverChannel channel = controlConnection.channel();
return query(channel, "SELECT * FROM system.peers")
@@ -101,6 +113,9 @@ public CompletionStage> getNewNodeInfo(InetSocketAddress conn
@Override
public CompletionStage> refreshNodeList() {
+ if (closeFuture.isDone()) {
+ return CompletableFutures.failedFuture(new IllegalStateException("closed"));
+ }
LOG.debug("Refreshing node list");
DriverChannel channel = controlConnection.channel();
savePort(channel);
@@ -120,6 +135,22 @@ public CompletionStage> refreshNodeList() {
});
}
+ @Override
+ public CompletionStage closeFuture() {
+ return closeFuture;
+ }
+
+ @Override
+ public CompletionStage closeAsync() {
+ closeFuture.complete(null);
+ return closeFuture;
+ }
+
+ @Override
+ public CompletionStage forceCloseAsync() {
+ return closeAsync();
+ }
+
@VisibleForTesting
protected CompletionStage query(
DriverChannel channel, String queryString, Map parameters) {
diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java
index 061163bc75e..ed34d13c736 100644
--- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java
+++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java
@@ -15,6 +15,7 @@
*/
package com.datastax.oss.driver.internal.core.metadata;
+import com.datastax.oss.driver.api.core.AsyncAutoCloseable;
import com.datastax.oss.driver.api.core.metadata.Metadata;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
@@ -31,7 +32,7 @@
import org.slf4j.LoggerFactory;
/** Holds the immutable instance of the {@link Metadata}, and handles requests to update it. */
-public class MetadataManager {
+public class MetadataManager implements AsyncAutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(MetadataManager.class);
private final InternalDriverContext context;
@@ -42,7 +43,7 @@ public class MetadataManager {
public MetadataManager(InternalDriverContext context) {
this.context = context;
this.adminExecutor = context.nettyOptions().adminEventExecutorGroup().next();
- this.singleThreaded = new SingleThreaded(context);
+ this.singleThreaded = new SingleThreaded();
this.metadata = DefaultMetadata.EMPTY;
}
@@ -115,14 +116,27 @@ public void refreshSchema(
// TODO refresh schema metadata
}
- // TODO user-controlled refresh, shutdown?
+ // TODO user-controlled refresh?
- private class SingleThreaded {
- private final InternalDriverContext context;
+ @Override
+ public CompletionStage closeFuture() {
+ return singleThreaded.closeFuture;
+ }
- private SingleThreaded(InternalDriverContext context) {
- this.context = context;
- }
+ @Override
+ public CompletionStage closeAsync() {
+ RunOrSchedule.on(adminExecutor, singleThreaded::close);
+ return singleThreaded.closeFuture;
+ }
+
+ @Override
+ public CompletionStage forceCloseAsync() {
+ return this.closeAsync();
+ }
+
+ private class SingleThreaded {
+ private final CompletableFuture closeFuture = new CompletableFuture<>();
+ private boolean closeWasCalled;
private void initNodes(
Set addresses, CompletableFuture initNodesFuture) {
@@ -162,15 +176,25 @@ private void addNode(InetSocketAddress address, Optional maybeInfo) {
private void removeNode(InetSocketAddress address) {
refresh(new RemoveNodeRefresh(metadata, address));
}
+
+ private void close() {
+ if (closeWasCalled) {
+ return;
+ }
+ closeWasCalled = true;
+ closeFuture.complete(null);
+ }
}
@VisibleForTesting
Void refresh(MetadataRefresh refresh) {
assert adminExecutor.inEventLoop();
- refresh.compute();
- metadata = refresh.newMetadata;
- for (Object event : refresh.events) {
- context.eventBus().fire(event);
+ if (!singleThreaded.closeWasCalled) {
+ refresh.compute();
+ metadata = refresh.newMetadata;
+ for (Object event : refresh.events) {
+ context.eventBus().fire(event);
+ }
}
return null;
}
diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/NodeStateManager.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/NodeStateManager.java
index 6fcd86cd6ca..840eda1a9d2 100644
--- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/NodeStateManager.java
+++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/NodeStateManager.java
@@ -15,6 +15,7 @@
*/
package com.datastax.oss.driver.internal.core.metadata;
+import com.datastax.oss.driver.api.core.AsyncAutoCloseable;
import com.datastax.oss.driver.api.core.config.CoreDriverOption;
import com.datastax.oss.driver.api.core.config.DriverConfigProfile;
import com.datastax.oss.driver.api.core.metadata.NodeState;
@@ -29,6 +30,8 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,184 +41,228 @@
*
* See {@link NodeState} and {@link TopologyEvent} for a description of the state change rules.
*/
-public class NodeStateManager {
+public class NodeStateManager implements AsyncAutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(NodeStateManager.class);
private final EventExecutor adminExecutor;
- private final MetadataManager metadataManager;
- private final EventBus eventBus;
- private final Debouncer> topologyEventDebouncer;
+ private final SingleThreaded singleThreaded;
public NodeStateManager(InternalDriverContext context) {
this.adminExecutor = context.nettyOptions().adminEventExecutorGroup().next();
- this.metadataManager = context.metadataManager();
-
- DriverConfigProfile config = context.config().defaultProfile();
- this.topologyEventDebouncer =
- new Debouncer<>(
- adminExecutor,
- this::coalesceTopologyEvents,
- this::flushTopologyEvents,
- config.getDuration(CoreDriverOption.METADATA_TOPOLOGY_WINDOW),
- config.getInt(CoreDriverOption.METADATA_TOPOLOGY_MAX_EVENTS));
-
- this.eventBus = context.eventBus();
- this.eventBus.register(
- ChannelEvent.class, RunOrSchedule.on(adminExecutor, this::onChannelEvent));
- this.eventBus.register(
- TopologyEvent.class, RunOrSchedule.on(adminExecutor, this::onTopologyEvent));
- // Note: this component exists for the whole life of the driver instance, so don't worry about
- // unregistering the listeners.
+ this.singleThreaded = new SingleThreaded(context);
}
- private void onChannelEvent(ChannelEvent event) {
- assert adminExecutor.inEventLoop();
- LOG.debug("Processing {}", event);
- @SuppressWarnings("SuspiciousMethodCalls")
- DefaultNode node = (DefaultNode) metadataManager.getMetadata().getNodes().get(event.address);
- assert node != null;
- switch (event.type) {
- case OPENED:
- node.openConnections += 1;
- if (node.state == NodeState.DOWN || node.state == NodeState.UNKNOWN) {
- setState(node, NodeState.UP, "a new connection was opened to it");
- }
- break;
- case CLOSED:
- node.openConnections -= 1;
- break;
- case RECONNECTION_STARTED:
- node.reconnections += 1;
- if (node.openConnections == 0) {
- setState(node, NodeState.DOWN, "it has no connections and started reconnecting");
- }
- break;
- case RECONNECTION_STOPPED:
- node.reconnections -= 1;
- break;
- }
+ @Override
+ public CompletionStage closeFuture() {
+ return singleThreaded.closeFuture;
}
- private void onDebouncedTopologyEvent(TopologyEvent event) {
- assert adminExecutor.inEventLoop();
- LOG.debug("Processing {}", event);
- DefaultNode node = (DefaultNode) metadataManager.getMetadata().getNodes().get(event.address);
- switch (event.type) {
- case SUGGEST_UP:
- if (node == null) {
- LOG.debug("Received UP event for unknown node {}, adding it", event.address);
- metadataManager.addNode(event.address);
- } else if (node.state == NodeState.FORCED_DOWN) {
- LOG.debug("Not setting {} UP because it is FORCED_DOWN", node);
- } else {
- setState(node, NodeState.UP, "an UP topology event was received");
- }
- break;
- case SUGGEST_DOWN:
- if (node == null) {
- LOG.debug("Received DOWN event for unknown node {}, ignoring it", event.address);
- } else if (node.openConnections > 0) {
- LOG.debug("Not setting {} DOWN because it still has active connections", node);
- } else if (node.state == NodeState.FORCED_DOWN) {
- LOG.debug("Not setting {} DOWN because it is FORCED_DOWN", node);
- } else {
- setState(node, NodeState.DOWN, "a DOWN topology event was received");
- }
- break;
- case FORCE_UP:
- if (node == null) {
- LOG.debug("Received FORCE_UP event for unknown node {}, adding it", event.address);
- metadataManager.addNode(event.address);
- } else {
- setState(node, NodeState.UP, "a FORCE_UP topology event was received");
- }
- break;
- case FORCE_DOWN:
- if (node == null) {
- LOG.debug("Received FORCE_DOWN event for unknown node {}, ignoring it", event.address);
- } else {
- setState(node, NodeState.FORCED_DOWN, "a FORCE_DOWN topology event was received");
- }
- break;
- case SUGGEST_ADDED:
- if (node != null) {
- LOG.debug(
- "Received ADDED event for {} but it is already in our metadata, ignoring", node);
- } else {
- metadataManager.addNode(event.address);
- }
- break;
- case SUGGEST_REMOVED:
- if (node == null) {
- LOG.debug(
- "Received REMOVED event for {} but it is not in our metadata, ignoring",
- event.address);
- } else {
- metadataManager.removeNode(event.address);
- }
- break;
- }
+ @Override
+ public CompletionStage closeAsync() {
+ RunOrSchedule.on(adminExecutor, singleThreaded::close);
+ return singleThreaded.closeFuture;
}
- // Called by the event bus, needs debouncing
- private void onTopologyEvent(TopologyEvent event) {
- assert adminExecutor.inEventLoop();
- topologyEventDebouncer.receive(event);
+ @Override
+ public CompletionStage forceCloseAsync() {
+ return closeAsync();
}
- // Called to process debounced events before flushing
- private Collection coalesceTopologyEvents(List events) {
- assert adminExecutor.inEventLoop();
- Collection result;
- if (events.size() == 1) {
- result = events;
- } else {
- // Keep the last FORCE* event for each node, or if there is none the last normal event
- Map last = Maps.newHashMapWithExpectedSize(events.size());
- for (TopologyEvent event : events) {
- if (event.isForceEvent()
- || !last.containsKey(event.address)
- || !last.get(event.address).isForceEvent()) {
- last.put(event.address, event);
- }
+ private class SingleThreaded {
+
+ private final MetadataManager metadataManager;
+ private final EventBus eventBus;
+ private final Debouncer> topologyEventDebouncer;
+ private final CompletableFuture closeFuture = new CompletableFuture<>();
+ private boolean closeWasCalled;
+
+ private SingleThreaded(InternalDriverContext context) {
+ this.metadataManager = context.metadataManager();
+
+ DriverConfigProfile config = context.config().defaultProfile();
+ this.topologyEventDebouncer =
+ new Debouncer<>(
+ adminExecutor,
+ this::coalesceTopologyEvents,
+ this::flushTopologyEvents,
+ config.getDuration(CoreDriverOption.METADATA_TOPOLOGY_WINDOW),
+ config.getInt(CoreDriverOption.METADATA_TOPOLOGY_MAX_EVENTS));
+
+ this.eventBus = context.eventBus();
+ this.eventBus.register(
+ ChannelEvent.class, RunOrSchedule.on(adminExecutor, this::onChannelEvent));
+ this.eventBus.register(
+ TopologyEvent.class, RunOrSchedule.on(adminExecutor, this::onTopologyEvent));
+ // Note: this component exists for the whole life of the driver instance, so don't worry about
+ // unregistering the listeners.
+
+ }
+
+ private void onChannelEvent(ChannelEvent event) {
+ assert adminExecutor.inEventLoop();
+ if (closeWasCalled) {
+ return;
+ }
+ LOG.debug("Processing {}", event);
+ @SuppressWarnings("SuspiciousMethodCalls")
+ DefaultNode node = (DefaultNode) metadataManager.getMetadata().getNodes().get(event.address);
+ assert node != null;
+ switch (event.type) {
+ case OPENED:
+ node.openConnections += 1;
+ if (node.state == NodeState.DOWN || node.state == NodeState.UNKNOWN) {
+ setState(node, NodeState.UP, "a new connection was opened to it");
+ }
+ break;
+ case CLOSED:
+ node.openConnections -= 1;
+ break;
+ case RECONNECTION_STARTED:
+ node.reconnections += 1;
+ if (node.openConnections == 0) {
+ setState(node, NodeState.DOWN, "it has no connections and started reconnecting");
+ }
+ break;
+ case RECONNECTION_STOPPED:
+ node.reconnections -= 1;
+ break;
}
- result = last.values();
}
- LOG.debug("Coalesced topology events: {} => {}", events, result);
- return result;
- }
- // Called when the debouncer flushes
- private void flushTopologyEvents(Collection events) {
- assert adminExecutor.inEventLoop();
- for (TopologyEvent event : events) {
- onDebouncedTopologyEvent(event);
+ private void onDebouncedTopologyEvent(TopologyEvent event) {
+ assert adminExecutor.inEventLoop();
+ if (closeWasCalled) {
+ return;
+ }
+ LOG.debug("Processing {}", event);
+ DefaultNode node = (DefaultNode) metadataManager.getMetadata().getNodes().get(event.address);
+ switch (event.type) {
+ case SUGGEST_UP:
+ if (node == null) {
+ LOG.debug("Received UP event for unknown node {}, adding it", event.address);
+ metadataManager.addNode(event.address);
+ } else if (node.state == NodeState.FORCED_DOWN) {
+ LOG.debug("Not setting {} UP because it is FORCED_DOWN", node);
+ } else {
+ setState(node, NodeState.UP, "an UP topology event was received");
+ }
+ break;
+ case SUGGEST_DOWN:
+ if (node == null) {
+ LOG.debug("Received DOWN event for unknown node {}, ignoring it", event.address);
+ } else if (node.openConnections > 0) {
+ LOG.debug("Not setting {} DOWN because it still has active connections", node);
+ } else if (node.state == NodeState.FORCED_DOWN) {
+ LOG.debug("Not setting {} DOWN because it is FORCED_DOWN", node);
+ } else {
+ setState(node, NodeState.DOWN, "a DOWN topology event was received");
+ }
+ break;
+ case FORCE_UP:
+ if (node == null) {
+ LOG.debug("Received FORCE_UP event for unknown node {}, adding it", event.address);
+ metadataManager.addNode(event.address);
+ } else {
+ setState(node, NodeState.UP, "a FORCE_UP topology event was received");
+ }
+ break;
+ case FORCE_DOWN:
+ if (node == null) {
+ LOG.debug("Received FORCE_DOWN event for unknown node {}, ignoring it", event.address);
+ } else {
+ setState(node, NodeState.FORCED_DOWN, "a FORCE_DOWN topology event was received");
+ }
+ break;
+ case SUGGEST_ADDED:
+ if (node != null) {
+ LOG.debug(
+ "Received ADDED event for {} but it is already in our metadata, ignoring", node);
+ } else {
+ metadataManager.addNode(event.address);
+ }
+ break;
+ case SUGGEST_REMOVED:
+ if (node == null) {
+ LOG.debug(
+ "Received REMOVED event for {} but it is not in our metadata, ignoring",
+ event.address);
+ } else {
+ metadataManager.removeNode(event.address);
+ }
+ break;
+ }
+ }
+
+ // Called by the event bus, needs debouncing
+ private void onTopologyEvent(TopologyEvent event) {
+ assert adminExecutor.inEventLoop();
+ topologyEventDebouncer.receive(event);
}
- }
- private void setState(DefaultNode node, NodeState newState, String reason) {
- NodeState oldState = node.state;
- if (oldState != newState) {
- LOG.debug("Transitioning {} {}=>{} (because {})", node, oldState, newState, reason);
- node.state = newState;
- if (newState != NodeState.UP) {
- // Fire the event immediately
- eventBus.fire(NodeStateEvent.changed(oldState, newState, node));
+ // Called to process debounced events before flushing
+ private Collection coalesceTopologyEvents(List events) {
+ assert adminExecutor.inEventLoop();
+ Collection result;
+ if (events.size() == 1) {
+ result = events;
} else {
- // Refresh the node first (but still fire event if that fails)
- metadataManager
- .refreshNode(node)
- .whenComplete(
- (success, error) -> {
- try {
- if (error != null) {
- LOG.debug("Error while refreshing info for " + node, error);
+ // Keep the last FORCE* event for each node, or if there is none the last normal event
+ Map last = Maps.newHashMapWithExpectedSize(events.size());
+ for (TopologyEvent event : events) {
+ if (event.isForceEvent()
+ || !last.containsKey(event.address)
+ || !last.get(event.address).isForceEvent()) {
+ last.put(event.address, event);
+ }
+ }
+ result = last.values();
+ }
+ LOG.debug("Coalesced topology events: {} => {}", events, result);
+ return result;
+ }
+
+ // Called when the debouncer flushes
+ private void flushTopologyEvents(Collection events) {
+ assert adminExecutor.inEventLoop();
+ for (TopologyEvent event : events) {
+ onDebouncedTopologyEvent(event);
+ }
+ }
+
+ private void close() {
+ assert adminExecutor.inEventLoop();
+ if (closeWasCalled) {
+ return;
+ }
+ closeWasCalled = true;
+ topologyEventDebouncer.stop();
+ closeFuture.complete(null);
+ }
+
+ private void setState(DefaultNode node, NodeState newState, String reason) {
+ NodeState oldState = node.state;
+ if (oldState != newState) {
+ LOG.debug("Transitioning {} {}=>{} (because {})", node, oldState, newState, reason);
+ node.state = newState;
+ if (newState != NodeState.UP) {
+ // Fire the event immediately
+ eventBus.fire(NodeStateEvent.changed(oldState, newState, node));
+ } else {
+ // Refresh the node first (but still fire event if that fails)
+ metadataManager
+ .refreshNode(node)
+ .whenComplete(
+ (success, error) -> {
+ try {
+ if (error != null) {
+ LOG.debug("Error while refreshing info for " + node, error);
+ }
+ eventBus.fire(NodeStateEvent.changed(oldState, newState, node));
+ } catch (Throwable t) {
+ LOG.warn("Unexpected exception", t);
}
- eventBus.fire(NodeStateEvent.changed(oldState, newState, node));
- } catch (Throwable t) {
- LOG.warn("Unexpected exception", t);
- }
- });
+ });
+ }
}
}
}
diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/TopologyMonitor.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/TopologyMonitor.java
index b8a80cb478b..e999734a249 100644
--- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/TopologyMonitor.java
+++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/TopologyMonitor.java
@@ -15,6 +15,7 @@
*/
package com.datastax.oss.driver.internal.core.metadata;
+import com.datastax.oss.driver.api.core.AsyncAutoCloseable;
import com.datastax.oss.driver.api.core.Cluster;
import com.datastax.oss.driver.api.core.addresstranslation.AddressTranslator;
import com.datastax.oss.driver.api.core.metadata.Node;
@@ -37,7 +38,7 @@
* refreshes are done with queries to system tables. If you prefer to rely on an external monitoring
* tool, this can be completely overridden.
*/
-public interface TopologyMonitor {
+public interface TopologyMonitor extends AsyncAutoCloseable {
/**
* Triggers the initialization of the monitor.
diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/pool/ChannelPool.java b/core/src/main/java/com/datastax/oss/driver/internal/core/pool/ChannelPool.java
index d359a0333af..5d504a82e43 100644
--- a/core/src/main/java/com/datastax/oss/driver/internal/core/pool/ChannelPool.java
+++ b/core/src/main/java/com/datastax/oss/driver/internal/core/pool/ChannelPool.java
@@ -15,6 +15,7 @@
*/
package com.datastax.oss.driver.internal.core.pool;
+import com.datastax.oss.driver.api.core.AsyncAutoCloseable;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.internal.core.channel.ChannelEvent;
import com.datastax.oss.driver.internal.core.channel.ChannelFactory;
@@ -53,7 +54,7 @@
* If one or more channels go down, a reconnection process starts in order to replace them; it
* runs until the channel count is back to its intended target.
*/
-public class ChannelPool {
+public class ChannelPool implements AsyncAutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(ChannelPool.class);
/**
@@ -120,26 +121,20 @@ public CompletionStage setKeyspace(CqlIdentifier newKeyspaceName) {
return RunOrSchedule.on(adminExecutor, () -> singleThreaded.setKeyspace(newKeyspaceName));
}
- /**
- * Closes the pool gracefully: subsequent calls to {@link #next()} will fail, but the pool's
- * channels will be closed gracefully (allowing pending requests to complete).
- */
- public CompletionStage close() {
- RunOrSchedule.on(adminExecutor, singleThreaded::close);
+ @Override
+ public CompletionStage closeFuture() {
return singleThreaded.closeFuture;
}
- /**
- * Closes the pool forcefully: subsequent calls to {@link #next()} will fail, and the pool's
- * channels will be closed forcefully (aborting pending requests).
- */
- public CompletionStage forceClose() {
- RunOrSchedule.on(adminExecutor, singleThreaded::forceClose);
+ @Override
+ public CompletionStage closeAsync() {
+ RunOrSchedule.on(adminExecutor, singleThreaded::close);
return singleThreaded.closeFuture;
}
- /** Does not close the pool, but returns a completion stage that will complete when it does. */
- public CompletionStage closeFuture() {
+ @Override
+ public CompletionStage forceCloseAsync() {
+ RunOrSchedule.on(adminExecutor, singleThreaded::forceClose);
return singleThreaded.closeFuture;
}
@@ -156,7 +151,7 @@ private class SingleThreaded {
private int wantedCount;
private CompletableFuture connectFuture = new CompletableFuture<>();
private boolean isConnecting;
- private CompletableFuture closeFuture = new CompletableFuture<>();
+ private CompletableFuture closeFuture = new CompletableFuture<>();
private boolean isClosing;
private CompletableFuture setKeyspaceFuture;
@@ -366,7 +361,7 @@ private void close() {
eventBus.fire(ChannelEvent.channelClosed(address));
return channel.close();
},
- () -> closeFuture.complete(ChannelPool.this),
+ () -> closeFuture.complete(null),
(channel, error) ->
LOG.warn(ChannelPool.this + " error closing channel " + channel, error));
}
diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/util/concurrent/Debouncer.java b/core/src/main/java/com/datastax/oss/driver/internal/core/util/concurrent/Debouncer.java
index 6c8ae599a7d..091bdd17c9f 100644
--- a/core/src/main/java/com/datastax/oss/driver/internal/core/util/concurrent/Debouncer.java
+++ b/core/src/main/java/com/datastax/oss/driver/internal/core/util/concurrent/Debouncer.java
@@ -50,6 +50,7 @@ public class Debouncer {
private List currentBatch = new ArrayList<>();
private ScheduledFuture> nextFlush;
+ private boolean stopped;
/**
* Creates a new instance.
@@ -77,6 +78,9 @@ public Debouncer(
/** This must be called on eventExecutor too. */
public void receive(T element) {
assert adminExecutor.inEventLoop();
+ if (stopped) {
+ return;
+ }
if (window.isZero() || maxEvents == 1) {
LOG.debug(
"Received {}, flushing immediately (window = {}, maxEvents = {})",
@@ -123,4 +127,16 @@ private void cancelNextFlush() {
}
}
}
+
+ /**
+ * Stop debouncing: the next flush is cancelled, and all pending and future events will be
+ * ignored.
+ */
+ public void stop() {
+ assert adminExecutor.inEventLoop();
+ if (!stopped) {
+ stopped = true;
+ cancelNextFlush();
+ }
+ }
}
diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTest.java
index 48ce8ce602f..f855a99f945 100644
--- a/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTest.java
+++ b/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTest.java
@@ -33,7 +33,7 @@ public class ControlConnectionTest extends ControlConnectionTestBase {
@Test
public void should_close_successfully_if_it_was_never_init() {
// When
- CompletionStage closeFuture = controlConnection.forceClose();
+ CompletionStage closeFuture = controlConnection.forceCloseAsync();
// Then
assertThat(closeFuture).isSuccess();
@@ -261,7 +261,7 @@ public void should_not_force_reconnection_if_closed() {
factoryHelper.waitForCall(ADDRESS1);
waitForPendingAdminTasks();
assertThat(initFuture).isSuccess();
- CompletionStage closeFuture = controlConnection.forceClose();
+ CompletionStage closeFuture = controlConnection.forceCloseAsync();
assertThat(closeFuture).isSuccess();
// When
@@ -287,7 +287,7 @@ public void should_close_channel_when_closing() {
assertThat(initFuture).isSuccess();
// When
- CompletionStage closeFuture = controlConnection.forceClose();
+ CompletionStage closeFuture = controlConnection.forceCloseAsync();
waitForPendingAdminTasks();
// Then
@@ -331,7 +331,7 @@ public void should_close_channel_if_closed_during_reconnection() {
// When
// the control connection gets closed before channel2 initialization is complete
- controlConnection.forceClose();
+ controlConnection.forceCloseAsync();
waitForPendingAdminTasks();
channel2Future.complete(channel2);
waitForPendingAdminTasks();
@@ -377,7 +377,7 @@ public void should_handle_channel_failure_if_closed_during_reconnection() {
// When
// the control connection gets closed before channel1 initialization fails
- controlConnection.forceClose();
+ controlConnection.forceCloseAsync();
channel1Future.completeExceptionally(new Exception("mock failure"));
waitForPendingAdminTasks();
diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitorTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitorTest.java
index 15159e7e6e8..e1c4dc9443c 100644
--- a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitorTest.java
+++ b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitorTest.java
@@ -224,6 +224,19 @@ public void should_refresh_node_list_from_local_and_peers() {
});
}
+ @Test
+ public void should_stop_executing_queries_once_closed() throws Exception {
+ // Given
+ topologyMonitor.close();
+
+ // When
+ CompletionStage> futureInfos = topologyMonitor.refreshNodeList();
+
+ // Then
+ assertThat(futureInfos)
+ .isFailed(error -> assertThat(error).isInstanceOf(IllegalStateException.class));
+ }
+
/** Mocks the query execution logic. */
private static class TestTopologyMonitor extends DefaultTopologyMonitor {
diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/MetadataManagerTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/MetadataManagerTest.java
index 041a7918b4a..1e11b66caee 100644
--- a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/MetadataManagerTest.java
+++ b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/MetadataManagerTest.java
@@ -69,6 +69,11 @@ public void setup() {
metadataManager = new TestMetadataManager(context);
}
+ @AfterMethod
+ public void teardown() {
+ adminEventLoopGroup.shutdownGracefully(100, 200, TimeUnit.MILLISECONDS);
+ }
+
@Test
public void should_add_contact_points() {
// When
@@ -200,11 +205,6 @@ public void should_remove_node() {
assertThat(refresh.toRemove).isEqualTo(ADDRESS1);
}
- @AfterMethod
- public void teardown() {
- adminEventLoopGroup.shutdownGracefully(100, 200, TimeUnit.MILLISECONDS);
- }
-
private class TestMetadataManager extends MetadataManager {
private List refreshes = new CopyOnWriteArrayList<>();
diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/NodeStateManagerTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/NodeStateManagerTest.java
index f885fbc5eb6..3ed41f5774e 100644
--- a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/NodeStateManagerTest.java
+++ b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/NodeStateManagerTest.java
@@ -493,6 +493,19 @@ public void should_keep_node_up_if_reconnection_starts_with_some_connections() {
Mockito.verify(eventBus, never()).fire(any(NodeStateEvent.class));
}
+ @Test
+ public void should_ignore_events_when_closed() throws Exception {
+ NodeStateManager manager = new NodeStateManager(context);
+ assertThat(node1.reconnections).isEqualTo(0);
+
+ manager.close();
+
+ eventBus.fire(ChannelEvent.reconnectionStarted(node1.getConnectAddress()));
+ waitForPendingAdminTasks();
+
+ assertThat(node1.reconnections).isEqualTo(0);
+ }
+
// Wait for all the tasks on the pool's admin executor to complete.
private void waitForPendingAdminTasks() {
// This works because the event loop group is single-threaded
diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/pool/ChannelPoolTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/pool/ChannelPoolTest.java
index f99efa513fe..03e6dca692d 100644
--- a/core/src/test/java/com/datastax/oss/driver/internal/core/pool/ChannelPoolTest.java
+++ b/core/src/test/java/com/datastax/oss/driver/internal/core/pool/ChannelPoolTest.java
@@ -589,7 +589,7 @@ public void should_close_all_channels_when_closed() throws Exception {
Mockito.verify(reconnectionSchedule).nextDelay();
factoryHelper.waitForCalls(ADDRESS, 1);
- CompletionStage closeFuture = pool.close();
+ CompletionStage closeFuture = pool.closeAsync();
waitForPendingAdminTasks();
// The two original channels were closed normally
@@ -651,7 +651,7 @@ public void should_force_close_all_channels_when_force_closed() throws Exception
Mockito.verify(reconnectionSchedule).nextDelay();
factoryHelper.waitForCalls(ADDRESS, 1);
- CompletionStage closeFuture = pool.forceClose();
+ CompletionStage closeFuture = pool.forceCloseAsync();
waitForPendingAdminTasks();
// The two original channels were force-closed
diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/util/concurrent/DebouncerTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/util/concurrent/DebouncerTest.java
index 338a741cd93..73582c8e4aa 100644
--- a/core/src/test/java/com/datastax/oss/driver/internal/core/util/concurrent/DebouncerTest.java
+++ b/core/src/test/java/com/datastax/oss/driver/internal/core/util/concurrent/DebouncerTest.java
@@ -162,4 +162,36 @@ public void should_force_flush_after_max_events() {
Mockito.verify(scheduledFuture, times(9)).cancel(true);
assertThat(results).containsExactly("0,1,2,3,4,5,6,7,8,9");
}
+
+ @Test
+ public void should_cancel_next_flush_when_stopped() {
+ Debouncer debouncer =
+ new Debouncer<>(
+ adminExecutor, this::coalesce, this::flush, DEFAULT_WINDOW, DEFAULT_MAX_EVENTS);
+
+ debouncer.receive(1);
+ Mockito.verify(adminExecutor)
+ .schedule(
+ Mockito.any(Runnable.class),
+ Mockito.eq(DEFAULT_WINDOW.toNanos()),
+ Mockito.eq(TimeUnit.NANOSECONDS));
+
+ debouncer.stop();
+ Mockito.verify(scheduledFuture).cancel(true);
+ }
+
+ @Test
+ public void should_ignore_new_events_when_flushed() {
+ Debouncer debouncer =
+ new Debouncer<>(
+ adminExecutor, this::coalesce, this::flush, DEFAULT_WINDOW, DEFAULT_MAX_EVENTS);
+ debouncer.stop();
+
+ debouncer.receive(1);
+ Mockito.verify(adminExecutor, never())
+ .schedule(
+ Mockito.any(Runnable.class),
+ Mockito.eq(DEFAULT_WINDOW.toNanos()),
+ Mockito.eq(TimeUnit.NANOSECONDS));
+ }
}