Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-3179 Pipeline placement based on Topology does not have fallback #678

Merged
merged 3 commits into from
Mar 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private DatanodeDetails(String uuid, String ipAddress, String hostName,
this.certSerialId = certSerialId;
}

protected DatanodeDetails(DatanodeDetails datanodeDetails) {
public DatanodeDetails(DatanodeDetails datanodeDetails) {
super(datanodeDetails.getHostName(), datanodeDetails.getNetworkLocation(),
datanodeDetails.getCost());
this.uuid = datanodeDetails.uuid;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,8 @@ boolean meetCriteria(DatanodeDetails datanodeDetails, int nodesRequired) {
try {
pipeline = stateManager.getPipeline(pid);
} catch (PipelineNotFoundException e) {
LOG.error("Pipeline not found in pipeline state manager during" +
" pipeline creation. PipelineID: " + pid +
" exception: " + e.getMessage());
LOG.debug("Pipeline not found in pipeline state manager during" +
" pipeline creation. PipelineID: {}", pid, e);
continue;
}
if (pipeline != null &&
Expand Down Expand Up @@ -282,26 +281,32 @@ public List<DatanodeDetails> getResultSet(
LOG.debug("Second node chosen: {}", nextNode);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Pipeline Placement: Unable to find 2nd node on different " +
"rack based on rack awareness.");
}
LOG.debug("Pipeline Placement: Unable to find 2nd node on different " +
"rack based on rack awareness. anchor: {}", anchor);
}

// Then choose nodes close to anchor based on network topology
int nodesToFind = nodesRequired - results.size();
for (int x = 0; x < nodesToFind; x++) {
// Pick remaining nodes based on the existence of rack awareness.
DatanodeDetails pick = rackAwareness
? chooseNodeFromNetworkTopology(
nodeManager.getClusterNetworkTopologyMap(), anchor, exclude)
: fallBackPickNodes(healthyNodes, exclude);
DatanodeDetails pick = null;
if (rackAwareness) {
pick = chooseNodeFromNetworkTopology(
nodeManager.getClusterNetworkTopologyMap(), anchor, exclude);
}
// fall back protection
if (pick == null) {
pick = fallBackPickNodes(healthyNodes, exclude);
if (rackAwareness) {
LOG.debug("Failed to choose node based on topology. Fallback " +
"picks node as: {}", pick);
}
}

if (pick != null) {
results.add(pick);
exclude.add(pick);
if (LOG.isDebugEnabled()) {
LOG.debug("Remaining node chosen: {}", pick);
}
LOG.debug("Remaining node chosen: {}", pick);
}
}

Expand Down Expand Up @@ -414,6 +419,10 @@ protected DatanodeDetails chooseNodeFromNetworkTopology(
Node pick = networkTopology.chooseRandom(
anchor.getNetworkLocation(), excluded);
DatanodeDetails pickedNode = (DatanodeDetails) pick;
if (pickedNode == null) {
LOG.debug("Pick node is null, excluded nodes {}, anchor {}.",
excluded, anchor);
}
timmylicheng marked this conversation as resolved.
Show resolved Hide resolved
return pickedNode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public class MockNodeManager implements NodeManager {
private ConcurrentMap<String, Set<String>> dnsToUuidMap;

public MockNodeManager(NetworkTopologyImpl clusterMap,
List<DatanodeDetails> nodes,
boolean initializeFakeNodes, int nodeCount) {
this.healthyNodes = new LinkedList<>();
this.staleNodes = new LinkedList<>();
Expand All @@ -104,6 +105,13 @@ public MockNodeManager(NetworkTopologyImpl clusterMap,
this.dnsToUuidMap = new ConcurrentHashMap<>();
this.aggregateStat = new SCMNodeStat();
this.clusterMap = clusterMap;
if (!nodes.isEmpty()) {
for (int x = 0; x < nodes.size(); x++) {
DatanodeDetails node = nodes.get(x);
register(node, null, null);
populateNodeMetric(node, x);
}
}
if (initializeFakeNodes) {
for (int x = 0; x < nodeCount; x++) {
DatanodeDetails dd = MockDatanodeDetails.randomDatanodeDetails();
Expand All @@ -116,7 +124,7 @@ public MockNodeManager(NetworkTopologyImpl clusterMap,
}

public MockNodeManager(boolean initializeFakeNodes, int nodeCount) {
this(new NetworkTopologyImpl(new OzoneConfiguration()),
this(new NetworkTopologyImpl(new OzoneConfiguration()), new ArrayList<>(),
initializeFakeNodes, nodeCount);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
timmylicheng marked this conversation as resolved.
Show resolved Hide resolved
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
Expand All @@ -35,6 +36,9 @@
import java.util.stream.Collectors;

import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT;
import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;

/**
* Test for PipelinePlacementPolicy.
Expand All @@ -43,36 +47,98 @@ public class TestPipelinePlacementPolicy {
private MockNodeManager nodeManager;
private OzoneConfiguration conf;
private PipelinePlacementPolicy placementPolicy;
private NetworkTopologyImpl cluster;
private static final int PIPELINE_PLACEMENT_MAX_NODES_COUNT = 10;

private List<DatanodeDetails> nodesWithOutRackAwareness = new ArrayList<>();
private List<DatanodeDetails> nodesWithRackAwareness = new ArrayList<>();

@Before
public void init() throws Exception {
nodeManager = new MockNodeManager(true,
PIPELINE_PLACEMENT_MAX_NODES_COUNT);
cluster = initTopology();
// start with nodes with rack awareness.
nodeManager = new MockNodeManager(cluster, getNodesWithRackAwareness(),
false, PIPELINE_PLACEMENT_MAX_NODES_COUNT);
conf = new OzoneConfiguration();
conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, 5);
placementPolicy = new PipelinePlacementPolicy(
nodeManager, new PipelineStateManager(), conf);
}

private NetworkTopologyImpl initTopology() {
NodeSchema[] schemas = new NodeSchema[]
{ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA};
NodeSchemaManager.getInstance().init(schemas, true);
NetworkTopologyImpl topology =
new NetworkTopologyImpl(NodeSchemaManager.getInstance());
return topology;
}

private List<DatanodeDetails> getNodesWithRackAwareness() {
List<DatanodeDetails> datanodes = new ArrayList<>();
for (Node node : NODES) {
DatanodeDetails datanode = overwriteLocationInNode(
getNodesWithoutRackAwareness(), node);
nodesWithRackAwareness.add(datanode);
datanodes.add(datanode);
}
return datanodes;
}

private DatanodeDetails getNodesWithoutRackAwareness() {
DatanodeDetails node = MockDatanodeDetails.randomDatanodeDetails();
nodesWithOutRackAwareness.add(node);
return node;
}

@Test
public void testChooseNodeBasedOnNetworkTopology() {
List<DatanodeDetails> healthyNodes =
nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
DatanodeDetails anchor = placementPolicy.chooseNode(healthyNodes);
public void testChooseNodeBasedOnNetworkTopology() throws SCMException {
DatanodeDetails anchor = placementPolicy.chooseNode(nodesWithRackAwareness);
// anchor should be removed from healthyNodes after being chosen.
Assert.assertFalse(healthyNodes.contains(anchor));
Assert.assertFalse(nodesWithRackAwareness.contains(anchor));

List<DatanodeDetails> excludedNodes =
new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT);
excludedNodes.add(anchor);
DatanodeDetails nextNode = placementPolicy.chooseNodeFromNetworkTopology(
nodeManager.getClusterNetworkTopologyMap(), anchor, excludedNodes);
Assert.assertFalse(excludedNodes.contains(nextNode));
// nextNode should not be the same as anchor.
// next node should not be the same as anchor.
Assert.assertTrue(anchor.getUuid() != nextNode.getUuid());
// next node should be on the same rack based on topology.
Assert.assertEquals(anchor.getNetworkLocation(),
nextNode.getNetworkLocation());
}

@Test
public void testChooseNodeWithSingleNodeRack() throws SCMException {
// There is only one node on 3 racks altogether.
List<DatanodeDetails> datanodes = new ArrayList<>();
for (Node node : SINGLE_NODE_RACK) {
DatanodeDetails datanode = overwriteLocationInNode(
MockDatanodeDetails.randomDatanodeDetails(), node);
datanodes.add(datanode);
}
MockNodeManager localNodeManager = new MockNodeManager(initTopology(),
datanodes, false, datanodes.size());
PipelinePlacementPolicy localPlacementPolicy = new PipelinePlacementPolicy(
localNodeManager, new PipelineStateManager(), conf);
int nodesRequired = HddsProtos.ReplicationFactor.THREE.getNumber();
List<DatanodeDetails> results = localPlacementPolicy.chooseDatanodes(
new ArrayList<>(datanodes.size()),
new ArrayList<>(datanodes.size()),
nodesRequired, 0);

Assert.assertEquals(nodesRequired, results.size());
// 3 nodes should be on different racks.
Assert.assertNotEquals(results.get(0).getNetworkLocation(),
results.get(1).getNetworkLocation());
Assert.assertNotEquals(results.get(0).getNetworkLocation(),
results.get(2).getNetworkLocation());
Assert.assertNotEquals(results.get(1).getNetworkLocation(),
results.get(2).getNetworkLocation());
}

@Test
public void testChooseNodeBasedOnRackAwareness() {
List<DatanodeDetails> healthyNodes = overWriteLocationInNodes(
Expand All @@ -84,8 +150,9 @@ public void testChooseNodeBasedOnRackAwareness() {
healthyNodes, new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT),
topologyWithDifRacks, anchor);
Assert.assertNotNull(nextNode);
Assert.assertFalse(anchor.getNetworkLocation().equals(
nextNode.getNetworkLocation()));
// next node should be on a different rack.
Assert.assertNotEquals(anchor.getNetworkLocation(),
nextNode.getNetworkLocation());
}

@Test
Expand Down Expand Up @@ -115,25 +182,25 @@ public void testFallBackPickNodes() {

@Test
public void testRackAwarenessNotEnabledWithFallBack() throws SCMException{
List<DatanodeDetails> healthyNodes =
nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
DatanodeDetails anchor = placementPolicy.chooseNode(healthyNodes);
DatanodeDetails randomNode = placementPolicy.chooseNode(healthyNodes);
DatanodeDetails anchor = placementPolicy
.chooseNode(nodesWithOutRackAwareness);
DatanodeDetails randomNode = placementPolicy
.chooseNode(nodesWithOutRackAwareness);
// rack awareness is not enabled.
Assert.assertTrue(anchor.getNetworkLocation().equals(
randomNode.getNetworkLocation()));

NetworkTopology topology = new NetworkTopologyImpl(new Configuration());
DatanodeDetails nextNode = placementPolicy.chooseNodeBasedOnRackAwareness(
healthyNodes, new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT),
topology, anchor);
nodesWithOutRackAwareness, new ArrayList<>(
PIPELINE_PLACEMENT_MAX_NODES_COUNT), topology, anchor);
// RackAwareness should not be able to choose any node.
Assert.assertNull(nextNode);

// PlacementPolicy should still be able to pick a set of 3 nodes.
int numOfNodes = HddsProtos.ReplicationFactor.THREE.getNumber();
List<DatanodeDetails> results = placementPolicy
.getResultSet(numOfNodes, healthyNodes);
.getResultSet(numOfNodes, nodesWithOutRackAwareness);

Assert.assertEquals(numOfNodes, results.size());
// All nodes are on same rack.
Expand All @@ -146,14 +213,20 @@ public void testRackAwarenessNotEnabledWithFallBack() throws SCMException{
private final static Node[] NODES = new NodeImpl[] {
new NodeImpl("h1", "/r1", NetConstants.NODE_COST_DEFAULT),
new NodeImpl("h2", "/r1", NetConstants.NODE_COST_DEFAULT),
new NodeImpl("h3", "/r1", NetConstants.NODE_COST_DEFAULT),
new NodeImpl("h4", "/r1", NetConstants.NODE_COST_DEFAULT),
new NodeImpl("h5", "/r2", NetConstants.NODE_COST_DEFAULT),
new NodeImpl("h6", "/r2", NetConstants.NODE_COST_DEFAULT),
new NodeImpl("h7", "/r2", NetConstants.NODE_COST_DEFAULT),
new NodeImpl("h8", "/r2", NetConstants.NODE_COST_DEFAULT),
new NodeImpl("h3", "/r2", NetConstants.NODE_COST_DEFAULT),
new NodeImpl("h4", "/r2", NetConstants.NODE_COST_DEFAULT),
new NodeImpl("h5", "/r3", NetConstants.NODE_COST_DEFAULT),
new NodeImpl("h6", "/r3", NetConstants.NODE_COST_DEFAULT),
new NodeImpl("h7", "/r4", NetConstants.NODE_COST_DEFAULT),
new NodeImpl("h8", "/r4", NetConstants.NODE_COST_DEFAULT),
};

// 3 racks with single node.
private final static Node[] SINGLE_NODE_RACK = new NodeImpl[] {
new NodeImpl("h1", "/r1", NetConstants.NODE_COST_DEFAULT),
new NodeImpl("h2", "/r2", NetConstants.NODE_COST_DEFAULT),
new NodeImpl("h3", "/r3", NetConstants.NODE_COST_DEFAULT)
};

private NetworkTopology createNetworkTopologyOnDifRacks() {
NetworkTopology topology = new NetworkTopologyImpl(new Configuration());
Expand All @@ -163,20 +236,26 @@ private NetworkTopology createNetworkTopologyOnDifRacks() {
return topology;
}

private DatanodeDetails overwriteLocationInNode(
DatanodeDetails datanode, Node node) {
DatanodeDetails result = DatanodeDetails.newBuilder()
.setUuid(datanode.getUuidString())
.setHostName(datanode.getHostName())
.setIpAddress(datanode.getIpAddress())
.addPort(datanode.getPort(DatanodeDetails.Port.Name.STANDALONE))
.addPort(datanode.getPort(DatanodeDetails.Port.Name.RATIS))
.addPort(datanode.getPort(DatanodeDetails.Port.Name.REST))
.setNetworkLocation(node.getNetworkLocation()).build();
return result;
}

private List<DatanodeDetails> overWriteLocationInNodes(
List<DatanodeDetails> datanodes) {
List<DatanodeDetails> results = new ArrayList<>(datanodes.size());
for (int i = 0; i < datanodes.size(); i++) {
DatanodeDetails datanode = datanodes.get(i);
DatanodeDetails result = DatanodeDetails.newBuilder()
.setUuid(datanode.getUuidString())
.setHostName(datanode.getHostName())
.setIpAddress(datanode.getIpAddress())
.addPort(datanode.getPort(DatanodeDetails.Port.Name.STANDALONE))
.addPort(datanode.getPort(DatanodeDetails.Port.Name.RATIS))
.addPort(datanode.getPort(DatanodeDetails.Port.Name.REST))
.setNetworkLocation(NODES[i].getNetworkLocation()).build();
results.add(result);
DatanodeDetails datanode = overwriteLocationInNode(
datanodes.get(i), NODES[i]);
results.add(datanode);
}
return results;
}
Expand Down