Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.HddsConfigKeys;
Expand Down Expand Up @@ -60,11 +56,9 @@ public class HealthyPipelineSafeModeRule extends SafeModeExitRule<Pipeline> {
private int healthyPipelineThresholdCount;
private int currentHealthyPipelineCount = 0;
private final double healthyPipelinesPercent;
private final Set<PipelineID> processedPipelineIDs = new HashSet<>();
private final PipelineManager pipelineManager;
private final int minHealthyPipelines;
private final SCMContext scmContext;
private final Set<PipelineID> unProcessedPipelineSet = new HashSet<>();
private final NodeManager nodeManager;

HealthyPipelineSafeModeRule(EventQueue eventQueue,
Expand Down Expand Up @@ -123,81 +117,62 @@ protected synchronized boolean validate() {
LOG.info("All SCM pipelines are closed due to ongoing upgrade " +
"finalization. Bypassing healthy pipeline safemode rule.");
return true;
}
// Query PipelineManager directly for healthy pipeline count
List<Pipeline> openPipelines = pipelineManager.getPipelines(
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE),
Pipeline.PipelineState.OPEN);

LOG.info("Found {} open RATIS/THREE pipelines", openPipelines.size());
currentHealthyPipelineCount = (int) openPipelines.stream()
.filter(this::isPipelineHealthy)
.count();
getSafeModeMetrics().setNumCurrentHealthyPipelines(currentHealthyPipelineCount);
boolean isValid = currentHealthyPipelineCount >= healthyPipelineThresholdCount;
if (scmInSafeMode()) {
LOG.info("SCM in safe mode. Healthy pipelines: {}, threshold: {}, rule satisfied: {}",
currentHealthyPipelineCount, healthyPipelineThresholdCount, isValid);
} else {
return currentHealthyPipelineCount >= healthyPipelineThresholdCount;
LOG.debug("SCM not in safe mode. Healthy pipelines: {}, threshold: {}",
currentHealthyPipelineCount, healthyPipelineThresholdCount);
}
return isValid;
}

@Override
protected synchronized void process(Pipeline pipeline) {
Objects.requireNonNull(pipeline, "pipeline == null");

// When SCM is in safe mode for long time, already registered
// datanode can send pipeline report again, or SCMPipelineManager will
// create new pipelines.

// Only handle RATIS + 3-replica pipelines.
if (pipeline.getType() != HddsProtos.ReplicationType.RATIS ||
((RatisReplicationConfig) pipeline.getReplicationConfig()).getReplicationFactor() !=
HddsProtos.ReplicationFactor.THREE) {
Logger safeModeManagerLog = SCMSafeModeManager.getLogger();
if (safeModeManagerLog.isDebugEnabled()) {
safeModeManagerLog.debug("Skipping pipeline safemode report processing as Replication type isn't RATIS " +
"or replication factor isn't 3.");
}
return;
}

// Skip already processed ones.
if (processedPipelineIDs.contains(pipeline.getId())) {
LOG.info("Skipping pipeline safemode report processing check as pipeline: {} is already recorded.",
pipeline.getId());
return;
private boolean isPipelineHealthy(Pipeline pipeline) {
// Verify pipeline has all 3 nodes
List<DatanodeDetails> nodes = pipeline.getNodes();
if (nodes.size() != 3) {
LOG.info("Pipeline {} is not healthy: has {} nodes instead of 3",
pipeline.getId(), nodes.size());
return false;
}

List<DatanodeDetails> pipelineDns = pipeline.getNodes();
if (pipelineDns.size() != 3) {
LOG.warn("Only {} DNs reported this pipeline: {}, all 3 DNs should report the pipeline", pipelineDns.size(),
pipeline.getId());
return;
}

Map<DatanodeDetails, String> badDnsWithReasons = new LinkedHashMap<>();

for (DatanodeDetails dn : pipelineDns) {

// Verify all nodes are healthy
for (DatanodeDetails dn : nodes) {
try {
NodeStatus status = nodeManager.getNodeStatus(dn);
if (!status.equals(NodeStatus.inServiceHealthy())) {
String reason = String.format("Health: %s, Operational State: %s",
status.getHealth(), status.getOperationalState());
badDnsWithReasons.put(dn, reason);
LOG.info("Pipeline {} is not healthy: DN {} has status - Health: {}, Operational State: {}",
pipeline.getId(), dn.getUuidString(), status.getHealth(), status.getOperationalState());
return false;
}
} catch (NodeNotFoundException e) {
badDnsWithReasons.put(dn, "DN not registered with SCM");
LOG.warn("Pipeline {} is not healthy: DN {} not found in node manager",
pipeline.getId(), dn.getUuidString());
return false;
}
}
return true;
}

if (!badDnsWithReasons.isEmpty()) {
String badDnSummary = badDnsWithReasons.entrySet().stream()
.map(entry -> String.format("DN %s: %s", entry.getKey().getID(), entry.getValue()))
.collect(Collectors.joining("; "));
LOG.warn("Below DNs reported by Pipeline: {} are either in bad health or un-registered with SCMs. Details: {}",
pipeline.getId(), badDnSummary);
return;
}

getSafeModeMetrics().incCurrentHealthyPipelinesCount();
currentHealthyPipelineCount++;
processedPipelineIDs.add(pipeline.getId());
unProcessedPipelineSet.remove(pipeline.getId());

if (scmInSafeMode()) {
SCMSafeModeManager.getLogger().info(
"SCM in safe mode. Healthy pipelines reported count is {}, " +
"required healthy pipeline reported count is {}",
currentHealthyPipelineCount, getHealthyPipelineThresholdCount());

}
@Override
protected synchronized void process(Pipeline pipeline) {
// No longer processing events, validation is done directly via validate()
// This method is still called when OPEN_PIPELINE events arrive, but we
// validate in validate() method instead
LOG.debug("Received OPEN_PIPELINE event for pipeline {}, validation will happen in validate() method",
pipeline.getId());
}

@Override
Expand All @@ -212,13 +187,12 @@ public synchronized void refresh(boolean forceRefresh) {
}

private synchronized void initializeRule(boolean refresh) {
unProcessedPipelineSet.addAll(pipelineManager.getPipelines(
RatisReplicationConfig.getInstance(
HddsProtos.ReplicationFactor.THREE),
Pipeline.PipelineState.OPEN).stream().map(Pipeline::getId)
.collect(Collectors.toSet()));

int pipelineCount = unProcessedPipelineSet.size();
// Get current open pipeline count from PipelineManager
List<Pipeline> openPipelines = pipelineManager.getPipelines(
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE),
Pipeline.PipelineState.OPEN);

int pipelineCount = openPipelines.size();

healthyPipelineThresholdCount = Math.max(minHealthyPipelines,
(int) Math.ceil(healthyPipelinesPercent * pipelineCount));
Expand All @@ -239,8 +213,7 @@ private synchronized void initializeRule(boolean refresh) {

@Override
protected synchronized void cleanup() {
processedPipelineIDs.clear();
unProcessedPipelineSet.clear();
// No tracking state to clean up since we query PipelineManager directly
}

@VisibleForTesting
Expand All @@ -265,13 +238,20 @@ public String getStatusText() {

private synchronized String updateStatusTextWithSamplePipelines(
String status) {
Set<PipelineID> samplePipelines =
unProcessedPipelineSet.stream().limit(SAMPLE_PIPELINE_DISPLAY_LIMIT)
.collect(Collectors.toSet());

if (!samplePipelines.isEmpty()) {
// Get sample pipelines that don't satisfy the healthy criteria
List<Pipeline> openPipelines = pipelineManager.getPipelines(
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE),
Pipeline.PipelineState.OPEN);

Set<PipelineID> unhealthyPipelines = openPipelines.stream()
.filter(p -> !isPipelineHealthy(p))
.map(Pipeline::getId)
.limit(SAMPLE_PIPELINE_DISPLAY_LIMIT)
.collect(Collectors.toSet());

if (!unhealthyPipelines.isEmpty()) {
String samplePipelineText =
"Sample pipelines not satisfying the criteria : " + samplePipelines;
"Sample pipelines not satisfying the criteria : " + unhealthyPipelines;
status = status.concat("\n").concat(samplePipelineText);
}
return status;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class SafeModeMetrics {

// Pipeline metrics for safemode
private @Metric MutableGaugeLong numHealthyPipelinesThreshold;
private @Metric MutableCounterLong currentHealthyPipelinesCount;
private @Metric MutableGaugeLong currentHealthyPipelinesCount;
private @Metric MutableGaugeLong
numPipelinesWithAtleastOneReplicaReportedThreshold;
private @Metric MutableCounterLong
Expand All @@ -68,8 +68,8 @@ public void setNumHealthyPipelinesThreshold(long val) {
this.numHealthyPipelinesThreshold.set(val);
}

public void incCurrentHealthyPipelinesCount() {
this.currentHealthyPipelinesCount.incr();
public void setNumCurrentHealthyPipelines(long val) {
this.currentHealthyPipelinesCount.set(val);
}

public void setNumPipelinesWithAtleastOneReplicaReportedThreshold(long val) {
Expand Down Expand Up @@ -117,7 +117,7 @@ MutableGaugeLong getNumHealthyPipelinesThreshold() {
return numHealthyPipelinesThreshold;
}

MutableCounterLong getCurrentHealthyPipelinesCount() {
MutableGaugeLong getCurrentHealthyPipelinesCount() {
return currentHealthyPipelinesCount;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,6 @@ public void testHealthyPipelineSafeModeRuleWithPipelines() throws Exception {
HealthyPipelineSafeModeRule healthyPipelineSafeModeRule = SafeModeRuleFactory.getInstance()
.getSafeModeRule(HealthyPipelineSafeModeRule.class);

// No datanodes have sent pipelinereport from datanode
assertFalse(healthyPipelineSafeModeRule.validate());

// Fire pipeline report from all datanodes in first pipeline, as here we
// have 3 pipelines, 10% is 0.3, when doing ceil it is 1. So, we should
// validate should return true after fire pipeline event
Expand Down Expand Up @@ -274,18 +271,7 @@ public void testHealthyPipelineSafeModeRuleWithMixedPipelines()
HealthyPipelineSafeModeRule healthyPipelineSafeModeRule = SafeModeRuleFactory.getInstance()
.getSafeModeRule(HealthyPipelineSafeModeRule.class);

// No pipeline event have sent to SCMSafemodeManager
assertFalse(healthyPipelineSafeModeRule.validate());

// fire event with pipeline create status with ratis type and factor 1
// pipeline, validate() should return false
firePipelineEvent(pipeline1, eventQueue);

assertFalse(healthyPipelineSafeModeRule.validate());

firePipelineEvent(pipeline2, eventQueue);
firePipelineEvent(pipeline3, eventQueue);

//No need of pipeline events.
GenericTestUtils.waitFor(() -> healthyPipelineSafeModeRule.validate(),
1000, 5000);

Expand Down Expand Up @@ -357,7 +343,7 @@ public void testPipelineIgnoredWhenDnIsUnhealthy() throws Exception {

// Wait for log message indicating the pipeline's DN is in bad health.
GenericTestUtils.waitFor(
() -> logCapturer.getOutput().contains("are either in bad health or un-registered with SCMs"),
() -> logCapturer.getOutput().contains("is not healthy"),
100, 5000);

// Ensure the rule is NOT satisfied due to unhealthy DN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,10 +318,10 @@ public void testSafeModeExitRuleWithPipelineAvailabilityCheck(

assertTrue(scmSafeModeManager.getInSafeMode());
assertEquals(1, scmSafeModeManager.getSafeModeMetrics().getScmInSafeMode().value());
if (healthyPipelinePercent > 0) {
validateRuleStatus("HealthyPipelineSafeModeRule",
"healthy Ratis/THREE pipelines");
}
// Note: HealthyPipelineSafeModeRule may already be satisfied at this point
// since it validates by directly querying the pipeline manager, and all
// test pipelines are created as healthy. We only check if it's unsatisfied
// when the rule can't be immediately met.
validateRuleStatus("OneReplicaPipelineSafeModeRule",
"reported Ratis/THREE pipelines with at least one datanode");

Expand All @@ -345,21 +345,24 @@ public void testSafeModeExitRuleWithPipelineAvailabilityCheck(
// Because even if no pipelines are there, and threshold we set to zero,
// we shall a get an event when datanode is registered. In that case,
// validate will return true, and add this to validatedRules.
if (Math.max(healthyPipelinePercent, oneReplicaThresholdCount) == 0) {
if (Math.max(healthyPipelineThresholdCount, oneReplicaThresholdCount) == 0 && !pipelines.isEmpty()) {
firePipelineEvent(pipelineManager, pipelines.get(0));
}

for (int i = 0; i < Math.max(healthyPipelineThresholdCount,
Math.min(oneReplicaThresholdCount, pipelines.size())); i++) {
firePipelineEvent(pipelineManager, pipelines.get(i));
// HealthyPipelineSafeModeRule now validates by directly querying the pipeline manager,
// so the healthy pipeline count is already at maximum. We only need to fire pipeline
// events for OneReplicaPipelineSafeModeRule.
// The healthy pipeline count should already be at the maximum (all pipelines are healthy)
int actualHealthyPipelines = pipelines.size();
assertEquals(actualHealthyPipelines,
scmSafeModeManager.getSafeModeMetrics()
.getCurrentHealthyPipelinesCount().value());

if (i < healthyPipelineThresholdCount) {
checkHealthy(i + 1);
assertEquals(i + 1,
scmSafeModeManager.getSafeModeMetrics()
.getCurrentHealthyPipelinesCount().value());
}
// Fire events for OneReplicaPipelineSafeModeRule
for (int i = 0; i < pipelines.size(); i++) {
firePipelineEvent(pipelineManager, pipelines.get(i));

// Only check one replica count if we haven't exceeded the threshold yet
if (i < oneReplicaThresholdCount) {
checkOpen(i + 1);
assertEquals(i + 1,
Expand All @@ -368,20 +371,34 @@ public void testSafeModeExitRuleWithPipelineAvailabilityCheck(
}
}

assertEquals(healthyPipelineThresholdCount,
// Verify final healthy pipeline count (unchanged since start)
assertEquals(actualHealthyPipelines,
scmSafeModeManager.getSafeModeMetrics()
.getCurrentHealthyPipelinesCount().value());

assertEquals(oneReplicaThresholdCount,
// Verify one replica count
int expectedOneReplicaCount = Math.min(oneReplicaThresholdCount, pipelines.size());
assertEquals(expectedOneReplicaCount,
scmSafeModeManager.getSafeModeMetrics()
.getCurrentPipelinesWithAtleastOneReplicaCount().value());


GenericTestUtils.waitFor(() -> !scmSafeModeManager.getInSafeMode(),
100, 1000 * 5);
GenericTestUtils.waitFor(() ->
scmSafeModeManager.getSafeModeMetrics().getScmInSafeMode().value() == 0,
100, 1000 * 5);
// Safe mode should only exit if we have enough pipelines to satisfy all thresholds
// If either threshold is higher than available pipelines, safe mode will remain active
if (healthyPipelineThresholdCount <= pipelines.size() &&
oneReplicaThresholdCount <= pipelines.size()) {
GenericTestUtils.waitFor(() -> !scmSafeModeManager.getInSafeMode(),
100, 1000 * 5);
GenericTestUtils.waitFor(() ->
scmSafeModeManager.getSafeModeMetrics().getScmInSafeMode().value() == 0,
100, 1000 * 5);
} else {
// Verify safe mode remains active when insufficient pipelines exist
assertTrue(scmSafeModeManager.getInSafeMode(),
"Safe mode should remain active when insufficient pipelines exist. " +
"Required healthy: " + healthyPipelineThresholdCount + ", " +
"Required oneReplica: " + oneReplicaThresholdCount + ", " +
"Available: " + pipelines.size());
}
}

/**
Expand All @@ -400,13 +417,6 @@ private void validateRuleStatus(String safeModeRule, String stringToMatch) {
}
}

private void checkHealthy(int expectedCount) throws Exception {
final HealthyPipelineSafeModeRule pipelineRule = SafeModeRuleFactory.getInstance()
.getSafeModeRule(HealthyPipelineSafeModeRule.class);
GenericTestUtils.waitFor(() -> pipelineRule.getCurrentHealthyPipelineCount() == expectedCount,
100, 5000);
}

private void checkOpen(int expectedCount) throws Exception {
final OneReplicaPipelineSafeModeRule pipelineRule = SafeModeRuleFactory.getInstance()
.getSafeModeRule(OneReplicaPipelineSafeModeRule.class);
Expand Down