From df0f31b9a0edec3955b14e5bf4bafbab54fce79d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Tue, 11 Apr 2023 18:33:38 +0200 Subject: [PATCH] Inject ThreadPool during PersistedState creation in CoordinatorTests. --- .../AtomicRegisterCoordinatorTests.java | 9 +++- .../coordination/CoordinatorTests.java | 14 +++--- .../AbstractCoordinatorTestCase.java | 47 +++++++++++++------ 3 files changed, 48 insertions(+), 22 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/AtomicRegisterCoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/AtomicRegisterCoordinatorTests.java index c2bfe6cbecc0a..bf733a362e492 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/AtomicRegisterCoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/AtomicRegisterCoordinatorTests.java @@ -343,7 +343,11 @@ public PreVoteCollector.Factory getPreVoteCollectorFactory() { } @Override - public CoordinationState.PersistedState createFreshPersistedState(DiscoveryNode localNode, BooleanSupplier disruptStorage) { + public CoordinationState.PersistedState createFreshPersistedState( + DiscoveryNode localNode, + BooleanSupplier disruptStorage, + ThreadPool threadPool + ) { return new AtomicRegisterPersistedState(localNode, sharedStore); } @@ -355,7 +359,8 @@ public CoordinationState.PersistedState createPersistedStateFromExistingState( Function adaptCurrentTerm, LongSupplier currentTimeInMillisSupplier, NamedWriteableRegistry namedWriteableRegistry, - BooleanSupplier disruptStorage + BooleanSupplier disruptStorage, + ThreadPool threadPool ) { return new AtomicRegisterPersistedState(newLocalNode, sharedStore); } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index aeb1ed5d72675..afe601376b784 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -1291,12 +1291,14 @@ public void testCannotJoinClusterWithDifferentUUID() { } final ClusterNode newNode = cluster1.new ClusterNode( - nextNodeIndex.getAndIncrement(), nodeInOtherCluster.getLocalNode(), n -> cluster1.createPersistedStateFromExistingState( - n, - nodeInOtherCluster.persistedState, - Function.identity(), - Function.identity() - ), nodeInOtherCluster.nodeSettings, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info") + nextNodeIndex.getAndIncrement(), nodeInOtherCluster.getLocalNode(), (node, threadPool) -> cluster1 + .createPersistedStateFromExistingState( + node, + nodeInOtherCluster.persistedState, + Function.identity(), + Function.identity(), + threadPool + ), nodeInOtherCluster.nodeSettings, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info") ); cluster1.clusterNodes.add(newNode); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java index c05d99265cdc1..662bb4b345390 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -106,6 +106,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.function.BooleanSupplier; import java.util.function.Consumer; import java.util.function.Function; @@ -899,15 +900,16 @@ private NamedWriteableRegistry getNamedWriteableRegistry() { ); } - CoordinationState.PersistedState createFreshPersistedState(DiscoveryNode localNode) { - return coordinatorStrategy.createFreshPersistedState(localNode, () -> disruptStorage); + CoordinationState.PersistedState createFreshPersistedState(DiscoveryNode localNode, ThreadPool threadPool) { + return coordinatorStrategy.createFreshPersistedState(localNode, () -> disruptStorage, threadPool); } CoordinationState.PersistedState createPersistedStateFromExistingState( DiscoveryNode newLocalNode, CoordinationState.PersistedState oldState, Function adaptGlobalMetadata, - Function adaptCurrentTerm + Function adaptCurrentTerm, + ThreadPool threadPool ) { return coordinatorStrategy.createPersistedStateFromExistingState( newLocalNode, @@ -916,7 +918,8 @@ CoordinationState.PersistedState createPersistedStateFromExistingState( adaptCurrentTerm, deterministicTaskQueue::getCurrentTimeMillis, getNamedWriteableRegistry(), - () -> disruptStorage + () -> disruptStorage, + threadPool ); } @@ -953,7 +956,7 @@ public class ClusterNode { ClusterNode( int nodeIndex, DiscoveryNode localNode, - Function persistedStateSupplier, + BiFunction persistedStateSupplier, Settings nodeSettings, NodeHealthService nodeHealthService ) { @@ -961,11 +964,12 @@ public class ClusterNode { this.nodeIndex = nodeIndex; this.localNode = localNode; this.nodeSettings = nodeSettings; - persistedState = persistedStateSupplier.apply(localNode); + final ThreadPool threadPool = deterministicTaskQueue.getThreadPool(this::onNode); + persistedState = persistedStateSupplier.apply(localNode, threadPool); assertTrue("must use a fresh PersistedState", openPersistedStates.add(persistedState)); boolean success = false; try { - DeterministicTaskQueue.onNodeLog(localNode, this::setUp).run(); + DeterministicTaskQueue.onNodeLog(localNode, () -> setUp(threadPool)).run(); success = true; } finally { if (success == false) { @@ -974,8 +978,7 @@ public class ClusterNode { } } - private void setUp() { - final ThreadPool threadPool = deterministicTaskQueue.getThreadPool(this::onNode); + private void setUp(ThreadPool threadPool) { clearableRecycler = new ClearableRecycler(recycler); mockTransport = new DisruptableMockTransport(localNode, deterministicTaskQueue) { @Override @@ -1189,7 +1192,13 @@ ClusterNode restartedNode( return new ClusterNode( nodeIndex, newLocalNode, - node -> createPersistedStateFromExistingState(newLocalNode, persistedState, adaptGlobalMetadata, adaptCurrentTerm), + (node, threadPool) -> createPersistedStateFromExistingState( + newLocalNode, + persistedState, + adaptGlobalMetadata, + adaptCurrentTerm, + threadPool + ), settings, nodeHealthService ); @@ -1471,7 +1480,11 @@ CoordinationServices getCoordinationServices( CoordinationState.PersistedState persistedState ); - CoordinationState.PersistedState createFreshPersistedState(DiscoveryNode localNode, BooleanSupplier disruptStorage); + CoordinationState.PersistedState createFreshPersistedState( + DiscoveryNode localNode, + BooleanSupplier disruptStorage, + ThreadPool threadPool + ); CoordinationState.PersistedState createPersistedStateFromExistingState( DiscoveryNode newLocalNode, @@ -1480,7 +1493,8 @@ CoordinationState.PersistedState createPersistedStateFromExistingState( Function adaptCurrentTerm, LongSupplier currentTimeInMillisSupplier, NamedWriteableRegistry namedWriteableRegistry, - BooleanSupplier disruptStorage + BooleanSupplier disruptStorage, + ThreadPool threadPool ); } @@ -1536,7 +1550,11 @@ public PreVoteCollector.Factory getPreVoteCollectorFactory() { } @Override - public CoordinationState.PersistedState createFreshPersistedState(DiscoveryNode localNode, BooleanSupplier disruptStorage) { + public CoordinationState.PersistedState createFreshPersistedState( + DiscoveryNode localNode, + BooleanSupplier disruptStorage, + ThreadPool threadPool + ) { return new MockPersistedState(localNode, disruptStorage); } @@ -1548,7 +1566,8 @@ public CoordinationState.PersistedState createPersistedStateFromExistingState( Function adaptCurrentTerm, LongSupplier currentTimeInMillisSupplier, NamedWriteableRegistry namedWriteableRegistry, - BooleanSupplier disruptStorage + BooleanSupplier disruptStorage, + ThreadPool threadPool ) { assert oldState instanceof MockPersistedState : oldState.getClass(); return new MockPersistedState(