From 0eb298fe6417e7f098355af35a348a6bfc659851 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Wed, 17 Apr 2013 14:28:38 -0700 Subject: [PATCH] use more aggressive concurrency levels for CHM - long running ones with high update rates - also expose a *system* property of es.useConcurrentHashMapV8 to use the new non blocking Java8 CHM impl --- .../concurrent/ConcurrentCollections.java | 42 +++++++++++++------ .../concurrent/ConcurrentHashMapLong.java | 4 +- .../index/engine/robin/RobinEngine.java | 2 +- .../elasticsearch/search/SearchService.java | 2 +- .../transport/TransportService.java | 2 +- 5 files changed, 35 insertions(+), 17 deletions(-) diff --git a/src/main/java/org/elasticsearch/common/util/concurrent/ConcurrentCollections.java b/src/main/java/org/elasticsearch/common/util/concurrent/ConcurrentCollections.java index 2293f26a29760..dcdedb8ed61c2 100644 --- a/src/main/java/org/elasticsearch/common/util/concurrent/ConcurrentCollections.java +++ b/src/main/java/org/elasticsearch/common/util/concurrent/ConcurrentCollections.java @@ -19,6 +19,7 @@ package org.elasticsearch.common.util.concurrent; +import jsr166e.ConcurrentHashMapV8; import jsr166y.LinkedTransferQueue; import org.elasticsearch.common.collect.MapBackedSet; @@ -34,28 +35,45 @@ */ public abstract class ConcurrentCollections { - private final static boolean useNonBlockingMap = Boolean.parseBoolean(System.getProperty("es.useNonBlockingMap", "false")); + private final static boolean useConcurrentHashMapV8 = Boolean.parseBoolean(System.getProperty("es.useConcurrentHashMapV8", "false")); private final static boolean useLinkedTransferQueue = Boolean.parseBoolean(System.getProperty("es.useLinkedTransferQueue", "false")); + static final int aggressiveConcurrencyLevel; + + static { + aggressiveConcurrencyLevel = Math.max(Runtime.getRuntime().availableProcessors() * 2, 16); + } + + /** + * Creates a new CHM with an aggressive concurrency level, aimed at high concurrent update rate long living maps. + */ + public static ConcurrentMap newConcurrentMapWithAggressiveConcurrency() { + if (useConcurrentHashMapV8) { + return new ConcurrentHashMapV8(16, 0.75f, aggressiveConcurrencyLevel); + } + return new ConcurrentHashMap(16, 0.75f, aggressiveConcurrencyLevel); + } + public static ConcurrentMap newConcurrentMap() { -// if (useNonBlockingMap) { -// return new NonBlockingHashMap(); -// } + if (useConcurrentHashMapV8) { + return new ConcurrentHashMapV8(); + } return new ConcurrentHashMap(); } + /** + * Creates a new CHM with an aggressive concurrency level, aimed at highly updateable long living maps. + */ + public static ConcurrentMapLong newConcurrentMapLongWithAggressiveConcurrency() { + return new ConcurrentHashMapLong(ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency()); + } + public static ConcurrentMapLong newConcurrentMapLong() { -// if (useNonBlockingMap) { -// return new NonBlockingHashMapLong(); -// } - return new ConcurrentHashMapLong(); + return new ConcurrentHashMapLong(ConcurrentCollections.newConcurrentMap()); } public static Set newConcurrentSet() { -// if (useNonBlockingMap) { -// return new NonBlockingHashSet(); -// } - return new MapBackedSet(new ConcurrentHashMap()); + return new MapBackedSet(ConcurrentCollections.newConcurrentMap()); } public static Queue newQueue() { diff --git a/src/main/java/org/elasticsearch/common/util/concurrent/ConcurrentHashMapLong.java b/src/main/java/org/elasticsearch/common/util/concurrent/ConcurrentHashMapLong.java index d8b72c77a1c9e..965bbe46cb833 100644 --- a/src/main/java/org/elasticsearch/common/util/concurrent/ConcurrentHashMapLong.java +++ b/src/main/java/org/elasticsearch/common/util/concurrent/ConcurrentHashMapLong.java @@ -31,8 +31,8 @@ public class ConcurrentHashMapLong implements ConcurrentMapLong { private final ConcurrentMap map; - public ConcurrentHashMapLong() { - this.map = ConcurrentCollections.newConcurrentMap(); + public ConcurrentHashMapLong(ConcurrentMap map) { + this.map = map; } @Override diff --git a/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java b/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java index 694ef31a35d37..3200b53442e49 100644 --- a/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java @@ -179,7 +179,7 @@ public RobinEngine(ShardId shardId, @IndexSettings Settings indexSettings, Threa this.codecService = codecService; this.indexConcurrency = indexSettings.getAsInt(INDEX_INDEX_CONCURRENCY, IndexWriterConfig.DEFAULT_MAX_THREAD_STATES); - this.versionMap = ConcurrentCollections.newConcurrentMap(); + this.versionMap = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); this.dirtyLocks = new Object[indexConcurrency * 50]; // we multiply it to have enough... for (int i = 0; i < dirtyLocks.length; i++) { dirtyLocks[i] = new Object(); diff --git a/src/main/java/org/elasticsearch/search/SearchService.java b/src/main/java/org/elasticsearch/search/SearchService.java index 4a45bffcc5ea0..bdc6e68d0905c 100644 --- a/src/main/java/org/elasticsearch/search/SearchService.java +++ b/src/main/java/org/elasticsearch/search/SearchService.java @@ -96,7 +96,7 @@ public class SearchService extends AbstractLifecycleComponent { private final CleanContextOnIndicesLifecycleListener indicesLifecycleListener = new CleanContextOnIndicesLifecycleListener(); - private final ConcurrentMapLong activeContexts = ConcurrentCollections.newConcurrentMapLong(); + private final ConcurrentMapLong activeContexts = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency(); private final ImmutableMap elementParsers; diff --git a/src/main/java/org/elasticsearch/transport/TransportService.java b/src/main/java/org/elasticsearch/transport/TransportService.java index 718fd0faa8f53..410118b1771bc 100644 --- a/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/src/main/java/org/elasticsearch/transport/TransportService.java @@ -54,7 +54,7 @@ public class TransportService extends AbstractLifecycleComponent serverHandlers = ImmutableMap.of(); final Object serverHandlersMutex = new Object(); - final ConcurrentMapLong clientHandlers = ConcurrentCollections.newConcurrentMapLong(); + final ConcurrentMapLong clientHandlers = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency(); final AtomicLong requestIds = new AtomicLong();