Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,11 @@ public PreVoteCollector.Factory getPreVoteCollectorFactory() {
}

@Override
public CoordinationState.PersistedState createFreshPersistedState(DiscoveryNode localNode, BooleanSupplier disruptStorage) {
public CoordinationState.PersistedState createFreshPersistedState(
DiscoveryNode localNode,
BooleanSupplier disruptStorage,
ThreadPool threadPool
) {
return new AtomicRegisterPersistedState(localNode, sharedStore);
}

Expand All @@ -355,7 +359,8 @@ public CoordinationState.PersistedState createPersistedStateFromExistingState(
Function<Long, Long> adaptCurrentTerm,
LongSupplier currentTimeInMillisSupplier,
NamedWriteableRegistry namedWriteableRegistry,
BooleanSupplier disruptStorage
BooleanSupplier disruptStorage,
ThreadPool threadPool
) {
return new AtomicRegisterPersistedState(newLocalNode, sharedStore);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1291,12 +1291,14 @@ public void testCannotJoinClusterWithDifferentUUID() {
}

final ClusterNode newNode = cluster1.new ClusterNode(
nextNodeIndex.getAndIncrement(), nodeInOtherCluster.getLocalNode(), n -> cluster1.createPersistedStateFromExistingState(
n,
nodeInOtherCluster.persistedState,
Function.identity(),
Function.identity()
), nodeInOtherCluster.nodeSettings, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info")
nextNodeIndex.getAndIncrement(), nodeInOtherCluster.getLocalNode(), (node, threadPool) -> cluster1
.createPersistedStateFromExistingState(
node,
nodeInOtherCluster.persistedState,
Function.identity(),
Function.identity(),
threadPool
), nodeInOtherCluster.nodeSettings, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info")
);

cluster1.clusterNodes.add(newNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -899,15 +900,16 @@ private NamedWriteableRegistry getNamedWriteableRegistry() {
);
}

CoordinationState.PersistedState createFreshPersistedState(DiscoveryNode localNode) {
return coordinatorStrategy.createFreshPersistedState(localNode, () -> disruptStorage);
CoordinationState.PersistedState createFreshPersistedState(DiscoveryNode localNode, ThreadPool threadPool) {
return coordinatorStrategy.createFreshPersistedState(localNode, () -> disruptStorage, threadPool);
}

CoordinationState.PersistedState createPersistedStateFromExistingState(
DiscoveryNode newLocalNode,
CoordinationState.PersistedState oldState,
Function<Metadata, Metadata> adaptGlobalMetadata,
Function<Long, Long> adaptCurrentTerm
Function<Long, Long> adaptCurrentTerm,
ThreadPool threadPool
) {
return coordinatorStrategy.createPersistedStateFromExistingState(
newLocalNode,
Expand All @@ -916,7 +918,8 @@ CoordinationState.PersistedState createPersistedStateFromExistingState(
adaptCurrentTerm,
deterministicTaskQueue::getCurrentTimeMillis,
getNamedWriteableRegistry(),
() -> disruptStorage
() -> disruptStorage,
threadPool
);
}

Expand Down Expand Up @@ -953,19 +956,20 @@ public class ClusterNode {
ClusterNode(
int nodeIndex,
DiscoveryNode localNode,
Function<DiscoveryNode, CoordinationState.PersistedState> persistedStateSupplier,
BiFunction<DiscoveryNode, ThreadPool, CoordinationState.PersistedState> persistedStateSupplier,
Settings nodeSettings,
NodeHealthService nodeHealthService
) {
this.nodeHealthService = nodeHealthService;
this.nodeIndex = nodeIndex;
this.localNode = localNode;
this.nodeSettings = nodeSettings;
persistedState = persistedStateSupplier.apply(localNode);
final ThreadPool threadPool = deterministicTaskQueue.getThreadPool(this::onNode);
persistedState = persistedStateSupplier.apply(localNode, threadPool);
assertTrue("must use a fresh PersistedState", openPersistedStates.add(persistedState));
boolean success = false;
try {
DeterministicTaskQueue.onNodeLog(localNode, this::setUp).run();
DeterministicTaskQueue.onNodeLog(localNode, () -> setUp(threadPool)).run();
success = true;
} finally {
if (success == false) {
Expand All @@ -974,8 +978,7 @@ public class ClusterNode {
}
}

private void setUp() {
final ThreadPool threadPool = deterministicTaskQueue.getThreadPool(this::onNode);
private void setUp(ThreadPool threadPool) {
clearableRecycler = new ClearableRecycler(recycler);
mockTransport = new DisruptableMockTransport(localNode, deterministicTaskQueue) {
@Override
Expand Down Expand Up @@ -1189,7 +1192,13 @@ ClusterNode restartedNode(
return new ClusterNode(
nodeIndex,
newLocalNode,
node -> createPersistedStateFromExistingState(newLocalNode, persistedState, adaptGlobalMetadata, adaptCurrentTerm),
(node, threadPool) -> createPersistedStateFromExistingState(
newLocalNode,
persistedState,
adaptGlobalMetadata,
adaptCurrentTerm,
threadPool
),
settings,
nodeHealthService
);
Expand Down Expand Up @@ -1471,7 +1480,11 @@ CoordinationServices getCoordinationServices(
CoordinationState.PersistedState persistedState
);

CoordinationState.PersistedState createFreshPersistedState(DiscoveryNode localNode, BooleanSupplier disruptStorage);
CoordinationState.PersistedState createFreshPersistedState(
DiscoveryNode localNode,
BooleanSupplier disruptStorage,
ThreadPool threadPool
);

CoordinationState.PersistedState createPersistedStateFromExistingState(
DiscoveryNode newLocalNode,
Expand All @@ -1480,7 +1493,8 @@ CoordinationState.PersistedState createPersistedStateFromExistingState(
Function<Long, Long> adaptCurrentTerm,
LongSupplier currentTimeInMillisSupplier,
NamedWriteableRegistry namedWriteableRegistry,
BooleanSupplier disruptStorage
BooleanSupplier disruptStorage,
ThreadPool threadPool
);
}

Expand Down Expand Up @@ -1536,7 +1550,11 @@ public PreVoteCollector.Factory getPreVoteCollectorFactory() {
}

@Override
public CoordinationState.PersistedState createFreshPersistedState(DiscoveryNode localNode, BooleanSupplier disruptStorage) {
public CoordinationState.PersistedState createFreshPersistedState(
DiscoveryNode localNode,
BooleanSupplier disruptStorage,
ThreadPool threadPool
) {
return new MockPersistedState(localNode, disruptStorage);
}

Expand All @@ -1548,7 +1566,8 @@ public CoordinationState.PersistedState createPersistedStateFromExistingState(
Function<Long, Long> adaptCurrentTerm,
LongSupplier currentTimeInMillisSupplier,
NamedWriteableRegistry namedWriteableRegistry,
BooleanSupplier disruptStorage
BooleanSupplier disruptStorage,
ThreadPool threadPool
) {
assert oldState instanceof MockPersistedState : oldState.getClass();
return new MockPersistedState(
Expand Down