diff --git a/conf/tachyon-env.sh.template b/conf/tachyon-env.sh.template index ee49485e1008..3febfd1e5a94 100755 --- a/conf/tachyon-env.sh.template +++ b/conf/tachyon-env.sh.template @@ -53,6 +53,13 @@ export TACHYON_UNDERFS_ADDRESS=$TACHYON_HOME/underfs export TACHYON_WORKER_MEMORY_SIZE=1GB export TACHYON_UNDERFS_HDFS_IMPL=org.apache.hadoop.hdfs.DistributedFileSystem +# The default max threads in both master and worker are unlimited and thus provides no protection against +# clients overwhelming the server. Users need to set this number according to their production environment +# These two settings represents the maximum number of client requests that a master/worker server may execute concurrently. +export TACHYON_WORKER_MAX_THREADS=2048 +export TACHYON_MASTER_MAX_THREADS=2048 + + CONF_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" export TACHYON_JAVA_OPTS+=" @@ -71,6 +78,8 @@ export TACHYON_JAVA_OPTS+=" -Dtachyon.master.worker.timeout.ms=60000 -Dtachyon.master.hostname=$TACHYON_MASTER_ADDRESS -Dtachyon.master.journal.folder=$TACHYON_HOME/journal/ + -Dtachyon.master.maxworker.threads=$TACHYON_MASTER_MAX_THREADS + -Dtachyon.worker.maxworker.threads=$TACHYON_WORKER_MAX_THREADS -Dorg.apache.jasper.compiler.disablejsr199=true -Djava.net.preferIPv4Stack=true " diff --git a/core/src/main/java/tachyon/Constants.java b/core/src/main/java/tachyon/Constants.java index 56a23c786011..4ffe52666b5c 100644 --- a/core/src/main/java/tachyon/Constants.java +++ b/core/src/main/java/tachyon/Constants.java @@ -109,10 +109,8 @@ public class Constants { public static final String MASTER_WEB_THREAD_COUNT = "tachyon.master.web.threads"; public static final String MASTER_TEMPORARY_FOLDER = "tachyon.master.temporary.folder"; public static final String MASTER_HEARTBEAT_INTERVAL_MS = "tachyon.master.heartbeat.interval.ms"; - public static final String MASTER_SELECTOR_THREADS = "tachyon.master.selector.threads"; - public static final String MASTER_QUEUE_SIZE_PER_SELECTOR = - "tachyon.master.queue.size.per.selector"; - public static final String MASTER_SERVER_THREADS = "tachyon.master.server.threads"; + public static final String MASTER_MAX_WORKER_THREADS = "tachyon.master.maxworker.threads"; + public static final String MASTER_MIN_WORKER_THREADS = "tachyon.master.minworker.threads"; public static final String MASTER_WORKER_TIMEOUT_MS = "tachyon.master.worker.timeout.ms"; public static final String MASTER_WHITELIST = "tachyon.master.whitelist"; public static final String MASTER_KEYTAB_KEY = "tachyon.master.keytab.file"; @@ -126,11 +124,11 @@ public class Constants { public static final String WORKER_HEARTBEAT_TIMEOUT_MS = "tachyon.worker.heartbeat.timeout.ms"; public static final String WORKER_TO_MASTER_HEARTBEAT_INTERVAL_MS = "tachyon.worker.to.master.heartbeat.interval.ms"; - public static final String WORKER_SELECTOR_THREADS = "tachyon.worker.selector.threads"; - public static final String WORKER_QUEUE_SIZE_PER_SELECTOR = - "tachyon.worker.queue.size.per.selector"; - public static final String WORKER_SERVER_THREADS = "tachyon.worker.server.threads"; public static final String WORKER_USER_TIMEOUT_MS = "tachyon.worker.user.timeout.ms"; + public static final String WORKER_MAX_WORKER_THREADS = "tachyon.worker.maxworker.threads"; + public static final String WORKER_MIN_WORKER_THREADS = "tachyon.worker.minworker.threads"; + + public static final String WORKER_CHECKPOINT_THREADS = "tachyon.worker.checkpoint.threads"; public static final String WORKER_PER_THREAD_CHECKPOINT_CAP_MB_SEC = diff --git a/core/src/main/java/tachyon/conf/TachyonConf.java b/core/src/main/java/tachyon/conf/TachyonConf.java index 4bd2a8322d5f..72a8c1d17aad 100644 --- a/core/src/main/java/tachyon/conf/TachyonConf.java +++ b/core/src/main/java/tachyon/conf/TachyonConf.java @@ -145,9 +145,9 @@ protected void loadDefault(boolean includeSystemProperties) { defaultProps.setProperty(Constants.MASTER_HOSTNAME, NetworkUtils.getLocalHostName()); defaultProps.setProperty(Constants.WORKER_NETWORK_NETTY_CHANNEL, ChannelType.defaultType().toString()); - defaultProps.setProperty(Constants.WORKER_SERVER_THREADS, + defaultProps.setProperty(Constants.WORKER_MIN_WORKER_THREADS, String.valueOf(Runtime.getRuntime().availableProcessors())); - defaultProps.setProperty(Constants.MASTER_SERVER_THREADS, + defaultProps.setProperty(Constants.MASTER_MIN_WORKER_THREADS, String.valueOf(2 * Runtime.getRuntime().availableProcessors())); InputStream defaultInputStream = diff --git a/core/src/main/java/tachyon/master/TachyonMaster.java b/core/src/main/java/tachyon/master/TachyonMaster.java index 412718292c44..a983e1f2cbfd 100644 --- a/core/src/main/java/tachyon/master/TachyonMaster.java +++ b/core/src/main/java/tachyon/master/TachyonMaster.java @@ -20,9 +20,13 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.server.TServer; +import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.server.TThreadedSelectorServer; +import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TNonblockingServerSocket; +import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,16 +68,15 @@ public static void main(String[] args) { private MasterInfo mMasterInfo; private InetSocketAddress mMasterAddress; private UIWebServer mWebServer; - private TNonblockingServerSocket mServerTNonblockingServerSocket; + private TServerSocket mServerTServerSocket; private TServer mMasterServiceServer; private MasterServiceHandler mMasterServiceHandler; private Journal mJournal; private EditLogProcessor mEditLogProcessor; private int mWebPort; - private int mSelectorThreads; - private int mAcceptQueueSizePerThread; - private int mWorkerThreads; + private int mMaxWorkerThread; + private int mMinWorkerThread; private boolean mZookeeperMode = false; private final ExecutorService mExecutorService = Executors.newFixedThreadPool(2, ThreadFactoryUtils.daemon("heartbeat-master-%d")); @@ -92,11 +95,6 @@ public TachyonMaster(TachyonConf tachyonConf) { int port = mTachyonConf.getInt(Constants.MASTER_PORT, 0); InetSocketAddress address = new InetSocketAddress(hostName, port); int webPort = mTachyonConf.getInt(Constants.MASTER_WEB_PORT, 0); - int selectorThreads = mTachyonConf.getInt(Constants.MASTER_SELECTOR_THREADS, 3); - int acceptQueueSizePerThreads = mTachyonConf.getInt(Constants.MASTER_QUEUE_SIZE_PER_SELECTOR, - 3000); - int workerThreads = mTachyonConf.getInt(Constants.MASTER_SERVER_THREADS, - 2 * Runtime.getRuntime().availableProcessors()); TachyonConf.assertValidPort(address, mTachyonConf); TachyonConf.assertValidPort(webPort, mTachyonConf); @@ -105,9 +103,13 @@ public TachyonMaster(TachyonConf tachyonConf) { mIsStarted = false; mWebPort = webPort; - mSelectorThreads = selectorThreads; - mAcceptQueueSizePerThread = acceptQueueSizePerThreads; - mWorkerThreads = workerThreads; + mMinWorkerThread = mTachyonConf.getInt(Constants.MASTER_MIN_WORKER_THREADS, + Runtime.getRuntime().availableProcessors()); + + //Set max thread to max integer by default + //An property will be set/added in tachyon-env for users to specify a number that make sense in + //their production environment + mMaxWorkerThread = mTachyonConf.getInt(Constants.MASTER_MAX_WORKER_THREADS, Integer.MAX_VALUE); try { // Extract the port from the generated socket. @@ -115,8 +117,8 @@ public TachyonMaster(TachyonConf tachyonConf) { // use (any random free port). // In a production or any real deployment setup, port '0' should not be used as it will make // deployment more complicated. - mServerTNonblockingServerSocket = new TNonblockingServerSocket(address); - mPort = NetworkUtils.getPort(mServerTNonblockingServerSocket); + mServerTServerSocket = new TServerSocket(address); + mPort = NetworkUtils.getPort(mServerTServerSocket); mMasterAddress = new InetSocketAddress(NetworkUtils.getFqdnHost(address), mPort); String journalFolder = mTachyonConf.get(Constants.MASTER_JOURNAL_FOLDER, "/journal/"); @@ -236,11 +238,13 @@ private void setup() throws IOException, TTransportException { MasterService.Processor masterServiceProcessor = new MasterService.Processor(mMasterServiceHandler); - mMasterServiceServer = - new TThreadedSelectorServer(new TThreadedSelectorServer.Args( - mServerTNonblockingServerSocket).processor(masterServiceProcessor) - .selectorThreads(mSelectorThreads).acceptQueueSizePerThread(mAcceptQueueSizePerThread) - .workerThreads(mWorkerThreads)); + mMasterServiceServer = new TThreadPoolServer( new TThreadPoolServer.Args( + mServerTServerSocket) + .maxWorkerThreads(mMaxWorkerThread) + .minWorkerThreads(mMinWorkerThread) + .processor(masterServiceProcessor) + .transportFactory(new TFramedTransport.Factory()) + .protocolFactory(new TBinaryProtocol.Factory())); mIsStarted = true; } @@ -308,7 +312,7 @@ public void stop() throws Exception { mWebServer.shutdownWebServer(); mMasterInfo.stop(); mMasterServiceServer.stop(); - mServerTNonblockingServerSocket.close(); + mServerTServerSocket.close(); mExecutorService.shutdown(); mIsStarted = false; } diff --git a/core/src/main/java/tachyon/util/NetworkUtils.java b/core/src/main/java/tachyon/util/NetworkUtils.java index 284ae0c2b1e4..85c2c6af3631 100644 --- a/core/src/main/java/tachyon/util/NetworkUtils.java +++ b/core/src/main/java/tachyon/util/NetworkUtils.java @@ -26,6 +26,7 @@ import java.util.Enumeration; import org.apache.thrift.transport.TNonblockingServerSocket; +import org.apache.thrift.transport.TServerSocket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -146,12 +147,12 @@ public static String getFqdnHost(NetAddress addr) throws UnknownHostException { /** * Gets the port for the underline socket. This function calls - * {@link #getSocket(org.apache.thrift.transport.TNonblockingServerSocket)}, so reflection will be + * {@link #getSocket(org.apache.thrift.transport.TServerSocket)}, so reflection will be * used to get the port. * - * @see #getSocket(org.apache.thrift.transport.TNonblockingServerSocket) + * @see #getSocket(org.apache.thrift.transport.TServerSocket) */ - public static int getPort(TNonblockingServerSocket thriftSocket) { + public static int getPort(TServerSocket thriftSocket) { return getSocket(thriftSocket).getLocalPort(); } @@ -161,9 +162,9 @@ public static int getPort(TNonblockingServerSocket thriftSocket) { * * @throws java.lang.RuntimeException if reflection calls fail */ - public static ServerSocket getSocket(final TNonblockingServerSocket thriftSocket) { + public static ServerSocket getSocket(final TServerSocket thriftSocket) { try { - Field field = TNonblockingServerSocket.class.getDeclaredField("serverSocket_"); + Field field = TServerSocket.class.getDeclaredField("serverSocket_"); field.setAccessible(true); return (ServerSocket) field.get(thriftSocket); } catch (NoSuchFieldException e) { diff --git a/core/src/main/java/tachyon/web/WebInterfaceConfigurationServlet.java b/core/src/main/java/tachyon/web/WebInterfaceConfigurationServlet.java index 15bacdfab8e0..7f097913a3cb 100644 --- a/core/src/main/java/tachyon/web/WebInterfaceConfigurationServlet.java +++ b/core/src/main/java/tachyon/web/WebInterfaceConfigurationServlet.java @@ -110,15 +110,14 @@ public void doGet(HttpServletRequest request, HttpServletResponse response) + mTachyonConf.getInt(Constants.MASTER_RETRY_COUNT, 29)); request.setAttribute("tachyon.master.heartbeat.interval.ms", "" + mTachyonConf.getInt(Constants.MASTER_HEARTBEAT_INTERVAL_MS, -1)); - request.setAttribute("tachyon.master.selector.threads", "" - + mTachyonConf.getInt(Constants.MASTER_SELECTOR_THREADS, -1)); - request.setAttribute("tachyon.master.queue.size.per.selector", "" - + mTachyonConf.getInt(Constants.MASTER_QUEUE_SIZE_PER_SELECTOR, -1)); - request.setAttribute("tachyon.master.server.threads", "" - + mTachyonConf.getInt(Constants.MASTER_SERVER_THREADS, -1)); + request.setAttribute("tachyon.master.minworker.threads", "" + + mTachyonConf.getInt(Constants.MASTER_MIN_WORKER_THREADS, -1)); + request.setAttribute("tachyon.master.maxworker.threads", "" + + mTachyonConf.getInt(Constants.MASTER_MAX_WORKER_THREADS, -1)); request.setAttribute("tachyon.master.worker.timeout.ms", "" + mTachyonConf.getInt(Constants.MASTER_WORKER_TIMEOUT_MS, -1)); getServletContext().getRequestDispatcher("/configuration.jsp").forward(request, response); } } + diff --git a/core/src/main/java/tachyon/worker/TachyonWorker.java b/core/src/main/java/tachyon/worker/TachyonWorker.java index f93f8d8d7699..33432f0c032d 100644 --- a/core/src/main/java/tachyon/worker/TachyonWorker.java +++ b/core/src/main/java/tachyon/worker/TachyonWorker.java @@ -22,9 +22,13 @@ import java.util.concurrent.Executors; import com.google.common.primitives.Ints; +import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.server.TServer; +import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.server.TThreadedSelectorServer; +import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TNonblockingServerSocket; +import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,17 +62,16 @@ public class TachyonWorker implements Runnable { * @param masterAddress The TachyonMaster's address * @param workerAddress This TachyonWorker's address * @param dataPort This TachyonWorker's data server's port - * @param selectorThreads The number of selector threads of the worker's thrift server - * @param acceptQueueSizePerThreads The accept queue size per thread of the worker's thrift server - * @param workerThreads The number of threads of the worker's thrift server + * @param minWorkerThreads The min number of worker threads used in TThreadPoolServer + * @param maxWorkerThreads The max number of worker threads used in TThreadPoolServer * @param tachyonConf The instance of {@link tachyon.conf.TachyonConf} to used by Worker. * @return The new TachyonWorker */ public static synchronized TachyonWorker createWorker(InetSocketAddress masterAddress, - InetSocketAddress workerAddress, int dataPort, int selectorThreads, - int acceptQueueSizePerThreads, int workerThreads, TachyonConf tachyonConf) { - return new TachyonWorker(masterAddress, workerAddress, dataPort, selectorThreads, - acceptQueueSizePerThreads, workerThreads, tachyonConf); + InetSocketAddress workerAddress, int dataPort, int minWorkerThreads, + int maxWorkerThreads, TachyonConf tachyonConf) { + return new TachyonWorker(masterAddress, workerAddress, dataPort, minWorkerThreads, + maxWorkerThreads, tachyonConf); } /** @@ -77,21 +80,19 @@ public static synchronized TachyonWorker createWorker(InetSocketAddress masterAd * @param masterAddress The TachyonMaster's address. e.g., localhost:19998 * @param workerAddress This TachyonWorker's address. e.g., localhost:29998 * @param dataPort This TachyonWorker's data server's port - * @param selectorThreads The number of selector threads of the worker's thrift server - * @param acceptQueueSizePerThreads The accept queue size per thread of the worker's thrift server - * @param workerThreads The number of threads of the worker's thrift server + * @param minWorkerThreads The min number of worker threads used in TThreadPoolServer + * @param maxWorkerThreads The max number of worker threads used in TThreadPoolServer * @param tachyonConf The instance of {@link tachyon.conf.TachyonConf} to used by Worker. * @return The new TachyonWorker */ public static synchronized TachyonWorker createWorker(String masterAddress, String workerAddress, - int dataPort, int selectorThreads, int acceptQueueSizePerThreads, int workerThreads, - TachyonConf tachyonConf) { + int dataPort, int minWorkerThreads, int maxWorkerThreads, TachyonConf tachyonConf) { String[] address = masterAddress.split(":"); InetSocketAddress master = new InetSocketAddress(address[0], Integer.parseInt(address[1])); address = workerAddress.split(":"); InetSocketAddress worker = new InetSocketAddress(address[0], Integer.parseInt(address[1])); - return new TachyonWorker(master, worker, dataPort, selectorThreads, acceptQueueSizePerThreads, - workerThreads, tachyonConf); + return new TachyonWorker(master, worker, dataPort, minWorkerThreads, maxWorkerThreads, + tachyonConf); } /** @@ -107,14 +108,18 @@ public static synchronized TachyonWorker createWorker(TachyonConf tachyonConf) { int workerPort = tachyonConf.getInt(Constants.WORKER_PORT, Constants.DEFAULT_WORKER_PORT); int dataPort = tachyonConf.getInt(Constants.WORKER_DATA_PORT, Constants.DEFAULT_WORKER_DATA_SERVER_PORT); - int selectorThreads = tachyonConf.getInt(Constants.WORKER_SELECTOR_THREADS, 3); - int qSizePerSelector = tachyonConf.getInt(Constants.WORKER_QUEUE_SIZE_PER_SELECTOR, 3000); - int serverThreads = tachyonConf.getInt(Constants.WORKER_SERVER_THREADS, + int minWorkerThreads = tachyonConf.getInt(Constants.WORKER_MIN_WORKER_THREADS, Runtime.getRuntime().availableProcessors()); + //Set max thread to max integer by default + //An property will be set/added in tachyon-env for users to specify a number that make sense in + //their production environment + int maxWorkerThreads = tachyonConf.getInt(Constants.WORKER_MAX_WORKER_THREADS, + Integer.MAX_VALUE); + return new TachyonWorker(new InetSocketAddress(masterHostname, masterPort), - new InetSocketAddress(workerHostName, workerPort), dataPort, selectorThreads, - qSizePerSelector, serverThreads, tachyonConf); + new InetSocketAddress(workerHostName, workerPort), dataPort, minWorkerThreads, + maxWorkerThreads, tachyonConf); } @@ -162,7 +167,7 @@ public static void main(String[] args) throws UnknownHostException { private final NetAddress mWorkerAddress; private TServer mServer; - private TNonblockingServerSocket mServerTNonblockingServerSocket; + private TServerSocket mServerTServerSocket; private final WorkerStorage mWorkerStorage; private final WorkerServiceHandler mWorkerServiceHandler; @@ -183,14 +188,12 @@ public static void main(String[] args) throws UnknownHostException { * @param masterAddress The TachyonMaster's address. * @param workerAddress This TachyonWorker's address. * @param dataPort This TachyonWorker's data server's port - * @param selectorThreads The number of selector threads of the worker's thrift server - * @param acceptQueueSizePerThreads The accept queue size per thread of the worker's thrift server - * @param workerThreads The number of threads of the worker's thrift server + * @param minWorkerThreads The min number of worker threads used in TThreadPoolServer + * @param maxWorkerThreads The max number of worker threads used in TThreadPoolServer * @param tachyonConf The {@link TachyonConf} instance for configuration properties */ private TachyonWorker(InetSocketAddress masterAddress, InetSocketAddress workerAddress, - int dataPort, int selectorThreads, int acceptQueueSizePerThreads, int workerThreads, - TachyonConf tachyonConf) { + int dataPort, int minWorkerThreads, int maxWorkerThreads, TachyonConf tachyonConf) { TachyonConf.assertValidPort(masterAddress, tachyonConf); TachyonConf.assertValidPort(workerAddress, tachyonConf); TachyonConf.assertValidPort(dataPort, tachyonConf); @@ -220,13 +223,17 @@ private TachyonWorker(InetSocketAddress masterAddress, InetSocketAddress workerA WorkerService.Processor processor = new WorkerService.Processor(mWorkerServiceHandler); - mServerTNonblockingServerSocket = new TNonblockingServerSocket(workerAddress); - mPort = NetworkUtils.getPort(mServerTNonblockingServerSocket); - mServer = - new TThreadedSelectorServer(new TThreadedSelectorServer.Args( - mServerTNonblockingServerSocket).processor(processor) - .selectorThreads(selectorThreads).acceptQueueSizePerThread(acceptQueueSizePerThreads) - .workerThreads(workerThreads)); + mServerTServerSocket = new TServerSocket(workerAddress); + mPort = NetworkUtils.getPort(mServerTServerSocket); + + mServer = new TThreadPoolServer( new TThreadPoolServer.Args( + mServerTServerSocket) + .minWorkerThreads(minWorkerThreads) + .maxWorkerThreads(maxWorkerThreads) + .processor(processor) + .transportFactory(new TFramedTransport.Factory()) + .protocolFactory(new TBinaryProtocol.Factory())); + } catch (TTransportException e) { LOG.error(e.getMessage(), e); throw Throwables.propagate(e); @@ -375,12 +382,12 @@ public void stop() throws IOException, InterruptedException { mWorkerStorage.stop(); mDataServer.close(); mServer.stop(); - mServerTNonblockingServerSocket.close(); + mServerTServerSocket.close(); mExecutorService.shutdown(); while (!mDataServer.isClosed() || mServer.isServing() || mHeartbeatThread.isAlive()) { // TODO The reason to stop and close again is due to some issues in Thrift. mServer.stop(); - mServerTNonblockingServerSocket.close(); + mServerTServerSocket.close(); CommonUtils.sleepMs(null, 100); } mHeartbeatThread.join(); diff --git a/core/src/test/java/tachyon/conf/TachyonConfTest.java b/core/src/test/java/tachyon/conf/TachyonConfTest.java index eedfae890268..0bc849541dc4 100644 --- a/core/src/test/java/tachyon/conf/TachyonConfTest.java +++ b/core/src/test/java/tachyon/conf/TachyonConfTest.java @@ -137,8 +137,8 @@ public void testMasterDefault() { intValue = sDefaultTachyonConf.getInt(Constants.MASTER_HEARTBEAT_INTERVAL_MS, 0); Assert.assertTrue(intValue == Constants.SECOND_MS); - intValue = sDefaultTachyonConf.getInt(Constants.MASTER_SELECTOR_THREADS, 0); - Assert.assertTrue(intValue == 3); + intValue = sDefaultTachyonConf.getInt(Constants.MASTER_MIN_WORKER_THREADS, 0); + Assert.assertTrue(intValue == Runtime.getRuntime().availableProcessors()); intValue = sDefaultTachyonConf.getInt(Constants.MASTER_WORKER_TIMEOUT_MS, 0); Assert.assertTrue(intValue == 10 * Constants.SECOND_MS); @@ -178,13 +178,7 @@ public void testWorkerDefault() { intValue = sDefaultTachyonConf.getInt(Constants.WORKER_TO_MASTER_HEARTBEAT_INTERVAL_MS, 0); Assert.assertTrue(intValue == Constants.SECOND_MS); - intValue = sDefaultTachyonConf.getInt(Constants.WORKER_SELECTOR_THREADS, 0); - Assert.assertTrue(intValue == 3); - - intValue = sDefaultTachyonConf.getInt(Constants.WORKER_QUEUE_SIZE_PER_SELECTOR, 0); - Assert.assertTrue(intValue == 3000); - - intValue = sDefaultTachyonConf.getInt(Constants.WORKER_SERVER_THREADS, -1); + intValue = sDefaultTachyonConf.getInt(Constants.WORKER_MIN_WORKER_THREADS, 0); Assert.assertTrue(intValue == Runtime.getRuntime().availableProcessors()); intValue = sDefaultTachyonConf.getInt(Constants.WORKER_USER_TIMEOUT_MS, 0); diff --git a/core/src/test/java/tachyon/master/LocalTachyonCluster.java b/core/src/test/java/tachyon/master/LocalTachyonCluster.java index 8be733feae4b..c34522cb2d56 100644 --- a/core/src/test/java/tachyon/master/LocalTachyonCluster.java +++ b/core/src/test/java/tachyon/master/LocalTachyonCluster.java @@ -173,8 +173,8 @@ public void start() throws IOException { // Lower the number of threads that the cluster will spin off. // default thread overhead is too much. - mMasterConf.set(Constants.MASTER_SELECTOR_THREADS, "1"); - mMasterConf.set(Constants.MASTER_SERVER_THREADS, "2"); + mMasterConf.set(Constants.MASTER_MIN_WORKER_THREADS, "1"); + mMasterConf.set(Constants.MASTER_MAX_WORKER_THREADS, "100"); mMasterConf.set(Constants.MASTER_WEB_THREAD_COUNT, "1"); // re-build the dir to set permission to 777 @@ -197,8 +197,8 @@ public void start() throws IOException { mWorkerConf.set(Constants.WORKER_DATA_FOLDER, mWorkerDataFolder); mWorkerConf.set(Constants.WORKER_MEMORY_SIZE, Long.toString(mWorkerCapacityBytes)); mWorkerConf.set(Constants.WORKER_TO_MASTER_HEARTBEAT_INTERVAL_MS, "15"); - mWorkerConf.set(Constants.WORKER_SELECTOR_THREADS, Integer.toString(1)); - mWorkerConf.set(Constants.WORKER_SERVER_THREADS, Integer.toString(2)); + mWorkerConf.set(Constants.WORKER_MIN_WORKER_THREADS, Integer.toString(1)); + mWorkerConf.set(Constants.WORKER_MAX_WORKER_THREADS, Integer.toString(100)); mWorkerConf.set(Constants.WORKER_NETTY_WORKER_THREADS, Integer.toString(2)); mWorkerConf.set("tachyon.worker.hierarchystore.level0.alias", "MEM"); @@ -219,7 +219,7 @@ public void start() throws IOException { mWorker = TachyonWorker.createWorker(new InetSocketAddress(mLocalhostName, getMasterPort()), - new InetSocketAddress(mLocalhostName, 0), 0, 1, 1, 1, mWorkerConf); + new InetSocketAddress(mLocalhostName, 0), 0, 1, 100, mWorkerConf); Runnable runWorker = new Runnable() { @Override public void run() { diff --git a/core/src/test/java/tachyon/master/LocalTachyonClusterMultiMaster.java b/core/src/test/java/tachyon/master/LocalTachyonClusterMultiMaster.java index ae3a6645ad41..ebb902163e90 100644 --- a/core/src/test/java/tachyon/master/LocalTachyonClusterMultiMaster.java +++ b/core/src/test/java/tachyon/master/LocalTachyonClusterMultiMaster.java @@ -199,7 +199,7 @@ public void start() throws IOException { mWorker = TachyonWorker.createWorker( CommonUtils.parseInetSocketAddress(mCuratorServer.getConnectString()), - new InetSocketAddress(mLocalhostName, 0), 0, 1, 1, 1, mWorkerConf); + new InetSocketAddress(mLocalhostName, 0), 0, 1, 100, mWorkerConf); Runnable runWorker = new Runnable() { @Override public void run() { diff --git a/core/src/test/java/tachyon/master/LocalTachyonMaster.java b/core/src/test/java/tachyon/master/LocalTachyonMaster.java index b3b6d35474b1..2648a1f36ce4 100644 --- a/core/src/test/java/tachyon/master/LocalTachyonMaster.java +++ b/core/src/test/java/tachyon/master/LocalTachyonMaster.java @@ -90,11 +90,9 @@ private LocalTachyonMaster(final String tachyonHome, TachyonConf tachyonConf) th tachyonConf.set(Constants.MASTER_PORT, "0"); tachyonConf.set(Constants.MASTER_WEB_PORT, "0"); - // Lower the number of threads that the cluster will spin off. - // default thread overhead is too much. - tachyonConf.set(Constants.MASTER_SELECTOR_THREADS, "1"); - tachyonConf.set(Constants.MASTER_QUEUE_SIZE_PER_SELECTOR, "1"); - tachyonConf.set(Constants.MASTER_SERVER_THREADS, "1"); + tachyonConf.set(Constants.MASTER_MIN_WORKER_THREADS, "1"); + tachyonConf.set(Constants.MASTER_MAX_WORKER_THREADS, "100"); + tachyonConf.set(Constants.MASTER_WEB_THREAD_COUNT, "9"); tachyonConf.set(Constants.WEB_RESOURCES, System.getProperty("user.dir") + "/src/main/webapp");