Skip to content

Commit

Permalink
Optimize refreshAfterWrite
Browse files Browse the repository at this point in the history
When an entry is eligible for refresh, only one thread should block to
schedule it. Previously all readers would block on a map computeIfAbsent
operation to obtain the future. While this allows refreshes to be
linearizable, it adds a small and unnecessary synchronization point. The
change restores the non-blocking behavior of v2.x, while keeping the
improvements of v3's rewrite.

The write timestamp is CAS'd as a soft lock to allow subsequent readers
to skip attempting to refresh. The least significant bit is used as a
flag for locking, causing the timestamp to be off by 1ns from the ideal
value. (Thanks @Maaartinus for suggesting this idea in #282 (comment))

Also restored from v2 is to suppress and log exceptions if the cache
loader fails when producing the refresh future.

The inspections to obtain an entry's age were improved, such as not
resurrecting an expired entry.
  • Loading branch information
ben-manes committed Jul 14, 2021
1 parent b13fcf0 commit b5dbe2d
Show file tree
Hide file tree
Showing 9 changed files with 275 additions and 82 deletions.
Expand Up @@ -115,8 +115,8 @@ private void addAccessExpiration() {
.addMethod(newGetter(Strength.STRONG, TypeName.LONG, "accessTime", Visibility.PLAIN))
.addMethod(newSetter(TypeName.LONG, "accessTime", Visibility.PLAIN));
addVarHandle("accessTime", TypeName.get(long.class));
addTimeConstructorAssignment(context.constructorByKey, "accessTime");
addTimeConstructorAssignment(context.constructorByKeyRef, "accessTime");
addTimeConstructorAssignment(context.constructorByKey, "accessTime", "now");
addTimeConstructorAssignment(context.constructorByKeyRef, "accessTime", "now");
}

private void addWriteExpiration() {
Expand All @@ -127,8 +127,8 @@ private void addWriteExpiration() {
.addMethod(newGetter(Strength.STRONG, TypeName.LONG, "writeTime", Visibility.PLAIN))
.addMethod(newSetter(TypeName.LONG, "writeTime", Visibility.PLAIN));
addVarHandle("writeTime", TypeName.get(long.class));
addTimeConstructorAssignment(context.constructorByKey, "writeTime");
addTimeConstructorAssignment(context.constructorByKeyRef, "writeTime");
addTimeConstructorAssignment(context.constructorByKey, "writeTime", "now & ~1L");
addTimeConstructorAssignment(context.constructorByKeyRef, "writeTime", "now & ~1L");
}
}

Expand All @@ -147,7 +147,8 @@ private void addRefreshExpiration() {
}

/** Adds a long constructor assignment. */
private void addTimeConstructorAssignment(MethodSpec.Builder constructor, String field) {
constructor.addStatement("$L.set(this, $N)", varHandleName(field), "now");
private void addTimeConstructorAssignment(
MethodSpec.Builder constructor, String field, String value) {
constructor.addStatement("$L.set(this, $N)", varHandleName(field), value);
}
}
Expand Up @@ -55,7 +55,6 @@
import java.util.Set;
import java.util.Spliterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -1242,42 +1241,48 @@ void refreshIfNeeded(Node<K, V> node, long now) {
K key;
V oldValue;
long writeTime = node.getWriteTime();
long refreshWriteTime = writeTime | 1L;
Object keyReference = node.getKeyReference();
if (((now - writeTime) > refreshAfterWriteNanos()) && (keyReference != null)
&& ((key = node.getKey()) != null) && ((oldValue = node.getValue()) != null)
&& !refreshes().containsKey(keyReference)) {
&& !refreshes().containsKey(keyReference)
&& ((writeTime & 1L) == 0L) && node.casWriteTime(writeTime, refreshWriteTime)) {
long[] startTime = new long[1];
@SuppressWarnings({"unchecked", "rawtypes"})
CompletableFuture<? extends V>[] refreshFuture = new CompletableFuture[1];
refreshes().computeIfAbsent(keyReference, k -> {
try {
startTime[0] = statsTicker().read();
if (isAsync) {
@SuppressWarnings("unchecked")
CompletableFuture<V> future = (CompletableFuture<V>) oldValue;
if (Async.isReady(future)) {
try {
refreshes().computeIfAbsent(keyReference, k -> {
try {
startTime[0] = statsTicker().read();
if (isAsync) {
@SuppressWarnings("unchecked")
CompletableFuture<V> future = (CompletableFuture<V>) oldValue;
if (Async.isReady(future)) {
@SuppressWarnings("NullAway")
var refresh = cacheLoader.asyncReload(key, future.join(), executor);
refreshFuture[0] = refresh;
} else {
// no-op if load is pending
return future;
}
} else {
@SuppressWarnings("NullAway")
var refresh = cacheLoader.asyncReload(key, future.join(), executor);
var refresh = cacheLoader.asyncReload(key, oldValue, executor);
refreshFuture[0] = refresh;
} else {
// no-op if load is pending
return future;
}
} else {
@SuppressWarnings("NullAway")
var refresh = cacheLoader.asyncReload(key, oldValue, executor);
refreshFuture[0] = refresh;
return refreshFuture[0];
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.log(Level.WARNING, "Exception thrown when submitting refresh task", e);
return null;
} catch (Throwable e) {
logger.log(Level.WARNING, "Exception thrown when submitting refresh task", e);
return null;
}
return refreshFuture[0];
} catch (RuntimeException e) {
throw e;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new CompletionException(e);
} catch (Exception e) {
throw new CompletionException(e);
}
});
});
} finally {
node.casWriteTime(refreshWriteTime, writeTime);
}

if (refreshFuture[0] != null) {
refreshFuture[0].whenComplete((newValue, error) -> {
Expand Down Expand Up @@ -1417,7 +1422,7 @@ void setVariableTime(Node<K, V> node, long expirationTime) {

void setWriteTime(Node<K, V> node, long now) {
if (expiresAfterWrite() || refreshAfterWrite()) {
node.setWriteTime(now);
node.setWriteTime(now & ~1L);
}
}

Expand Down Expand Up @@ -3631,14 +3636,14 @@ final class BoundedExpireAfterAccess implements FixedExpiration<K, V> {
requireNonNull(key);
requireNonNull(unit);
Object lookupKey = cache.nodeFactory.newLookupKey(key);
Node<?, ?> node = cache.data.get(lookupKey);
Node<K, V> node = cache.data.get(lookupKey);
if (node == null) {
return OptionalLong.empty();
}
long age = cache.expirationTicker().read() - node.getAccessTime();
return (age > cache.expiresAfterAccessNanos())
long now = cache.expirationTicker().read();
return cache.hasExpired(node, now)
? OptionalLong.empty()
: OptionalLong.of(unit.convert(age, TimeUnit.NANOSECONDS));
: OptionalLong.of(unit.convert(now - node.getAccessTime(), TimeUnit.NANOSECONDS));
}
@Override public long getExpiresAfter(TimeUnit unit) {
return unit.convert(cache.expiresAfterAccessNanos(), TimeUnit.NANOSECONDS);
Expand All @@ -3662,14 +3667,14 @@ final class BoundedExpireAfterWrite implements FixedExpiration<K, V> {
requireNonNull(key);
requireNonNull(unit);
Object lookupKey = cache.nodeFactory.newLookupKey(key);
Node<?, ?> node = cache.data.get(lookupKey);
Node<K, V> node = cache.data.get(lookupKey);
if (node == null) {
return OptionalLong.empty();
}
long age = cache.expirationTicker().read() - node.getWriteTime();
return (age > cache.expiresAfterWriteNanos())
long now = cache.expirationTicker().read();
return cache.hasExpired(node, now)
? OptionalLong.empty()
: OptionalLong.of(unit.convert(age, TimeUnit.NANOSECONDS));
: OptionalLong.of(unit.convert(now - node.getWriteTime(), TimeUnit.NANOSECONDS));
}
@Override public long getExpiresAfter(TimeUnit unit) {
return unit.convert(cache.expiresAfterWriteNanos(), TimeUnit.NANOSECONDS);
Expand All @@ -3693,14 +3698,14 @@ final class BoundedVarExpiration implements VarExpiration<K, V> {
requireNonNull(key);
requireNonNull(unit);
Object lookupKey = cache.nodeFactory.newLookupKey(key);
Node<?, ?> node = cache.data.get(lookupKey);
Node<K, V> node = cache.data.get(lookupKey);
if (node == null) {
return OptionalLong.empty();
}
long duration = node.getVariableTime() - cache.expirationTicker().read();
return (duration <= 0)
long now = cache.expirationTicker().read();
return cache.hasExpired(node, now)
? OptionalLong.empty()
: OptionalLong.of(unit.convert(duration, TimeUnit.NANOSECONDS));
: OptionalLong.of(unit.convert(node.getVariableTime() - now, TimeUnit.NANOSECONDS));
}
@Override public void setExpiresAfter(K key, long duration, TimeUnit unit) {
requireNonNull(key);
Expand All @@ -3713,6 +3718,9 @@ final class BoundedVarExpiration implements VarExpiration<K, V> {
long durationNanos = TimeUnit.NANOSECONDS.convert(duration, unit);
synchronized (node) {
now = cache.expirationTicker().read();
if (cache.hasExpired(node, now)) {
return;
}
node.setVariableTime(now + Math.min(durationNanos, MAXIMUM_EXPIRY));
}
cache.afterRead(node, now, /* recordHit */ false);
Expand Down Expand Up @@ -3816,14 +3824,14 @@ final class BoundedRefreshAfterWrite implements FixedRefresh<K, V> {
requireNonNull(key);
requireNonNull(unit);
Object lookupKey = cache.nodeFactory.newLookupKey(key);
Node<?, ?> node = cache.data.get(lookupKey);
Node<K, V> node = cache.data.get(lookupKey);
if (node == null) {
return OptionalLong.empty();
}
long age = cache.expirationTicker().read() - node.getWriteTime();
return (age > cache.refreshAfterWriteNanos())
long now = cache.expirationTicker().read();
return cache.hasExpired(node, now)
? OptionalLong.empty()
: OptionalLong.of(unit.convert(age, TimeUnit.NANOSECONDS));
: OptionalLong.of(unit.convert(now - node.getWriteTime(), TimeUnit.NANOSECONDS));
}
@Override public long getRefreshesAfter(TimeUnit unit) {
return unit.convert(cache.refreshAfterWriteNanos(), TimeUnit.NANOSECONDS);
Expand Down
Expand Up @@ -219,8 +219,6 @@ interface FixedExpiration<K extends Object, V extends Object> {
* An expiration policy uses the age to determine if an entry is fresh or stale by comparing it
* to the freshness lifetime. This is calculated as {@code fresh = freshnessLifetime > age}
* where {@code freshnessLifetime = expires - currentTime}.
* <p>
* This method is scheduled for removal in version 3.0.0.
*
* @param key the key for the entry being queried
* @param unit the unit that {@code age} is expressed in
Expand Down Expand Up @@ -251,8 +249,6 @@ default Optional<Duration> ageOf(K key) {
* to elapsing this time bound. An entry is considered fresh if its age is less than this
* duration, and stale otherwise. The expiration policy determines when the entry's age is
* reset.
* <p>
* This method is scheduled for removal in version 3.0.0.
*
* @param unit the unit that duration is expressed in
* @return the length of time after which an entry should be automatically removed
Expand All @@ -275,8 +271,6 @@ default Duration getExpiresAfter() {
/**
* Specifies that each entry should be automatically removed from the cache once a fixed
* duration has elapsed. The expiration policy determines when the entry's age is reset.
* <p>
* This method is scheduled for removal in version 3.0.0.
*
* @param duration the length of time after which an entry should be automatically removed
* @param unit the unit that {@code duration} is expressed in
Expand Down Expand Up @@ -334,8 +328,6 @@ interface VarExpiration<K extends Object, V extends Object> {
/**
* Returns the duration until the entry should be automatically removed. The expiration policy
* determines when the entry's duration is reset.
* <p>
* This method is scheduled for removal in version 3.0.0.
*
* @param key the key for the entry being queried
* @param unit the unit that {@code age} is expressed in
Expand Down Expand Up @@ -386,8 +378,6 @@ default void setExpiresAfter(K key, Duration duration) {
* already associated with a value. This method differs from {@link Map#putIfAbsent} by
* substituting the configured {@link Expiry} with the specified write duration, has no effect
* on the duration if the entry was present, and returns the success rather than a value.
* <p>
* This method is scheduled for removal in version 3.0.0.
*
* @param key the key with which the specified value is to be associated
* @param value value to be associated with the specified key
Expand Down Expand Up @@ -421,8 +411,6 @@ default void setExpiresAfter(K key, Duration duration) {
* contained a value associated with the {@code key}, the old value is replaced by the new
* {@code value}. This method differs from {@link Cache#put} by substituting the configured
* {@link Expiry} with the specified write duration.
* <p>
* This method is scheduled for removal in version 3.0.0.
*
* @param key the key with which the specified value is to be associated
* @param value value to be associated with the specified key
Expand Down Expand Up @@ -491,13 +479,11 @@ interface FixedRefresh<K extends Object, V extends Object> {

/**
* Returns the age of the entry based on the refresh policy. The entry's age is the cache's
* estimate of the amount of time since the entry's refresh time was last reset.
* estimate of the amount of time since the entry's refresh period was last reset.
* <p>
* An expiration policy uses the age to determine if an entry is fresh or stale by comparing it
* A refresh policy uses the age to determine if an entry is fresh or stale by comparing it
* to the freshness lifetime. This is calculated as {@code fresh = freshnessLifetime > age}
* where {@code freshnessLifetime = expires - currentTime}.
* <p>
* This method is scheduled for removal in version 3.0.0.
*
* @param key the key for the entry being queried
* @param unit the unit that {@code age} is expressed in
Expand All @@ -506,10 +492,10 @@ interface FixedRefresh<K extends Object, V extends Object> {
OptionalLong ageOf(K key, TimeUnit unit);

/**
* Returns the age of the entry based on the expiration policy. The entry's age is the cache's
* estimate of the amount of time since the entry's expiration was last reset.
* Returns the age of the entry based on the refresh policy. The entry's age is the cache's
* estimate of the amount of time since the entry's refresh period was last reset.
* <p>
* An expiration policy uses the age to determine if an entry is fresh or stale by comparing it
* A refresh policy uses the age to determine if an entry is fresh or stale by comparing it
* to the freshness lifetime. This is calculated as {@code fresh = freshnessLifetime > age}
* where {@code freshnessLifetime = expires - currentTime}.
*
Expand All @@ -528,8 +514,6 @@ default Optional<Duration> ageOf(K key) {
* to elapsing this time bound. An entry is considered fresh if its age is less than this
* duration, and stale otherwise. The refresh policy determines when the entry's age is
* reset.
* <p>
* This method is scheduled for removal in version 3.0.0.
*
* @param unit the unit that duration is expressed in
* @return the length of time after which an entry is eligible to be reloaded
Expand All @@ -552,8 +536,6 @@ default Duration getRefreshesAfter() {
/**
* Specifies that each entry should be eligible for reloading once a fixed duration has elapsed.
* The refresh policy determines when the entry's age is reset.
* <p>
* This method is scheduled for removal in version 3.0.0.
*
* @param duration the length of time after which an entry is eligible to be reloaded
* @param unit the unit that {@code duration} is expressed in
Expand Down
Expand Up @@ -632,7 +632,7 @@ public void evict_resurrect_expireAfterVar(Cache<Int, Int> cache, CacheContext c
await().untilTrue(started);
var threadState = EnumSet.of(State.BLOCKED, State.WAITING);
await().until(() -> threadState.contains(evictor.get().getState()));
cache.policy().expireVariably().get().setExpiresAfter(key, Duration.ofDays(1));
node.setVariableTime(context.ticker().read() + TimeUnit.DAYS.toNanos(1));
}
await().untilTrue(done);

Expand Down
Expand Up @@ -286,6 +286,26 @@ public void ageOf_duration(CacheContext context,
assertThat(expireAfterAccess.ageOf(context.firstKey()), is(Optional.empty()));
}

@Test(dataProvider = "caches")
@CacheSpec(implementation = Implementation.Caffeine, expireAfterAccess = Expire.ONE_MINUTE)
public void ageOf_absent(CacheContext context,
@ExpireAfterAccess FixedExpiration<Int, Int> expireAfterAccess) {
assertThat(expireAfterAccess.ageOf(
context.absentKey(), TimeUnit.SECONDS).isPresent(), is(false));
}

@Test(dataProvider = "caches")
@CacheSpec(implementation = Implementation.Caffeine,
expiry = { CacheExpiry.DISABLED, CacheExpiry.CREATE, CacheExpiry.WRITE, CacheExpiry.ACCESS },
expireAfterAccess = Expire.ONE_MINUTE, population = Population.EMPTY)
public void ageOf_expired(Cache<Int, Int> cache, CacheContext context,
@ExpireAfterAccess FixedExpiration<Int, Int> expireAfterAccess) {
cache.put(context.absentKey(), context.absentValue());
context.ticker().advance(2, TimeUnit.MINUTES);
assertThat(expireAfterAccess.ageOf(
context.absentKey(), TimeUnit.SECONDS).isPresent(), is(false));
}

/* --------------- Policy: oldest --------------- */

@CacheSpec(implementation = Implementation.Caffeine, expireAfterAccess = Expire.ONE_MINUTE)
Expand Down

0 comments on commit b5dbe2d

Please sign in to comment.