diff --git a/extensions/infinispan-cache/runtime/src/main/java/io/quarkus/cache/infinispan/runtime/InfinispanCacheImpl.java b/extensions/infinispan-cache/runtime/src/main/java/io/quarkus/cache/infinispan/runtime/InfinispanCacheImpl.java index a92f6f4e70fd7..cc0ea14720eb6 100644 --- a/extensions/infinispan-cache/runtime/src/main/java/io/quarkus/cache/infinispan/runtime/InfinispanCacheImpl.java +++ b/extensions/infinispan-cache/runtime/src/main/java/io/quarkus/cache/infinispan/runtime/InfinispanCacheImpl.java @@ -5,6 +5,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; import java.util.concurrent.Flow; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -24,6 +25,10 @@ import io.quarkus.infinispan.client.runtime.InfinispanClientUtil; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; +import io.vertx.core.Context; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.impl.ContextInternal; /** * This class is an internal Quarkus cache implementation using Infinispan. @@ -111,6 +116,8 @@ public Uni get(K key, Function valueLoader) { @Override public Uni getAsync(K key, Function> valueLoader) { + Context context = Vertx.currentContext(); + return Uni.createFrom().completionStage(CompletionStages.handleAndCompose(remoteCache.getAsync(key), (v1, ex1) -> { if (ex1 != null) { return CompletableFuture.failedFuture(ex1); @@ -145,7 +152,49 @@ public Uni getAsync(K key, Function> valueLoader) { } }); return resultAsync; - })); + })).emitOn(new Executor() { + // We need make sure we go back to the original context when the cache value is computed. + // Otherwise, we would always emit on the context having computed the value, which could + // break the duplicated context isolation. + @Override + public void execute(Runnable command) { + Context ctx = Vertx.currentContext(); + if (context == null) { + // We didn't capture a context + if (ctx == null) { + // We are not on a context => we can execute immediately. + command.run(); + } else { + // We are on a context. + // We cannot continue on the current context as we may share a duplicated context. + // We need a new one. Note that duplicate() does not duplicate the duplicated context, + // but the root context. + ((ContextInternal) ctx).duplicate() + .runOnContext(new Handler() { + @Override + public void handle(Void ignored) { + command.run(); + } + }); + } + } else { + // We captured a context. + if (ctx == context) { + // We are on the same context => we can execute immediately + command.run(); + } else { + // 1) We are not on a context (ctx == null) => we need to switch to the captured context. + // 2) We are on a different context (ctx != null) => we need to switch to the captured context. + context.runOnContext(new Handler() { + @Override + public void handle(Void ignored) { + command.run(); + } + }); + } + } + } + }); } @Override