Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.datanode.ec.reconstruction.validation";
public static final boolean DFS_DN_EC_RECONSTRUCTION_VALIDATION_VALUE = false;

public static final String DFS_DISABLE_DATANODE_TOPOLOGY_SORT_KEY =
"dfs.disable.datanode.topology.sort";
public static final boolean DFS_DISABLE_DATANODE_TOPOLOGY_SORT_KEY_DEFAULT = false;


public static final String
DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY =
"dfs.datanode.directoryscan.throttle.limit.ms.per.sec";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,10 @@ public long getTotalECBlockGroups() {
return blocksMap.getECBlockGroups();
}

public boolean isTopologySortDisabled() {
return this.topologySortDisabled;
}

/**
* redundancyRecheckInterval is how often namenode checks for new
* reconstruction work.
Expand Down Expand Up @@ -458,12 +462,24 @@ public long getTotalECBlockGroups() {
/** Storages accessible from multiple DNs. */
private final ProvidedStorageMap providedStorageMap;

/**
* Enable/disable topology sorting for datanodes. Enable for colocated clusters
* while disable for compute/storage separate clusters since they won't be in
* the same host or rack.
*/
private final boolean topologySortDisabled;

public BlockManager(final Namesystem namesystem, boolean haEnabled,
final Configuration conf) throws IOException {
this.namesystem = namesystem;
datanodeManager = new DatanodeManager(this, namesystem, conf);
heartbeatManager = datanodeManager.getHeartbeatManager();
this.blockIdManager = new BlockIdManager(this);

this.topologySortDisabled = conf.getBoolean(
DFSConfigKeys.DFS_DISABLE_DATANODE_TOPOLOGY_SORT_KEY,
DFSConfigKeys.DFS_DISABLE_DATANODE_TOPOLOGY_SORT_KEY_DEFAULT);

blocksPerPostpondedRescan = (int)Math.min(Integer.MAX_VALUE,
datanodeManager.getBlocksPerPostponedMisreplicatedBlocksRescan());
rescannedMisreplicatedBlocks =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,6 @@ public class DatanodeManager {
final Configuration conf) throws IOException {
this.namesystem = namesystem;
this.blockManager = blockManager;

this.useDfsNetworkTopology = conf.getBoolean(
DFSConfigKeys.DFS_USE_DFS_NETWORK_TOPOLOGY_KEY,
DFSConfigKeys.DFS_USE_DFS_NETWORK_TOPOLOGY_DEFAULT);
Expand Down Expand Up @@ -587,11 +586,14 @@ private void sortLocatedStripedBlock(final LocatedBlock lb,
*/
private void sortLocatedBlock(final LocatedBlock lb, String targetHost,
Comparator<DatanodeInfo> comparator) {
// As it is possible for the separation of node manager and datanode,
// here we should get node but not datanode only .
// Resolving topology is expensive especially by calling external script.
// If datanode and nodemanager are not colocated, we can disable resolving topology
// since they will not locate on the same host and rack.
// Otherwise, it resolves nodemanager topology and compares/sorts the datanodes
// by comparing the distances between nodemanager and the datanodes.
boolean nonDatanodeReader = false;
Node client = getDatanodeByHost(targetHost);
if (client == null) {
if (client == null && !blockManager.isTopologySortDisabled()) {
nonDatanodeReader = true;
List<String> hosts = new ArrayList<>(1);
hosts.add(targetHost);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ static DatanodeStorageInfo[] chooseTargetForNewBlock(
// If client locality is ignored, clientNode remains 'null' to indicate
if (!ignoreClientLocality) {
clientNode = bm.getDatanodeManager().getDatanodeByHost(r.clientMachine);
if (clientNode == null) {
if (clientNode == null && !bm.isTopologySortDisabled()) {
clientNode = getClientNode(bm, r.clientMachine);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3081,7 +3081,7 @@ LocatedBlock getAdditionalDatanode(String src, long fileId,
readUnlock("getAdditionalDatanode");
}

if (clientnode == null) {
if (clientnode == null && !blockManager.isTopologySortDisabled()) {
clientnode = FSDirWriteFileOp.getClientNode(blockManager, clientMachine);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Random;
import java.util.Set;

import org.apache.hadoop.test.GenericTestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -84,8 +85,14 @@ public class TestDatanodeManager {
final int NUM_ITERATIONS = 500;

private static DatanodeManager mockDatanodeManager(
FSNamesystem fsn, Configuration conf) throws IOException {
FSNamesystem fsn, Configuration conf) throws IOException {
return mockDatanodeManager(fsn, conf, false);
}

private static DatanodeManager mockDatanodeManager(
FSNamesystem fsn, Configuration conf, boolean topologySortDisabled) throws IOException {
BlockManager bm = Mockito.mock(BlockManager.class);
Mockito.when(bm.isTopologySortDisabled()).thenReturn(topologySortDisabled);
BlockReportLeaseManager blm = new BlockReportLeaseManager(conf);
Mockito.when(bm.getBlockReportLeaseManager()).thenReturn(blm);
DatanodeManager dm = new DatanodeManager(bm, fsn, conf);
Expand Down Expand Up @@ -310,6 +317,64 @@ public void testSortLocatedBlocks() throws IOException, URISyntaxException {
HelperFunction(null, 0);
}

/**
* This test tests the case of disabling topology sorting
*/
@Test
public void testDisableTopologySorting() throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_DISABLE_DATANODE_TOPOLOGY_SORT_KEY, true);
conf.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
TestDatanodeManager.MyResolver.class, DNSToSwitchMapping.class);

GenericTestUtils.LogCapturer log = GenericTestUtils.LogCapturer.captureLogs(DatanodeManager.LOG);

FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
Mockito.when(fsn.hasWriteLock()).thenReturn(true);
DatanodeManager dm = mockDatanodeManager(fsn, conf, true);
// register 5 datanodes, each with different storage ID and type
DatanodeInfo[] locs = new DatanodeInfo[5];
String[] storageIDs = new String[5];
StorageType[] storageTypes = new StorageType[]{
StorageType.ARCHIVE,
StorageType.DEFAULT,
StorageType.DISK,
StorageType.RAM_DISK,
StorageType.SSD
};
for (int i = 0; i < 5; i++) {
// register new datanode
String uuid = "UUID-" + i;
String ip = "IP-" + i;
DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class);
Mockito.when(dr.getDatanodeUuid()).thenReturn(uuid);
Mockito.when(dr.getIpAddr()).thenReturn(ip);
Mockito.when(dr.getXferAddr()).thenReturn(ip + ":9000");
Mockito.when(dr.getXferPort()).thenReturn(9000);
Mockito.when(dr.getSoftwareVersion()).thenReturn("version1");
dm.registerDatanode(dr);

// get location and storage information
locs[i] = dm.getDatanode(uuid);
storageIDs[i] = "storageID-" + i;
}

// create LocatedBlock with above locations
ExtendedBlock b = new ExtendedBlock("somePoolID", 1234);
LocatedBlock block = new LocatedBlock(b, locs, storageIDs, storageTypes);
List<LocatedBlock> blocks = new ArrayList<>();
blocks.add(block);

final String targetIp = "random_ip";

// sort block locations
dm.sortLocatedBlocks(targetIp, blocks);

// No node topology resolution
assertFalse(log.getOutput().contains("Node Resolution failed"));
}

/**
* Execute a functional topology script and make sure that helper
* function works correctly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,35 @@
package org.apache.hadoop.hdfs.server.namenode;

import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyList;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;

import java.io.IOException;
import java.util.EnumSet;

import org.apache.commons.lang.reflect.FieldUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -155,4 +169,48 @@ public void testAddBlockUC() throws Exception {
}
}
}

@Test
public void testAddBlockWithoutTopology() throws Exception {
final String src = "/testAddBlockWithoutTopology";

final FSNamesystem ns = cluster.getNamesystem();
final NamenodeProtocols nn = cluster.getNameNodeRpc();

// create file
HdfsFileStatus fileStatus = nn.create(src, FsPermission.getFileDefault(),
"clientName",
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)),
true, (short) 1, 1024, null, null, null);

FieldUtils.writeField(ns.getBlockManager(), "topologySortDisabled", true, true);
DNSToSwitchMapping mockMapping = mock(DNSToSwitchMapping.class);
DNSToSwitchMapping origMapping = (DNSToSwitchMapping) FieldUtils.readField(
ns.getBlockManager().getDatanodeManager(),
"dnsToSwitchMapping",
true);
FieldUtils.writeField(ns.getBlockManager().getDatanodeManager(),
"dnsToSwitchMapping",
mockMapping,
true);

LocatedBlock locatedBlock = nn.addBlock(src, "clientName", null, null,
HdfsConstants.GRANDFATHER_INODE_ID, null, null);

assertEquals(1, locatedBlock.getLocations().length);

LocatedBlock additionalLocatedBlock = nn.getAdditionalDatanode(
src, fileStatus.getFileId(), locatedBlock.getBlock(),
locatedBlock.getLocations(), locatedBlock.getStorageIDs(),
new DatanodeInfo[0],2, "clientName");

assertEquals(3, additionalLocatedBlock.getLocations().length);

verify(mockMapping, never()).resolve(anyList());
FieldUtils.writeField(ns.getBlockManager().getDatanodeManager(),
"dnsToSwitchMapping",
origMapping,
true);
FieldUtils.writeField(ns.getBlockManager(), "topologySortDisabled", false, true);
}
}