Skip to content

Commit

Permalink
Add cluster shutdown
Browse files Browse the repository at this point in the history
Introduce AsyncAutoCloseable and have all driver components implement
it.
Note: user-provided components like LBP and AddressTranslator are not
closed yet.
  • Loading branch information
olim7t committed Apr 26, 2017
1 parent 8c6479a commit 658419d
Show file tree
Hide file tree
Showing 18 changed files with 579 additions and 208 deletions.
@@ -0,0 +1,65 @@
/*
* Copyright (C) 2017-2017 DataStax Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.driver.api.core;

import com.datastax.oss.driver.internal.core.util.concurrent.BlockingOperation;
import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
import java.util.concurrent.CompletionStage;

/**
* An object that can be closed in an asynchronous, non-blocking manner.
*
* <p>For convenience, this extends the JDK's {@code AutoCloseable} in order to be usable in
* try-with-resource blocks (in that case, the <em>blocking</em> {@link #close()} will be used).
*/
public interface AsyncAutoCloseable extends AutoCloseable {

/**
* Returns a stage that will complete when {@link #close()} or {@link #forceCloseAsync()} is
* called, and the shutdown sequence completes.
*/
CompletionStage<Void> closeFuture();

/**
* Initiates an orderly shutdown: no new requests are accepted, but all pending requests are
* allowed to complete normally.
*
* @return a stage that will complete when the shutdown sequence is complete. Multiple calls to
* this method or {@link #forceCloseAsync()} always return the same instance.
*/
CompletionStage<Void> closeAsync();

/**
* Initiates a forced shutdown of this instance: no new requests are accepted, and all pending
* requests will complete with an exception.
*
* @return a stage that will complete when the shutdown sequence is complete. Multiple calls to
* this method or {@link #close()} always return the same instance.
*/
CompletionStage<Void> forceCloseAsync();

/**
* {@inheritDoc}
*
* <p>This method is implemented by calling {@link #closeAsync()} and blocking on the result. This
* should not be called on a driver thread.
*/
@Override
default void close() throws Exception {
BlockingOperation.checkNotDriverThread();
CompletableFutures.getUninterruptibly(closeAsync().toCompletableFuture());
}
}
Expand Up @@ -18,9 +18,10 @@
import com.datastax.oss.driver.api.core.context.DriverContext;
import com.datastax.oss.driver.api.core.metadata.Metadata;
import com.datastax.oss.driver.api.core.metadata.Node;
import java.util.concurrent.CompletionStage;

/** An instance of the driver, that connects to a Cassandra cluster. */
public interface Cluster {
public interface Cluster extends AsyncAutoCloseable {
/** Returns a builder to create a new instance of the default implementation. */
static ClusterBuilder builder() {
return new ClusterBuilder();
Expand All @@ -44,5 +45,6 @@ static ClusterBuilder builder() {
*/
Metadata getMetadata();

/** Returns a context that provides access to all the policies used by this driver instance. */
DriverContext getContext();
}
Expand Up @@ -15,17 +15,24 @@
*/
package com.datastax.oss.driver.internal.core;

import com.datastax.oss.driver.api.core.AsyncAutoCloseable;
import com.datastax.oss.driver.api.core.Cluster;
import com.datastax.oss.driver.api.core.context.DriverContext;
import com.datastax.oss.driver.api.core.metadata.Metadata;
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.internal.core.metadata.MetadataManager;
import com.datastax.oss.driver.internal.core.metadata.NodeStateManager;
import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
import com.datastax.oss.driver.internal.core.util.concurrent.RunOrSchedule;
import com.google.common.collect.ImmutableList;
import io.netty.util.concurrent.EventExecutor;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -66,15 +73,38 @@ public DriverContext getContext() {
return context;
}

@Override
public CompletionStage<Void> closeFuture() {
return singleThreaded.closeFuture;
}

@Override
public CompletionStage<Void> closeAsync() {
RunOrSchedule.on(adminExecutor, singleThreaded::close);
return singleThreaded.closeFuture;
}

@Override
public CompletionStage<Void> forceCloseAsync() {
RunOrSchedule.on(adminExecutor, singleThreaded::forceClose);
return singleThreaded.closeFuture;
}

private class SingleThreaded {

private final InternalDriverContext context;
private final Set<InetSocketAddress> initialContactPoints;
private final NodeStateManager nodeStateManager;
private final CompletableFuture<Cluster> initFuture = new CompletableFuture<>();
private boolean initWasCalled;
private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
private boolean closeWasCalled;
private boolean forceCloseWasCalled;
private List<CompletionStage<Void>> childrenCloseFutures;

private SingleThreaded(InternalDriverContext context, Set<InetSocketAddress> contactPoints) {
this.context = context;
this.nodeStateManager = new NodeStateManager(context);
this.initialContactPoints = contactPoints;
}

Expand All @@ -85,8 +115,6 @@ private void init() {
}
initWasCalled = true;

new NodeStateManager(context);

// If any contact points were provided, store them in the metadata right away (the
// control connection will need them if it has to initialize)
MetadataManager metadataManager = context.metadataManager();
Expand Down Expand Up @@ -124,5 +152,97 @@ private void init() {
return null;
});
}

private void close() {
assert adminExecutor.inEventLoop();
if (closeWasCalled) {
return;
}
closeWasCalled = true;

LOG.debug("Closing {}", this);
childrenCloseFutures = new ArrayList<>();
for (AsyncAutoCloseable closeable : internalComponentsToClose()) {
LOG.debug("Closing {}", closeable);
childrenCloseFutures.add(closeable.closeAsync());
}
CompletableFutures.whenAllDone(childrenCloseFutures)
.whenCompleteAsync(this::onChildrenClosed, adminExecutor);
}

private void forceClose() {
assert adminExecutor.inEventLoop();
if (forceCloseWasCalled) {
return;
}
forceCloseWasCalled = true;

LOG.debug("Force-closing {} (was {}closed before)", this, (closeWasCalled ? "" : "not "));

if (closeWasCalled) {
// childrenCloseFutures is already created, and onChildrenClosed has already been called
for (AsyncAutoCloseable closeable : internalComponentsToClose()) {
LOG.debug("Force-closing {}", closeable);
closeable.forceCloseAsync();
}
} else {
closeWasCalled = true;
childrenCloseFutures = new ArrayList<>();
for (AsyncAutoCloseable closeable : internalComponentsToClose()) {
LOG.debug("Force-closing {}", closeable);
childrenCloseFutures.add(closeable.forceCloseAsync());
}
CompletableFutures.whenAllDone(childrenCloseFutures)
.whenCompleteAsync(this::onChildrenClosed, adminExecutor);
}
}

private void onChildrenClosed(@SuppressWarnings("unused") Void ignored, Throwable error) {
assert adminExecutor.inEventLoop();
if (error != null) {
LOG.warn("Unexpected error while closing", error);
}
try {
for (CompletionStage<Void> future : childrenCloseFutures) {
warnIfFailed(future);
}
context
.nettyOptions()
.onClose()
.addListener(
f -> {
if (!f.isSuccess()) {
closeFuture.completeExceptionally(f.cause());
} else {
closeFuture.complete(null);
}
});
} catch (Throwable t) {
// Being paranoid here, but we don't want to risk swallowing an exception and leaving close
// hanging
LOG.warn("Unexpected error while closing", t);
}
}

private void warnIfFailed(CompletionStage<Void> stage) {
CompletableFuture<Void> future = stage.toCompletableFuture();
assert future.isDone();
if (future.isCompletedExceptionally()) {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
// InterruptedException can't happen actually, but including it to make compiler happy
LOG.warn("Unexpected error while closing", e.getCause());
}
}
}

private List<AsyncAutoCloseable> internalComponentsToClose() {
return ImmutableList.of(
nodeStateManager,
metadataManager,
context.topologyMonitor(),
context.controlConnection());
}
}
}
Expand Up @@ -76,7 +76,7 @@ public void afterChannelInitialized(Channel channel) {
}

@Override
public Future<?> onShutdown() {
public Future<?> onClose() {
return ioEventLoopGroup.shutdownGracefully();
}
}
Expand Up @@ -79,5 +79,5 @@ public interface NettyOptions {
* that you have allocated elsewhere in this component, for example shut down custom event loop
* groups.
*/
Future<?> onShutdown();
Future<?> onClose();
}
Expand Up @@ -16,6 +16,7 @@
package com.datastax.oss.driver.internal.core.control;

import com.datastax.oss.driver.api.core.AllNodesFailedException;
import com.datastax.oss.driver.api.core.AsyncAutoCloseable;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.internal.core.channel.ChannelEvent;
import com.datastax.oss.driver.internal.core.channel.DriverChannel;
Expand Down Expand Up @@ -57,7 +58,7 @@
* <p>If a custom {@link TopologyMonitor} is used, the control connection is used only for schema
* refreshes; if schema metadata is also disabled, the control connection never initializes.
*/
public class ControlConnection implements EventCallback {
public class ControlConnection implements EventCallback, AsyncAutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(ControlConnection.class);

private final InternalDriverContext context;
Expand Down Expand Up @@ -103,8 +104,19 @@ public void reconnectNow() {
RunOrSchedule.on(adminExecutor, singleThreaded::reconnectNow);
}

/** Note: control queries are never critical, so there is no graceful close. */
public CompletionStage<Void> forceClose() {
@Override
public CompletionStage<Void> closeFuture() {
return singleThreaded.closeFuture;
}

@Override
public CompletionStage<Void> closeAsync() {
// Control queries are never critical, so there is no graceful close.
return forceCloseAsync();
}

@Override
public CompletionStage<Void> forceCloseAsync() {
RunOrSchedule.on(adminExecutor, singleThreaded::forceClose);
return singleThreaded.closeFuture;
}
Expand Down
Expand Up @@ -24,6 +24,7 @@
import com.datastax.oss.driver.internal.core.channel.DriverChannel;
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.internal.core.control.ControlConnection;
import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import java.net.InetAddress;
Expand Down Expand Up @@ -54,23 +55,31 @@ public class DefaultTopologyMonitor implements TopologyMonitor {
private final ControlConnection controlConnection;
private final AddressTranslator addressTranslator;
private final Duration timeout;
private final CompletableFuture<Void> closeFuture;

@VisibleForTesting volatile int port = -1;

public DefaultTopologyMonitor(InternalDriverContext context) {
this.controlConnection = context.controlConnection();
addressTranslator = context.addressTranslator();
this.addressTranslator = context.addressTranslator();
DriverConfigProfile config = context.config().defaultProfile();
this.timeout = config.getDuration(CoreDriverOption.CONTROL_CONNECTION_TIMEOUT);
this.closeFuture = new CompletableFuture<>();
}

@Override
public CompletionStage<Void> init() {
if (closeFuture.isDone()) {
return CompletableFutures.failedFuture(new IllegalStateException("closed"));
}
return controlConnection.init(true);
}

@Override
public CompletionStage<Optional<NodeInfo>> refreshNode(Node node) {
if (closeFuture.isDone()) {
return CompletableFutures.failedFuture(new IllegalStateException("closed"));
}
LOG.debug("Refreshing info for {}", node);
DriverChannel channel = controlConnection.channel();
if (node.getConnectAddress().equals(channel.address())) {
Expand All @@ -93,6 +102,9 @@ public CompletionStage<Optional<NodeInfo>> refreshNode(Node node) {

@Override
public CompletionStage<Optional<NodeInfo>> getNewNodeInfo(InetSocketAddress connectAddress) {
if (closeFuture.isDone()) {
return CompletableFutures.failedFuture(new IllegalStateException("closed"));
}
LOG.debug("Fetching info for new node {}", connectAddress);
DriverChannel channel = controlConnection.channel();
return query(channel, "SELECT * FROM system.peers")
Expand All @@ -101,6 +113,9 @@ public CompletionStage<Optional<NodeInfo>> getNewNodeInfo(InetSocketAddress conn

@Override
public CompletionStage<Iterable<NodeInfo>> refreshNodeList() {
if (closeFuture.isDone()) {
return CompletableFutures.failedFuture(new IllegalStateException("closed"));
}
LOG.debug("Refreshing node list");
DriverChannel channel = controlConnection.channel();
savePort(channel);
Expand All @@ -120,6 +135,22 @@ public CompletionStage<Iterable<NodeInfo>> refreshNodeList() {
});
}

@Override
public CompletionStage<Void> closeFuture() {
return closeFuture;
}

@Override
public CompletionStage<Void> closeAsync() {
closeFuture.complete(null);
return closeFuture;
}

@Override
public CompletionStage<Void> forceCloseAsync() {
return closeAsync();
}

@VisibleForTesting
protected CompletionStage<AdminResult> query(
DriverChannel channel, String queryString, Map<String, Object> parameters) {
Expand Down

0 comments on commit 658419d

Please sign in to comment.