Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ConcurrentHashMapV8 for lower memory overhead #6400

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6,312 changes: 6,312 additions & 0 deletions src/main/java/jsr166e/ConcurrentHashMapV8.java

Large diffs are not rendered by default.

753 changes: 753 additions & 0 deletions src/main/java/jsr166e/CountedCompleter.java

Large diffs are not rendered by default.

3,345 changes: 3,345 additions & 0 deletions src/main/java/jsr166e/ForkJoinPool.java

Large diffs are not rendered by default.

1,557 changes: 1,557 additions & 0 deletions src/main/java/jsr166e/ForkJoinTask.java

Large diffs are not rendered by default.

125 changes: 125 additions & 0 deletions src/main/java/jsr166e/ForkJoinWorkerThread.java
@@ -0,0 +1,125 @@
// Rev 1.5 from http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/ForkJoinWorkerThread.java?view=co

/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/

package jsr166e;


/**
* A thread managed by a {@link ForkJoinPool}, which executes
* {@link ForkJoinTask}s.
* This class is subclassable solely for the sake of adding
* functionality -- there are no overridable methods dealing with
* scheduling or execution. However, you can override initialization
* and termination methods surrounding the main task processing loop.
* If you do create such a subclass, you will also need to supply a
* custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to
* {@linkplain ForkJoinPool#ForkJoinPool use it} in a {@code ForkJoinPool}.
*
* @since 1.7
* @author Doug Lea
*/
public class ForkJoinWorkerThread extends Thread {
/*
* ForkJoinWorkerThreads are managed by ForkJoinPools and perform
* ForkJoinTasks. For explanation, see the internal documentation
* of class ForkJoinPool.
*
* This class just maintains links to its pool and WorkQueue. The
* pool field is set immediately upon construction, but the
* workQueue field is not set until a call to registerWorker
* completes. This leads to a visibility race, that is tolerated
* by requiring that the workQueue field is only accessed by the
* owning thread.
*/

final ForkJoinPool pool; // the pool this thread works in
final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics

/**
* Creates a ForkJoinWorkerThread operating in the given pool.
*
* @param pool the pool this thread works in
* @throws NullPointerException if pool is null
*/
protected ForkJoinWorkerThread(ForkJoinPool pool) {
// Use a placeholder until a useful name can be set in registerWorker
super("aForkJoinWorkerThread");
this.pool = pool;
this.workQueue = pool.registerWorker(this);
}

/**
* Returns the pool hosting this thread.
*
* @return the pool
*/
public ForkJoinPool getPool() {
return pool;
}

/**
* Returns the unique index number of this thread in its pool.
* The returned value ranges from zero to the maximum number of
* threads (minus one) that may exist in the pool, and does not
* change during the lifetime of the thread. This method may be
* useful for applications that track status or collect results
* per-worker-thread rather than per-task.
*
* @return the index number
*/
public int getPoolIndex() {
return workQueue.poolIndex >>> 1; // ignore odd/even tag bit
}

/**
* Initializes internal state after construction but before
* processing any tasks. If you override this method, you must
* invoke {@code super.onStart()} at the beginning of the method.
* Initialization requires care: Most fields must have legal
* default values, to ensure that attempted accesses from other
* threads work correctly even before this thread starts
* processing tasks.
*/
protected void onStart() {
}

/**
* Performs cleanup associated with termination of this worker
* thread. If you override this method, you must invoke
* {@code super.onTermination} at the end of the overridden method.
*
* @param exception the exception causing this thread to abort due
* to an unrecoverable error, or {@code null} if completed normally
*/
protected void onTermination(Throwable exception) {
}

/**
* This method is required to be public, but should never be
* called explicitly. It performs the main run loop to execute
* {@link ForkJoinTask}s.
*/
public void run() {
Throwable exception = null;
try {
onStart();
pool.runWorker(workQueue);
} catch (Throwable ex) {
exception = ex;
} finally {
try {
onTermination(exception);
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
pool.deregisterWorker(this, exception);
}
}
}
}
199 changes: 199 additions & 0 deletions src/main/java/jsr166e/ThreadLocalRandom.java
@@ -0,0 +1,199 @@
// Rev 1.2 from http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/ThreadLocalRandom.java?revision=1.2

/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/

package jsr166e;

import java.util.Random;

/**
* A random number generator isolated to the current thread. Like the
* global {@link java.util.Random} generator used by the {@link
* java.lang.Math} class, a {@code ThreadLocalRandom} is initialized
* with an internally generated seed that may not otherwise be
* modified. When applicable, use of {@code ThreadLocalRandom} rather
* than shared {@code Random} objects in concurrent programs will
* typically encounter much less overhead and contention. Use of
* {@code ThreadLocalRandom} is particularly appropriate when multiple
* tasks (for example, each a {@link ForkJoinTask}) use random numbers
* in parallel in thread pools.
*
* <p>Usages of this class should typically be of the form:
* {@code ThreadLocalRandom.current().nextX(...)} (where
* {@code X} is {@code Int}, {@code Long}, etc).
* When all usages are of this form, it is never possible to
* accidently share a {@code ThreadLocalRandom} across multiple threads.
*
* <p>This class also provides additional commonly used bounded random
* generation methods.
*
* @since 1.7
* @author Doug Lea
*/
public class ThreadLocalRandom extends Random {
// same constants as Random, but must be redeclared because private
private static final long multiplier = 0x5DEECE66DL;
private static final long addend = 0xBL;
private static final long mask = (1L << 48) - 1;

/**
* The random seed. We can't use super.seed.
*/
private long rnd;

/**
* Initialization flag to permit calls to setSeed to succeed only
* while executing the Random constructor. We can't allow others
* since it would cause setting seed in one part of a program to
* unintentionally impact other usages by the thread.
*/
boolean initialized;

// Padding to help avoid memory contention among seed updates in
// different TLRs in the common case that they are located near
// each other.
private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;

/**
* The actual ThreadLocal
*/
private static final ThreadLocal<ThreadLocalRandom> localRandom =
new ThreadLocal<ThreadLocalRandom>() {
protected ThreadLocalRandom initialValue() {
return new ThreadLocalRandom();
}
};


/**
* Constructor called only by localRandom.initialValue.
*/
ThreadLocalRandom() {
super();
initialized = true;
}

/**
* Returns the current thread's {@code ThreadLocalRandom}.
*
* @return the current thread's {@code ThreadLocalRandom}
*/
public static ThreadLocalRandom current() {
return localRandom.get();
}

/**
* Throws {@code UnsupportedOperationException}. Setting seeds in
* this generator is not supported.
*
* @throws UnsupportedOperationException always
*/
public void setSeed(long seed) {
if (initialized)
throw new UnsupportedOperationException();
rnd = (seed ^ multiplier) & mask;
}

protected int next(int bits) {
rnd = (rnd * multiplier + addend) & mask;
return (int) (rnd >>> (48-bits));
}

/**
* Returns a pseudorandom, uniformly distributed value between the
* given least value (inclusive) and bound (exclusive).
*
* @param least the least value returned
* @param bound the upper bound (exclusive)
* @return the next value
* @throws IllegalArgumentException if least greater than or equal
* to bound
*/
public int nextInt(int least, int bound) {
if (least >= bound)
throw new IllegalArgumentException();
return nextInt(bound - least) + least;
}

/**
* Returns a pseudorandom, uniformly distributed value
* between 0 (inclusive) and the specified value (exclusive).
*
* @param n the bound on the random number to be returned. Must be
* positive.
* @return the next value
* @throws IllegalArgumentException if n is not positive
*/
public long nextLong(long n) {
if (n <= 0)
throw new IllegalArgumentException("n must be positive");
// Divide n by two until small enough for nextInt. On each
// iteration (at most 31 of them but usually much less),
// randomly choose both whether to include high bit in result
// (offset) and whether to continue with the lower vs upper
// half (which makes a difference only if odd).
long offset = 0;
while (n >= Integer.MAX_VALUE) {
int bits = next(2);
long half = n >>> 1;
long nextn = ((bits & 2) == 0) ? half : n - half;
if ((bits & 1) == 0)
offset += n - nextn;
n = nextn;
}
return offset + nextInt((int) n);
}

/**
* Returns a pseudorandom, uniformly distributed value between the
* given least value (inclusive) and bound (exclusive).
*
* @param least the least value returned
* @param bound the upper bound (exclusive)
* @return the next value
* @throws IllegalArgumentException if least greater than or equal
* to bound
*/
public long nextLong(long least, long bound) {
if (least >= bound)
throw new IllegalArgumentException();
return nextLong(bound - least) + least;
}

/**
* Returns a pseudorandom, uniformly distributed {@code double} value
* between 0 (inclusive) and the specified value (exclusive).
*
* @param n the bound on the random number to be returned. Must be
* positive.
* @return the next value
* @throws IllegalArgumentException if n is not positive
*/
public double nextDouble(double n) {
if (n <= 0)
throw new IllegalArgumentException("n must be positive");
return nextDouble() * n;
}

/**
* Returns a pseudorandom, uniformly distributed value between the
* given least value (inclusive) and bound (exclusive).
*
* @param least the least value returned
* @param bound the upper bound (exclusive)
* @return the next value
* @throws IllegalArgumentException if least greater than or equal
* to bound
*/
public double nextDouble(double least, double bound) {
if (least >= bound)
throw new IllegalArgumentException();
return nextDouble() * (bound - least) + least;
}

private static final long serialVersionUID = -5851777807851030925L;
}
Expand Up @@ -21,7 +21,7 @@

import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.apache.lucene.index.*;
import org.apache.lucene.index.IndexReader.ReaderClosedListener;
Expand All @@ -32,6 +32,7 @@
import org.elasticsearch.common.Numbers;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.mapper.internal.VersionFieldMapper;

Expand All @@ -46,7 +47,7 @@ public class Versions {
public static final long NOT_SET = -2L;

// TODO: is there somewhere else we can store these?
private static final ConcurrentHashMap<IndexReader,CloseableThreadLocal<PerThreadIDAndVersionLookup>> lookupStates = new ConcurrentHashMap<>();
private static final ConcurrentMap<IndexReader,CloseableThreadLocal<PerThreadIDAndVersionLookup>> lookupStates = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();

// Evict this reader from lookupStates once it's closed:
private static final ReaderClosedListener removeLookupState = new ReaderClosedListener() {
Expand Down
Expand Up @@ -21,11 +21,15 @@

import com.google.common.collect.Sets;

import org.apache.lucene.util.Constants;

import java.util.Deque;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.*;

import jsr166e.ConcurrentHashMapV8;

/**
*
*/
Expand All @@ -43,11 +47,21 @@ public abstract class ConcurrentCollections {
* Creates a new CHM with an aggressive concurrency level, aimed at high concurrent update rate long living maps.
*/
public static <K, V> ConcurrentMap<K, V> newConcurrentMapWithAggressiveConcurrency() {
return new ConcurrentHashMap<>(16, 0.75f, aggressiveConcurrencyLevel);
if (Constants.JRE_IS_MINIMUM_JAVA8) {
// Just use JDK's impl when we are on Java8:
return new ConcurrentHashMap<>(16, 0.75f, aggressiveConcurrencyLevel);
} else {
return new ConcurrentHashMapV8<>(16, 0.75f, aggressiveConcurrencyLevel);
}
}

public static <K, V> ConcurrentMap<K, V> newConcurrentMap() {
return new ConcurrentHashMap<>();
if (Constants.JRE_IS_MINIMUM_JAVA8) {
// Just use JDK's impl when we are on Java8:
return new ConcurrentHashMap<>();
} else {
return new ConcurrentHashMapV8<>();
}
}

/**
Expand Down