Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
use more aggressive concurrency levels for CHM
- long running ones with high update rates
- also expose a *system* property of es.useConcurrentHashMapV8 to use the new non blocking Java8 CHM impl
  • Loading branch information
kimchy committed Apr 17, 2013
1 parent 271305d commit 0eb298f
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 17 deletions.
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.common.util.concurrent;

import jsr166e.ConcurrentHashMapV8;
import jsr166y.LinkedTransferQueue;
import org.elasticsearch.common.collect.MapBackedSet;

Expand All @@ -34,28 +35,45 @@
*/
public abstract class ConcurrentCollections {

private final static boolean useNonBlockingMap = Boolean.parseBoolean(System.getProperty("es.useNonBlockingMap", "false"));
private final static boolean useConcurrentHashMapV8 = Boolean.parseBoolean(System.getProperty("es.useConcurrentHashMapV8", "false"));
private final static boolean useLinkedTransferQueue = Boolean.parseBoolean(System.getProperty("es.useLinkedTransferQueue", "false"));

static final int aggressiveConcurrencyLevel;

static {
aggressiveConcurrencyLevel = Math.max(Runtime.getRuntime().availableProcessors() * 2, 16);
}

/**
* Creates a new CHM with an aggressive concurrency level, aimed at high concurrent update rate long living maps.
*/
public static <K, V> ConcurrentMap<K, V> newConcurrentMapWithAggressiveConcurrency() {
if (useConcurrentHashMapV8) {
return new ConcurrentHashMapV8<K, V>(16, 0.75f, aggressiveConcurrencyLevel);
}
return new ConcurrentHashMap<K, V>(16, 0.75f, aggressiveConcurrencyLevel);
}

public static <K, V> ConcurrentMap<K, V> newConcurrentMap() {
// if (useNonBlockingMap) {
// return new NonBlockingHashMap<K, V>();
// }
if (useConcurrentHashMapV8) {
return new ConcurrentHashMapV8<K, V>();
}
return new ConcurrentHashMap<K, V>();
}

/**
* Creates a new CHM with an aggressive concurrency level, aimed at highly updateable long living maps.
*/
public static <V> ConcurrentMapLong<V> newConcurrentMapLongWithAggressiveConcurrency() {
return new ConcurrentHashMapLong<V>(ConcurrentCollections.<Long, V>newConcurrentMapWithAggressiveConcurrency());
}

public static <V> ConcurrentMapLong<V> newConcurrentMapLong() {
// if (useNonBlockingMap) {
// return new NonBlockingHashMapLong<V>();
// }
return new ConcurrentHashMapLong<V>();
return new ConcurrentHashMapLong<V>(ConcurrentCollections.<Long, V>newConcurrentMap());
}

public static <V> Set<V> newConcurrentSet() {
// if (useNonBlockingMap) {
// return new NonBlockingHashSet<V>();
// }
return new MapBackedSet<V>(new ConcurrentHashMap<V, Boolean>());
return new MapBackedSet<V>(ConcurrentCollections.<V, Boolean>newConcurrentMap());
}

public static <T> Queue<T> newQueue() {
Expand Down
Expand Up @@ -31,8 +31,8 @@ public class ConcurrentHashMapLong<T> implements ConcurrentMapLong<T> {

private final ConcurrentMap<Long, T> map;

public ConcurrentHashMapLong() {
this.map = ConcurrentCollections.newConcurrentMap();
public ConcurrentHashMapLong(ConcurrentMap<Long, T> map) {
this.map = map;
}

@Override
Expand Down
Expand Up @@ -179,7 +179,7 @@ public RobinEngine(ShardId shardId, @IndexSettings Settings indexSettings, Threa
this.codecService = codecService;

this.indexConcurrency = indexSettings.getAsInt(INDEX_INDEX_CONCURRENCY, IndexWriterConfig.DEFAULT_MAX_THREAD_STATES);
this.versionMap = ConcurrentCollections.newConcurrentMap();
this.versionMap = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
this.dirtyLocks = new Object[indexConcurrency * 50]; // we multiply it to have enough...
for (int i = 0; i < dirtyLocks.length; i++) {
dirtyLocks[i] = new Object();
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/elasticsearch/search/SearchService.java
Expand Up @@ -96,7 +96,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {

private final CleanContextOnIndicesLifecycleListener indicesLifecycleListener = new CleanContextOnIndicesLifecycleListener();

private final ConcurrentMapLong<SearchContext> activeContexts = ConcurrentCollections.newConcurrentMapLong();
private final ConcurrentMapLong<SearchContext> activeContexts = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency();

private final ImmutableMap<String, SearchParseElement> elementParsers;

Expand Down
Expand Up @@ -54,7 +54,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
volatile ImmutableMap<String, TransportRequestHandler> serverHandlers = ImmutableMap.of();
final Object serverHandlersMutex = new Object();

final ConcurrentMapLong<RequestHolder> clientHandlers = ConcurrentCollections.newConcurrentMapLong();
final ConcurrentMapLong<RequestHolder> clientHandlers = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency();

final AtomicLong requestIds = new AtomicLong();

Expand Down

0 comments on commit 0eb298f

Please sign in to comment.