Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-9343. Shift sortDatanodes logic to OM #5391

Merged
merged 29 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
df4c96d
HDDS-9389. Introduce new API and cache refresh for serving network to…
Dec 18, 2023
1d006b2
Fix findbugs
Dec 18, 2023
1780d45
Fix LOG.info typos
Dec 18, 2023
36127da
Modify clusterMap object based on the refetched clusterTree from SCM
Dec 21, 2023
c213528
Incorporate dfs.datanode.use.datanode.hostname while filtering matchi…
Dec 21, 2023
d63ca30
Merge remote-tracking branch 'origin' into HDDS-9343
Jan 9, 2024
eff32f1
Merge remote-tracking branch 'origin' into HDDS-9343
Jan 11, 2024
98f8560
Merge branch 'master' into HDDS-9389
Jan 17, 2024
8b33b7b
Merge remote-tracking branch 'origin' into HDDS-9389
Jan 23, 2024
b9741bc
import assertEquals
Jan 29, 2024
9dd266a
Fix TestOmContainerLocationCache
Jan 29, 2024
f623f9d
Fix TestBlockOutputStreamWithFailures
Jan 29, 2024
3d9c241
Address review comments
Feb 5, 2024
a1c6ff8
Include level, parent information, handle case of read from non-datan…
Feb 12, 2024
91e12c5
Refactor proto objects, address reviews
Feb 12, 2024
113d045
NetworkTopologyImpl#getDistanceCost: account for null objects, fix fa…
Feb 13, 2024
f4594a3
Change NodeType to NetworkNode
Feb 13, 2024
80d5a6e
Change NodeImpl to NodeTopology
Feb 13, 2024
fff4abe
Modify proto objects to optional and corresponding usages
Feb 13, 2024
140e3f9
Fix typo
Feb 13, 2024
5b9c577
Merge remote-tracking branch 'origin' into HDDS-9389
Feb 22, 2024
6bda4a7
Resolve node location for reads from non-datanodes hosts within OM
Feb 22, 2024
b62695f
Remove force refetching of clusterTree information
Feb 22, 2024
85e5c60
Merge remote-tracking branch 'origin' into HDDS-9389
Feb 26, 2024
c528224
Remove tests in TestKeyManagerUnit that reference ScmBlockLocationPro…
Feb 27, 2024
db5beab
Fix checkstyle
Feb 27, 2024
c3e04fd
Add tests for KeyManagerImpl#sortDatanodes
Feb 27, 2024
1d1d5d1
Fix findbugs
Feb 27, 2024
8fba5c9
Fix integration (ozone) test failure
Feb 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions hadoop-hdds/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,10 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>junit-platform-launcher</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.ozone</groupId>
<artifactId>hdds-interface-server</artifactId>
</dependency>
tanvipenumudy marked this conversation as resolved.
Show resolved Hide resolved
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ExtendedDatanodeDetailsProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
import org.apache.hadoop.hdds.scm.net.NetConstants;
import org.apache.hadoop.hdds.scm.net.Node;
import org.apache.hadoop.hdds.scm.net.NodeImpl;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -1020,4 +1022,18 @@ public String getBuildDate() {
public void setBuildDate(String date) {
this.buildDate = date;
}

public ScmBlockLocationProtocolProtos.NodeType toProtobuf(int clientVersion) {
ScmBlockLocationProtocolProtos.NodeType nodeType =
ScmBlockLocationProtocolProtos.NodeType.newBuilder()
.setDatanodeDetails(toProtoBuilder(clientVersion).build()).build();
return nodeType;
}

public static Node fromProtobuf(
ScmBlockLocationProtocolProtos.NodeType nodeType) {
return nodeType.hasDatanodeDetails()
? DatanodeDetails.getFromProtoBuf(nodeType.getDatanodeDetails())
: null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.Collection;
import java.util.List;

import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;

/**
* The interface defines an inner node in a network topology.
* An inner node represents network topology entities, such as data center,
Expand Down Expand Up @@ -89,4 +91,15 @@ N newInnerNode(String name, String location, InnerNode parent, int level,
*/
Node getLeaf(int leafIndex, List<String> excludedScopes,
Collection<Node> excludedNodes, int ancestorGen);

ScmBlockLocationProtocolProtos.NodeType toProtobuf(int clientVersion);

boolean equals(Object o);

int hashCode();

static InnerNode fromProtobuf(
ScmBlockLocationProtocolProtos.InnerNode innerNode) {
return InnerNodeImpl.fromProtobuf(innerNode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Map;

import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -47,10 +48,10 @@ public InnerNodeImpl newInnerNode(String name, String location,
}
}

static final Factory FACTORY = new Factory();
public static final Factory FACTORY = new Factory();
// a map of node's network name to Node for quick search and keep
// the insert order
private final HashMap<String, Node> childrenMap =
private HashMap<String, Node> childrenMap =
new LinkedHashMap<String, Node>();
// number of descendant leaves under this node
private int numOfLeaves;
Expand All @@ -66,6 +67,69 @@ protected InnerNodeImpl(String name, String location, InnerNode parent,
super(name, location, parent, level, cost);
}

protected InnerNodeImpl(String name, String location, int cost,
HashMap<String, Node> childrenMap, int numOfLeaves) {
super(name, location, cost);
this.childrenMap = childrenMap;
this.numOfLeaves = numOfLeaves;
}

/**
* InnerNodeImpl Builder to help construct an InnerNodeImpl object from
* protobuf objects.
*/
public static class Builder {
private String name;
private String location;
private int cost;
private HashMap<String, Node> childrenMap = new LinkedHashMap<>();
private int numOfLeaves;

public Builder setName(String name) {
this.name = name;
return this;
}

public Builder setLocation(String location) {
this.location = location;
return this;
}

public Builder setCost(int cost) {
this.cost = cost;
return this;
}

public Builder setChildrenMap(HashMap<String, Node> childrenMap) {
this.childrenMap = childrenMap;
return this;
}

public Builder setChildrenMap(
List<ScmBlockLocationProtocolProtos.ChildrenMap> childrenMapList) {
HashMap<String, Node> newChildrenMap = new HashMap<>();
for (ScmBlockLocationProtocolProtos.ChildrenMap childrenMapProto :
childrenMapList) {
String networkName = childrenMapProto.getNetworkName();
ScmBlockLocationProtocolProtos.NodeType nodeType =
childrenMapProto.getNodeType();
Node node = Node.fromProtobuf(nodeType);
newChildrenMap.put(networkName, node);
}
this.childrenMap = newChildrenMap;
return this;
}

public Builder setNumOfLeaves(int numOfLeaves) {
this.numOfLeaves = numOfLeaves;
return this;
}

public InnerNodeImpl build() {
return new InnerNodeImpl(name, location, cost, childrenMap, numOfLeaves);
}
}

/** @return the number of children this node has */
private int getNumOfChildren() {
return childrenMap.size();
Expand Down Expand Up @@ -389,6 +453,59 @@ public Node getLeaf(int leafIndex, List<String> excludedScopes,
return null;
}

@Override
public ScmBlockLocationProtocolProtos.NodeType toProtobuf(int clientVersion) {

ScmBlockLocationProtocolProtos.InnerNode.Builder innerNode =
ScmBlockLocationProtocolProtos.InnerNode.newBuilder()
.setNumOfLeaves(numOfLeaves)
.setNodeImpl(
NodeImpl.toProtobuf(getNetworkName(), getNetworkLocation(),
getCost()));

if (childrenMap != null && !childrenMap.isEmpty()) {
for (Map.Entry<String, Node> entry : childrenMap.entrySet()) {
if (entry.getValue() != null) {
ScmBlockLocationProtocolProtos.ChildrenMap childrenMapProto =
ScmBlockLocationProtocolProtos.ChildrenMap.newBuilder()
.setNetworkName(entry.getKey())
.setNodeType(entry.getValue().toProtobuf(clientVersion))
.build();
innerNode.addChildrenMap(childrenMapProto);
}
}
}
innerNode.build();

ScmBlockLocationProtocolProtos.NodeType nodeType =
ScmBlockLocationProtocolProtos.NodeType.newBuilder()
.setInnerNode(innerNode).build();

return nodeType;
}

public static Node fromProtobuf(
ScmBlockLocationProtocolProtos.NodeType nodeType) {
return nodeType.hasInnerNode()
? InnerNodeImpl.fromProtobuf(nodeType.getInnerNode())
: null;
}

public static InnerNode fromProtobuf(
ScmBlockLocationProtocolProtos.InnerNode innerNode) {

ScmBlockLocationProtocolProtos.NodeImpl nodeImpl = innerNode.getNodeImpl();
InnerNodeImpl.Builder builder = new InnerNodeImpl.Builder()
.setName(nodeImpl.getName())
.setLocation(nodeImpl.getLocation())
.setCost(nodeImpl.getCost())
.setChildrenMap(innerNode.getChildrenMapList())
.setNumOfLeaves(innerNode.getNumOfLeaves());

InnerNode res = builder.build();
return res;
}

@Override
public boolean equals(Object to) {
if (to == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ public InvalidTopologyException(String msg) {
*/
Node getNode(String loc);

InnerNode getClusterTree();

/**
* Given a string representation of a InnerNode, return its leaf nodes count.
* @param loc a path-like string representation of a InnerNode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ public NetworkTopologyImpl(ConfigurationSource conf) {
schemaManager.getCost(NetConstants.ROOT_LEVEL));
}

public NetworkTopologyImpl(String schemaFile, InnerNode clusterTree) {
schemaManager = NodeSchemaManager.getInstance();
schemaManager.init(schemaFile);
maxLevel = schemaManager.getMaxLevel();
shuffleOperation = Collections::shuffle;
factory = InnerNodeImpl.FACTORY;
this.clusterTree = clusterTree;
}

@VisibleForTesting
public NetworkTopologyImpl(NodeSchemaManager manager,
Consumer<List<? extends Node>> shuffleOperation) {
Expand Down Expand Up @@ -306,6 +315,11 @@ public Node getNode(String loc) {
}
}

@Override
public InnerNode getClusterTree() {
tanvipenumudy marked this conversation as resolved.
Show resolved Hide resolved
return clusterTree;
}

/**
* Given a string representation of Node, return its leaf nodes count.
* @param loc a path-like string representation of Node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hdds.scm.net;

import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;

/**
* The interface defines a node in a network topology.
* A node may be a leave representing a data node or an inner
Expand Down Expand Up @@ -126,4 +129,21 @@ public interface Node {
* @return true if this node is under a specific scope
*/
boolean isDescendant(String nodePath);

ScmBlockLocationProtocolProtos.NodeType toProtobuf(int clientVersion);

static Node fromProtobuf(
ScmBlockLocationProtocolProtos.NodeType nodeTypeProto) {

if (nodeTypeProto.hasDatanodeDetails()) {
return DatanodeDetails.getFromProtoBuf(
nodeTypeProto.getDatanodeDetails());
} else if (nodeTypeProto.hasNodeImpl()) {
return NodeImpl.fromProtobuf(nodeTypeProto.getNodeImpl());
} else if (nodeTypeProto.hasInnerNode()) {
return InnerNode.fromProtobuf(nodeTypeProto.getInnerNode());
} else {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.net;

import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;

import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT;
import static org.apache.hadoop.hdds.scm.net.NetConstants.PATH_SEPARATOR_STR;
Expand Down Expand Up @@ -229,6 +230,48 @@ public boolean isDescendant(String nodePath) {
NetUtils.addSuffix(nodePath));
}

@Override
public ScmBlockLocationProtocolProtos.NodeType toProtobuf(int clientVersion) {
ScmBlockLocationProtocolProtos.NodeImpl nodeImpl =
ScmBlockLocationProtocolProtos.NodeImpl.newBuilder()
.setName(name)
.setLocation(location)
.setCost(cost)
.build();

ScmBlockLocationProtocolProtos.NodeType nodeType =
ScmBlockLocationProtocolProtos.NodeType.newBuilder()
.setNodeImpl(nodeImpl).build();

return nodeType;
}

public static ScmBlockLocationProtocolProtos.NodeImpl toProtobuf(String name,
String location, int cost) {

ScmBlockLocationProtocolProtos.NodeImpl.Builder nodeImplBuilder =
ScmBlockLocationProtocolProtos.NodeImpl.newBuilder()
.setName(name)
.setLocation(location)
.setCost(cost);

ScmBlockLocationProtocolProtos.NodeImpl nodeImpl = nodeImplBuilder.build();
return nodeImpl;
}

public static Node fromProtobuf(
ScmBlockLocationProtocolProtos.NodeType nodeType) {
return nodeType.hasNodeImpl()
? InnerNodeImpl.fromProtobuf(nodeType.getNodeImpl())
: null;
}

public static NodeImpl fromProtobuf(
ScmBlockLocationProtocolProtos.NodeImpl nodeImpl) {
return new NodeImpl(nodeImpl.getName(), nodeImpl.getLocation(),
nodeImpl.getCost());
}

@Override
public boolean equals(Object to) {
if (to == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,21 @@ public void init(ConfigurationSource conf) {
}
}

public void init(String schemaFile) {
tanvipenumudy marked this conversation as resolved.
Show resolved Hide resolved
NodeSchemaLoadResult result;
try {
result = NodeSchemaLoader.getInstance().loadSchemaFromFile(schemaFile);
allSchema = result.getSchemaList();
enforcePrefix = result.isEnforePrefix();
maxLevel = allSchema.size();
} catch (Throwable e) {
String msg = "Failed to load schema file:" + schemaFile
+ ", error: " + e.getMessage();
LOG.error(msg, e);
throw new RuntimeException(msg, e);
}
}

@VisibleForTesting
public void init(NodeSchema[] schemas, boolean enforce) {
allSchema = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,11 @@ public final class OzoneConfigKeys {
public static final String HDDS_SCM_CLIENT_FAILOVER_MAX_RETRY =
"hdds.scmclient.failover.max.retry";

public static final String
OZONE_SCM_NETWORK_TOPOLOGY_CLUSTER_TREE_REFRESH_DURATION =
"ozone.scm.network.topology.cluster.tree.refresh.duration";
tanvipenumudy marked this conversation as resolved.
Show resolved Hide resolved
public static final String
OZONE_SCM_NETWORK_TOPOLOGY_CLUSTER_TREE_REFRESH_DURATION_DEFAULT = "1h";

/**
* There is no need to instantiate this class.
Expand Down
8 changes: 8 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3752,6 +3752,14 @@
<description>Wait duration before which close container
is send to DN.</description>
</property>
<property>
<name>ozone.scm.network.topology.cluster.tree.refresh.duration</name>
<value>1h</value>
<tag>SCM, OZONE, OM</tag>
<description>The duration at which we periodically fetch the updated network
topology cluster tree from SCM.
</description>
</property>
<property>
<name>ozone.scm.ha.ratis.server.snapshot.creation.gap</name>
<value>1024</value>
Expand Down