Skip to content

Commit

Permalink
HDDS-8334. ReplicationManager: Add nodes to exclude list if they are …
Browse files Browse the repository at this point in the history
…overloaded (#4510)
  • Loading branch information
sodonnel committed Apr 1, 2023
1 parent 7b570df commit 3ecd87c
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
Expand Down Expand Up @@ -147,6 +148,16 @@ public class ReplicationManager implements SCMService {
*/
private LegacyReplicationManager legacyReplicationManager;

/**
* Set of nodes which have been excluded for replication commands due to the
* number of commands queued on a datanode. This can be used when generating
* reconstruction commands to avoid nodes which are already overloaded. When
* the datanode heartbeat is received, the node is removed from this set if
* the command count has dropped below the limit.
*/
private final Map<DatanodeDetails, Integer> excludedNodes =
new ConcurrentHashMap<>();

/**
* SCMService related variables.
* After leaving safe mode, replicationMonitor needs to wait for a while
Expand Down Expand Up @@ -526,13 +537,13 @@ public void sendThrottledReplicationCommand(ContainerInfo containerInfo,
"available for replication of container " + containerID + " to " +
target);
}
// Put the least loaded source first
sourceWithCmds.sort(Comparator.comparingInt(Pair::getLeft));
DatanodeDetails source = selectAndOptionallyExcludeDatanode(
1, sourceWithCmds);

ReplicateContainerCommand cmd =
ReplicateContainerCommand.toTarget(containerID, target);
cmd.setReplicaIndex(replicaIndex);
sendDatanodeCommand(cmd, containerInfo, sourceWithCmds.get(0).getRight());
sendDatanodeCommand(cmd, containerInfo, source);
}

public void sendThrottledReconstructionCommand(ContainerInfo containerInfo,
Expand All @@ -545,10 +556,24 @@ public void sendThrottledReconstructionCommand(ContainerInfo containerInfo,
throw new CommandTargetOverloadedException("No target with capacity " +
"available for reconstruction of " + containerInfo.getContainerID());
}
// Put the least loaded target first
targetWithCmds.sort(Comparator.comparingInt(Pair::getLeft));
sendDatanodeCommand(command, containerInfo,
targetWithCmds.get(0).getRight());
DatanodeDetails target = selectAndOptionallyExcludeDatanode(
reconstructionCommandWeight, targetWithCmds);
sendDatanodeCommand(command, containerInfo, target);
}

private DatanodeDetails selectAndOptionallyExcludeDatanode(
int additionalCmdCount, List<Pair<Integer, DatanodeDetails>> datanodes) {
if (datanodes.isEmpty()) {
return null;
}
// Put the least loaded datanode first
datanodes.sort(Comparator.comparingInt(Pair::getLeft));
DatanodeDetails datanode = datanodes.get(0).getRight();
int currentCount = datanodes.get(0).getLeft();
if (currentCount + additionalCmdCount >= datanodeReplicationLimit) {
addExcludedNode(datanode);
}
return datanode;
}

/**
Expand All @@ -567,18 +592,12 @@ public void sendThrottledReconstructionCommand(ContainerInfo containerInfo,
= new ArrayList<>();
for (DatanodeDetails dn : datanodes) {
try {
Map<Type, Integer> counts = nodeManager.getTotalDatanodeCommandCounts(
dn, Type.replicateContainerCommand,
Type.reconstructECContainersCommand);
int replicateCount = counts.get(Type.replicateContainerCommand);
int reconstructCount = counts.get(Type.reconstructECContainersCommand);
int totalCount = replicateCount
+ reconstructCount * reconstructionCommandWeight;
int totalCount = getDatanodeQueuedReplicationCount(dn);
if (totalCount >= datanodeReplicationLimit) {
LOG.debug("Datanode {} has reached the maximum number of queued " +
"commands, replication: {}, reconstruction: {} * {})",
dn, replicateCount, reconstructCount,
reconstructionCommandWeight);
"commands, replication + reconstruction * {}: {})",
dn, reconstructionCommandWeight, totalCount);
addExcludedNode(dn);
continue;
}
datanodeWithCommandCount.add(Pair.of(totalCount, dn));
Expand All @@ -590,6 +609,16 @@ public void sendThrottledReconstructionCommand(ContainerInfo containerInfo,
return datanodeWithCommandCount;
}

private int getDatanodeQueuedReplicationCount(DatanodeDetails datanode)
throws NodeNotFoundException {
Map<Type, Integer> counts = nodeManager.getTotalDatanodeCommandCounts(
datanode, Type.replicateContainerCommand,
Type.reconstructECContainersCommand);
int replicateCount = counts.get(Type.replicateContainerCommand);
int reconstructCount = counts.get(Type.reconstructECContainersCommand);
return replicateCount + reconstructCount * reconstructionCommandWeight;
}

/**
* Send a push replication command to the given source datanode, instructing
* it to copy the given container to the target. The command is sent as a low
Expand Down Expand Up @@ -804,15 +833,40 @@ public long getScmTerm() throws NotLeaderException {
* Notify ReplicationManager that the command counts on a datanode have been
* updated via a heartbeat received. This will allow RM to consider the node
* for container operations if it was previously excluded due to load.
* @param datanodeDetails The datanode for which the commands have been
* updated.
* @param datanode The datanode for which the commands have been updated.
*/
public void datanodeCommandCountUpdated(DatanodeDetails datanodeDetails) {
// For now this is a NOOP, as the plan is to use this notification in a
// future change to limit the number of commands scheduled against a DN by
// RM.
public void datanodeCommandCountUpdated(DatanodeDetails datanode) {
LOG.debug("Received a notification that the DN command count " +
"has been updated for {}", datanodeDetails);
"has been updated for {}", datanode);
// If there is an existing mapping, we may need to remove it
excludedNodes.computeIfPresent(datanode, (k, v) -> {
try {
if (getDatanodeQueuedReplicationCount(datanode)
< datanodeReplicationLimit) {
// Returning null removes the entry from the map
return null;
} else {
return 1;
}
} catch (NodeNotFoundException e) {
LOG.warn("Unable to find datanode {} in nodeManager. " +
"Should not happen.", datanode);
return null;
}
});
}

/**
* Returns the list of datanodes that are currently excluded from being
* targets for container replication due to queued commands.
* @return Set of excluded DatanodeDetails.
*/
public Set<DatanodeDetails> getExcludedNodes() {
return excludedNodes.keySet();
}

private void addExcludedNode(DatanodeDetails dn) {
excludedNodes.put(dn, 1);
}

protected void processContainer(ContainerInfo containerInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -957,7 +958,7 @@ public void testSendThrottledReconstructionCommand()
repConfig, 1, HddsProtos.LifeCycleState.CLOSED, 10, 20);

ReconstructECContainersCommand command = createReconstructionCommand(
container, new ArrayList<>(targetNodes.keySet()));
container, targetNodes.keySet().toArray(new DatanodeDetails[0]));

replicationManager.sendThrottledReconstructionCommand(container, command);

Expand All @@ -980,10 +981,6 @@ public void testSendThrottledReconstructionCommandThrowsWhenNoTargets()
int reconstructionCount = 2;
int replicationCount = limit - reconstructionCount * reconstructionWeight;

List<DatanodeDetails> targets = new ArrayList<>();
targets.add(MockDatanodeDetails.randomDatanodeDetails());
targets.add(MockDatanodeDetails.randomDatanodeDetails());

Mockito.when(nodeManager.getTotalDatanodeCommandCounts(any(),
eq(SCMCommandProto.Type.replicateContainerCommand),
eq(SCMCommandProto.Type.reconstructECContainersCommand)))
Expand All @@ -999,12 +996,13 @@ public void testSendThrottledReconstructionCommandThrowsWhenNoTargets()
ContainerInfo container = ReplicationTestUtil.createContainerInfo(
repConfig, 1, HddsProtos.LifeCycleState.CLOSED, 10, 20);
ReconstructECContainersCommand command = createReconstructionCommand(
container, targets);
container, MockDatanodeDetails.randomDatanodeDetails(),
MockDatanodeDetails.randomDatanodeDetails());
replicationManager.sendThrottledReconstructionCommand(container, command);
}

private ReconstructECContainersCommand createReconstructionCommand(
ContainerInfo containerInfo, List<DatanodeDetails> targets) {
ContainerInfo containerInfo, DatanodeDetails... targets) {
List<ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex> sources
= new ArrayList<>();
for (int i = 1; i <= 3; i++) {
Expand All @@ -1013,10 +1011,10 @@ private ReconstructECContainersCommand createReconstructionCommand(
MockDatanodeDetails.randomDatanodeDetails(), i));
}
byte[] missingIndexes = new byte[]{4, 5};

return new ReconstructECContainersCommand(
containerInfo.getContainerID(), sources,
targets, missingIndexes, (ECReplicationConfig) repConfig);
new ArrayList<>(Arrays.asList(targets)), missingIndexes,
(ECReplicationConfig) repConfig);
}

@Test
Expand Down Expand Up @@ -1050,6 +1048,76 @@ public void testCreateThrottledDeleteContainerCommandThrowsWhenNoSources()
replicationManager.sendThrottledDeleteCommand(container, 1, target, true);
}

@Test
public void testExcludedNodes() throws NodeNotFoundException,
NotLeaderException, CommandTargetOverloadedException {
int repLimit = replicationManager.getConfig().getDatanodeReplicationLimit();
int reconstructionWeight = replicationManager.getConfig()
.getReconstructionCommandWeight();
ContainerInfo container = ReplicationTestUtil.createContainerInfo(
repConfig, 1, HddsProtos.LifeCycleState.CLOSED, 10, 20);
Map<DatanodeDetails, Integer> commandCounts = new HashMap<>();
DatanodeDetails dn1 = MockDatanodeDetails.randomDatanodeDetails();
DatanodeDetails dn2 = MockDatanodeDetails.randomDatanodeDetails();
DatanodeDetails dn3 = MockDatanodeDetails.randomDatanodeDetails();

commandCounts.put(dn1, repLimit - 1);
commandCounts.put(dn2, repLimit - reconstructionWeight);
commandCounts.put(dn3, repLimit);

Mockito.when(nodeManager.getTotalDatanodeCommandCounts(any(),
eq(SCMCommandProto.Type.replicateContainerCommand),
eq(SCMCommandProto.Type.reconstructECContainersCommand)))
.thenAnswer(invocation -> {
DatanodeDetails dn = invocation.getArgument(0);
Map<SCMCommandProto.Type, Integer> counts = new HashMap<>();
counts.put(SCMCommandProto.Type.replicateContainerCommand,
commandCounts.get(dn));
counts.put(SCMCommandProto.Type.reconstructECContainersCommand, 0);
return counts;
});

replicationManager.sendThrottledReplicationCommand(container,
new ArrayList<>(commandCounts.keySet()),
MockDatanodeDetails.randomDatanodeDetails(), 1);

Set<DatanodeDetails> excluded = replicationManager.getExcludedNodes();
Assert.assertEquals(excluded.size(), 1);
// dn 3 was at the limit already, so should be added when filtering the
// nodes
Assert.assertTrue(excluded.contains(dn3));

// Trigger an update for dn3, but it should stay in the excluded list as its
// count is still at the limit.
replicationManager.datanodeCommandCountUpdated(dn3);
Assert.assertEquals(replicationManager.getExcludedNodes().size(), 1);

// now sent a reconstruction command. It should be sent to dn2, which is
// at the lowest count, but this command should push it to the limit and
// cause it to be excluded.
ReconstructECContainersCommand command = createReconstructionCommand(
container, dn1, dn2);
replicationManager.sendThrottledReconstructionCommand(container, command);
excluded = replicationManager.getExcludedNodes();
Assert.assertEquals(excluded.size(), 2);
// dn 3 was already in the excluded list
Assert.assertTrue(excluded.contains(dn3));
// dn 2 reached the limit from the reconstruction command
Assert.assertTrue(excluded.contains(dn2));

// Update received for DN2, it should be cleared from the excluded list.
replicationManager.datanodeCommandCountUpdated(dn2);
excluded = replicationManager.getExcludedNodes();
Assert.assertEquals(excluded.size(), 1);
Assert.assertFalse(excluded.contains(dn2));

// Finally, update received for DN1 - it is not excluded and should not
// be added or cause any problems by not being there
replicationManager.datanodeCommandCountUpdated(dn1);
Assert.assertEquals(excluded.size(), 1);
Assert.assertFalse(excluded.contains(dn1));
}

@SafeVarargs
private final Set<ContainerReplica> addReplicas(ContainerInfo container,
ContainerReplicaProto.State replicaState,
Expand Down

0 comments on commit 3ecd87c

Please sign in to comment.