Skip to content

Commit

Permalink
HDDS-8158. Replication Manager: Make all handlers send commands immed…
Browse files Browse the repository at this point in the history
…iately instead of returning commands (#4399)
  • Loading branch information
sodonnel committed Mar 16, 2023
1 parent dc03783 commit f093f71
Show file tree
Hide file tree
Showing 21 changed files with 438 additions and 420 deletions.
Expand Up @@ -17,30 +17,25 @@
*/
package org.apache.hadoop.hdds.scm.container.replication;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static java.util.Collections.emptySet;

/**
* Handles the EC Over replication processing and forming the respective SCM
* commands.
Expand All @@ -49,12 +44,12 @@ public class ECOverReplicationHandler extends AbstractOverReplicationHandler {
public static final Logger LOG =
LoggerFactory.getLogger(ECOverReplicationHandler.class);

private final NodeManager nodeManager;
private final ReplicationManager replicationManager;

public ECOverReplicationHandler(PlacementPolicy placementPolicy,
NodeManager nodeManager) {
ReplicationManager replicationManager) {
super(placementPolicy);
this.nodeManager = nodeManager;
this.replicationManager = replicationManager;

}

Expand All @@ -67,13 +62,13 @@ public ECOverReplicationHandler(PlacementPolicy placementPolicy,
* @param result - Health check result.
* @param remainingMaintenanceRedundancy - represents that how many nodes go
* into maintenance.
* @return Returns the key value pair of destination dn where the command gets
* executed and the command itself.
* @return The number of commands send.
*/
@Override
public Set<Pair<DatanodeDetails, SCMCommand<?>>> processAndCreateCommands(
public int processAndSendCommands(
Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps,
ContainerHealthResult result, int remainingMaintenanceRedundancy) {
ContainerHealthResult result, int remainingMaintenanceRedundancy)
throws NotLeaderException {
ContainerInfo container = result.getContainerInfo();

// We are going to check for over replication, so we should filter out any
Expand All @@ -90,10 +85,14 @@ public Set<Pair<DatanodeDetails, SCMCommand<?>>> processAndCreateCommands(
// second lookup of the NodeStatus
Set<ContainerReplica> healthyReplicas = replicas.stream()
.filter(r -> {
NodeStatus ns = ReplicationManager.getNodeStatus(
r.getDatanodeDetails(), nodeManager);
return ns.isHealthy() && ns.getOperationalState() ==
HddsProtos.NodeOperationalState.IN_SERVICE;
try {
NodeStatus ns = replicationManager.getNodeStatus(
r.getDatanodeDetails());
return ns.isHealthy() && ns.getOperationalState() ==
HddsProtos.NodeOperationalState.IN_SERVICE;
} catch (NodeNotFoundException e) {
return false;
}
})
.collect(Collectors.toSet());

Expand All @@ -104,13 +103,13 @@ public Set<Pair<DatanodeDetails, SCMCommand<?>>> processAndCreateCommands(
LOG.info("The container {} state changed and it is no longer over"
+ " replication. Replica count: {}, healthy replica count: {}",
container.getContainerID(), replicas.size(), healthyReplicas.size());
return emptySet();
return 0;
}

if (!replicaCount.isOverReplicated(true)) {
LOG.info("The container {} with replicas {} will be corrected " +
"by the pending delete", container.getContainerID(), replicas);
return emptySet();
return 0;
}

List<Integer> overReplicatedIndexes =
Expand All @@ -120,7 +119,7 @@ public Set<Pair<DatanodeDetails, SCMCommand<?>>> processAndCreateCommands(
LOG.warn("The container {} with replicas {} was found over replicated "
+ "by EcContainerReplicaCount, but there are no over replicated "
+ "indexes returned", container.getContainerID(), replicas);
return emptySet();
return 0;
}

final List<DatanodeDetails> deletionInFlight = new ArrayList<>();
Expand All @@ -143,10 +142,10 @@ public Set<Pair<DatanodeDetails, SCMCommand<?>>> processAndCreateCommands(
LOG.warn("The container {} is over replicated, but no replicas were "
+ "selected to remove by the placement policy. Replicas: {}",
container, replicas);
return emptySet();
return 0;
}

final Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = new HashSet<>();
int commandsSent = 0;
// As a sanity check, sum up the current counts of each replica index. When
// processing replicasToRemove, ensure that removing the replica would not
// drop the count of that index to zero.
Expand All @@ -164,16 +163,15 @@ public Set<Pair<DatanodeDetails, SCMCommand<?>>> processAndCreateCommands(
continue;
}
replicaIndexCounts.put(r.getReplicaIndex(), currentCount - 1);
DeleteContainerCommand deleteCommand =
new DeleteContainerCommand(container.getContainerID(), true);
deleteCommand.setReplicaIndex(r.getReplicaIndex());
commands.add(Pair.of(r.getDatanodeDetails(), deleteCommand));
replicationManager.sendDeleteCommand(container, r.getReplicaIndex(),
r.getDatanodeDetails(), true);
commandsSent++;
}

if (commands.size() == 0) {
if (commandsSent == 0) {
LOG.warn("With the current state of available replicas {}, no" +
" commands were created to remove excess replicas.", replicas);
}
return commands;
return commandsSent;
}
}

0 comments on commit f093f71

Please sign in to comment.