Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-8535. ReplicationManager: Unhealthy containers could block EC recovery in small clusters #4756

Merged
merged 2 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,8 @@ public boolean isSufficientlyReplicated() {
return isSufficientlyReplicated(false);
}



@Override
public String toString() {
StringBuilder sb = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
Expand All @@ -43,6 +43,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -206,6 +207,17 @@ public int processAndSendCommands(
"OverReplication handler", container);
replicationManager.processOverReplicatedContainer(result);
}

/* If we get here, the scenario is:
1. Under replicated.
2. Not over replicated.
3. Placement Policy not able to find enough targets.
Check if there are some UNHEALTHY replicas. In a small cluster, these
UNHEALTHY replicas could block DNs that could otherwise be targets
for new EC replicas. Deleting an UNHEALTHY replica can make its host DN
available as a target.
*/
checkAndRemoveUnhealthyReplica(replicaCount, deletionInFlight);
// As we want to re-queue and try again later, we just re-throw
throw e;
}
Expand All @@ -224,8 +236,7 @@ public int processAndSendCommands(
private Map<Integer, Pair<ContainerReplica, NodeStatus>> filterSources(
Set<ContainerReplica> replicas, List<DatanodeDetails> deletionInFlight) {
return replicas.stream().filter(r -> r
.getState() == StorageContainerDatanodeProtocolProtos
.ContainerReplicaProto.State.CLOSED)
.getState() == State.CLOSED)
// Exclude stale and dead nodes. This is particularly important for
// maintenance nodes, as the replicas will remain present in the
// container manager, even when they go dead.
Expand Down Expand Up @@ -541,4 +552,101 @@ private static byte[] int2byte(List<Integer> src) {
}
return dst;
}

/**
* Deletes one UNHEALTHY replica so that its host datanode becomes available
* to host a healthy replica. This can be helpful if reconstruction or
* replication is blocked because DNs that follow the placement policy are
* not available as targets.
* @param replicaCount ECContainerReplicaCount object of this container
* @param deletionInFlight pending deletes of this container's replicas
*/
private void checkAndRemoveUnhealthyReplica(
ECContainerReplicaCount replicaCount,
List<DatanodeDetails> deletionInFlight) {
LOG.debug("Finding an UNHEALTHY replica of container {} to delete so its " +
"host datanode can be available for replication/reconstruction.",
replicaCount.getContainer());
if (!deletionInFlight.isEmpty()) {
LOG.debug("There are {} pending deletes. Completing them could " +
"free up nodes to fix under replication. Not deleting UNHEALTHY" +
" replicas in this iteration.", deletionInFlight.size());
return;
}

ContainerInfo container = replicaCount.getContainer();
// ensure that the container is recoverable
if (replicaCount.isUnrecoverable()) {
sodonnel marked this conversation as resolved.
Show resolved Hide resolved
LOG.warn("Cannot recover container {}.", container);
return;
}

// don't consider replicas that aren't on IN_SERVICE and HEALTHY DNs
Set<Integer> closedReplicas = new HashSet<>();
Set<ContainerReplica> unhealthyReplicas = new HashSet<>();
for (ContainerReplica replica : replicaCount.getReplicas()) {
try {
NodeStatus nodeStatus =
replicationManager.getNodeStatus(replica.getDatanodeDetails());
if (!nodeStatus.isHealthy() || !nodeStatus.isInService()) {
continue;
}
} catch (NodeNotFoundException e) {
LOG.debug("Skipping replica {} when trying to unblock under " +
"replication handling.", replica, e);
continue;
}

if (replica.getState().equals(State.CLOSED)) {
// collect CLOSED replicas for later
closedReplicas.add(replica.getReplicaIndex());
} else if (replica.getState().equals(State.UNHEALTHY)) {
unhealthyReplicas.add(replica);
}
}

if (unhealthyReplicas.isEmpty()) {
LOG.debug("Container {} does not have any UNHEALTHY replicas.",
container.containerID());
return;
}

/*
If an index has both an UNHEALTHY and CLOSED replica, prefer deleting the
UNHEALTHY replica of this index and return. Otherwise, delete any UNHEALTHY
replica.
*/
for (ContainerReplica unhealthyReplica : unhealthyReplicas) {
if (closedReplicas.contains(unhealthyReplica.getReplicaIndex())) {
try {
replicationManager.sendThrottledDeleteCommand(
replicaCount.getContainer(), unhealthyReplica.getReplicaIndex(),
unhealthyReplica.getDatanodeDetails(), true);
return;
} catch (NotLeaderException | CommandTargetOverloadedException e) {
LOG.debug("Skipping sending a delete command for replica {} to " +
"Datanode {}.", unhealthyReplica,
unhealthyReplica.getDatanodeDetails());
}
}
}

/*
We didn't delete in the earlier loop - just delete any UNHEALTHY
replica now.
*/
for (ContainerReplica unhealthyReplica : unhealthyReplicas) {
try {
replicationManager.sendThrottledDeleteCommand(
replicaCount.getContainer(), unhealthyReplica.getReplicaIndex(),
unhealthyReplica.getDatanodeDetails(), true);
return;
} catch (NotLeaderException | CommandTargetOverloadedException e) {
LOG.debug("Skipping sending a delete command for replica {} to " +
"Datanode {}.", unhealthyReplica,
unhealthyReplica.getDatanodeDetails());
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
Expand Down Expand Up @@ -69,11 +70,13 @@
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.UNHEALTHY;
import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
Expand Down Expand Up @@ -471,7 +474,8 @@ public void testUnderRepSentToOverRepHandlerIfNoNewNodes()
Set<Pair<DatanodeDetails, SCMCommand<?>>> expectedDelete =
new HashSet<>();
expectedDelete.add(Pair.of(overRepReplica.getDatanodeDetails(),
createDeleteContainerCommand(container, overRepReplica)));
createDeleteContainerCommand(container,
overRepReplica.getReplicaIndex())));

Mockito.when(replicationManager.processOverReplicatedContainer(
underRep)).thenAnswer(invocationOnMock -> {
Expand All @@ -488,6 +492,120 @@ public void testUnderRepSentToOverRepHandlerIfNoNewNodes()
}
}

/**
* Tests that under replication handling tries to delete an UNHEALTHY
* replica if no target datanodes are found. It should delete only
* one UNHEALTHY replica so that the replica's host DN becomes available as a
* target for reconstruction/replication of a healthy replica.
*/
@Test
public void testUnhealthyNodeDeletedIfNoTargetsFound()
throws IOException {
PlacementPolicy noNodesPolicy = ReplicationTestUtil
.getNoNodesTestPlacementPolicy(nodeManager, conf);

ContainerReplica decomReplica =
ReplicationTestUtil.createContainerReplica(container.containerID(),
5, DECOMMISSIONING, CLOSED);
ContainerReplica maintReplica =
ReplicationTestUtil.createContainerReplica(container.containerID(),
5, ENTERING_MAINTENANCE, CLOSED);

List<ContainerReplica> replicasToAdd = new ArrayList<>();
replicasToAdd.add(null);
replicasToAdd.add(decomReplica);
replicasToAdd.add(maintReplica);

ECUnderReplicationHandler ecURH =
new ECUnderReplicationHandler(
noNodesPolicy, conf, replicationManager);
ContainerHealthResult.UnderReplicatedHealthResult underRep =
new ContainerHealthResult.UnderReplicatedHealthResult(container,
1, false, false, false);
ContainerReplica unhealthyReplica =
ReplicationTestUtil.createContainerReplica(container.containerID(),
4, IN_SERVICE, UNHEALTHY);

/*
The underRepHandler processes in stages. First missing indexes, then
decommission and then maintenance. If a stage cannot find new nodes and
there are no commands created yet, then we should either throw, or pass
control to the over rep handler if the container is also over
replicated, or try to delete an UNHEALTHY replica if one is present.
In this loop we first have the container under replicated with a missing
index, then with a decommissioning index, and finally with a maintenance
index. In all cases, initially there are no UNHEALTHY replicas, so it
should throw an exception. Then we add 2 UNHEALTHY replicas, so
it should return the command to delete one.
*/
for (ContainerReplica toAdd : replicasToAdd) {
Mockito.clearInvocations(replicationManager);
Set<ContainerReplica> existingReplicas = ReplicationTestUtil
.createReplicas(Pair.of(IN_SERVICE, 5),
Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
Pair.of(IN_SERVICE, 4));
if (toAdd != null) {
existingReplicas.add(toAdd);
}

// should throw an SCMException indicating no targets were found
Assert.assertThrows(SCMException.class,
() -> ecURH.processAndSendCommands(existingReplicas,
Collections.emptyList(), underRep, 2));
Mockito.verify(replicationManager, times(0))
.sendThrottledDeleteCommand(eq(container), anyInt(),
any(DatanodeDetails.class), anyBoolean());

/*
Now, for the same container, also add an UNHEALTHY replica. The handler
should catch the SCMException that says no targets were found and try
to handle it by deleting the UNHEALTHY replica.
*/
existingReplicas.add(unhealthyReplica);
existingReplicas.add(
ReplicationTestUtil.createContainerReplica(container.containerID(),
1, IN_SERVICE, UNHEALTHY));

/*
Mock such that when replication manager is called to send a delete
command, we add the command to commandsSet and later use it for
assertions.
*/
commandsSent.clear();
Mockito.doAnswer(invocation -> {
commandsSent.add(Pair.of(invocation.getArgument(2),
createDeleteContainerCommand(invocation.getArgument(0),
invocation.getArgument(1))));
return null;
})
.when(replicationManager)
.sendThrottledDeleteCommand(Mockito.any(ContainerInfo.class),
Mockito.anyInt(), Mockito.any(DatanodeDetails.class),
Mockito.eq(true));

assertThrows(SCMException.class,
() -> ecURH.processAndSendCommands(existingReplicas,
Collections.emptyList(), underRep, 2));
Mockito.verify(replicationManager, times(1))
.sendThrottledDeleteCommand(container,
unhealthyReplica.getReplicaIndex(),
unhealthyReplica.getDatanodeDetails(), true);
Assertions.assertEquals(1, commandsSent.size());
Pair<DatanodeDetails, SCMCommand<?>> command =
commandsSent.iterator().next();
Assertions.assertEquals(SCMCommandProto.Type.deleteContainerCommand,
command.getValue().getType());
DeleteContainerCommand deleteCommand =
(DeleteContainerCommand) command.getValue();
Assertions.assertEquals(unhealthyReplica.getDatanodeDetails(),
command.getKey());
Assertions.assertEquals(container.containerID(),
ContainerID.valueOf(deleteCommand.getContainerID()));
Assertions.assertEquals(unhealthyReplica.getReplicaIndex(),
deleteCommand.getReplicaIndex());
}
}

@Test
public void testPartialReconstructionIfNotEnoughNodes() {
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
Expand Down Expand Up @@ -725,7 +843,8 @@ public void testUnderRepDueToDecomAndOverRep()

Set<Pair<DatanodeDetails, SCMCommand<?>>> expectedDelete = new HashSet<>();
expectedDelete.add(Pair.of(overRepReplica.getDatanodeDetails(),
createDeleteContainerCommand(container, overRepReplica)));
createDeleteContainerCommand(container,
overRepReplica.getReplicaIndex())));

Mockito.when(replicationManager.processOverReplicatedContainer(
underRep)).thenAnswer(invocationOnMock -> {
Expand Down Expand Up @@ -966,10 +1085,10 @@ public void testMaintenanceIndexCopiedWhenContainerUnRecoverable()
}

private DeleteContainerCommand createDeleteContainerCommand(
ContainerInfo containerInfo, ContainerReplica replica) {
ContainerInfo containerInfo, int replicaIndex) {
DeleteContainerCommand deleteCommand =
new DeleteContainerCommand(containerInfo.getContainerID(), true);
deleteCommand.setReplicaIndex(replica.getReplicaIndex());
deleteCommand.setReplicaIndex(replicaIndex);
return deleteCommand;
}
}
Loading