Skip to content

Commit

Permalink
Correct context for CancellableSOCache listener (#83021) (#83169)
Browse files Browse the repository at this point in the history
* Correct context for CancellableSOCache listener (#83021)

Today the `CancellableSingleObjectCache` completes its listeners in the
thread context of the `get()` call that actually computes the value
which will be the correct context only if no batching took place. With
this commit we make sure to complete each listener in the context in
which it was passed to the corresponding `get()` call.

* Fix changelog YAML

* Spotless
  • Loading branch information
DaveCTurner committed Jan 26, 2022
1 parent 8668691 commit 3b494e4
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 15 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/83021.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 83021
summary: Correct context for CancellableSOCache listener
area: Features/Stats
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.CancellableSingleObjectCache;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.seqno.RetentionLeaseStats;
Expand Down Expand Up @@ -69,8 +70,8 @@ public class TransportClusterStatsAction extends TransportNodesAction<
private final NodeService nodeService;
private final IndicesService indicesService;

private final MetadataStatsCache<MappingStats> mappingStatsCache = new MetadataStatsCache<>(MappingStats::of);
private final MetadataStatsCache<AnalysisStats> analysisStatsCache = new MetadataStatsCache<>(AnalysisStats::of);
private final MetadataStatsCache<MappingStats> mappingStatsCache;
private final MetadataStatsCache<AnalysisStats> analysisStatsCache;

@Inject
public TransportClusterStatsAction(
Expand All @@ -95,6 +96,8 @@ public TransportClusterStatsAction(
);
this.nodeService = nodeService;
this.indicesService = indicesService;
this.mappingStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), MappingStats::of);
this.analysisStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), AnalysisStats::of);
}

@Override
Expand Down Expand Up @@ -264,7 +267,8 @@ public void writeTo(StreamOutput out) throws IOException {
private static class MetadataStatsCache<T> extends CancellableSingleObjectCache<Metadata, Long, T> {
private final BiFunction<Metadata, Runnable, T> function;

MetadataStatsCache(BiFunction<Metadata, Runnable, T> function) {
MetadataStatsCache(ThreadContext threadContext, BiFunction<Metadata, Runnable, T> function) {
super(threadContext);
this.function = function;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
package org.elasticsearch.common.util;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.support.ListenableActionFuture;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.tasks.TaskCancelledException;
Expand Down Expand Up @@ -41,8 +43,14 @@
*/
public abstract class CancellableSingleObjectCache<Input, Key, Value> {

private final ThreadContext threadContext;

private final AtomicReference<CachedItem> currentCachedItemRef = new AtomicReference<>();

protected CancellableSingleObjectCache(ThreadContext threadContext) {
this.threadContext = threadContext;
}

/**
* Compute a new value for the cache.
* <p>
Expand Down Expand Up @@ -220,7 +228,7 @@ boolean addListener(ActionListener<Value> listener, BooleanSupplier isCancelled)
ActionListener.completeWith(listener, () -> future.actionGet(0L));
} else {
// Refresh is still pending; it's not cancelled because there are still references.
future.addListener(listener);
future.addListener(ContextPreservingActionListener.wrapPreservingContext(listener, threadContext));
final AtomicBoolean released = new AtomicBoolean();
cancellationChecks.add(() -> {
if (released.get() == false && isCancelled.getAsBoolean() && released.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
Expand Down Expand Up @@ -193,10 +195,11 @@ public void testExceptionCompletesListenersButIsNotCached() {
public void testConcurrentRefreshesAndCancellation() throws InterruptedException {
final ThreadPool threadPool = new TestThreadPool("test");
try {
final ThreadContext threadContext = threadPool.getThreadContext();
final CancellableSingleObjectCache<String, String, Integer> testCache = new CancellableSingleObjectCache<
String,
String,
Integer>() {
Integer>(threadContext) {
@Override
protected void refresh(
String s,
Expand All @@ -222,6 +225,7 @@ protected String getKey(String s) {
final CountDownLatch startLatch = new CountDownLatch(1);
final CountDownLatch finishLatch = new CountDownLatch(count);
final BlockingQueue<Runnable> queue = ConcurrentCollections.newBlockingQueue();
final String contextHeader = "test-context-header";

for (int i = 0; i < count; i++) {
final boolean cancel = randomBoolean();
Expand All @@ -236,11 +240,14 @@ protected String getKey(String s) {
final StepListener<Integer> stepListener = new StepListener<>();
final AtomicBoolean isComplete = new AtomicBoolean();
final AtomicBoolean isCancelled = new AtomicBoolean();
testCache.get(
input,
isCancelled::get,
ActionListener.runBefore(stepListener, () -> assertTrue(isComplete.compareAndSet(false, true)))
);
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
final String contextValue = randomAlphaOfLength(10);
threadContext.putHeader(contextHeader, contextValue);
testCache.get(input, isCancelled::get, ActionListener.runBefore(stepListener, () -> {
assertTrue(isComplete.compareAndSet(false, true));
assertThat(threadContext.getHeader(contextHeader), equalTo(contextValue));
}));
}

final Runnable next = queue.poll();
if (next != null) {
Expand Down Expand Up @@ -283,7 +290,7 @@ public void testConcurrentRefreshesWithFreshnessCheck() throws InterruptedExcept
final CancellableSingleObjectCache<String, String, Integer> testCache = new CancellableSingleObjectCache<
String,
String,
Integer>() {
Integer>(threadPool.getThreadContext()) {
@Override
protected void refresh(
String s,
Expand Down Expand Up @@ -386,10 +393,9 @@ public void run() {
}
};

final CancellableSingleObjectCache<String, String, Integer> testCache = new CancellableSingleObjectCache<
String,
String,
Integer>() {
final CancellableSingleObjectCache<String, String, Integer> testCache = new CancellableSingleObjectCache<String, String, Integer>(
testThreadContext
) {
@Override
protected void refresh(
String s,
Expand Down Expand Up @@ -433,10 +439,16 @@ protected String getKey(String s) {
expectThrows(TaskCancelledException.class, () -> cancelledFuture.actionGet(0L));
}

private static final ThreadContext testThreadContext = new ThreadContext(Settings.EMPTY);

private static class TestCache extends CancellableSingleObjectCache<String, String, Integer> {

private final LinkedList<StepListener<Function<String, Integer>>> pendingRefreshes = new LinkedList<>();

private TestCache() {
super(testThreadContext);
}

@Override
protected void refresh(
String input,
Expand Down

0 comments on commit 3b494e4

Please sign in to comment.