diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index b76cb559d0..0231ead156 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -1413,7 +1413,8 @@ leaderId, getMemberId(), currentTerm, followerCommit, state.getNextIndex(), NOT_ } } return JavaUtils.allOf(futures).whenCompleteAsync( - (r, t) -> followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE)) + (r, t) -> followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE)), + serverExecutor ).thenApply(v -> { final AppendEntriesReplyProto reply; synchronized(this) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java index 96f7efbe10..ad4d988ab7 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java @@ -115,7 +115,9 @@ public synchronized void close() { return; } isClosed = true; - map.entrySet().parallelStream().forEach(entry -> close(entry.getKey(), entry.getValue())); + ConcurrentUtils.parallelForEachAsync(map.entrySet(), + entry -> close(entry.getKey(), entry.getValue()), + executor); } private void close(RaftGroupId groupId, CompletableFuture future) { diff --git a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java index 4476f3ecf2..215e8408f3 100644 --- a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java @@ -51,6 +51,8 @@ import java.nio.file.Path; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -85,6 +87,9 @@ public abstract class InstallSnapshotNotificationTests notifyInstallSnapshotFromLeader( RaftProtos.RoleInfoProto roleInfoProto, @@ -120,7 +125,7 @@ public CompletableFuture notifyInstallSnapshotFromLeader( return leaderSnapshotInfo.getTermIndex(); }; - return CompletableFuture.supplyAsync(supplier); + return CompletableFuture.supplyAsync(supplier, stateMachineExecutor); } @Override