Skip to content

Commit

Permalink
Fix stale notification when eviction raced with an update
Browse files Browse the repository at this point in the history
[SOLR-10141](https://issues.apache.org/jira/browse/SOLR-10141) (thanks Yonik!)

As an optimization, an update is allowed to bypass the hash map and
synchronize on the read entry directly. In this block it checks
liveliness, performs the mutation, and notifies the writer. This avoids
more expensive computations through the map.

Previously, an eviction was performed in a computation to remove the
entry and notify the writer, or resurrect. Inside the computation the
entry was not synchronized on and that was done only after it was
removed from the table. The removal listener was notified with the
value initially read at the start of this method.

This allowed an update to modify the value while (or after) the entry
was being removed from the hash table. This led to notifying the writer
and removal listener with the stale value. Because the writer must
be called exclusively with the mutation, this computation must use
a synchronized guard. Otherwise we might have preferred to re-read
the value when notifying the listener. This adds a slight penalty on
eviction (usually async) while allowing `put` to still be fast (but
may block a little more often).

`putSlow` was removed as it is no longer needed. It was a computation-
based write that was safe from this issue. But it was only used when the
new weight was zero, as that update race would cause an incorrect
eviction. Now that the primary path is safe from this race, it is
unnecessary.
  • Loading branch information
ben-manes committed Feb 18, 2017
1 parent c5cc2d4 commit 83b47d1
Show file tree
Hide file tree
Showing 9 changed files with 195 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@

import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;

import org.openjdk.jmh.annotations.Benchmark;
Expand All @@ -31,6 +29,7 @@
import org.openjdk.jmh.annotations.Threads;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.yahoo.ycsb.generator.NumberGenerator;
import com.yahoo.ycsb.generator.ScrambledZipfianGenerator;

Expand All @@ -43,8 +42,8 @@ public class ComputeBenchmark {
static final int MASK = SIZE - 1;
static final int ITEMS = SIZE / 3;
static final Integer COMPUTE_KEY = SIZE / 2;
static final Callable<Boolean> valueLoader = () -> Boolean.TRUE;
static final Function<Integer, Boolean> mappingFunction = any -> Boolean.TRUE;
static final CacheLoader<Integer, Boolean> cacheLoader = CacheLoader.from(key -> Boolean.TRUE);

@Param({"ConcurrentHashMap", "Caffeine", "Guava"})
String computeType;
Expand Down Expand Up @@ -101,14 +100,8 @@ private void setupCaffeine() {
}

private void setupGuava() {
com.google.common.cache.Cache<Integer, Boolean> cache =
CacheBuilder.newBuilder().concurrencyLevel(64).build();
benchmarkFunction = key -> {
try {
return cache.get(key, valueLoader);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
};
com.google.common.cache.LoadingCache<Integer, Boolean> cache =
CacheBuilder.newBuilder().concurrencyLevel(64).build(cacheLoader);
benchmarkFunction = cache::getUnchecked;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -721,43 +721,47 @@ boolean hasExpired(Node<K, V> node, long now) {
* @param now the current time, used only if expiring
*/
@GuardedBy("evictionLock")
@SuppressWarnings("PMD.CollapsibleIfStatements")
@SuppressWarnings({"PMD.CollapsibleIfStatements", "GuardedByChecker"})
void evictEntry(Node<K, V> node, RemovalCause cause, long now) {
K key = node.getKey();
V value = node.getValue();
@SuppressWarnings("unchecked")
V[] value = (V[]) new Object[1];
boolean[] removed = new boolean[1];
boolean[] resurrect = new boolean[1];
RemovalCause actualCause = (key == null) || (value == null) ? RemovalCause.COLLECTED : cause;
RemovalCause[] actualCause = new RemovalCause[1];

data.computeIfPresent(node.getKeyReference(), (k, n) -> {
if (n != node) {
return n;
}
if (actualCause == RemovalCause.EXPIRED) {
boolean expired = false;
if (expiresAfterAccess()) {
long expirationTime = now - expiresAfterAccessNanos();
expired |= n.getAccessTime() <= expirationTime;
}
if (expiresAfterWrite()) {
long expirationTime = now - expiresAfterWriteNanos();
expired |= n.getWriteTime() <= expirationTime;
}
if (!expired) {
resurrect[0] = true;
return n;
}
} else if (actualCause == RemovalCause.SIZE) {
int weight;
synchronized (node) {
weight = node.getWeight();
}
if (weight == 0) {
resurrect[0] = true;
return n;
synchronized (n) {
value[0] = n.getValue();

actualCause[0] = (key == null) || (value[0] == null) ? RemovalCause.COLLECTED : cause;
if (actualCause[0] == RemovalCause.EXPIRED) {
boolean expired = false;
if (expiresAfterAccess()) {
long expirationTime = now - expiresAfterAccessNanos();
expired |= n.getAccessTime() <= expirationTime;
}
if (expiresAfterWrite()) {
long expirationTime = now - expiresAfterWriteNanos();
expired |= n.getWriteTime() <= expirationTime;
}
if (!expired) {
resurrect[0] = true;
return n;
}
} else if (actualCause[0] == RemovalCause.SIZE) {
int weight = node.getWeight();
if (weight == 0) {
resurrect[0] = true;
return n;
}
}
writer.delete(key, value[0], actualCause[0]);
makeDead(n);
}
writer.delete(key, value, actualCause);
removed[0] = true;
return null;
});
Expand All @@ -770,7 +774,6 @@ void evictEntry(Node<K, V> node, RemovalCause cause, long now) {
// If the eviction fails due to a concurrent removal of the victim, that removal may cancel out
// the addition that triggered this eviction. The victim is eagerly unlinked before the removal
// task so that if an eviction is still required then a new victim will be chosen for removal.
makeDead(node);
if (node.inEden() && (evicts() || expiresAfterAccess())) {
accessOrderEdenDeque().remove(node);
} else if (evicts()) {
Expand All @@ -789,7 +792,7 @@ void evictEntry(Node<K, V> node, RemovalCause cause, long now) {
if (hasRemovalListener()) {
// Notify the listener only if the entry was evicted. This must be performed as the last
// step during eviction to safe guard against the executor rejecting the notification task.
notifyRemoval(key, value, actualCause);
notifyRemoval(key, value[0], actualCause[0]);
}
}
}
Expand Down Expand Up @@ -1481,50 +1484,36 @@ public Map<K, V> getAllPresent(Iterable<?> keys) {

@Override
public V put(K key, V value) {
int weight = weigher.weigh(key, value);
return (weight > 0)
? putFast(key, value, weight, /* notifyWriter */ true, /* onlyIfAbsent */ false)
: putSlow(key, value, weight, /* notifyWriter */ true, /* onlyIfAbsent */ false);
return put(key, value, /* notifyWriter */ true, /* onlyIfAbsent */ false);
}

@Override
public V put(K key, V value, boolean notifyWriter) {
int weight = weigher.weigh(key, value);
return (weight > 0)
? putFast(key, value, weight, notifyWriter, /* onlyIfAbsent */ false)
: putSlow(key, value, weight, notifyWriter, /* onlyIfAbsent */ false);
return put(key, value, notifyWriter, /* onlyIfAbsent */ false);
}

@Override
public V putIfAbsent(K key, V value) {
int weight = weigher.weigh(key, value);
return (weight > 0)
? putFast(key, value, weight, /* notifyWriter */ true, /* onlyIfAbsent */ true)
: putSlow(key, value, weight, /* notifyWriter */ true, /* onlyIfAbsent */ true);
return put(key, value, /* notifyWriter */ true, /* onlyIfAbsent */ true);
}

/**
* Adds a node to the policy and the data store. If an existing node is found, then its value is
* updated if allowed.
*
* This implementation is optimized for writing values with a non-zero weight. A zero weight is
* incompatible due to the potential for the update to race with eviction, where the entry should
* no longer be eligible if the update was successful. This implementation is ~50% faster than
* {@link #putSlow} due to not incurring the penalty of a compute and lambda in the common case.
*
* @param key key with which the specified value is to be associated
* @param value value to be associated with the specified key
* @param notifyWriter if the writer should be notified for an inserted or updated entry
* @param onlyIfAbsent a write is performed only if the key is not already associated with a value
* @return the prior value in or null if no mapping was found
*/
V putFast(K key, V value, int newWeight, boolean notifyWriter, boolean onlyIfAbsent) {
V put(K key, V value, boolean notifyWriter, boolean onlyIfAbsent) {
requireNonNull(key);
requireNonNull(value);
requireState(newWeight != 0);

Node<K, V> node = null;
long now = expirationTicker().read();
int newWeight = weigher.weigh(key, value);
for (;;) {
Node<K, V> prior = data.get(nodeFactory.newLookupKey(key));
if (prior == null) {
Expand Down Expand Up @@ -1606,100 +1595,6 @@ V putFast(K key, V value, int newWeight, boolean notifyWriter, boolean onlyIfAbs
}
}

/**
* Adds a node to the policy and the data store. If an existing node is found, then its value is
* updated if allowed.
*
* This implementation is strict by using a compute to block other writers to that entry. This
* guards against an eviction trying to discard an entry concurrently (and successfully) updated
* to have a zero weight. The penalty is 50% of the throughput when compared to {@link #putFast}.
*
* @param key key with which the specified value is to be associated
* @param value value to be associated with the specified key
* @param notifyWriter if the writer should be notified for an inserted or updated entry
* @param onlyIfAbsent a write is performed only if the key is not already associated with a value
* @return the prior value or null if no mapping was found
*/
V putSlow(K key, V value, int newWeight, boolean notifyWriter, boolean onlyIfAbsent) {
requireNonNull(key);
requireNonNull(value);

@SuppressWarnings("unchecked")
K[] nodeKey = (K[]) new Object[1];
@SuppressWarnings("unchecked")
V[] oldValue = (V[]) new Object[1];
@SuppressWarnings({"unchecked", "rawtypes"})
RemovalCause[] cause = new RemovalCause[1];
long now = expirationTicker().read();

int[] oldWeight = new int[1];
Object keyRef = nodeFactory.newReferenceKey(key, keyReferenceQueue());
Node<K, V> node = data.compute(keyRef, (kr, n) -> {
if (n == null) {
if (notifyWriter) {
writer.write(key, value);
}
return nodeFactory.newNode(kr, value, valueReferenceQueue(), newWeight, now);
}

synchronized (n) {
nodeKey[0] = n.getKey();
oldValue[0] = n.getValue();
oldWeight[0] = n.getWeight();
if ((nodeKey[0] == null) || (oldValue[0] == null)) {
cause[0] = RemovalCause.COLLECTED;
} else if (hasExpired(n, now)) {
cause[0] = RemovalCause.EXPIRED;
}
if (cause[0] != null) {
writer.delete(nodeKey[0], oldValue[0], cause[0]);
} else if (onlyIfAbsent && (oldValue[0] != null)) {
return n;
}

if (value != oldValue[0]) {
if (cause[0] == null) {
cause[0] = RemovalCause.REPLACED;
}
if (notifyWriter) {
writer.write(key, value);
}
}

n.setValue(value, valueReferenceQueue());
n.setWeight(newWeight);
n.setAccessTime(now);
n.setWriteTime(now);
return n;
}
});

if (cause[0] != null) {
if (cause[0].wasEvicted()) {
statsCounter().recordEviction(oldWeight[0]);
}
if (hasRemovalListener()) {
notifyRemoval(nodeKey[0], oldValue[0], cause[0]);
}
}

if ((oldValue[0] == null) && (cause[0] == null)) {
afterWrite(node, new AddTask(node, newWeight), now);
} else if (onlyIfAbsent && (oldValue[0] != null) && (cause[0] == null)) {
afterRead(node, now, /* recordHit */ false);
} else {
int weightedDifference = newWeight - oldWeight[0];
if (expiresAfterWrite() || (oldValue[0] == null) || (weightedDifference != 0)
|| ((cause[0] != null) && (cause[0] != RemovalCause.REPLACED))) {
afterWrite(node, new UpdateTask(node, weightedDifference), now);
} else {
afterRead(node, now, /* recordHit */ false);
}
}

return (cause[0] == null) || (cause[0] == RemovalCause.REPLACED) ? oldValue[0] : null;
}

@Override
public V remove(Object key) {
return hasWriter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,6 @@ public V put(K key, V value) {
return put(key, value, /* notifyWriter */ true);
}


@Override
public V put(K key, V value, boolean notifyWriter) {
requireNonNull(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ public void put_changeWeight(Cache<String, List<Integer>> cache,
@CacheSpec(implementation = Implementation.Caffeine, maximumSize = Maximum.FULL,
weigher = CacheWeigher.COLLECTION, population = Population.EMPTY,
keys = ReferenceType.STRONG, values = ReferenceType.STRONG)
@SuppressWarnings("FutureReturnValueIgnored")
public void put_asyncWeight(AsyncLoadingCache<Integer, List<Integer>> cache,
CacheContext context, Eviction<?, ?> eviction) {
AtomicBoolean ready = new AtomicBoolean();
Expand Down

0 comments on commit 83b47d1

Please sign in to comment.