Skip to content

Commit

Permalink
HDDS-9439. Refactor sortDatanodes to OM
Browse files Browse the repository at this point in the history
add initial changes

reposition clusterMap initialization

use getTopology instead of refetch

reposition clusterMap initialization
  • Loading branch information
tanvipenumudy committed Oct 12, 2023
1 parent 5c9a292 commit 334640c
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ public NetworkTopologyImpl(ConfigurationSource conf) {
schemaManager.getCost(NetConstants.ROOT_LEVEL));
}

public NetworkTopologyImpl(String schemaFile) {
schemaManager = NodeSchemaManager.getInstance();
schemaManager.init(schemaFile);
maxLevel = schemaManager.getMaxLevel();
factory = InnerNodeImpl.FACTORY;
clusterTree = factory.newInnerNode(ROOT, null, null,
NetConstants.ROOT_LEVEL,
schemaManager.getCost(NetConstants.ROOT_LEVEL));
}

@VisibleForTesting
public NetworkTopologyImpl(NodeSchemaManager manager) {
schemaManager = manager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,21 @@ public void init(ConfigurationSource conf) {
}
}

public void init(String schemaFile) {
NodeSchemaLoadResult result;
try {
result = NodeSchemaLoader.getInstance().loadSchemaFromFile(schemaFile);
allSchema = result.getSchemaList();
enforcePrefix = result.isEnforePrefix();
maxLevel = allSchema.size();
} catch (Throwable e) {
String msg = "Failed to load schema file:" + schemaFile
+ ", error: " + e.getMessage();
LOG.error(msg, e);
throw new RuntimeException(msg, e);
}
}

@VisibleForTesting
public void init(NodeSchema[] schemas, boolean enforce) {
allSchema = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.net.Node;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.hdds.utils.BackgroundService;
Expand Down Expand Up @@ -1802,8 +1803,7 @@ void sortDatanodes(String clientMachine, OmKeyInfo... keyInfos) {
LOG.warn("No datanodes in pipeline {}", pipeline.getId());
continue;
}
sortedNodes = sortDatanodes(clientMachine, nodes, keyInfo,
uuidList);
sortedNodes = sortDatanodes(clientMachine, nodes);
if (sortedNodes != null) {
sortedPipelines.put(uuidSet, sortedNodes);
}
Expand All @@ -1817,24 +1817,30 @@ void sortDatanodes(String clientMachine, OmKeyInfo... keyInfos) {
}
}

private List<DatanodeDetails> sortDatanodes(String clientMachine,
List<DatanodeDetails> nodes, OmKeyInfo keyInfo, List<String> nodeList) {
List<DatanodeDetails> sortedNodes = null;
try {
sortedNodes = scmClient.getBlockClient()
.sortDatanodes(nodeList, clientMachine);
if (LOG.isDebugEnabled()) {
LOG.debug("Sorted datanodes {} for client {}, result: {}", nodes,
clientMachine, sortedNodes);
public List<DatanodeDetails> sortDatanodes(String clientMachine,
List<DatanodeDetails> nodes) {
DatanodeDetails client = null;
List<DatanodeDetails> possibleClients =
getClientNodesByAddress(clientMachine, nodes);
if (possibleClients.size() > 0) {
client = possibleClients.get(0);
}
List<? extends Node> sortedNodeList = ozoneManager.getClusterMap()
.sortByDistanceCost(client, nodes, nodes.size());
List<DatanodeDetails> ret = new ArrayList<>();
sortedNodeList.stream().forEach(node -> ret.add((DatanodeDetails) node));
return ret;
}

private List<DatanodeDetails> getClientNodesByAddress(String clientMachine,
List<DatanodeDetails> nodes) {
List<DatanodeDetails> matchingNodes = new ArrayList<>();
for (DatanodeDetails node : nodes) {
if (node.getIpAddress().equals(clientMachine)) {
matchingNodes.add(node);
}
} catch (IOException e) {
LOG.warn("Unable to sort datanodes based on distance to client, "
+ " volume={}, bucket={}, key={}, client={}, datanodes={}, "
+ " exception={}",
keyInfo.getVolumeName(), keyInfo.getBucketName(),
keyInfo.getKeyName(), clientMachine, nodeList, e.getMessage());
}
return sortedNodes;
return matchingNodes;
}

private static List<String> toNodeUuid(Collection<DatanodeDetails> nodes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.client.ScmBlockLocationClient;
import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
import org.apache.hadoop.hdds.server.OzoneAdmins;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
Expand Down Expand Up @@ -388,6 +390,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private final OMStorage omStorage;
private final ScmBlockLocationProtocol scmBlockClient;
private final StorageContainerLocationProtocol scmContainerClient;
private NetworkTopology clusterMap;
private ObjectName omInfoBeanName;
private Timer metricsTimer;
private ScheduleOMMetricsWriteTask scheduleOMMetricsWriteTask;
Expand Down Expand Up @@ -1147,6 +1150,10 @@ public void setScmBlockLocationClient(
this.scmBlockLocationClient = scmBlockLocationClient;
}

public NetworkTopology getClusterMap() {
return clusterMap;
}

/**
* For testing purpose only. This allows testing token in integration test
* without fully setting up a working secure cluster.
Expand Down Expand Up @@ -1707,6 +1714,9 @@ public void start() throws IOException {
throw new UncheckedIOException(ex);
}

clusterMap = new NetworkTopologyImpl(
scmBlockLocationClient.getTopologyInformation());

try {
httpServer = new OzoneManagerHttpServer(configuration, this);
httpServer.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public void cleanup() throws Exception {
@Test
public void listMultipartUploadPartsWithZeroUpload() throws IOException {
//GIVEN
System.out.println(writeClient.refetchTopologyInformation());
createBucket(metadataManager, "vol1", "bucket1");

OmMultipartInfo omMultipartInfo =
Expand Down

0 comments on commit 334640c

Please sign in to comment.