Skip to content

Commit

Permalink
Allow to control the number of processors sizes are based on
Browse files Browse the repository at this point in the history
Sometimes, one wants to just control the number of processors our different size base calculations for thread pools and network workers are based on, and not use the reported available processor by the OS. Add processors setting, where it can be controlled.

closes #3643
  • Loading branch information
kimchy committed Sep 6, 2013
1 parent 1e21abd commit 9099877
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 18 deletions.
Expand Up @@ -33,12 +33,12 @@ public class EsExecutors {
/**
* Returns the number of processors available but at most <tt>32</tt>.
*/
public static int boundedNumberOfProcessors() {
public static int boundedNumberOfProcessors(Settings settings) {
/* This relates to issues where machines with large number of cores
* ie. >= 48 create too many threads and run into OOM see #3478
* We just use an 32 core upper-bound here to not stress the system
* too much with too many created threads */
return Math.min(32, Runtime.getRuntime().availableProcessors());
return settings.getAsInt("processors", Math.min(32, Runtime.getRuntime().availableProcessors()));
}

public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(ThreadFactory threadFactory) {
Expand Down
Expand Up @@ -127,7 +127,7 @@ public NettyHttpServerTransport(Settings settings, NetworkService networkService
this.resetCookies = componentSettings.getAsBoolean("reset_cookies", settings.getAsBoolean("http.reset_cookies", false));
this.maxCumulationBufferCapacity = componentSettings.getAsBytesSize("max_cumulation_buffer_capacity", null);
this.maxCompositeBufferComponents = componentSettings.getAsInt("max_composite_buffer_components", -1);
this.workerCount = componentSettings.getAsInt("worker_count", EsExecutors.boundedNumberOfProcessors() * 2);
this.workerCount = componentSettings.getAsInt("worker_count", EsExecutors.boundedNumberOfProcessors(settings) * 2);
this.blockingServer = settings.getAsBoolean("http.blocking_server", settings.getAsBoolean(TCP_BLOCKING_SERVER, settings.getAsBoolean(TCP_BLOCKING, false)));
this.port = componentSettings.get("port", settings.get("http.port", "9200-9300"));
this.bindHost = componentSettings.get("bind_host", settings.get("http.bind_host", settings.get("http.host")));
Expand Down
Expand Up @@ -185,7 +185,7 @@ public RobinEngine(ShardId shardId, @IndexSettings Settings indexSettings, Threa
this.similarityService = similarityService;
this.codecService = codecService;
this.compoundOnFlush = indexSettings.getAsBoolean(INDEX_COMPOUND_ON_FLUSH, this.compoundOnFlush);
this.indexConcurrency = indexSettings.getAsInt(INDEX_INDEX_CONCURRENCY, Math.max(IndexWriterConfig.DEFAULT_MAX_THREAD_STATES, (int) (EsExecutors.boundedNumberOfProcessors() * 0.65)));
this.indexConcurrency = indexSettings.getAsInt(INDEX_INDEX_CONCURRENCY, Math.max(IndexWriterConfig.DEFAULT_MAX_THREAD_STATES, (int) (EsExecutors.boundedNumberOfProcessors(indexSettings) * 0.65)));
this.versionMap = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
this.dirtyLocks = new Object[indexConcurrency * 50]; // we multiply it to have enough...
for (int i = 0; i < dirtyLocks.length; i++) {
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/org/elasticsearch/threadpool/ThreadPool.java
Expand Up @@ -99,7 +99,7 @@ public ThreadPool(Settings settings, @Nullable NodeSettingsService nodeSettingsS

Map<String, Settings> groupSettings = settings.getGroups(THREADPOOL_GROUP);

int availableProcessors = EsExecutors.boundedNumberOfProcessors();
int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
int halfProcMaxAt5 = Math.min(((availableProcessors + 1) / 2), 5);
int halfProcMaxAt10 = Math.min(((availableProcessors + 1) / 2), 10);
defaultExecutorTypeSettings = ImmutableMap.<String, Settings>builder()
Expand Down Expand Up @@ -296,7 +296,7 @@ private ExecutorHolder rebuild(String name, ExecutorHolder previousExecutorHolde
Executor executor = EsExecutors.newCached(keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory);
return new ExecutorHolder(executor, new Info(name, type, -1, -1, keepAlive, null));
} else if ("fixed".equals(type)) {
int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors());
int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors(settings));
SizeValue defaultQueueSize = defaultSettings.getAsSize("queue", defaultSettings.getAsSize("queue_size", null));

if (previousExecutorHolder != null) {
Expand Down Expand Up @@ -327,7 +327,7 @@ private ExecutorHolder rebuild(String name, ExecutorHolder previousExecutorHolde
} else if ("scaling".equals(type)) {
TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5));
int defaultMin = defaultSettings.getAsInt("min", 1);
int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors());
int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors(settings));
if (previousExecutorHolder != null) {
if ("scaling".equals(previousInfo.getType())) {
TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive());
Expand Down
22 changes: 11 additions & 11 deletions src/main/java/org/elasticsearch/transport/netty/NettyTransport.java
Expand Up @@ -152,8 +152,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem

private volatile BoundTransportAddress boundAddress;

private final KeyedLock<String> connectionLock = new KeyedLock<String >();
private final KeyedLock<String> connectionLock = new KeyedLock<String>();

// this lock is here to make sure we close this transport and disconnect all the client nodes
// connections while no connect operations is going on... (this might help with 100% CPU when stopping the transport?)
private final ReadWriteLock globalLock = new ReentrantReadWriteLock();
Expand All @@ -169,7 +169,7 @@ public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService n
System.setProperty("org.jboss.netty.epollBugWorkaround", "true");
}

this.workerCount = componentSettings.getAsInt("worker_count", EsExecutors.boundedNumberOfProcessors() * 2);
this.workerCount = componentSettings.getAsInt("worker_count", EsExecutors.boundedNumberOfProcessors(settings) * 2);
this.bossCount = componentSettings.getAsInt("boss_count", 1);
this.blockingServer = settings.getAsBoolean("transport.tcp.blocking_server", settings.getAsBoolean(TCP_BLOCKING_SERVER, settings.getAsBoolean(TCP_BLOCKING, false)));
this.blockingClient = settings.getAsBoolean("transport.tcp.blocking_client", settings.getAsBoolean(TCP_BLOCKING_CLIENT, settings.getAsBoolean(TCP_BLOCKING, false)));
Expand Down Expand Up @@ -591,7 +591,7 @@ public void connectToNode(DiscoveryNode node, boolean light) {
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null) {
return;
}
}
connectionLock.acquire(node.id());
try {
if (!lifecycle.started()) {
Expand Down Expand Up @@ -755,12 +755,12 @@ public void disconnectFromNode(DiscoveryNode node) {
if (nodeChannels != null) {
connectionLock.acquire(node.id());
try {
try {
nodeChannels.close();
} finally {
logger.debug("disconnected from [{}]", node);
transportServiceAdapter.raiseNodeDisconnected(node);
}
try {
nodeChannels.close();
} finally {
logger.debug("disconnected from [{}]", node);
transportServiceAdapter.raiseNodeDisconnected(node);
}
} finally {
connectionLock.release(node.id());
}
Expand All @@ -774,7 +774,7 @@ private void disconnectFromNode(DiscoveryNode node, Channel channel, String reas
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null && nodeChannels.hasChannel(channel)) {
connectionLock.acquire(node.id());
if (!nodeChannels.hasChannel(channel)){ //might have been removed in the meanwhile, safety check
if (!nodeChannels.hasChannel(channel)) { //might have been removed in the meanwhile, safety check
assert !connectedNodes.containsKey(node);
} else {
try {
Expand Down

0 comments on commit 9099877

Please sign in to comment.