Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Background refreshes not happening #2 #1478

Open
tjackowiak opened this issue Jan 27, 2024 · 5 comments
Open

Background refreshes not happening #2 #1478

tjackowiak opened this issue Jan 27, 2024 · 5 comments

Comments

@tjackowiak
Copy link

Hi,
we are experiencing an issue identical to one described in #120. On some instances Caffeine stops refreshing entries when the computation of the new value is slow.

I was able to prepare a code to reproduce the issue.

import com.github.benmanes.caffeine.cache.Caffeine
import java.time.Duration
import java.util.concurrent.Executors
import kotlin.concurrent.thread

fun main() {
    var i = 0

    val cache = Caffeine.newBuilder()
        .refreshAfterWrite(Duration.ofSeconds(1))
        .maximumSize(1)
        .executor(Executors.newSingleThreadExecutor())
        .buildAsync { _: String ->
            println("loading value")
            Thread.sleep(5_000)
            i++
        }


    thread {
        while (true) {
            thread {
                val value = cache.get("key").join()
                println("${Thread.currentThread().name} ->  $value")
            }
            Thread.sleep(1_000)
        }
    }
}

I would expect that one second after calculating the value, a new computation begins since the previous result becomes outdated. However, if the computation time exceeds the refreshAfterWrite duration, this never occurs.

The issue is not triggered when using 'LoadingCache' instead of 'AsyncLoadingCache' or a thread pool with two threads.

I'm using Caffeine 3.1.8.

Is this is a bug or is it something I have misunderstood?

@ben-manes
Copy link
Owner

Thanks! I think that I understand what may be happening.

When the entry is being loaded and the refresh duration passes since mapping's insertion, then attempts to reload should no-op. Currently that is done by storing the in-flight future into the refresh map, which is a side mapping for those in-flight reload operations. However, when the load completes that future is not removed from the mapping until a write or eviction occurs that invalidates it (as CacheLoader#reload can be based on the old value). Unfortunately it looks like I forgot to remove the future from the refresh map in this case, which causes further refreshes to not be scheduled because it thinks one is already in-flight.

Inserting the in-flight load as a pseudo refresh to cheaply disable it was probably not a good approach. That was later exposed as Cache.policy().refreshes() so it would mislead if inspected on. I think instead it should guard entering the scheduling by checking if in-flight and, if by a race it does enter (e.g. value was replaced), then no-op upon detection. The below patch passes your test case, so I kicked off a CI run to see if anything fails before I iterate on cleaning up the warnings and adding test coverage.

diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java
index 22e37cbb..2eec6078 100644
--- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java
+++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java
@@ -1316,6 +1316,7 @@ abstract class BoundedLocalCache<K, V> extends BLCHeader.DrainStatusRef
     ConcurrentMap<Object, CompletableFuture<?>> refreshes;
     if (((now - writeTime) > refreshAfterWriteNanos()) && (keyReference != null)
         && ((key = node.getKey()) != null) && ((oldValue = node.getValue()) != null)
+        && !(isAsync && !Async.isReady((CompletableFuture<V>) oldValue))
         && ((writeTime & 1L) == 0L) && !(refreshes = refreshes()).containsKey(keyReference)
         && node.isAlive() && node.casWriteTime(writeTime, refreshWriteTime)) {
       long[] startTime = new long[1];
@@ -1334,7 +1335,7 @@ abstract class BoundedLocalCache<K, V> extends BLCHeader.DrainStatusRef
                 refreshFuture[0] = requireNonNull(refresh, "Null future");
               } else {
                 // no-op if load is pending
-                return future;
+                return null;
               }
             } else {
               @SuppressWarnings("NullAway")

In the meantime, refreshAfterWrite is intended to be used with expiration. If you are not combining them then that might add to your frustration and we likely should have asserted that in Guava (which Caffeine then mirrors). The intent was for popular entries to be reloaded optimistically to avoid a latency spike when expired and threads block waiting for a fresh load, e.g. an oauth token used on every user request. That masks this bug because when the entry does expire then the refresh mapping is removed and nothing gets stuck.

@tjackowiak
Copy link
Author

Thank you for the quick verification!

In the meantime, refreshAfterWrite is intended to be used with expiration.

In my case, low response latency is more important than data freshness. The reload time depends on an external service that I have no influence on, so we did not set any expiration time and rely on the fact that frequent requests will force asynchronous reloading of the old value. Good to know that adding this setting also solves the problem!

@ben-manes
Copy link
Owner

In that case you can hack around it for the moment by doing a no-op write after the load. It's not pretty, but it will clear the entry from mapping of in-flight refreshes to keep linearizability.

Here is a bit of test code showing that as I was debugging the issue,

Workaround
public final class Issue1478Test {
  volatile AsyncCache<String, Integer> c;

  @Test
  public void issue1478() {
    var counter = new AtomicInteger();
    var executorService = Executors.newSingleThreadExecutor();
    var cache = Caffeine.newBuilder()
        .maximumSize(1)
        .executor(executorService)
        .refreshAfterWrite(Duration.ofSeconds(1))
        .buildAsync(new CacheLoader<String, Integer>() {
          @Override public Integer load(String key) {
            System.out.println(Thread.currentThread().getName() + " -> start loading");
            Uninterruptibles.sleepUninterruptibly(Duration.ofSeconds(5));
            System.out.println(Thread.currentThread().getName() + " -> end loading");
            return counter.incrementAndGet();
          }
          @Override public Integer reload(String key, Integer oldValue) {
            System.out.println(Thread.currentThread().getName() + " -> start reloading");
            Uninterruptibles.sleepUninterruptibly(Duration.ofSeconds(5));
            System.out.println(Thread.currentThread().getName() + " -> end reloading");
            return counter.incrementAndGet();
          }
          public CompletableFuture<? extends Integer> asyncLoad(
              String key, Executor executor) throws Exception {
            var future = CacheLoader.super.asyncLoad(key, executor);
            future.thenAcceptAsync(value -> 
                c.synchronous().asMap().replace(key, value, value), executor);
            return future;
          }
        });
    c = cache;

    //cache.synchronous().put("key", 0);
    while (true) {
      var thread = new Thread(() -> {
        var value = cache.get("key").join();
        System.out.println(Thread.currentThread().getName() + " -> " + value);
      });
      thread.start();
      //Uninterruptibles.joinUninterruptibly(thread);
      Uninterruptibles.sleepUninterruptibly(Duration.ofSeconds(1));
    }
  }
}

I'll try to get a fix out soon since this is a bug and I don't like to sit on those. There are two other issues open that are low priority quality-of-life asks that I'd like to wrap up into a release. If I'm lucky then I can get them all sorted out and release, else I'll cut this early (all depending on your timeframe). I might need a week or so depending on how far I get this weekend, work, etc.

@ben-manes
Copy link
Owner

sorry, I just haven't had the time and mental focus to work on this project. I'll keep chipping away and I might cut a quick release with just this fix while trying to catch up with everything.

@ben-manes
Copy link
Owner

I think the original fix is correct, but it also enters this unexpectedly because the write time was not adjusted far into the future. When the future is first loaded we have this ugly hack,

// Ensure that in-flight async computation cannot expire (reset on a completion callback)
if (isComputingAsync(node)) {
synchronized (node) {
if (!Async.isReady((CompletableFuture<?>) node.getValue())) {
long expirationTime = expirationTicker().read() + ASYNC_EXPIRY;
setVariableTime(node, expirationTime);
setAccessTime(node, expirationTime);
setWriteTime(node, expirationTime);
}
}

but it is not present on an update, which would be set explicitly with the current write time. I am unsure if this is needed as hasExpired(node) also checks if the entry is async loading, but that is not present on evictEntry for removal. So I see a few possibilities to investigate.

  1. Try removing the AddTask overrides, which are applied after the write.
  2. If needed, then move them to the write operations for insert or update so atomically set.
  3. See if eviction might accidentally remove an expired async task. Regardless, it should have a check just to be safe.
  4. Update the refreshIfNeeded as above, but using isComputingAsync as the same logic as that new line
  5. Write oodles of tests.
Toy unit test
  @Test(dataProvider = "caches")
  @CacheSpec(population = Population.EMPTY, loader = Loader.ASYNC_INCOMPLETE,
      refreshAfterWrite = Expire.ONE_MINUTE, expireAfterWrite = Expire.FOREVER)
  public void refreshIfNeeded_slowLoad(AsyncLoadingCache<Int, Int> cache, CacheContext context) {
    cache.put(context.absentKey(), new CompletableFuture<Int>());

    cache.synchronous().asMap().put(context.absentKey(), context.absentKey());

    context.ticker().advance(Duration.ofHours(1));
    cache.put(context.absentKey(), new CompletableFuture<Int>());

    context.ticker().advance(Duration.ofHours(1));
    var future = cache.getIfPresent(context.absentKey());
    assertThat(future).isNotDone();

    assertThat(cache.synchronous().policy().refreshes()).isEmpty();

    future.complete(context.absentKey().negate());
    assertThat(cache.synchronous().policy().refreshes()).isEmpty();
  }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants