Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
Expand Down Expand Up @@ -125,6 +124,27 @@
public final class XceiverServerRatis implements XceiverServerSpi {
private static final Logger LOG = LoggerFactory
.getLogger(XceiverServerRatis.class);

private static class ActivePipelineContext {
/** The current datanode is the current leader of the pipeline. */
private final boolean isPipelineLeader;
/** The heartbeat containing pipeline close action has been triggered. */
private final boolean isPendingClose;

ActivePipelineContext(boolean isPipelineLeader, boolean isPendingClose) {
this.isPipelineLeader = isPipelineLeader;
this.isPendingClose = isPendingClose;
}

public boolean isPipelineLeader() {
return isPipelineLeader;
}

public boolean isPendingClose() {
return isPendingClose;
}
}

private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
private static final List<Integer> DEFAULT_PRIORITY_LIST =
new ArrayList<>(
Expand All @@ -150,11 +170,8 @@ private static long nextCallId() {
private final ConfigurationSource conf;
// TODO: Remove the gids set when Ratis supports an api to query active
// pipelines
private final Set<RaftGroupId> raftGids = ConcurrentHashMap.newKeySet();
private final ConcurrentMap<RaftGroupId, ActivePipelineContext> activePipelines = new ConcurrentHashMap<>();
private final RaftPeerId raftPeerId;
// pipelines for which I am the leader
private final Map<RaftGroupId, Boolean> groupLeaderMap =
new ConcurrentHashMap<>();
// Timeout used while calling submitRequest directly.
private final long requestTimeout;
private final boolean shouldDeleteRatisLogDirectory;
Expand Down Expand Up @@ -726,11 +743,11 @@ private void handlePipelineFailure(RaftGroupId groupId,
}

triggerPipelineClose(groupId, msg,
ClosePipelineInfo.Reason.PIPELINE_FAILED, false);
ClosePipelineInfo.Reason.PIPELINE_FAILED);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think trigger of this case is very frequent. Please check if this should not cause overload of HB to SCM due to continuous trigger of HB.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right, I saw that ContainerStateMachine#notifyFollowerSlowness will get triggered continuously as long as the follower is still uncontactable by the leader. Thanks for catching it.

I think we can have a deduplication logic to trigger heartbeat only for the first triggerPipelineClose for that particular pipeline. Maybe a concurrent set that stores the pipeline ID to be closed by SCM and the pipeline ID can be removed from the set in ClosePipelineCommandHandler when SCM sends back the pipeline close command.

Let me think about this further.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @ivandika3 for the proposal. just curious, when you say we could get that failure pipeline ID removed from the set in ClosePipelineCommandHandler, do you mean the 'raftGids' concurrent set in XceiverServerRatis? (https://github.com/apache/ozone/blob/master/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java#L758-L760)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @DaveTeng0 for checking this out, I was previously thinking of another concurrent set, similar to raftGids that will contain the RaftGroupIds of the inflight pipelines pending to be closed, i.e. the datanode has triggered the heartbeat containing DN close pipeline action to the SCM, but the DN has not received the close pipeline command from SCM yet.

The idea is to prevent excessive heartbeat triggers since the ContainerStateMachine#notifyFollowerSlowness hook will get triggered for every leader's follower health check (see https://github.com/apache/ratis/blob/05db67929a5b06ce964eda6627d44cd153cc2bce/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java#L1285), which might happen every heartbeat (< 150ms).

We can change the raftGids to store extra information about whether the pipeline for the RaftGroupId is pending to be closed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sumitagrawl I have updated XceiverServerRatis to keep track of active pipelines and its relevant information (i.e. whether the pipeline is pending close and whether the current datanode is the leader of the pipeline).

I tested manually by shutting down one of the datanodes in an active pipeline. The leader datanode triggered the pipeline close immediately due to notifyFollowerSlowness hook, but the subsequent pipeline close commands is triggered in the next heartbeats.

SCM pipeline action close log (separated by the 30s heartbeat interval) received from the pipeine leader DN

2023-12-20 14:14:53,962 [scm1-EventQueue-PipelineActionsForPipelineActionHandler] INFO org.apache.hadoop.hdds.scm.pipeline.PipelineActionHandler: Received pipeline action CLOSE for PipelineID=38c214e0-9a5e-4b89-b114-661ae7eef57a from datanode a3fcdd27-8244-4b7a-840c-037dac8c6337. Reason : a3fcdd27-8244-4b7a-840c-037dac8c6337 has not seen follower/s a2289e91-7fe6-49e8-ae84-fb33f39719d0 for 302697ms
2023-12-20 14:15:23,963 [scm1-EventQueue-PipelineActionsForPipelineActionHandler] INFO org.apache.hadoop.hdds.scm.pipeline.PipelineActionHandler: Received pipeline action CLOSE for PipelineID=38c214e0-9a5e-4b89-b114-661ae7eef57a from datanode a3fcdd27-8244-4b7a-840c-037dac8c6337. Reason : a3fcdd27-8244-4b7a-840c-037dac8c6337 has not seen follower/s a2289e91-7fe6-49e8-ae84-fb33f39719d0 for 302697ms
2023-12-20 14:15:53,960 [scm1-EventQueue-PipelineActionsForPipelineActionHandler] INFO org.apache.hadoop.hdds.scm.pipeline.PipelineActionHandler: Received pipeline action CLOSE for PipelineID=38c214e0-9a5e-4b89-b114-661ae7eef57a from datanode a3fcdd27-8244-4b7a-840c-037dac8c6337. Reason : a3fcdd27-8244-4b7a-840c-037dac8c6337 has not seen follower/s a2289e91-7fe6-49e8-ae84-fb33f39719d0 for 302697ms
2023-12-20 14:16:23,960 [scm1-EventQueue-PipelineActionsForPipelineActionHandler] INFO org.apache.hadoop.hdds.scm.pipeline.PipelineActionHandler: Received pipeline action CLOSE for PipelineID=38c214e0-9a5e-4b89-b114-661ae7eef57a from datanode a3fcdd27-8244-4b7a-840c-037dac8c6337. Reason : a3fcdd27-8244-4b7a-840c-037dac8c6337 has not seen follower/s a2289e91-7fe6-49e8-ae84-fb33f39719d0 for 302697ms
2023-12-20 14:16:52,409 [scm1-EventQueue-PipelineActionsForPipelineActionHandler] INFO org.apache.hadoop.hdds.scm.pipeline.PipelineActionHandler: Received pipeline action CLOSE for PipelineID=38c214e0-9a5e-4b89-b114-661ae7eef57a from datanode a3fcdd27-8244-4b7a-840c-037dac8c6337. Reason : a3fcdd27-8244-4b7a-840c-037dac8c6337 has not seen follower/s a2289e91-7fe6-49e8-ae84-fb33f39719d0 for 302697ms
2023-12-20 14:16:52,413 [scm1-EventQueue-PipelineActionsForPipelineActionHandler] INFO org.apache.hadoop.hdds.scm.pipeline.PipelineActionHandler: Received pipeline action CLOSE for PipelineID=38c214e0-9a5e-4b89-b114-661ae7eef57a from datanode a3fcdd27-8244-4b7a-840c-037dac8c6337. Reason : a3fcdd27-8244-4b7a-840c-037dac8c6337 has not seen follower/s a2289e91-7fe6-49e8-ae84-fb33f39719d0 for 302697ms
2023-12-20 14:16:52,595 [scm1-EventQueue-PipelineActionsForPipelineActionHandler] INFO org.apache.hadoop.hdds.scm.pipeline.PipelineActionHandler: Received pipeline action CLOSE for PipelineID=38c214e0-9a5e-4b89-b114-661ae7eef57a from datanode a3fcdd27-8244-4b7a-840c-037dac8c6337. Reason : a3fcdd27-8244-4b7a-840c-037dac8c6337 has not seen follower/s a2289e91-7fe6-49e8-ae84-fb33f39719d0 for 302697ms

The DN was restarted at 14:16:53, maybe why SCM received multiple heartbeat from the same DN around that time.

DN pipeline close due to follower log (triggered multiple times within a single heartbeat interval)

2023-12-20 14:14:53,956 [a3fcdd27-8244-4b7a-840c-037dac8c6337@group-661AE7EEF57A->a2289e91-7fe6-49e8-ae84-fb33f39719d0-GrpcLogAppender-LogAppenderDaemon] ERROR org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis: pipeline Action CLOSE on pipeline PipelineID=38c214e0-9a5e-4b89-b114-661ae7eef57a.Reason : a3fcdd27-8244-4b7a-840c-037dac8c6337 has not seen follower/s a2289e91-7fe6-49e8-ae84-fb33f39719d0 for 302697ms
2023-12-20 14:15:02,517 [a3fcdd27-8244-4b7a-840c-037dac8c6337@group-661AE7EEF57A->a2289e91-7fe6-49e8-ae84-fb33f39719d0-GrpcLogAppender-LogAppenderDaemon] ERROR org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis: pipeline Action CLOSE on pipeline PipelineID=38c214e0-9a5e-4b89-b114-661ae7eef57a.Reason : a3fcdd27-8244-4b7a-840c-037dac8c6337 has not seen follower/s a2289e91-7fe6-49e8-ae84-fb33f39719d0 for 311262ms
2023-12-20 14:15:09,112 [a3fcdd27-8244-4b7a-840c-037dac8c6337@group-661AE7EEF57A->a2289e91-7fe6-49e8-ae84-fb33f39719d0-GrpcLogAppender-LogAppenderDaemon] ERROR org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis: pipeline Action CLOSE on pipeline PipelineID=38c214e0-9a5e-4b89-b114-661ae7eef57a.Reason : a3fcdd27-8244-4b7a-840c-037dac8c6337 has not seen follower/s a2289e91-7fe6-49e8-ae84-fb33f39719d0 for 317858ms
2023-12-20 14:15:17,303 [a3fcdd27-8244-4b7a-840c-037dac8c6337@group-661AE7EEF57A->a2289e91-7fe6-49e8-ae84-fb33f39719d0-GrpcLogAppender-LogAppenderDaemon] ERROR org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis: pipeline Action CLOSE on pipeline PipelineID=38c214e0-9a5e-4b89-b114-661ae7eef57a.Reason : a3fcdd27-8244-4b7a-840c-037dac8c6337 has not seen follower/s a2289e91-7fe6-49e8-ae84-fb33f39719d0 for 326049ms
2023-12-20 14:15:21,319 [a3fcdd27-8244-4b7a-840c-037dac8c6337@group-661AE7EEF57A->a2289e91-7fe6-49e8-ae84-fb33f39719d0-GrpcLogAppender-LogAppenderDaemon] ERROR org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis: pipeline Action CLOSE on pipeline PipelineID=38c214e0-9a5e-4b89-b114-661ae7eef57a.Reason : a3fcdd27-8244-4b7a-840c-037dac8c6337 has not seen follower/s a2289e91-7fe6-49e8-ae84-fb33f39719d0 for 330065ms
2023-12-20 14:15:26,097 [a3fcdd27-8244-4b7a-840c-037dac8c6337@group-661AE7EEF57A->a2289e91-7fe6-49e8-ae84-fb33f39719d0-GrpcLogAppender-LogAppenderDaemon] ERROR org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis: pipeline Action CLOSE on pipeline PipelineID=38c214e0-9a5e-4b89-b114-661ae7eef57a.Reason : a3fcdd27-8244-4b7a-840c-037dac8c6337 has not seen follower/s a2289e91-7fe6-49e8-ae84-fb33f39719d0 for 334843ms
2023-12-20 14:15:33,563 [a3fcdd27-8244-4b7a-840c-037dac8c6337@group-661AE7EEF57A->a2289e91-7fe6-49e8-ae84-fb33f39719d0-GrpcLogAppender-LogAppenderDaemon] ERROR org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis: pipeline Action CLOSE on pipeline PipelineID=38c214e0-9a5e-4b89-b114-661ae7eef57a.Reason : a3fcdd27-8244-4b7a-840c-037dac8c6337 has not seen follower/s a2289e91-7fe6-49e8-ae84-fb33f39719d0 for 342308ms
2023-12-20 14:15:38,610 [a3fcdd27-8244-4b7a-840c-037dac8c6337@group-661AE7EEF57A->a2289e91-7fe6-49e8-ae84-fb33f39719d0-GrpcLogAppender-LogAppenderDaemon] ERROR org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis: pipeline Action CLOSE on pipeline PipelineID=38c214e0-9a5e-4b89-b114-661ae7eef57a.Reason : a3fcdd27-8244-4b7a-840c-037dac8c6337 has not seen follower/s a2289e91-7fe6-49e8-ae84-fb33f39719d0 for 347356ms
2023-12-20 14:15:47,337 [a3fcdd27-8244-4b7a-840c-037dac8c6337@group-661AE7EEF57A->a2289e91-7fe6-49e8-ae84-fb33f39719d0-GrpcLogAppender-LogAppenderDaemon] ERROR org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis: pipeline Action CLOSE on pipeline PipelineID=38c214e0-9a5e-4b89-b114-661ae7eef57a.Reason : a3fcdd27-8244-4b7a-840c-037dac8c6337 has not seen follower/s a2289e91-7fe6-49e8-ae84-fb33f39719d0 for 356082ms
2023-12-20 14:15:54,298 [a3fcdd27-8244-4b7a-840c-037dac8c6337@group-661AE7EEF57A->a2289e91-7fe6-49e8-ae84-fb33f39719d0-GrpcLogAppender-LogAppenderDaemon] ERROR org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis: pipeline Action CLOSE on pipeline PipelineID=38c214e0-9a5e-4b89-b114-661ae7eef57a.Reason : a3fcdd27-8244-4b7a-840c-037dac8c6337 has not seen follower/s a2289e91-7fe6-49e8-ae84-fb33f39719d0 for 363044ms
2023-12-20 14:15:58,898 [a3fcdd27-8244-4b7a-840c-037dac8c6337@group-661AE7EEF57A->a2289e91-7fe6-49e8-ae84-fb33f39719d0-GrpcLogAppender-LogAppenderDaemon] ERROR org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis: pipeline Action CLOSE on pipeline PipelineID=38c214e0-9a5e-4b89-b114-661ae7eef57a.Reason : a3fcdd27-8244-4b7a-840c-037dac8c6337 has not seen follower/s a2289e91-7fe6-49e8-ae84-fb33f39719d0 for 367643ms
2023-12-20 14:16:04,575 [a3fcdd27-8244-4b7a-840c-037dac8c6337@group-661AE7EEF57A->a2289e91-7fe6-49e8-ae84-fb33f39719d0-GrpcLogAppender-LogAppenderDaemon] ERROR org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis: pipeline Action CLOSE on pipeline PipelineID=38c214e0-9a5e-4b89-b114-661ae7eef57a.Reason : a3fcdd27-8244-4b7a-840c-037dac8c6337 has not seen follower/s a2289e91-7fe6-49e8-ae84-fb33f39719d0 for 373321ms
2023-12-20 14:16:11,602 [a3fcdd27-8244-4b7a-840c-037dac8c6337@group-661AE7EEF57A->a2289e91-7fe6-49e8-ae84-fb33f39719d0-GrpcLogAppender-LogAppenderDaemon] ERROR org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis: pipeline Action CLOSE on pipeline PipelineID=38c214e0-9a5e-4b89-b114-661ae7eef57a.Reason : a3fcdd27-8244-4b7a-840c-037dac8c6337 has not seen follower/s a2289e91-7fe6-49e8-ae84-fb33f39719d0 for 380348ms
2023-12-20 14:16:19,886 [a3fcdd27-8244-4b7a-840c-037dac8c6337@group-661AE7EEF57A->a2289e91-7fe6-49e8-ae84-fb33f39719d0-GrpcLogAppender-LogAppenderDaemon] ERROR org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis: pipeline Action CLOSE on pipeline PipelineID=38c214e0-9a5e-4b89-b114-661ae7eef57a.Reason : a3fcdd27-8244-4b7a-840c-037dac8c6337 has not seen follower/s a2289e91-7fe6-49e8-ae84-fb33f39719d0 for 388632ms
2023-12-20 14:16:24,493 [a3fcdd27-8244-4b7a-840c-037dac8c6337@group-661AE7EEF57A->a2289e91-7fe6-49e8-ae84-fb33f39719d0-GrpcLogAppender-LogAppenderDaemon] ERROR org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis: pipeline Action CLOSE on pipeline PipelineID=38c214e0-9a5e-4b89-b114-661ae7eef57a.Reason : a3fcdd27-8244-4b7a-840c-037dac8c6337 has not seen follower/s a2289e91-7fe6-49e8-ae84-fb33f39719d0 for 393239ms
2023-12-20 14:16:31,369 [a3fcdd27-8244-4b7a-840c-037dac8c6337@group-661AE7EEF57A->a2289e91-7fe6-49e8-ae84-fb33f39719d0-GrpcLogAppender-LogAppenderDaemon] ERROR org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis: pipeline Action CLOSE on pipeline PipelineID=38c214e0-9a5e-4b89-b114-661ae7eef57a.Reason : a3fcdd27-8244-4b7a-840c-037dac8c6337 has not seen follower/s a2289e91-7fe6-49e8-ae84-fb33f39719d0 for 400114ms
2023-12-20 14:16:37,345 [a3fcdd27-8244-4b7a-840c-037dac8c6337@group-661AE7EEF57A->a2289e91-7fe6-49e8-ae84-fb33f39719d0-GrpcLogAppender-LogAppenderDaemon] ERROR org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis: pipeline Action CLOSE on pipeline PipelineID=38c214e0-9a5e-4b89-b114-661ae7eef57a.Reason : a3fcdd27-8244-4b7a-840c-037dac8c6337 has not seen follower/s a2289e91-7fe6-49e8-ae84-fb33f39719d0 for 406091ms
2023-12-20 14:16:45,904 [a3fcdd27-8244-4b7a-840c-037dac8c6337@group-661AE7EEF57A->a2289e91-7fe6-49e8-ae84-fb33f39719d0-GrpcLogAppender-LogAppenderDaemon] ERROR org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis: pipeline Action CLOSE on pipeline PipelineID=38c214e0-9a5e-4b89-b114-661ae7eef57a.Reason : a3fcdd27-8244-4b7a-840c-037dac8c6337 has not seen follower/s a2289e91-7fe6-49e8-ae84-fb33f39719d0 for 414649ms
2023-12-20 14:16:51,766 [a3fcdd27-8244-4b7a-840c-037dac8c6337@group-661AE7EEF57A->a2289e91-7fe6-49e8-ae84-fb33f39719d0-GrpcLogAppender-LogAppenderDaemon] ERROR org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis: pipeline Action CLOSE on pipeline PipelineID=38c214e0-9a5e-4b89-b114-661ae7eef57a.Reason : a3fcdd27-8244-4b7a-840c-037dac8c6337 has not seen follower/s a2289e91-7fe6-49e8-ae84-fb33f39719d0 for 420512ms

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ivandika3
Current code has fixed frequent retry, but still we need retry over certain time interval is SCM is down / unable to handle the request. May be we need send together again during HB if still its active.

}

private void triggerPipelineClose(RaftGroupId groupId, String detail,
ClosePipelineInfo.Reason reasonCode, boolean triggerHB) {
ClosePipelineInfo.Reason reasonCode) {
PipelineID pipelineID = PipelineID.valueOf(groupId.getUuid());
ClosePipelineInfo.Builder closePipelineInfo =
ClosePipelineInfo.newBuilder()
Expand All @@ -744,9 +761,12 @@ private void triggerPipelineClose(RaftGroupId groupId, String detail,
.build();
if (context != null) {
context.addPipelineActionIfAbsent(action);
// wait for the next HB timeout or right away?
if (triggerHB) {
if (!activePipelines.get(groupId).isPendingClose()) {
// if pipeline close action has not been triggered before, we need trigger pipeline close immediately to
// prevent SCM to allocate blocks on the failed pipeline
context.getParent().triggerHeartbeat();
activePipelines.computeIfPresent(groupId,
(key, value) -> new ActivePipelineContext(value.isPipelineLeader(), true));
}
}
LOG.error("pipeline Action {} on pipeline {}.Reason : {}",
Expand All @@ -756,7 +776,7 @@ private void triggerPipelineClose(RaftGroupId groupId, String detail,

@Override
public boolean isExist(HddsProtos.PipelineID pipelineId) {
return raftGids.contains(
return activePipelines.containsKey(
RaftGroupId.valueOf(PipelineID.getFromProtobuf(pipelineId).getId()));
}

Expand All @@ -780,9 +800,11 @@ public List<PipelineReport> getPipelineReport() {
for (RaftGroupId groupId : gids) {
HddsProtos.PipelineID pipelineID = PipelineID
.valueOf(groupId.getUuid()).getProtobuf();
boolean isLeader = activePipelines.getOrDefault(groupId,
new ActivePipelineContext(false, false)).isPipelineLeader();
reports.add(PipelineReport.newBuilder()
.setPipelineID(pipelineID)
.setIsLeader(groupLeaderMap.getOrDefault(groupId, Boolean.FALSE))
.setIsLeader(isLeader)
.setBytesWritten(calculatePipelineBytesWritten(pipelineID))
.build());
}
Expand Down Expand Up @@ -872,7 +894,7 @@ void handleApplyTransactionFailure(RaftGroupId groupId,
"Ratis Transaction failure in datanode " + dnId + " with role " + role
+ " .Triggering pipeline close action.";
triggerPipelineClose(groupId, msg,
ClosePipelineInfo.Reason.STATEMACHINE_TRANSACTION_FAILED, true);
ClosePipelineInfo.Reason.STATEMACHINE_TRANSACTION_FAILED);
}
/**
* The fact that the snapshot contents cannot be used to actually catch up
Expand Down Expand Up @@ -908,7 +930,7 @@ public void handleNodeLogFailure(RaftGroupId groupId, Throwable t) {
: t.getMessage();

triggerPipelineClose(groupId, msg,
ClosePipelineInfo.Reason.PIPELINE_LOG_FAILED, true);
ClosePipelineInfo.Reason.PIPELINE_LOG_FAILED);
}

public long getMinReplicatedIndex(PipelineID pipelineID) throws IOException {
Expand All @@ -920,13 +942,12 @@ public long getMinReplicatedIndex(PipelineID pipelineID) throws IOException {
}

public void notifyGroupRemove(RaftGroupId gid) {
raftGids.remove(gid);
// Remove any entries for group leader map
groupLeaderMap.remove(gid);
// Remove Group ID entry from the active pipeline map
activePipelines.remove(gid);
}

void notifyGroupAdd(RaftGroupId gid) {
raftGids.add(gid);
activePipelines.put(gid, new ActivePipelineContext(false, false));
sendPipelineReport();
}

Expand All @@ -936,7 +957,9 @@ void handleLeaderChangedNotification(RaftGroupMemberId groupMemberId,
"leaderId: {}", groupMemberId.getGroupId(), raftPeerId1);
// Save the reported leader to be sent with the report to SCM
boolean leaderForGroup = this.raftPeerId.equals(raftPeerId1);
groupLeaderMap.put(groupMemberId.getGroupId(), leaderForGroup);
activePipelines.compute(groupMemberId.getGroupId(),
(key, value) -> value == null ? new ActivePipelineContext(leaderForGroup, false) :
new ActivePipelineContext(leaderForGroup, value.isPendingClose()));
if (context != null && leaderForGroup) {
// Publish new report from leader
sendPipelineReport();
Expand Down