Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public ReplicationManager(final ConfigurationSource conf,
this.ecMisReplicationCheckHandler =
new ECMisReplicationCheckHandler(ecContainerPlacement);
this.ratisReplicationCheckHandler =
new RatisReplicationCheckHandler(ratisContainerPlacement);
new RatisReplicationCheckHandler(ratisContainerPlacement, this);
this.nodeManager = nodeManager;
this.metrics = ReplicationManagerMetrics.create(this);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.hadoop.hdds.scm.container.replication.health;

import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
Expand All @@ -26,6 +27,9 @@
import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp;
import org.apache.hadoop.hdds.scm.container.replication.RatisContainerReplicaCount;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManagerUtil;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -52,9 +56,12 @@ public class RatisReplicationCheckHandler extends AbstractCheck {
* should be replicated.
*/
private final PlacementPolicy ratisContainerPlacement;
private final ReplicationManager replicationManager;

public RatisReplicationCheckHandler(PlacementPolicy containerPlacement) {
public RatisReplicationCheckHandler(PlacementPolicy containerPlacement,
ReplicationManager replicationManager) {
this.ratisContainerPlacement = containerPlacement;
this.replicationManager = replicationManager;
}

@Override
Expand Down Expand Up @@ -190,6 +197,12 @@ public ContainerHealthResult checkHealth(ContainerCheckRequest request) {
return replicaCount.toUnderHealthResult();
}


if (replicaCount.isOverReplicated(false)) {
// If the container is over replicated without considering UNHEALTHY
// then we know for sure it is over replicated, so mark as such.
return replicaCount.toOverHealthResult();
}
/*
When checking for over replication, consider UNHEALTHY replicas. This means
that other than checking over replication of healthy replicas (such as 4
Expand All @@ -200,9 +213,37 @@ of UNHEALTHY replicas (such as 3 CLOSED and 1 UNHEALTHY replicas of a
RatisContainerReplicaCount consideringUnhealthy =
new RatisContainerReplicaCount(container, replicas, replicaPendingOps,
minReplicasForMaintenance, true);
boolean isOverReplicated = consideringUnhealthy.isOverReplicated(false);
if (isOverReplicated) {
return consideringUnhealthy.toOverHealthResult();

if (consideringUnhealthy.isOverReplicated(false)) {
if (container.getState() == HddsProtos.LifeCycleState.CLOSED) {
return consideringUnhealthy.toOverHealthResult();
} else if (container.getState()
== HddsProtos.LifeCycleState.QUASI_CLOSED) {
// If the container is quasi-closed and over replicated, we may have a
// case where the excess replica is an unhealthy one, but it has a
// unique origin and therefore should not be deleted. In this case,
// we should not mark the container as over replicated.
// We ignore pending deletes, as a container is still over replicated
// until the pending delete completes.
ContainerReplica toDelete = ReplicationManagerUtil
.selectUnhealthyReplicaForDelete(container, replicas, 0,
(dnd) -> {
try {
return replicationManager.getNodeStatus(dnd);
} catch (NodeNotFoundException e) {
return null;
}
});
if (toDelete != null) {
// There is at least one unhealthy replica that can be deleted, so
// return as over replicated.
return consideringUnhealthy.toOverHealthResult();
} else {
// Even though we have at least 4 replicas with some unhealthy, we
// can't delete any of them, so the container is not over replicated.
return new ContainerHealthResult.HealthyResult(container);
}
}
}

int requiredNodes = container.getReplicationConfig().getRequiredNodes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,11 @@
import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.OverReplicatedHealthResult;
import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.UnderReplicatedHealthResult;
import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationQueue;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
Expand All @@ -50,6 +53,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;

import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONED;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
Expand All @@ -76,17 +80,23 @@ public class TestRatisReplicationCheckHandler {
private ReplicationQueue repQueue;
private ContainerCheckRequest.Builder requestBuilder;
private ReplicationManagerReport report;
private ReplicationManager replicationManager;
private int maintenanceRedundancy = 2;

@BeforeEach
public void setup() throws IOException {
public void setup() throws IOException, NodeNotFoundException {
containerPlacementPolicy = Mockito.mock(PlacementPolicy.class);
Mockito.when(containerPlacementPolicy.validateContainerPlacement(
Mockito.any(),
Mockito.anyInt()
)).thenAnswer(invocation ->
new ContainerPlacementStatusDefault(2, 2, 3));
healthCheck = new RatisReplicationCheckHandler(containerPlacementPolicy);

replicationManager = Mockito.mock(ReplicationManager.class);
Mockito.when(replicationManager.getNodeStatus(Mockito.any()))
.thenReturn(NodeStatus.inServiceHealthy());
healthCheck = new RatisReplicationCheckHandler(containerPlacementPolicy,
replicationManager);
repConfig = RatisReplicationConfig.getInstance(THREE);
repQueue = new ReplicationQueue();
report = new ReplicationManagerReport();
Expand Down Expand Up @@ -892,4 +902,87 @@ public void testWithQuasiClosedReplicasWithWrongSequenceID() {
ReplicationManagerReport.HealthState.UNDER_REPLICATED));
}

@Test
public void testExcessReplicasButNotOverReplicatedDuetoUniqueOrigins() {
final long sequenceID = 20;
final ContainerInfo container = ReplicationTestUtil.createContainerInfo(
repConfig, 1, HddsProtos.LifeCycleState.QUASI_CLOSED,
sequenceID);

final Set<ContainerReplica> replicas = new HashSet<>(2);
replicas.add(createContainerReplica(container.containerID(), 0,
IN_SERVICE, State.QUASI_CLOSED, 1, 1,
MockDatanodeDetails.randomDatanodeDetails(),
MockDatanodeDetails.randomDatanodeDetails().getUuid(),
sequenceID - 1));
replicas.add(createContainerReplica(container.containerID(), 0,
IN_SERVICE, State.QUASI_CLOSED, 1, 1,
MockDatanodeDetails.randomDatanodeDetails(),
MockDatanodeDetails.randomDatanodeDetails().getUuid(),
sequenceID - 1));
replicas.add(createContainerReplica(container.containerID(), 0,
IN_SERVICE, State.QUASI_CLOSED, 1, 1,
MockDatanodeDetails.randomDatanodeDetails(),
MockDatanodeDetails.randomDatanodeDetails().getUuid(),
sequenceID - 1));

replicas.add(createContainerReplica(container.containerID(), 0,
IN_SERVICE, State.UNHEALTHY, 1, 1,
MockDatanodeDetails.randomDatanodeDetails(),
MockDatanodeDetails.randomDatanodeDetails().getUuid(),
sequenceID));

requestBuilder.setContainerReplicas(replicas)
.setContainerInfo(container);

assertFalse(healthCheck.handle(requestBuilder.build()));
assertEquals(0, repQueue.underReplicatedQueueSize());
assertEquals(0, repQueue.overReplicatedQueueSize());
assertEquals(0, report.getStat(
ReplicationManagerReport.HealthState.UNDER_REPLICATED));
assertEquals(0, report.getStat(
ReplicationManagerReport.HealthState.OVER_REPLICATED));
}

@Test
public void testExcessReplicasAndOverReplicatedDuetoNonUniqueOrigins() {
final long sequenceID = 20;
final ContainerInfo container = ReplicationTestUtil.createContainerInfo(
repConfig, 1, HddsProtos.LifeCycleState.QUASI_CLOSED,
sequenceID);

UUID origin = UUID.randomUUID();
final Set<ContainerReplica> replicas = new HashSet<>(2);
replicas.add(createContainerReplica(container.containerID(), 0,
IN_SERVICE, State.QUASI_CLOSED, 1, 1,
MockDatanodeDetails.randomDatanodeDetails(),
MockDatanodeDetails.randomDatanodeDetails().getUuid(),
sequenceID - 1));
replicas.add(createContainerReplica(container.containerID(), 0,
IN_SERVICE, State.QUASI_CLOSED, 1, 1,
MockDatanodeDetails.randomDatanodeDetails(),
MockDatanodeDetails.randomDatanodeDetails().getUuid(),
sequenceID - 1));
replicas.add(createContainerReplica(container.containerID(), 0,
IN_SERVICE, State.QUASI_CLOSED, 1, 1,
MockDatanodeDetails.randomDatanodeDetails(), origin,
sequenceID - 1));

replicas.add(createContainerReplica(container.containerID(), 0,
IN_SERVICE, State.UNHEALTHY, 1, 1,
MockDatanodeDetails.randomDatanodeDetails(), origin,
sequenceID - 1));

requestBuilder.setContainerReplicas(replicas)
.setContainerInfo(container);

assertTrue(healthCheck.handle(requestBuilder.build()));
assertEquals(0, repQueue.underReplicatedQueueSize());
assertEquals(1, repQueue.overReplicatedQueueSize());
assertEquals(0, report.getStat(
ReplicationManagerReport.HealthState.UNDER_REPLICATED));
assertEquals(1, report.getStat(
ReplicationManagerReport.HealthState.OVER_REPLICATED));
}

}