From 524a45a37dce483c0ae4183634097752b7caa7f9 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Mon, 4 Dec 2023 17:05:35 +0800 Subject: [PATCH 1/3] HDDS-9823. Pipeline failure should trigger heartbeat immediately --- .../server/ratis/XceiverServerRatis.java | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 0f8f7d4eccdf..f6a526506fa2 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -703,11 +703,11 @@ private void handlePipelineFailure(RaftGroupId groupId, } triggerPipelineClose(groupId, msg, - ClosePipelineInfo.Reason.PIPELINE_FAILED, false); + ClosePipelineInfo.Reason.PIPELINE_FAILED); } private void triggerPipelineClose(RaftGroupId groupId, String detail, - ClosePipelineInfo.Reason reasonCode, boolean triggerHB) { + ClosePipelineInfo.Reason reasonCode) { PipelineID pipelineID = PipelineID.valueOf(groupId.getUuid()); ClosePipelineInfo.Builder closePipelineInfo = ClosePipelineInfo.newBuilder() @@ -721,10 +721,9 @@ private void triggerPipelineClose(RaftGroupId groupId, String detail, .build(); if (context != null) { context.addPipelineActionIfAbsent(action); - // wait for the next HB timeout or right away? - if (triggerHB) { - context.getParent().triggerHeartbeat(); - } + // need to trigger pipeline close immediately to prevent user using + // the failed pipeline + context.getParent().triggerHeartbeat(); } LOG.error("pipeline Action {} on pipeline {}.Reason : {}", action.getAction(), pipelineID, @@ -849,7 +848,7 @@ void handleApplyTransactionFailure(RaftGroupId groupId, "Ratis Transaction failure in datanode " + dnId + " with role " + role + " .Triggering pipeline close action."; triggerPipelineClose(groupId, msg, - ClosePipelineInfo.Reason.STATEMACHINE_TRANSACTION_FAILED, true); + ClosePipelineInfo.Reason.STATEMACHINE_TRANSACTION_FAILED); } /** * The fact that the snapshot contents cannot be used to actually catch up @@ -885,7 +884,7 @@ public void handleNodeLogFailure(RaftGroupId groupId, Throwable t) { : t.getMessage(); triggerPipelineClose(groupId, msg, - ClosePipelineInfo.Reason.PIPELINE_LOG_FAILED, true); + ClosePipelineInfo.Reason.PIPELINE_LOG_FAILED); } public long getMinReplicatedIndex(PipelineID pipelineID) throws IOException { From c073da7aa19d18c38c4fe0c9b808599eeb0f58a2 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Mon, 4 Dec 2023 17:09:50 +0800 Subject: [PATCH 2/3] Update comment --- .../common/transport/server/ratis/XceiverServerRatis.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index f6a526506fa2..4376e48de7f6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -721,8 +721,8 @@ private void triggerPipelineClose(RaftGroupId groupId, String detail, .build(); if (context != null) { context.addPipelineActionIfAbsent(action); - // need to trigger pipeline close immediately to prevent user using - // the failed pipeline + // need to trigger pipeline close immediately to prevent SCM to allocate + // blocks on the failed pipeline context.getParent().triggerHeartbeat(); } LOG.error("pipeline Action {} on pipeline {}.Reason : {}", From 417893cd2677ec7b2938d41d127957f4ee39d9ca Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Tue, 19 Dec 2023 18:40:36 +0800 Subject: [PATCH 3/3] Create an active pipeline maps to keep track of context --- .../server/ratis/XceiverServerRatis.java | 56 +++++++++++++------ 1 file changed, 40 insertions(+), 16 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index d3b09b66183c..53d5e13b05cd 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -25,12 +25,11 @@ import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Objects; -import java.util.Set; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingDeque; @@ -125,6 +124,27 @@ public final class XceiverServerRatis implements XceiverServerSpi { private static final Logger LOG = LoggerFactory .getLogger(XceiverServerRatis.class); + + private static class ActivePipelineContext { + /** The current datanode is the current leader of the pipeline. */ + private final boolean isPipelineLeader; + /** The heartbeat containing pipeline close action has been triggered. */ + private final boolean isPendingClose; + + ActivePipelineContext(boolean isPipelineLeader, boolean isPendingClose) { + this.isPipelineLeader = isPipelineLeader; + this.isPendingClose = isPendingClose; + } + + public boolean isPipelineLeader() { + return isPipelineLeader; + } + + public boolean isPendingClose() { + return isPendingClose; + } + } + private static final AtomicLong CALL_ID_COUNTER = new AtomicLong(); private static final List DEFAULT_PRIORITY_LIST = new ArrayList<>( @@ -150,11 +170,8 @@ private static long nextCallId() { private final ConfigurationSource conf; // TODO: Remove the gids set when Ratis supports an api to query active // pipelines - private final Set raftGids = ConcurrentHashMap.newKeySet(); + private final ConcurrentMap activePipelines = new ConcurrentHashMap<>(); private final RaftPeerId raftPeerId; - // pipelines for which I am the leader - private final Map groupLeaderMap = - new ConcurrentHashMap<>(); // Timeout used while calling submitRequest directly. private final long requestTimeout; private final boolean shouldDeleteRatisLogDirectory; @@ -744,9 +761,13 @@ private void triggerPipelineClose(RaftGroupId groupId, String detail, .build(); if (context != null) { context.addPipelineActionIfAbsent(action); - // need to trigger pipeline close immediately to prevent SCM to allocate - // blocks on the failed pipeline - context.getParent().triggerHeartbeat(); + if (!activePipelines.get(groupId).isPendingClose()) { + // if pipeline close action has not been triggered before, we need trigger pipeline close immediately to + // prevent SCM to allocate blocks on the failed pipeline + context.getParent().triggerHeartbeat(); + activePipelines.computeIfPresent(groupId, + (key, value) -> new ActivePipelineContext(value.isPipelineLeader(), true)); + } } LOG.error("pipeline Action {} on pipeline {}.Reason : {}", action.getAction(), pipelineID, @@ -755,7 +776,7 @@ private void triggerPipelineClose(RaftGroupId groupId, String detail, @Override public boolean isExist(HddsProtos.PipelineID pipelineId) { - return raftGids.contains( + return activePipelines.containsKey( RaftGroupId.valueOf(PipelineID.getFromProtobuf(pipelineId).getId())); } @@ -779,9 +800,11 @@ public List getPipelineReport() { for (RaftGroupId groupId : gids) { HddsProtos.PipelineID pipelineID = PipelineID .valueOf(groupId.getUuid()).getProtobuf(); + boolean isLeader = activePipelines.getOrDefault(groupId, + new ActivePipelineContext(false, false)).isPipelineLeader(); reports.add(PipelineReport.newBuilder() .setPipelineID(pipelineID) - .setIsLeader(groupLeaderMap.getOrDefault(groupId, Boolean.FALSE)) + .setIsLeader(isLeader) .setBytesWritten(calculatePipelineBytesWritten(pipelineID)) .build()); } @@ -919,13 +942,12 @@ public long getMinReplicatedIndex(PipelineID pipelineID) throws IOException { } public void notifyGroupRemove(RaftGroupId gid) { - raftGids.remove(gid); - // Remove any entries for group leader map - groupLeaderMap.remove(gid); + // Remove Group ID entry from the active pipeline map + activePipelines.remove(gid); } void notifyGroupAdd(RaftGroupId gid) { - raftGids.add(gid); + activePipelines.put(gid, new ActivePipelineContext(false, false)); sendPipelineReport(); } @@ -935,7 +957,9 @@ void handleLeaderChangedNotification(RaftGroupMemberId groupMemberId, "leaderId: {}", groupMemberId.getGroupId(), raftPeerId1); // Save the reported leader to be sent with the report to SCM boolean leaderForGroup = this.raftPeerId.equals(raftPeerId1); - groupLeaderMap.put(groupMemberId.getGroupId(), leaderForGroup); + activePipelines.compute(groupMemberId.getGroupId(), + (key, value) -> value == null ? new ActivePipelineContext(leaderForGroup, false) : + new ActivePipelineContext(leaderForGroup, value.isPendingClose())); if (context != null && leaderForGroup) { // Publish new report from leader sendPipelineReport();