Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Assert not same executor when completing future #108934

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
4f1a539
Assert not same executor when completing future
henningandersen May 23, 2024
d84d275
Get proper debug info out of assertion.
henningandersen May 23, 2024
39450b7
Remove dummy assertion.
henningandersen May 23, 2024
ef2a6a4
cs
henningandersen May 23, 2024
a7b6a85
comments.
henningandersen May 23, 2024
41c54e1
nasty hack to see more issues.
henningandersen May 23, 2024
7722a9e
Add and use UnsafePlainActionFuture
henningandersen May 24, 2024
1508a6f
CCR unsafety
henningandersen May 24, 2024
43a477b
Completion stats and searchable snapshot
henningandersen May 24, 2024
4cc0c59
Disruption IT
henningandersen May 24, 2024
7896110
Transport test cases
henningandersen May 24, 2024
199676e
Transport test case plus corrupted blob store IT
henningandersen May 24, 2024
2e031fb
spotless
henningandersen May 24, 2024
8de5f11
right executor
henningandersen May 24, 2024
6554796
ml
henningandersen May 24, 2024
356a846
security
henningandersen May 24, 2024
a6466a2
Merge remote-tracking branch 'origin/main' into assert_not_same_execu…
henningandersen May 25, 2024
aec8017
Simplifiy
henningandersen May 25, 2024
c6f9c32
ccr double encounter
henningandersen May 25, 2024
ce01fd9
silly mistake
henningandersen May 26, 2024
7205c23
inference runner
henningandersen May 27, 2024
ac36c97
Mark deprecated.
henningandersen May 27, 2024
1edb078
Stateless prewarming
henningandersen May 27, 2024
1c1d48a
spotless
henningandersen May 27, 2024
bedbda8
Disable assertion for merge.
henningandersen May 28, 2024
0992112
Merge remote-tracking branch 'origin/main' into assert_not_same_execu…
henningandersen May 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.UnsafePlainActionFuture;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
Expand All @@ -26,6 +27,7 @@
import org.elasticsearch.cluster.routing.Murmur3HashFunction;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
Expand Down Expand Up @@ -542,7 +544,7 @@ public void testRejoinWhileBeingRemoved() {
});

final ClusterService dataClusterService = internalCluster().getInstance(ClusterService.class, dataNode);
final PlainActionFuture<Void> failedLeader = new PlainActionFuture<>() {
final PlainActionFuture<Void> failedLeader = new UnsafePlainActionFuture<>(ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME) {
@Override
protected boolean blockingAllowed() {
// we're deliberately blocking the cluster applier on the master until the data node starts to rejoin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,8 @@ public void testHandlingMissingRootLevelSnapshotMetadata() throws Exception {
final ThreadPool threadPool = internalCluster().getCurrentMasterNodeInstance(ThreadPool.class);
assertThat(
PlainActionFuture.get(
f -> threadPool.generic()
// any other executor than generic and management
f -> threadPool.executor(ThreadPool.Names.SNAPSHOT)
.execute(
ActionRunnable.supply(
f,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
package org.elasticsearch.action.support;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException;
import org.elasticsearch.core.CheckedConsumer;
Expand All @@ -37,6 +39,7 @@ public void onResponse(@Nullable T result) {

@Override
public void onFailure(Exception e) {
assert assertCompleteAllowed();
if (sync.setException(Objects.requireNonNull(e))) {
done(false);
}
Expand Down Expand Up @@ -113,6 +116,7 @@ public boolean isCancelled() {

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
assert assertCompleteAllowed();
if (sync.cancel() == false) {
return false;
}
Expand All @@ -130,6 +134,7 @@ public boolean cancel(boolean mayInterruptIfRunning) {
* @return true if the state was successfully changed.
*/
protected final boolean set(@Nullable T value) {
assert assertCompleteAllowed();
boolean result = sync.set(value);
if (result) {
done(true);
Expand Down Expand Up @@ -399,4 +404,27 @@ public static <T, E extends Exception> T get(CheckedConsumer<PlainActionFuture<T
e.accept(fut);
return fut.actionGet(timeout, unit);
}

private boolean assertCompleteAllowed() {
Thread waiter = sync.getFirstQueuedThread();
// todo: reenable assertion once downstream code is updated
assert true || waiter == null || allowedExecutors(waiter, Thread.currentThread())
: "cannot complete future on thread "
+ Thread.currentThread()
+ " with waiter on thread "
+ waiter
+ ", could deadlock if pool was full\n"
+ ExceptionsHelper.formatStackTrace(waiter.getStackTrace());
return true;
}

// only used in assertions
boolean allowedExecutors(Thread thread1, Thread thread2) {
// this should only be used to validate thread interactions, like not waiting for a future completed on the same
// executor, hence calling it with the same thread indicates a bug in the assertion using this.
assert thread1 != thread2 : "only call this for different threads";
String thread1Name = EsExecutors.executorName(thread1);
String thread2Name = EsExecutors.executorName(thread2);
return thread1Name == null || thread2Name == null || thread1Name.equals(thread2Name) == false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.support;

import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.CheckedConsumer;

import java.util.Objects;

/**
* An unsafe future. You should not need to use this for new code, rather you should be able to convert that code to be async
* or use a clear hierarchy of thread pool executors around the future.
*
* This future is unsafe, since it allows notifying the future on the same thread pool executor that it is being waited on. This
* is a common deadlock scenario, since all threads may be waiting and thus no thread may be able to complete the future.
*/
@Deprecated(forRemoval = true)
public class UnsafePlainActionFuture<T> extends PlainActionFuture<T> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opted to add this unsafe variant that is then used in all places where I found conflicts, since fixing them all would make this PR size unmanageable. Using this we can now fix them one by one, spreading out the load too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 can we mark this as @Deprecated(forRemoval = true) so that IDEs highlight its usages?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a task/issue/ticket on fixing them one-by-one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think one by one is too granular. I'll probably create area level ones (perhaps a bit more granular, for instance the one in AbstractClient needs a separate one).


private final String unsafeExecutor;
private final String unsafeExecutor2;

public UnsafePlainActionFuture(String unsafeExecutor) {
this(unsafeExecutor, null);
}

public UnsafePlainActionFuture(String unsafeExecutor, String unsafeExecutor2) {
Objects.requireNonNull(unsafeExecutor);
this.unsafeExecutor = unsafeExecutor;
this.unsafeExecutor2 = unsafeExecutor2;
}

@Override
boolean allowedExecutors(Thread thread1, Thread thread2) {
return super.allowedExecutors(thread1, thread2)
|| unsafeExecutor.equals(EsExecutors.executorName(thread1))
|| unsafeExecutor2 == null
|| unsafeExecutor2.equals(EsExecutors.executorName(thread1));
}

public static <T, E extends Exception> T get(CheckedConsumer<PlainActionFuture<T>, E> e, String allowedExecutor) throws E {
PlainActionFuture<T> fut = new UnsafePlainActionFuture<>(allowedExecutor);
e.accept(fut);
return fut.actionGet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.search.TransportSearchScrollAction;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.UnsafePlainActionFuture;
import org.elasticsearch.action.termvectors.MultiTermVectorsAction;
import org.elasticsearch.action.termvectors.MultiTermVectorsRequest;
import org.elasticsearch.action.termvectors.MultiTermVectorsRequestBuilder;
Expand Down Expand Up @@ -410,7 +411,13 @@ protected <Request extends ActionRequest, Response extends ActionResponse> void
* on the result before it goes out of scope.
* @param <R> reference counted result type
*/
private static class RefCountedFuture<R extends RefCounted> extends PlainActionFuture<R> {
// todo: the use of UnsafePlainActionFuture here is quite broad, we should find a better way to be more specific
// (unless making all usages safe is easy).
private static class RefCountedFuture<R extends RefCounted> extends UnsafePlainActionFuture<R> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yikes. I suspect we can/should move all usages of this into tests, and do the ref-counting (and asyncification) properly in prod code. But I see that's not a small change. Ok for now...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I also pondered on this for a short while, but decided to defer this issue for now. I think I agree to move to all prod client interactions being async now. If it is too big we would need to stop notifying on generic thread pool (one more pool maybe, hopefully as an interim step). We can discuss more when we tackle it.


private RefCountedFuture() {
super(ThreadPool.Names.GENERIC);
}

@Override
public final void onResponse(R result) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,10 @@ public static String executorName(String threadName) {
return threadName.substring(executorNameStart + 1, executorNameEnd);
}

public static String executorName(Thread thread) {
return executorName(thread.getName());
}

public static ThreadFactory daemonThreadFactory(Settings settings, String namePrefix) {
return daemonThreadFactory(threadName(settings, namePrefix));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
import org.apache.lucene.search.suggest.document.CompletionTerms;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.UnsafePlainActionFuture;
import org.elasticsearch.common.FieldMemoryStats;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.search.suggest.completion.CompletionStats;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -42,7 +44,7 @@ public CompletionStatsCache(Supplier<Engine.Searcher> searcherSupplier) {
}

public CompletionStats get(String... fieldNamePatterns) {
final PlainActionFuture<CompletionStats> newFuture = new PlainActionFuture<>();
final PlainActionFuture<CompletionStats> newFuture = new UnsafePlainActionFuture<>(ThreadPool.Names.MANAGEMENT);
final PlainActionFuture<CompletionStats> oldFuture = completionStatsFutureRef.compareAndExchange(null, newFuture);

if (oldFuture != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.UnsafePlainActionFuture;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.logging.Loggers;
Expand Down Expand Up @@ -75,6 +76,7 @@
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.search.suggest.completion.CompletionStats;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transports;

import java.io.Closeable;
Expand Down Expand Up @@ -1956,7 +1958,7 @@ private boolean drainForClose() {

logger.debug("drainForClose(): draining ops");
releaseEnsureOpenRef.close();
final var future = new PlainActionFuture<Void>() {
final var future = new UnsafePlainActionFuture<Void>(ThreadPool.Names.GENERIC) {
@Override
protected boolean blockingAllowed() {
// TODO remove this blocking, or at least do it elsewhere, see https://github.com/elastic/elasticsearch/issues/89821
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.UnsafePlainActionFuture;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
Expand Down Expand Up @@ -869,7 +870,7 @@ protected final void recoverUnstartedReplica(
routingTable
);
try {
PlainActionFuture<RecoveryResponse> future = new PlainActionFuture<>();
PlainActionFuture<RecoveryResponse> future = new UnsafePlainActionFuture<>(ThreadPool.Names.GENERIC);
recovery.recoverToTarget(future);
future.actionGet();
recoveryTarget.markAsDone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.UnsafePlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.node.VersionInformation;
Expand Down Expand Up @@ -960,7 +961,7 @@ public void onFailure(Exception e) {
protected void doRun() throws Exception {
safeAwait(go);
for (int iter = 0; iter < 10; iter++) {
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
PlainActionFuture<TestResponse> listener = new UnsafePlainActionFuture<>(ThreadPool.Names.GENERIC);
final String info = sender + "_B_" + iter;
serviceB.sendRequest(
nodeA,
Expand Down Expand Up @@ -996,7 +997,7 @@ public void onFailure(Exception e) {
protected void doRun() throws Exception {
go.await();
for (int iter = 0; iter < 10; iter++) {
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
PlainActionFuture<TestResponse> listener = new UnsafePlainActionFuture<>(ThreadPool.Names.GENERIC);
final String info = sender + "_" + iter;
final DiscoveryNode node = nodeB; // capture now
try {
Expand Down Expand Up @@ -3464,7 +3465,7 @@ public static void connectToNode(TransportService service, DiscoveryNode node) t
* @param connectionProfile the connection profile to use when connecting to this node
*/
public static void connectToNode(TransportService service, DiscoveryNode node, ConnectionProfile connectionProfile) {
PlainActionFuture.get(fut -> service.connectToNode(node, connectionProfile, fut.map(x -> null)));
UnsafePlainActionFuture.get(fut -> service.connectToNode(node, connectionProfile, fut.map(x -> null)), ThreadPool.Names.GENERIC);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.RefCountingListener;
import org.elasticsearch.action.support.UnsafePlainActionFuture;
import org.elasticsearch.blobcache.BlobCacheMetrics;
import org.elasticsearch.blobcache.BlobCacheUtils;
import org.elasticsearch.blobcache.common.ByteRange;
Expand All @@ -36,6 +37,7 @@
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.monitor.fs.FsProbe;
import org.elasticsearch.node.NodeRoleSettings;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
Expand Down Expand Up @@ -1136,7 +1138,9 @@ private int readMultiRegions(
int startRegion,
int endRegion
) throws InterruptedException, ExecutionException {
final PlainActionFuture<Void> readsComplete = new PlainActionFuture<>();
final PlainActionFuture<Void> readsComplete = new UnsafePlainActionFuture<>(
BlobStoreRepository.STATELESS_SHARD_PREWARMING_THREAD_NAME
);
final AtomicInteger bytesRead = new AtomicInteger();
try (var listeners = new RefCountingListener(1, readsComplete)) {
for (int region = startRegion; region <= endRegion; region++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.action.support.ListenerTimeouts;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.action.support.UnsafePlainActionFuture;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.RemoteClusterClient;
import org.elasticsearch.cluster.ClusterName;
Expand Down Expand Up @@ -599,7 +600,11 @@ private void updateMappings(
Client followerClient,
Index followerIndex
) {
final PlainActionFuture<IndexMetadata> indexMetadataFuture = new PlainActionFuture<>();
// todo: this could manifest in production and seems we could make this async easily.
final PlainActionFuture<IndexMetadata> indexMetadataFuture = new UnsafePlainActionFuture<>(
Ccr.CCR_THREAD_POOL_NAME,
ThreadPool.Names.GENERIC
);
final long startTimeInNanos = System.nanoTime();
final Supplier<TimeValue> timeout = () -> {
final long elapsedInNanos = System.nanoTime() - startTimeInNanos;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.UnsafePlainActionFuture;
import org.elasticsearch.action.support.replication.PostWriteRefresh;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportWriteAction;
Expand Down Expand Up @@ -802,7 +803,7 @@ class CcrAction extends ReplicationAction<BulkShardOperationsRequest, BulkShardO

@Override
protected void performOnPrimary(IndexShard primary, BulkShardOperationsRequest request, ActionListener<PrimaryResult> listener) {
final PlainActionFuture<Releasable> permitFuture = new PlainActionFuture<>();
final PlainActionFuture<Releasable> permitFuture = new UnsafePlainActionFuture<>(ThreadPool.Names.GENERIC);
primary.acquirePrimaryOperationPermit(permitFuture, EsExecutors.DIRECT_EXECUTOR_SERVICE);
final TransportWriteAction.WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> ccrResult;
final var threadpool = mock(ThreadPool.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.UnsafePlainActionFuture;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -31,6 +32,7 @@
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.dataframe.DestinationIndex;
import org.elasticsearch.xpack.ml.dataframe.stats.DataCountsTracker;
import org.elasticsearch.xpack.ml.dataframe.stats.ProgressTracker;
Expand Down Expand Up @@ -100,7 +102,9 @@ public void run(String modelId) {

LOGGER.info("[{}] Started inference on test data against model [{}]", config.getId(), modelId);
try {
PlainActionFuture<LocalModel> localModelPlainActionFuture = new PlainActionFuture<>();
PlainActionFuture<LocalModel> localModelPlainActionFuture = new UnsafePlainActionFuture<>(
MachineLearning.UTILITY_THREAD_POOL_NAME
);
modelLoadingService.getModelForInternalInference(modelId, localModelPlainActionFuture);
InferenceState inferenceState = restoreInferenceState();
dataCountsTracker.setTestDocsCount(inferenceState.processedTestDocsCount);
Expand Down