Skip to content

Commit

Permalink
wip - issue 1687
Browse files Browse the repository at this point in the history
  • Loading branch information
ben-manes committed May 28, 2024
1 parent b4cedbc commit 9d7850f
Show file tree
Hide file tree
Showing 18 changed files with 532 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@

import com.github.benmanes.caffeine.cache.LocalAsyncCache.AsyncBulkCompleter.NullMapCompletionException;
import com.github.benmanes.caffeine.cache.stats.CacheStats;
import com.google.errorprone.annotations.CanIgnoreReturnValue;

/**
* This class provides a skeletal implementation of the {@link AsyncCache} interface to minimize the
Expand Down Expand Up @@ -144,9 +145,9 @@ default CompletableFuture<Map<K, V>> getAll(Iterable<? extends K> keys,
try {
var loader = mappingFunction.apply(
Collections.unmodifiableSet(proxies.keySet()), cache().executor());
return loader.whenComplete(completer).thenCompose(ignored -> composeResult(futures));
return loader.handle(completer).thenCompose(ignored -> composeResult(futures));
} catch (Throwable t) {
completer.accept(/* result */ null, t);
completer.apply(/* result */ null, t);
throw t;
}
}
Expand Down Expand Up @@ -214,9 +215,15 @@ default void handleCompletion(K key, CompletableFuture<? extends V> valueFuture,
@SuppressWarnings("unchecked")
var castedFuture = (CompletableFuture<V>) valueFuture;

// update the weight and expiration timestamps
cache().statsCounter().recordLoadSuccess(loadTime);
cache().replace(key, castedFuture, castedFuture, /* shouldDiscardRefresh */ false);
try {
// update the weight and expiration timestamps
cache().replace(key, castedFuture, castedFuture, /* shouldDiscardRefresh */ false);
cache().statsCounter().recordLoadSuccess(loadTime);
} catch (Throwable t) {
logger.log(Level.WARNING, "Exception thrown during asynchronous load", t);
cache().statsCounter().recordLoadFailure(loadTime);
cache().remove(key, valueFuture);
}
}
if (recordMiss) {
cache().statsCounter().recordMisses(1);
Expand All @@ -226,7 +233,7 @@ default void handleCompletion(K key, CompletableFuture<? extends V> valueFuture,

/** A function executed asynchronously after a bulk load completes. */
final class AsyncBulkCompleter<K, V>
implements BiConsumer<Map<? extends K, ? extends V>, Throwable> {
implements BiFunction<Map<? extends K, ? extends V>, Throwable, Map<? extends K, ? extends V>> {
private final LocalCache<K, CompletableFuture<V>> cache;
private final Map<K, CompletableFuture<V>> proxies;
private final long startTime;
Expand All @@ -239,49 +246,91 @@ final class AsyncBulkCompleter<K, V>
}

@Override
public void accept(@Nullable Map<? extends K, ? extends V> result, @Nullable Throwable error) {
@CanIgnoreReturnValue
public @Nullable Map<? extends K, ? extends V> apply(
@Nullable Map<? extends K, ? extends V> result, @Nullable Throwable error) {
long loadTime = cache.statsTicker().read() - startTime;

if (result == null) {
if (error == null) {
error = new NullMapCompletionException();
}
for (var entry : proxies.entrySet()) {
cache.remove(entry.getKey(), entry.getValue());
entry.getValue().obtrudeException(error);
}
cache.statsCounter().recordLoadFailure(loadTime);
if (!(error instanceof CancellationException) && !(error instanceof TimeoutException)) {
logger.log(Level.WARNING, "Exception thrown during asynchronous load", error);
}
handleFailure(error);
} else {
fillProxies(result);
addNewEntries(result);
error = handleSuccess(result);
}

if (error == null) {
cache.statsCounter().recordLoadSuccess(loadTime);
return result;
}

cache.statsCounter().recordLoadFailure(loadTime);
if (error instanceof RuntimeException) {
throw (RuntimeException) error;
} else if (error instanceof Error) {
throw (Error) error;
}
throw new CompletionException(error);
}

/** Populates the proxies with the computed result. */
private void fillProxies(Map<? extends K, ? extends V> result) {
proxies.forEach((key, future) -> {
V value = result.get(key);
private void handleFailure(Throwable error) {
for (var entry : proxies.entrySet()) {
cache.remove(entry.getKey(), entry.getValue());
entry.getValue().obtrudeException(error);
}
if (!(error instanceof CancellationException) && !(error instanceof TimeoutException)) {
logger.log(Level.WARNING, "Exception thrown during asynchronous load", error);
}
}

private @Nullable Throwable handleSuccess(Map<? extends K, ? extends V> result) {
Throwable error = null;

// Populates the proxies with the computed result
for (var entry : proxies.entrySet()) {
var key = entry.getKey();
var value = result.get(key);
var future = entry.getValue();
future.obtrudeValue(value);

if (value == null) {
cache.remove(key, future);
} else {
// update the weight and expiration timestamps
cache.replace(key, future, future);
try {
// update the weight and expiration timestamps
cache.replace(key, future, future);
} catch (Throwable t) {
logger.log(Level.WARNING, "Exception thrown during asynchronous load", t);
cache.remove(key, future);
if (error == null) {
error = t;
} else {
error.addSuppressed(t);
}
}
}
});
}
}

/** Adds to the cache any extra entries computed that were not requested. */
private void addNewEntries(Map<? extends K, ? extends V> result) {
result.forEach((key, value) -> {
// Adds to the cache any extra entries computed that were not requested
for (var entry : result.entrySet()) {
var key = entry.getKey();
var value = result.get(key);
if (!proxies.containsKey(key)) {
cache.put(key, CompletableFuture.completedFuture(value));
try {
cache.put(key, CompletableFuture.completedFuture(value));
} catch (Throwable t) {
logger.log(Level.WARNING, "Exception thrown during asynchronous load", t);
if (error == null) {
error = t;
} else {
error.addSuppressed(t);
}
}
}
});
}

return error;
}

static final class NullMapCompletionException extends CompletionException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,36 +295,42 @@ public CompletableFuture<Map<K, V>> refreshAll(Iterable<? extends K> keys) {
return;
}

boolean[] discard = new boolean[1];
var value = asyncCache.cache().compute(key, (ignored, currentValue) -> {
var successful = asyncCache.cache().refreshes().remove(keyReference, castedFuture);
if (successful && (currentValue == oldValueFuture[0])) {
if (currentValue == null) {
// If the entry is absent then discard the refresh and maybe notifying the listener
discard[0] = (newValue != null);
return null;
} else if ((currentValue == newValue) || (currentValue == castedFuture)) {
// If the reloaded value is the same instance then no-op
return currentValue;
} else if (newValue == Async.getIfReady((CompletableFuture<?>) currentValue)) {
// If the completed futures hold the same value instance then no-op
return currentValue;
try {
boolean[] discard = new boolean[1];
var value = asyncCache.cache().compute(key, (ignored, currentValue) -> {
var successful = asyncCache.cache().refreshes().remove(keyReference, castedFuture);
if (successful && (currentValue == oldValueFuture[0])) {
if (currentValue == null) {
// If the entry is absent then discard the refresh and maybe notifying the listener
discard[0] = (newValue != null);
return null;
} else if ((currentValue == newValue) || (currentValue == castedFuture)) {
// If the reloaded value is the same instance then no-op
return currentValue;
} else if (newValue == Async.getIfReady((CompletableFuture<?>) currentValue)) {
// If the completed futures hold the same value instance then no-op
return currentValue;
}
return (newValue == null) ? null : castedFuture;
}
return (newValue == null) ? null : castedFuture;
// Otherwise, a write invalidated the refresh so discard it and notify the listener
discard[0] = true;
return currentValue;
}, asyncCache.cache().expiry(), /* recordLoad */ false, /* recordLoadFailure */ true);

if (discard[0] && (newValue != null)) {
var cause = (value == null) ? RemovalCause.EXPLICIT : RemovalCause.REPLACED;
asyncCache.cache().notifyRemoval(key, castedFuture, cause);
}
// Otherwise, a write invalidated the refresh so discard it and notify the listener
discard[0] = true;
return currentValue;
}, asyncCache.cache().expiry(), /* recordLoad */ false, /* recordLoadFailure */ true);

if (discard[0] && (newValue != null)) {
var cause = (value == null) ? RemovalCause.EXPLICIT : RemovalCause.REPLACED;
asyncCache.cache().notifyRemoval(key, castedFuture, cause);
}
if (newValue == null) {
if (newValue == null) {
asyncCache.cache().statsCounter().recordLoadFailure(loadTime);
} else {
asyncCache.cache().statsCounter().recordLoadSuccess(loadTime);
}
} catch (Throwable t) {
logger.log(Level.WARNING, "Exception thrown during asynchronous load", t);
asyncCache.cache().statsCounter().recordLoadFailure(loadTime);
} else {
asyncCache.cache().statsCounter().recordLoadSuccess(loadTime);
asyncCache.cache().remove(key, castedFuture);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static com.github.benmanes.caffeine.testing.CollectionSubject.assertThat;
import static com.github.benmanes.caffeine.testing.FutureSubject.assertThat;
import static com.github.benmanes.caffeine.testing.IntSubject.assertThat;
import static com.github.benmanes.caffeine.testing.LoggingEventSubject.assertLogs;
import static com.github.benmanes.caffeine.testing.MapSubject.assertThat;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
Expand Down Expand Up @@ -739,11 +740,11 @@ public void getAllBifunction_absent_failure(AsyncCache<Int, Int> cache, CacheCon
assertThat(future).failsWith(CompletionException.class)
.hasCauseThat().isInstanceOf(IllegalStateException.class);
assertThat(context).stats().hits(0).misses(context.absentKeys().size()).success(0).failures(1);

var event = Iterables.getOnlyElement(TestLoggerFactory.getLoggingEvents());
assertThat(event.getFormattedMessage()).isEqualTo("Exception thrown during asynchronous load");
assertThat(event.getThrowable().orElseThrow()).isInstanceOf(IllegalStateException.class);
assertThat(event.getLevel()).isEqualTo(WARN);
assertLogs()
.withLevel(WARN)
.withThrowable(IllegalStateException.class)
.withMessage("Exception thrown during asynchronous load")
.hasSize(1);
}

@CacheSpec
Expand Down Expand Up @@ -1010,7 +1011,7 @@ public void getAllBifunction_early_failure(AsyncCache<Int, Int> cache, CacheCont
} else {
assertThat(result.join()).containsExactlyEntriesIn(context.absent());
}
assertThat(TestLoggerFactory.getLoggingEvents()).isEmpty();
assertLogs().isEmpty();
}

/* --------------- put --------------- */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import static com.github.benmanes.caffeine.cache.testing.CacheContext.intern;
import static com.github.benmanes.caffeine.cache.testing.CacheContextSubject.assertThat;
import static com.github.benmanes.caffeine.cache.testing.CacheSubject.assertThat;
import static com.github.benmanes.caffeine.testing.LoggingEventSubject.assertLogs;
import static com.github.benmanes.caffeine.testing.MapSubject.assertThat;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth.assertWithMessage;
Expand Down Expand Up @@ -86,7 +86,6 @@
import com.github.benmanes.caffeine.cache.testing.CheckNoEvictions;
import com.github.benmanes.caffeine.cache.testing.CheckNoStats;
import com.github.benmanes.caffeine.testing.Int;
import com.github.valfirst.slf4jtest.TestLoggerFactory;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSetMultimap;
Expand Down Expand Up @@ -845,11 +844,11 @@ public void cleanup(Cache<Int, Int> cache, CacheContext context) {
@CacheSpec(population = Population.SINGLETON, removalListener = Listener.REJECTING)
public void removalListener_error_log(Cache<Int, Int> cache, CacheContext context) {
cache.invalidateAll();

var event = Iterables.getOnlyElement(TestLoggerFactory.getLoggingEvents());
assertThat(event.getLevel()).isEqualTo(WARN);
assertThat(event.getFormattedMessage()).isEqualTo("Exception thrown by removal listener");
assertThat(event.getThrowable().orElseThrow()).isInstanceOf(RejectedExecutionException.class);
assertLogs()
.withMessage("Exception thrown by removal listener")
.withThrowable(RejectedExecutionException.class)
.withLevel(WARN)
.hasSize(1);
}

@CheckMaxLogLevel(ERROR)
Expand All @@ -859,13 +858,11 @@ public void removalListener_error_log(Cache<Int, Int> cache, CacheContext contex
removalListener = Listener.CONSUMING)
public void removalListener_submit_error_log(Cache<Int, Int> cache, CacheContext context) {
cache.invalidateAll();

var event = Iterables.getOnlyElement(TestLoggerFactory.getLoggingEvents().stream()
.filter(e -> e.getLevel() == ERROR)
.collect(toImmutableList()));
assertThat(event.getFormattedMessage()).isEqualTo(
"Exception thrown when submitting removal listener");
assertThat(event.getThrowable().orElseThrow()).isInstanceOf(RejectedExecutionException.class);
assertLogs()
.withMessage("Exception thrown when submitting removal listener")
.withThrowable(RejectedExecutionException.class)
.withLevel(ERROR)
.hasSize(1);
}

@CheckNoStats
Expand Down
Loading

0 comments on commit 9d7850f

Please sign in to comment.