Skip to content

Commit

Permalink
[ALLUXIO-3181] Close client sockets during stepdown (#7230)
Browse files Browse the repository at this point in the history
* Close client sockets during stepdown

* Add license

* Fix findbugs

* Switch to scheduled executor
  • Loading branch information
aaudiber authored and calvinjia committed May 8, 2018
1 parent 62a639b commit 46fb050
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 13 deletions.
10 changes: 10 additions & 0 deletions core/common/src/main/java/alluxio/PropertyKey.java
Expand Up @@ -900,6 +900,14 @@ public String toString() {
+ "href=\"#configure-multihomed-networks\">multi-homed networks</a>.") + "href=\"#configure-multihomed-networks\">multi-homed networks</a>.")
.setScope(Scope.MASTER) .setScope(Scope.MASTER)
.build(); .build();
public static final PropertyKey MASTER_CLIENT_SOCKET_CLEANUP_INTERVAL =
new Builder(Name.MASTER_CLIENT_SOCKET_CLEANUP_INTERVAL)
.setDefaultValue("10min")
.setDescription("Interval for removing closed client sockets from internal tracking.")
.setIsHidden(true)
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.MASTER)
.build();
public static final PropertyKey MASTER_CONNECTION_TIMEOUT_MS = public static final PropertyKey MASTER_CONNECTION_TIMEOUT_MS =
new Builder(Name.MASTER_CONNECTION_TIMEOUT_MS) new Builder(Name.MASTER_CONNECTION_TIMEOUT_MS)
.setAlias(new String[]{"alluxio.master.connection.timeout.ms"}) .setAlias(new String[]{"alluxio.master.connection.timeout.ms"})
Expand Down Expand Up @@ -2987,6 +2995,8 @@ public static final class Name {
public static final String MASTER_AUDIT_LOGGING_QUEUE_CAPACITY = public static final String MASTER_AUDIT_LOGGING_QUEUE_CAPACITY =
"alluxio.master.audit.logging.queue.capacity"; "alluxio.master.audit.logging.queue.capacity";
public static final String MASTER_BIND_HOST = "alluxio.master.bind.host"; public static final String MASTER_BIND_HOST = "alluxio.master.bind.host";
public static final String MASTER_CLIENT_SOCKET_CLEANUP_INTERVAL =
"alluxio.master.client.socket.cleanup.interval";
public static final String MASTER_CONNECTION_TIMEOUT_MS = public static final String MASTER_CONNECTION_TIMEOUT_MS =
"alluxio.master.connection.timeout"; "alluxio.master.connection.timeout";
public static final String MASTER_FILE_ASYNC_PERSIST_HANDLER = public static final String MASTER_FILE_ASYNC_PERSIST_HANDLER =
Expand Down
Expand Up @@ -17,6 +17,7 @@
import alluxio.RuntimeConstants; import alluxio.RuntimeConstants;
import alluxio.master.journal.JournalSystem; import alluxio.master.journal.JournalSystem;
import alluxio.master.journal.JournalSystem.Mode; import alluxio.master.journal.JournalSystem.Mode;
import alluxio.master.thrift.SocketTrackingTServerSocket;
import alluxio.metrics.MetricsSystem; import alluxio.metrics.MetricsSystem;
import alluxio.metrics.sink.MetricsServlet; import alluxio.metrics.sink.MetricsServlet;
import alluxio.metrics.sink.PrometheusMetricsServlet; import alluxio.metrics.sink.PrometheusMetricsServlet;
Expand All @@ -40,7 +41,6 @@
import org.apache.thrift.server.TServer; import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.server.TThreadPoolServer.Args; import org.apache.thrift.server.TThreadPoolServer.Args;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportException; import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory; import org.apache.thrift.transport.TTransportFactory;
import org.slf4j.Logger; import org.slf4j.Logger;
Expand Down Expand Up @@ -72,7 +72,7 @@ public class AlluxioMasterProcess implements MasterProcess {
private final int mPort; private final int mPort;


/** The socket for thrift rpc server. */ /** The socket for thrift rpc server. */
private TServerSocket mTServerSocket; private SocketTrackingTServerSocket mRpcServerSocket;


/** The transport provider to create thrift server transport. */ /** The transport provider to create thrift server transport. */
private final TransportProvider mTransportProvider; private final TransportProvider mTransportProvider;
Expand Down Expand Up @@ -139,9 +139,10 @@ public class AlluxioMasterProcess implements MasterProcess {
} }


mTransportProvider = TransportProvider.Factory.create(); mTransportProvider = TransportProvider.Factory.create();
mTServerSocket = new TServerSocket(NetworkAddressUtils.getBindAddress(ServiceType.MASTER_RPC), mRpcServerSocket = new SocketTrackingTServerSocket(
NetworkAddressUtils.getBindAddress(ServiceType.MASTER_RPC),
(int) Configuration.getMs(PropertyKey.MASTER_CONNECTION_TIMEOUT_MS)); (int) Configuration.getMs(PropertyKey.MASTER_CONNECTION_TIMEOUT_MS));
mPort = NetworkAddressUtils.getThriftPort(mTServerSocket); mPort = NetworkAddressUtils.getThriftPort(mRpcServerSocket);
// reset master rpc port // reset master rpc port
Configuration.set(PropertyKey.MASTER_RPC_PORT, Integer.toString(mPort)); Configuration.set(PropertyKey.MASTER_RPC_PORT, Integer.toString(mPort));
mRpcBindAddress = NetworkAddressUtils.getBindAddress(ServiceType.MASTER_RPC); mRpcBindAddress = NetworkAddressUtils.getBindAddress(ServiceType.MASTER_RPC);
Expand Down Expand Up @@ -367,16 +368,19 @@ protected void startServingRPCServer() {
} }


try { try {
if (mTServerSocket != null) { if (mRpcServerSocket != null) {
mTServerSocket.close(); mRpcServerSocket.close();
} }
mTServerSocket = new TServerSocket(mRpcBindAddress, // The socket tracking socket will close all client sockets when the server socket is closed.
// This is necessary so that clients don't receive spurious errors during failover. The master
// will close this socket before resetting its state during stepdown.
mRpcServerSocket = new SocketTrackingTServerSocket(mRpcBindAddress,
(int) Configuration.getMs(PropertyKey.MASTER_CONNECTION_TIMEOUT_MS)); (int) Configuration.getMs(PropertyKey.MASTER_CONNECTION_TIMEOUT_MS));
} catch (TTransportException e) { } catch (TTransportException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
// create master thrift service with the multiplexed processor. // create master thrift service with the multiplexed processor.
Args args = new TThreadPoolServer.Args(mTServerSocket).maxWorkerThreads(mMaxWorkerThreads) Args args = new TThreadPoolServer.Args(mRpcServerSocket).maxWorkerThreads(mMaxWorkerThreads)
.minWorkerThreads(mMinWorkerThreads).processor(processor).transportFactory(transportFactory) .minWorkerThreads(mMinWorkerThreads).processor(processor).transportFactory(transportFactory)
.protocolFactory(new TBinaryProtocol.Factory(true, true)); .protocolFactory(new TBinaryProtocol.Factory(true, true));


Expand All @@ -398,9 +402,9 @@ protected void stopServing() throws Exception {
mThriftServer.stop(); mThriftServer.stop();
mThriftServer = null; mThriftServer = null;
} }
if (mTServerSocket != null) { if (mRpcServerSocket != null) {
mTServerSocket.close(); mRpcServerSocket.close();
mTServerSocket = null; mRpcServerSocket = null;
} }
if (mJvmPauseMonitor != null) { if (mJvmPauseMonitor != null) {
mJvmPauseMonitor.stop(); mJvmPauseMonitor.stop();
Expand Down
Expand Up @@ -93,9 +93,12 @@ public void start() throws Exception {
waitForReady(); waitForReady();
LOG.info("Primary started"); LOG.info("Primary started");
mLeaderSelector.waitForState(State.SECONDARY); mLeaderSelector.waitForState(State.SECONDARY);
// Put the journal in secondary mode ASAP to avoid interfering with the new primary.
mJournalSystem.setMode(Mode.SECONDARY);
stopServing(); stopServing();
// Put the journal in secondary mode ASAP to avoid interfering with the new primary. This must
// happen after stopServing because downgrading the journal system will reset master state,
// which could cause NPEs for outstanding RPC threads. We need to first close all client
// sockets in stopServing so that clients don't see NPEs.
mJournalSystem.setMode(Mode.SECONDARY);
mServingThread.join(mServingThreadTimeoutMs); mServingThread.join(mServingThreadTimeoutMs);
if (mServingThread.isAlive()) { if (mServingThread.isAlive()) {
LOG.error( LOG.error(
Expand Down
@@ -0,0 +1,115 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.master.thrift;

import alluxio.Configuration;
import alluxio.PropertyKey;
import alluxio.util.ThreadFactoryUtils;

import com.google.common.io.Closer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* Extension of TServerSocket which tracks all accepted sockets and closes them when the server
* socket is closed.
*/
public class SocketTrackingTServerSocket extends TServerSocket {
private static final Logger LOG = LoggerFactory.getLogger(SocketTrackingTServerSocket.class);
private static final long CLEANUP_INTERVAL_MS =
Configuration.getMs(PropertyKey.MASTER_CLIENT_SOCKET_CLEANUP_INTERVAL);

private final Set<Socket> mSockets = ConcurrentHashMap.newKeySet();
private final ScheduledExecutorService mExecutor;

/**
* @param bindAddr bind address for the socket
* @param clientTimeout timeout for client sockets from accept
*/
public SocketTrackingTServerSocket(InetSocketAddress bindAddr, int clientTimeout)
throws TTransportException {
super(bindAddr, clientTimeout);
mExecutor = Executors
.newSingleThreadScheduledExecutor(ThreadFactoryUtils.build("socket-closer-thread", true));
mExecutor.scheduleAtFixedRate(this::removeClosedSockets, CLEANUP_INTERVAL_MS,
CLEANUP_INTERVAL_MS, TimeUnit.MILLISECONDS);
}

@Override
public TSocket acceptImpl() throws TTransportException {
TSocket socket = super.acceptImpl();
mSockets.add(socket.getSocket());
return socket;
}

@Override
public void close() {
super.close();
try {
closeClientSockets();
} catch (IOException e) {
LOG.error("Could not close client sockets", e);
}
mExecutor.shutdownNow();
try {
if (!mExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
LOG.warn("Failed to stop socket cleanup thread.");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}

/**
* Closes all socket connections that have been accepted by this server socket.
*/
private void closeClientSockets() throws IOException {
Closer closer = Closer.create();
int count = 0;
for (Socket s : mSockets) {
if (!s.isClosed()) {
closer.register(s);
count++;
}
}
closer.close();
LOG.info("Closed {} client sockets", count);
}

/**
* Periodically clean up any closed sockets.
*/
private void removeClosedSockets() {
// This is best-effort, and may not remove sockets added to the mSockets set after the
// iterator was created. Those sockets will be checked on the next sweep.
for (Iterator<Socket> it = mSockets.iterator(); it.hasNext();) {
Socket s = it.next();
if (s.isClosed()) {
it.remove();
}
}
}
}

0 comments on commit 46fb050

Please sign in to comment.