Skip to content

Commit

Permalink
HDFS-10909. De-duplicate code in ErasureCodingWorker#initializeStripe…
Browse files Browse the repository at this point in the history
…dReadThreadPool and DFSClient#initThreadsNumForStripedReads. (Manoj Govindassamy via lei)
  • Loading branch information
Lei Xu committed Nov 2, 2016
1 parent 9b0c17f commit b592061
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 42 deletions.
Expand Up @@ -2800,37 +2800,17 @@ public void rejectedExecution(Runnable runnable,
/** /**
* Create thread pool for parallel reading in striped layout, * Create thread pool for parallel reading in striped layout,
* STRIPED_READ_THREAD_POOL, if it does not already exist. * STRIPED_READ_THREAD_POOL, if it does not already exist.
* @param num Number of threads for striped reads thread pool. * @param numThreads Number of threads for striped reads thread pool.
*/ */
private void initThreadsNumForStripedReads(int num) { private void initThreadsNumForStripedReads(int numThreads) {
assert num > 0; assert numThreads > 0;
if (STRIPED_READ_THREAD_POOL != null) { if (STRIPED_READ_THREAD_POOL != null) {
return; return;
} }
synchronized (DFSClient.class) { synchronized (DFSClient.class) {
if (STRIPED_READ_THREAD_POOL == null) { if (STRIPED_READ_THREAD_POOL == null) {
STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60, STRIPED_READ_THREAD_POOL = DFSUtilClient.getThreadPoolExecutor(1,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), numThreads, 60, "StripedRead-", true);
new Daemon.DaemonFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
Thread t = super.newThread(r);
t.setName("stripedRead-" + threadIndex.getAndIncrement());
return t;
}
},
new ThreadPoolExecutor.CallerRunsPolicy() {
@Override
public void rejectedExecution(Runnable runnable,
ThreadPoolExecutor e) {
LOG.info("Execution for striped reading rejected, "
+ "Executing in current thread");
// will run in the current thread
super.rejectedExecution(runnable, e);
}
});
STRIPED_READ_THREAD_POOL.allowCoreThreadTimeOut(true); STRIPED_READ_THREAD_POOL.allowCoreThreadTimeOut(true);
} }
} }
Expand Down
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;


import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.primitives.SignedBytes; import com.google.common.primitives.SignedBytes;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -51,6 +52,7 @@
import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.KMSUtil; import org.apache.hadoop.util.KMSUtil;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
Expand Down Expand Up @@ -81,6 +83,10 @@
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;


import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY;
Expand Down Expand Up @@ -776,4 +782,48 @@ private static boolean getClientDataTransferTcpNoDelay(Configuration conf) {
DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY, DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY,
DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT); DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT);
} }

/**
* Utility to create a {@link ThreadPoolExecutor}.
*
* @param corePoolSize - min threads in the pool, even if idle
* @param maxPoolSize - max threads in the pool
* @param keepAliveTimeSecs - max seconds beyond which excess idle threads
* will be terminated
* @param threadNamePrefix - name prefix for the pool threads
* @param runRejectedExec - when true, rejected tasks from
* ThreadPoolExecutor are run in the context of calling thread
* @return ThreadPoolExecutor
*/
public static ThreadPoolExecutor getThreadPoolExecutor(int corePoolSize,
int maxPoolSize, long keepAliveTimeSecs, String threadNamePrefix,
boolean runRejectedExec) {
Preconditions.checkArgument(corePoolSize > 0);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
maxPoolSize, keepAliveTimeSecs, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new Daemon.DaemonFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
Thread t = super.newThread(r);
t.setName(threadNamePrefix + threadIndex.getAndIncrement());
return t;
}
});
if (runRejectedExec) {
threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor
.CallerRunsPolicy() {
@Override
public void rejectedExecution(Runnable runnable,
ThreadPoolExecutor e) {
LOG.info(threadNamePrefix + " task is rejected by " +
"ThreadPoolExecutor. Executing it in current thread.");
// will run in the current thread
super.rejectedExecution(runnable, e);
}
});
}
return threadPoolExecutor;
}
} }
Expand Up @@ -20,13 +20,13 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger; import org.slf4j.Logger;


import java.util.Collection; import java.util.Collection;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue; import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -89,22 +89,11 @@ public void rejectedExecution(Runnable runnable,
stripedReadPool.allowCoreThreadTimeOut(true); stripedReadPool.allowCoreThreadTimeOut(true);
} }


private void initializeStripedBlkReconstructionThreadPool(int num) { private void initializeStripedBlkReconstructionThreadPool(int numThreads) {
LOG.debug("Using striped block reconstruction; pool threads={}", num); LOG.debug("Using striped block reconstruction; pool threads={}",
stripedReconstructionPool = new ThreadPoolExecutor(2, num, 60, numThreads);
TimeUnit.SECONDS, stripedReconstructionPool = DFSUtilClient.getThreadPoolExecutor(2,
new LinkedBlockingQueue<Runnable>(), numThreads, 60, "StripedBlockReconstruction-", false);
new Daemon.DaemonFactory() {
private final AtomicInteger threadIdx = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
Thread t = super.newThread(r);
t.setName("stripedBlockReconstruction-"
+ threadIdx.getAndIncrement());
return t;
}
});
stripedReconstructionPool.allowCoreThreadTimeOut(true); stripedReconstructionPool.allowCoreThreadTimeOut(true);
} }


Expand Down

0 comments on commit b592061

Please sign in to comment.