Skip to content

Commit

Permalink
HDDS-1574 Average out pipeline allocation on datanodes and add metrcs…
Browse files Browse the repository at this point in the history
…/test (#291)
  • Loading branch information
timmylicheng committed Jan 7, 2020
1 parent fced9c1 commit acc9640
Show file tree
Hide file tree
Showing 14 changed files with 338 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public final class Pipeline {
private UUID leaderId;
// Timestamp for pipeline upon creation
private Long creationTimestamp;
// Only valid for Ratis THREE pipeline. No need persist.
private int nodeIdsHash;

/**
* The immutable properties of pipeline object is used in
Expand All @@ -73,6 +75,7 @@ private Pipeline(PipelineID id, ReplicationType type,
this.state = state;
this.nodeStatus = nodeStatus;
this.creationTimestamp = System.currentTimeMillis();
this.nodeIdsHash = 0;
}

/**
Expand Down Expand Up @@ -129,6 +132,14 @@ void setCreationTimestamp(Long creationTimestamp) {
this.creationTimestamp = creationTimestamp;
}

public int getNodeIdsHash() {
return nodeIdsHash;
}

void setNodeIdsHash(int nodeIdsHash) {
this.nodeIdsHash = nodeIdsHash;
}

/**
* Return the pipeline leader's UUID.
*
Expand Down Expand Up @@ -347,6 +358,7 @@ public static class Builder {
private List<DatanodeDetails> nodesInOrder = null;
private UUID leaderId = null;
private Long creationTimestamp = null;
private int nodeIdsHash = 0;

public Builder() {}

Expand All @@ -359,6 +371,7 @@ public Builder(Pipeline pipeline) {
this.nodesInOrder = pipeline.nodesInOrder.get();
this.leaderId = pipeline.getLeaderId();
this.creationTimestamp = pipeline.getCreationTimestamp();
this.nodeIdsHash = 0;
}

public Builder setId(PipelineID id1) {
Expand Down Expand Up @@ -397,6 +410,11 @@ public Builder setNodesInOrder(List<Integer> orders) {
return this;
}

public Builder setNodeIdsHash(int nodeIdsHash1) {
this.nodeIdsHash = nodeIdsHash1;
return this;
}

public Pipeline build() {
Preconditions.checkNotNull(id);
Preconditions.checkNotNull(type);
Expand All @@ -405,6 +423,7 @@ public Pipeline build() {
Preconditions.checkNotNull(nodeStatus);
Pipeline pipeline = new Pipeline(id, type, factor, state, nodeStatus);
pipeline.setLeaderId(leaderId);
pipeline.setNodeIdsHash(nodeIdsHash);
// overwrite with original creationTimestamp
if (creationTimestamp != null) {
pipeline.setCreationTimestamp(creationTimestamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ List<DatanodeDetails> filterViableNodes(
// filter nodes that meet the size and pipeline engagement criteria.
// Pipeline placement doesn't take node space left into account.
List<DatanodeDetails> healthyList = healthyNodes.stream()
.filter(d -> meetCriteria(d, nodesRequired)).limit(nodesRequired)
.filter(d -> meetCriteria(d, nodesRequired))
.collect(Collectors.toList());

if (healthyList.size() < nodesRequired) {
Expand Down Expand Up @@ -308,6 +308,7 @@ public DatanodeDetails chooseNode(
}
// the pick is decided and it should be removed from candidates.
healthyNodes.remove(datanodeDetails);

return datanodeDetails;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@ Pipeline openPipeline(PipelineID pipelineId) throws IOException {
pipeline = pipelineStateMap
.updatePipelineState(pipelineId, PipelineState.OPEN);
}
// Amend nodeIdsHash if needed.
if (pipeline.getType() == ReplicationType.RATIS &&
pipeline.getFactor() == ReplicationFactor.THREE &&
pipeline.getNodeIdsHash() == 0) {
pipeline.setNodeIdsHash(RatisPipelineUtils
.encodeNodeIdsOfFactorThreePipeline(pipeline.getNodes()));
}
return pipeline;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ public Pipeline create(ReplicationFactor factor) throws IOException {
}

List<DatanodeDetails> dns;
int nodeIdHash = 0;

switch(factor) {
case ONE:
Expand All @@ -165,6 +166,7 @@ public Pipeline create(ReplicationFactor factor) throws IOException {
case THREE:
dns = placementPolicy.chooseDatanodes(null,
null, factor.getNumber(), 0);
nodeIdHash = RatisPipelineUtils.encodeNodeIdsOfFactorThreePipeline(dns);
break;
default:
throw new IllegalStateException("Unknown factor: " + factor.name());
Expand All @@ -176,6 +178,7 @@ public Pipeline create(ReplicationFactor factor) throws IOException {
.setType(ReplicationType.RATIS)
.setFactor(factor)
.setNodes(dns)
.setNodeIdsHash(nodeIdHash)
.build();

// Send command to datanodes to create pipeline
Expand All @@ -196,12 +199,17 @@ public Pipeline create(ReplicationFactor factor) throws IOException {
@Override
public Pipeline create(ReplicationFactor factor,
List<DatanodeDetails> nodes) {
int nodeIdHash = 0;
if (factor == ReplicationFactor.THREE) {
nodeIdHash = RatisPipelineUtils.encodeNodeIdsOfFactorThreePipeline(nodes);
}
return Pipeline.newBuilder()
.setId(PipelineID.randomId())
.setState(PipelineState.ALLOCATED)
.setType(ReplicationType.RATIS)
.setFactor(factor)
.setNodes(nodes)
.setNodeIdsHash(nodeIdHash)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
package org.apache.hadoop.hdds.scm.pipeline;

import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.ratis.RatisHelper;
Expand All @@ -35,7 +38,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Utility class for Ratis pipelines. Contains methods to create and destroy
* ratis pipelines.
Expand Down Expand Up @@ -100,4 +102,38 @@ static void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID,
true, p.getId());
}
}

static int encodeNodeIdsOfFactorThreePipeline(List<DatanodeDetails> nodes) {
if (nodes.size() != HddsProtos.ReplicationFactor.THREE.getNumber()) {
return 0;
}
return nodes.get(0).getUuid().hashCode() ^
nodes.get(1).getUuid().hashCode() ^
nodes.get(2).getUuid().hashCode();
}

/**
* Return first existed pipeline which share the same set of datanodes
* with the input pipeline.
* @param stateManager PipelineStateManager
* @param pipeline input pipeline
* @return first matched pipeline
*/
static Pipeline checkPipelineContainSameDatanodes(
PipelineStateManager stateManager, Pipeline pipeline) {
List<Pipeline> matchedPipelines = stateManager.getPipelines(
HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE)
.stream().filter(p -> !p.getId().equals(pipeline.getId()) &&
(// For all OPEN or ALLOCATED pipelines
p.getPipelineState() == Pipeline.PipelineState.OPEN ||
p.getPipelineState() == Pipeline.PipelineState.ALLOCATED) &&
p.getNodeIdsHash() == pipeline.getNodeIdsHash())
.collect(Collectors.toList());
if (matchedPipelines.size() == 0) {
return null;
} else {
return matchedPipelines.stream().findFirst().get();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,19 @@ public void setPipelineProvider(ReplicationType replicationType,
pipelineFactory.setProvider(replicationType, provider);
}

private int computeNodeIdHash(Pipeline pipeline) {
if (pipeline.getType() != ReplicationType.RATIS) {
return 0;
}

if (pipeline.getFactor() != ReplicationFactor.THREE) {
return 0;
}

return RatisPipelineUtils.
encodeNodeIdsOfFactorThreePipeline(pipeline.getNodes());
}

private void initializePipelineState() throws IOException {
if (pipelineStore.isEmpty()) {
LOG.info("No pipeline exists in current db");
Expand All @@ -143,6 +156,7 @@ private void initializePipelineState() throws IOException {
Pipeline pipeline = Pipeline.getFromProtobuf(pipelineBuilder.setState(
HddsProtos.PipelineState.PIPELINE_ALLOCATED).build());
Preconditions.checkNotNull(pipeline);
pipeline.setNodeIdsHash(computeNodeIdHash(pipeline));
stateManager.addPipeline(pipeline);
nodeManager.addPipeline(pipeline);
}
Expand All @@ -163,6 +177,18 @@ public synchronized Pipeline createPipeline(ReplicationType type,
metrics.incNumPipelineCreated();
metrics.createPerPipelineMetrics(pipeline);
}
Pipeline overlapPipeline = RatisPipelineUtils
.checkPipelineContainSameDatanodes(stateManager, pipeline);
if (overlapPipeline != null) {
metrics.incNumPipelineContainSameDatanodes();
//TODO remove until pipeline allocation is proved equally distributed.
LOG.info("Pipeline: " + pipeline.getId().toString() +
" contains same datanodes as previous pipeline: " +
overlapPipeline.getId().toString() + " nodeIds: " +
pipeline.getNodes().get(0).getUuid().toString() +
", " + pipeline.getNodes().get(1).getUuid().toString() +
", " + pipeline.getNodes().get(2).getUuid().toString());
}
return pipeline;
} catch (IOException ex) {
metrics.incNumPipelineCreationFailed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public final class SCMPipelineMetrics implements MetricsSource {
private @Metric MutableCounterLong numPipelineDestroyFailed;
private @Metric MutableCounterLong numPipelineReportProcessed;
private @Metric MutableCounterLong numPipelineReportProcessingFailed;
private @Metric MutableCounterLong numPipelineContainSameDatanodes;
private Map<PipelineID, MutableCounterLong> numBlocksAllocated;

/** Private constructor. */
Expand Down Expand Up @@ -92,6 +93,7 @@ public void getMetrics(MetricsCollector collector, boolean all) {
numPipelineDestroyFailed.snapshot(recordBuilder, true);
numPipelineReportProcessed.snapshot(recordBuilder, true);
numPipelineReportProcessingFailed.snapshot(recordBuilder, true);
numPipelineContainSameDatanodes.snapshot(recordBuilder, true);
numBlocksAllocated
.forEach((pid, metric) -> metric.snapshot(recordBuilder, true));
}
Expand Down Expand Up @@ -176,4 +178,11 @@ void incNumPipelineReportProcessed() {
void incNumPipelineReportProcessingFailed() {
numPipelineReportProcessingFailed.incr();
}

/**
* Increments number of pipeline who contains same set of datanodes.
*/
void incNumPipelineContainSameDatanodes() {
numPipelineContainSameDatanodes.incr();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.hadoop.hdds.scm.container;

import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.net.NetConstants;
Expand Down Expand Up @@ -93,16 +93,17 @@ public class MockNodeManager implements NodeManager {
private NetworkTopology clusterMap;
private ConcurrentMap<String, Set<String>> dnsToUuidMap;

public MockNodeManager(boolean initializeFakeNodes, int nodeCount) {
public MockNodeManager(NetworkTopologyImpl clusterMap,
boolean initializeFakeNodes, int nodeCount) {
this.healthyNodes = new LinkedList<>();
this.staleNodes = new LinkedList<>();
this.deadNodes = new LinkedList<>();
this.nodeMetricMap = new HashMap<>();
this.node2PipelineMap = new Node2PipelineMap();
this.node2ContainerMap = new Node2ContainerMap();
this.dnsToUuidMap = new ConcurrentHashMap<>();
aggregateStat = new SCMNodeStat();
clusterMap = new NetworkTopologyImpl(new Configuration());
this.aggregateStat = new SCMNodeStat();
this.clusterMap = clusterMap;
if (initializeFakeNodes) {
for (int x = 0; x < nodeCount; x++) {
DatanodeDetails dd = MockDatanodeDetails.randomDatanodeDetails();
Expand All @@ -114,6 +115,11 @@ public MockNodeManager(boolean initializeFakeNodes, int nodeCount) {
this.commandMap = new HashMap<>();
}

public MockNodeManager(boolean initializeFakeNodes, int nodeCount) {
this(new NetworkTopologyImpl(new OzoneConfiguration()),
initializeFakeNodes, nodeCount);
}

/**
* Invoked from ctor to create some node Metrics.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
Expand Down Expand Up @@ -67,13 +68,14 @@ public static void setUp() throws Exception {
.getTestDir(TestCloseContainerEventHandler.class.getSimpleName());
configuration
.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
configuration.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_NUMBER_LIMIT, 16);
nodeManager = new MockNodeManager(true, 10);
eventQueue = new EventQueue();
pipelineManager =
new SCMPipelineManager(configuration, nodeManager, eventQueue);
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), configuration);
pipelineManager.getStateManager(), configuration, eventQueue);
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
mockRatisProvider);
containerManager = new SCMContainerManager(configuration, pipelineManager);
Expand All @@ -91,6 +93,9 @@ public static void tearDown() throws Exception {
if (containerManager != null) {
containerManager.close();
}
if (pipelineManager != null) {
pipelineManager.close();
}
FileUtil.fullyDelete(testDir);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public Pipeline create(HddsProtos.ReplicationFactor factor)
.setType(initialPipeline.getType())
.setFactor(factor)
.setNodes(initialPipeline.getNodes())
.setNodeIdsHash(RatisPipelineUtils
.encodeNodeIdsOfFactorThreePipeline(initialPipeline.getNodes()))
.build();
}
}
Expand All @@ -91,6 +93,8 @@ public Pipeline create(HddsProtos.ReplicationFactor factor,
.setType(HddsProtos.ReplicationType.RATIS)
.setFactor(factor)
.setNodes(nodes)
.setNodeIdsHash(RatisPipelineUtils
.encodeNodeIdsOfFactorThreePipeline(nodes))
.build();
}
}
Loading

0 comments on commit acc9640

Please sign in to comment.