Skip to content

Commit

Permalink
[TACHYON-224] change thread model to TThreadPoolServer on Master and …
Browse files Browse the repository at this point in the history
…Worker
  • Loading branch information
SiyuanHe committed Feb 24, 2015
1 parent a73823d commit b660f3b
Show file tree
Hide file tree
Showing 11 changed files with 105 additions and 95 deletions.
9 changes: 9 additions & 0 deletions conf/tachyon-env.sh.template
Expand Up @@ -53,6 +53,13 @@ export TACHYON_UNDERFS_ADDRESS=$TACHYON_HOME/underfs
export TACHYON_WORKER_MEMORY_SIZE=1GB
export TACHYON_UNDERFS_HDFS_IMPL=org.apache.hadoop.hdfs.DistributedFileSystem

# The default max threads in both master and worker are unlimited and thus provides no protection against
# clients overwhelming the server. Users need to set this number according to their production environment
# These two settings represents the maximum number of client requests that a master/worker server may execute concurrently.
export TACHYON_WORKER_MAX_THREADS=2048
export TACHYON_MASTER_MAX_THREADS=2048


CONF_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"

export TACHYON_JAVA_OPTS+="
Expand All @@ -71,6 +78,8 @@ export TACHYON_JAVA_OPTS+="
-Dtachyon.master.worker.timeout.ms=60000
-Dtachyon.master.hostname=$TACHYON_MASTER_ADDRESS
-Dtachyon.master.journal.folder=$TACHYON_HOME/journal/
-Dtachyon.master.maxworker.threads=$TACHYON_MASTER_MAX_THREADS
-Dtachyon.worker.maxworker.threads=$TACHYON_WORKER_MAX_THREADS
-Dorg.apache.jasper.compiler.disablejsr199=true
-Djava.net.preferIPv4Stack=true
"
Expand Down
14 changes: 6 additions & 8 deletions core/src/main/java/tachyon/Constants.java
Expand Up @@ -109,10 +109,8 @@ public class Constants {
public static final String MASTER_WEB_THREAD_COUNT = "tachyon.master.web.threads";
public static final String MASTER_TEMPORARY_FOLDER = "tachyon.master.temporary.folder";
public static final String MASTER_HEARTBEAT_INTERVAL_MS = "tachyon.master.heartbeat.interval.ms";
public static final String MASTER_SELECTOR_THREADS = "tachyon.master.selector.threads";
public static final String MASTER_QUEUE_SIZE_PER_SELECTOR =
"tachyon.master.queue.size.per.selector";
public static final String MASTER_SERVER_THREADS = "tachyon.master.server.threads";
public static final String MASTER_MAX_WORKER_THREADS = "tachyon.master.maxworker.threads";
public static final String MASTER_MIN_WORKER_THREADS = "tachyon.master.minworker.threads";
public static final String MASTER_WORKER_TIMEOUT_MS = "tachyon.master.worker.timeout.ms";
public static final String MASTER_WHITELIST = "tachyon.master.whitelist";
public static final String MASTER_KEYTAB_KEY = "tachyon.master.keytab.file";
Expand All @@ -126,11 +124,11 @@ public class Constants {
public static final String WORKER_HEARTBEAT_TIMEOUT_MS = "tachyon.worker.heartbeat.timeout.ms";
public static final String WORKER_TO_MASTER_HEARTBEAT_INTERVAL_MS =
"tachyon.worker.to.master.heartbeat.interval.ms";
public static final String WORKER_SELECTOR_THREADS = "tachyon.worker.selector.threads";
public static final String WORKER_QUEUE_SIZE_PER_SELECTOR =
"tachyon.worker.queue.size.per.selector";
public static final String WORKER_SERVER_THREADS = "tachyon.worker.server.threads";
public static final String WORKER_USER_TIMEOUT_MS = "tachyon.worker.user.timeout.ms";
public static final String WORKER_MAX_WORKER_THREADS = "tachyon.worker.maxworker.threads";
public static final String WORKER_MIN_WORKER_THREADS = "tachyon.worker.minworker.threads";



public static final String WORKER_CHECKPOINT_THREADS = "tachyon.worker.checkpoint.threads";
public static final String WORKER_PER_THREAD_CHECKPOINT_CAP_MB_SEC =
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/tachyon/conf/TachyonConf.java
Expand Up @@ -145,9 +145,9 @@ protected void loadDefault(boolean includeSystemProperties) {
defaultProps.setProperty(Constants.MASTER_HOSTNAME, NetworkUtils.getLocalHostName());
defaultProps.setProperty(Constants.WORKER_NETWORK_NETTY_CHANNEL,
ChannelType.defaultType().toString());
defaultProps.setProperty(Constants.WORKER_SERVER_THREADS,
defaultProps.setProperty(Constants.WORKER_MIN_WORKER_THREADS,
String.valueOf(Runtime.getRuntime().availableProcessors()));
defaultProps.setProperty(Constants.MASTER_SERVER_THREADS,
defaultProps.setProperty(Constants.MASTER_MIN_WORKER_THREADS,
String.valueOf(2 * Runtime.getRuntime().availableProcessors()));

InputStream defaultInputStream =
Expand Down
44 changes: 24 additions & 20 deletions core/src/main/java/tachyon/master/TachyonMaster.java
Expand Up @@ -20,9 +20,13 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -64,16 +68,15 @@ public static void main(String[] args) {
private MasterInfo mMasterInfo;
private InetSocketAddress mMasterAddress;
private UIWebServer mWebServer;
private TNonblockingServerSocket mServerTNonblockingServerSocket;
private TServerSocket mServerTServerSocket;
private TServer mMasterServiceServer;
private MasterServiceHandler mMasterServiceHandler;
private Journal mJournal;
private EditLogProcessor mEditLogProcessor;
private int mWebPort;

private int mSelectorThreads;
private int mAcceptQueueSizePerThread;
private int mWorkerThreads;
private int mMaxWorkerThread;
private int mMinWorkerThread;
private boolean mZookeeperMode = false;
private final ExecutorService mExecutorService = Executors.newFixedThreadPool(2,
ThreadFactoryUtils.daemon("heartbeat-master-%d"));
Expand All @@ -92,11 +95,6 @@ public TachyonMaster(TachyonConf tachyonConf) {
int port = mTachyonConf.getInt(Constants.MASTER_PORT, 0);
InetSocketAddress address = new InetSocketAddress(hostName, port);
int webPort = mTachyonConf.getInt(Constants.MASTER_WEB_PORT, 0);
int selectorThreads = mTachyonConf.getInt(Constants.MASTER_SELECTOR_THREADS, 3);
int acceptQueueSizePerThreads = mTachyonConf.getInt(Constants.MASTER_QUEUE_SIZE_PER_SELECTOR,
3000);
int workerThreads = mTachyonConf.getInt(Constants.MASTER_SERVER_THREADS,
2 * Runtime.getRuntime().availableProcessors());

TachyonConf.assertValidPort(address, mTachyonConf);
TachyonConf.assertValidPort(webPort, mTachyonConf);
Expand All @@ -105,18 +103,22 @@ public TachyonMaster(TachyonConf tachyonConf) {

mIsStarted = false;
mWebPort = webPort;
mSelectorThreads = selectorThreads;
mAcceptQueueSizePerThread = acceptQueueSizePerThreads;
mWorkerThreads = workerThreads;
mMinWorkerThread = mTachyonConf.getInt(Constants.MASTER_MIN_WORKER_THREADS,
Runtime.getRuntime().availableProcessors());

//Set max thread to max integer by default
//An property will be set/added in tachyon-env for users to specify a number that make sense in
//their production environment
mMaxWorkerThread = mTachyonConf.getInt(Constants.MASTER_MAX_WORKER_THREADS, Integer.MAX_VALUE);

try {
// Extract the port from the generated socket.
// When running tests, its great to use port '0' so the system will figure out what port to
// use (any random free port).
// In a production or any real deployment setup, port '0' should not be used as it will make
// deployment more complicated.
mServerTNonblockingServerSocket = new TNonblockingServerSocket(address);
mPort = NetworkUtils.getPort(mServerTNonblockingServerSocket);
mServerTServerSocket = new TServerSocket(address);
mPort = NetworkUtils.getPort(mServerTServerSocket);

mMasterAddress = new InetSocketAddress(NetworkUtils.getFqdnHost(address), mPort);
String journalFolder = mTachyonConf.get(Constants.MASTER_JOURNAL_FOLDER, "/journal/");
Expand Down Expand Up @@ -236,11 +238,13 @@ private void setup() throws IOException, TTransportException {
MasterService.Processor<MasterServiceHandler> masterServiceProcessor =
new MasterService.Processor<MasterServiceHandler>(mMasterServiceHandler);

mMasterServiceServer =
new TThreadedSelectorServer(new TThreadedSelectorServer.Args(
mServerTNonblockingServerSocket).processor(masterServiceProcessor)
.selectorThreads(mSelectorThreads).acceptQueueSizePerThread(mAcceptQueueSizePerThread)
.workerThreads(mWorkerThreads));
mMasterServiceServer = new TThreadPoolServer( new TThreadPoolServer.Args(
mServerTServerSocket)
.maxWorkerThreads(mMaxWorkerThread)
.minWorkerThreads(mMinWorkerThread)
.processor(masterServiceProcessor)
.transportFactory(new TFramedTransport.Factory())
.protocolFactory(new TBinaryProtocol.Factory()));

mIsStarted = true;
}
Expand Down Expand Up @@ -308,7 +312,7 @@ public void stop() throws Exception {
mWebServer.shutdownWebServer();
mMasterInfo.stop();
mMasterServiceServer.stop();
mServerTNonblockingServerSocket.close();
mServerTServerSocket.close();
mExecutorService.shutdown();
mIsStarted = false;
}
Expand Down
11 changes: 6 additions & 5 deletions core/src/main/java/tachyon/util/NetworkUtils.java
Expand Up @@ -26,6 +26,7 @@
import java.util.Enumeration;

import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TServerSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -146,12 +147,12 @@ public static String getFqdnHost(NetAddress addr) throws UnknownHostException {

/**
* Gets the port for the underline socket. This function calls
* {@link #getSocket(org.apache.thrift.transport.TNonblockingServerSocket)}, so reflection will be
* {@link #getSocket(org.apache.thrift.transport.TServerSocket)}, so reflection will be
* used to get the port.
*
* @see #getSocket(org.apache.thrift.transport.TNonblockingServerSocket)
* @see #getSocket(org.apache.thrift.transport.TServerSocket)
*/
public static int getPort(TNonblockingServerSocket thriftSocket) {
public static int getPort(TServerSocket thriftSocket) {
return getSocket(thriftSocket).getLocalPort();
}

Expand All @@ -161,9 +162,9 @@ public static int getPort(TNonblockingServerSocket thriftSocket) {
*
* @throws java.lang.RuntimeException if reflection calls fail
*/
public static ServerSocket getSocket(final TNonblockingServerSocket thriftSocket) {
public static ServerSocket getSocket(final TServerSocket thriftSocket) {
try {
Field field = TNonblockingServerSocket.class.getDeclaredField("serverSocket_");
Field field = TServerSocket.class.getDeclaredField("serverSocket_");
field.setAccessible(true);
return (ServerSocket) field.get(thriftSocket);
} catch (NoSuchFieldException e) {
Expand Down
Expand Up @@ -110,15 +110,14 @@ public void doGet(HttpServletRequest request, HttpServletResponse response)
+ mTachyonConf.getInt(Constants.MASTER_RETRY_COUNT, 29));
request.setAttribute("tachyon.master.heartbeat.interval.ms", ""
+ mTachyonConf.getInt(Constants.MASTER_HEARTBEAT_INTERVAL_MS, -1));
request.setAttribute("tachyon.master.selector.threads", ""
+ mTachyonConf.getInt(Constants.MASTER_SELECTOR_THREADS, -1));
request.setAttribute("tachyon.master.queue.size.per.selector", ""
+ mTachyonConf.getInt(Constants.MASTER_QUEUE_SIZE_PER_SELECTOR, -1));
request.setAttribute("tachyon.master.server.threads", ""
+ mTachyonConf.getInt(Constants.MASTER_SERVER_THREADS, -1));
request.setAttribute("tachyon.master.minworker.threads", ""
+ mTachyonConf.getInt(Constants.MASTER_MIN_WORKER_THREADS, -1));
request.setAttribute("tachyon.master.maxworker.threads", ""
+ mTachyonConf.getInt(Constants.MASTER_MAX_WORKER_THREADS, -1));
request.setAttribute("tachyon.master.worker.timeout.ms", ""
+ mTachyonConf.getInt(Constants.MASTER_WORKER_TIMEOUT_MS, -1));

getServletContext().getRequestDispatcher("/configuration.jsp").forward(request, response);
}
}

0 comments on commit b660f3b

Please sign in to comment.