Permalink
Browse files

HDFS-6899: Merging r1619970 from trunk to branch-2.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1619975 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
arp7 committed Aug 23, 2014
1 parent 9fa622f commit a3491bc9e161648918b1a3a6e321681f7fe23c3c
@@ -159,6 +159,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6758. block writer should pass the expected block size to
DataXceiverServer. (Arpit Agarwal)
+ HDFS-6899. Allow changing MiniDFSCluster volumes per DN and capacity
+ per volume. (Arpit Agarwal)
+
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)
@@ -29,6 +29,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF;
@@ -49,7 +50,8 @@
* It uses the {@link FsDatasetImpl} object for synchronization.
*/
@InterfaceAudience.Private
-class FsVolumeImpl implements FsVolumeSpi {
+@VisibleForTesting
+public class FsVolumeImpl implements FsVolumeSpi {
private final FsDatasetImpl dataset;
private final String storageID;
private final StorageType storageType;
@@ -58,6 +60,12 @@
private final File currentDir; // <StorageDirectory>/current
private final DF usage;
private final long reserved;
+
+ // Capacity configured. This is useful when we want to
+ // limit the visible capacity for tests. If negative, then we just
+ // query from the filesystem.
+ protected long configuredCapacity;
+
/**
* Per-volume worker pool that processes new blocks to cache.
* The maximum number of workers per volume is bounded (configurable via
@@ -77,20 +85,26 @@
File parent = currentDir.getParentFile();
this.usage = new DF(parent, conf);
this.storageType = storageType;
+ this.configuredCapacity = -1;
+ cacheExecutor = initializeCacheExecutor(parent);
+ }
+
+ protected ThreadPoolExecutor initializeCacheExecutor(File parent) {
final int maxNumThreads = dataset.datanode.getConf().getInt(
DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY,
- DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT
- );
+ DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT);
+
ThreadFactory workerFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("FsVolumeImplWorker-" + parent.toString() + "-%d")
.build();
- cacheExecutor = new ThreadPoolExecutor(
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, maxNumThreads,
60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
workerFactory);
- cacheExecutor.allowCoreThreadTimeOut(true);
+ executor.allowCoreThreadTimeOut(true);
+ return executor;
}
File getCurrentDir() {
@@ -129,9 +143,24 @@ long getBlockPoolUsed(String bpid) throws IOException {
* reserved capacity.
* @return the unreserved number of bytes left in this filesystem. May be zero.
*/
- long getCapacity() {
- long remaining = usage.getCapacity() - reserved;
- return remaining > 0 ? remaining : 0;
+ @VisibleForTesting
+ public long getCapacity() {
+ if (configuredCapacity < 0) {
+ long remaining = usage.getCapacity() - reserved;
+ return remaining > 0 ? remaining : 0;
+ }
+
+ return configuredCapacity;
+ }
+
+ /**
+ * This function MUST NOT be used outside of tests.
+ *
+ * @param capacity
+ */
+ @VisibleForTesting
+ public void setCapacityForTesting(long capacity) {
+ this.configuredCapacity = capacity;
}
@Override
@@ -91,7 +91,9 @@
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
@@ -132,11 +134,15 @@
public static final String DFS_NAMENODE_SAFEMODE_EXTENSION_TESTING_KEY
= DFS_NAMENODE_SAFEMODE_EXTENSION_KEY + ".testing";
- // Changing this value may break some tests that assume it is 2.
- public static final int DIRS_PER_DATANODE = 2;
+ // Changing this default may break some tests that assume it is 2.
+ private static final int DEFAULT_STORAGES_PER_DATANODE = 2;
static { DefaultMetricsSystem.setMiniClusterMode(true); }
+ public int getStoragesPerDatanode() {
+ return storagesPerDatanode;
+ }
+
/**
* Class to construct instances of MiniDFSClusters with specific options.
*/
@@ -146,6 +152,8 @@
private final Configuration conf;
private int numDataNodes = 1;
private StorageType[][] storageTypes = null;
+ private StorageType[] storageTypes1D = null;
+ private int storagesPerDatanode = DEFAULT_STORAGES_PER_DATANODE;
private boolean format = true;
private boolean manageNameDfsDirs = true;
private boolean manageNameDfsSharedDirs = true;
@@ -156,6 +164,8 @@
private String[] racks = null;
private String [] hosts = null;
private long [] simulatedCapacities = null;
+ private long [][] storageCapacities = null;
+ private long [] storageCapacities1D = null;
private String clusterId = null;
private boolean waitSafeMode = true;
private boolean setupHostsFile = false;
@@ -193,17 +203,21 @@ public Builder numDataNodes(int val) {
return this;
}
+ /**
+ * Default: DEFAULT_STORAGES_PER_DATANODE
+ */
+ public Builder storagesPerDatanode(int numStorages) {
+ this.storagesPerDatanode = numStorages;
+ return this;
+ }
+
/**
* Set the same storage type configuration for each datanode.
* If storageTypes is uninitialized or passed null then
* StorageType.DEFAULT is used.
*/
public Builder storageTypes(StorageType[] types) {
- assert types.length == DIRS_PER_DATANODE;
- this.storageTypes = new StorageType[numDataNodes][types.length];
- for (int i = 0; i < numDataNodes; ++i) {
- this.storageTypes[i] = types;
- }
+ this.storageTypes1D = types;
return this;
}
@@ -217,6 +231,26 @@ public Builder storageTypes(StorageType[][] types) {
return this;
}
+ /**
+ * Set the same storage capacity configuration for each datanode.
+ * If storageTypes is uninitialized or passed null then
+ * StorageType.DEFAULT is used.
+ */
+ public Builder storageCapacities(long[] capacities) {
+ this.storageCapacities1D = capacities;
+ return this;
+ }
+
+ /**
+ * Set custom storage capacity configuration for each datanode.
+ * If storageCapacities is uninitialized or passed null then
+ * capacity is limited by available disk space.
+ */
+ public Builder storageCapacities(long[][] capacities) {
+ this.storageCapacities = capacities;
+ return this;
+ }
+
/**
* Default: true
*/
@@ -290,6 +324,11 @@ public Builder hosts(String[] val) {
}
/**
+ * Use SimulatedFSDataset and limit the capacity of each DN per
+ * the values passed in val.
+ *
+ * For limiting the capacity of volumes with real storage, see
+ * {@link FsVolumeImpl#setCapacityForTesting}
* Default: null
*/
public Builder simulatedCapacities(long[] val) {
@@ -392,7 +431,28 @@ protected MiniDFSCluster(Builder builder) throws IOException {
LOG.info("starting cluster: numNameNodes=" + numNameNodes
+ ", numDataNodes=" + builder.numDataNodes);
nameNodes = new NameNodeInfo[numNameNodes];
+ this.storagesPerDatanode = builder.storagesPerDatanode;
+
+ // Duplicate the storageType setting for each DN.
+ if (builder.storageTypes == null && builder.storageTypes1D != null) {
+ assert builder.storageTypes1D.length == storagesPerDatanode;
+ builder.storageTypes = new StorageType[builder.numDataNodes][storagesPerDatanode];
+ for (int i = 0; i < builder.numDataNodes; ++i) {
+ builder.storageTypes[i] = builder.storageTypes1D;
+ }
+ }
+
+ // Duplicate the storageCapacity setting for each DN.
+ if (builder.storageCapacities == null && builder.storageCapacities1D != null) {
+ assert builder.storageCapacities1D.length == storagesPerDatanode;
+ builder.storageCapacities = new long[builder.numDataNodes][storagesPerDatanode];
+
+ for (int i = 0; i < builder.numDataNodes; ++i) {
+ builder.storageCapacities[i] = builder.storageCapacities1D;
+ }
+ }
+
initMiniDFSCluster(builder.conf,
builder.numDataNodes,
builder.storageTypes,
@@ -405,6 +465,7 @@ protected MiniDFSCluster(Builder builder) throws IOException {
builder.dnOption,
builder.racks,
builder.hosts,
+ builder.storageCapacities,
builder.simulatedCapacities,
builder.clusterId,
builder.waitSafeMode,
@@ -447,6 +508,7 @@ public void setDnArgs(String ... args) {
private boolean waitSafeMode = true;
private boolean federation;
private boolean checkExitOnShutdown = true;
+ protected final int storagesPerDatanode;
/**
* A unique instance identifier for the cluster. This
@@ -485,6 +547,7 @@ public void setStartOpt(StartupOption startOpt) {
*/
public MiniDFSCluster() {
nameNodes = new NameNodeInfo[0]; // No namenode in the cluster
+ storagesPerDatanode = DEFAULT_STORAGES_PER_DATANODE;
synchronized (MiniDFSCluster.class) {
instanceId = instanceCount++;
}
@@ -659,11 +722,12 @@ public MiniDFSCluster(int nameNodePort,
String[] racks, String hosts[],
long[] simulatedCapacities) throws IOException {
this.nameNodes = new NameNodeInfo[1]; // Single namenode in the cluster
+ this.storagesPerDatanode = DEFAULT_STORAGES_PER_DATANODE;
initMiniDFSCluster(conf, numDataNodes, null, format,
- manageNameDfsDirs, true, manageDataDfsDirs, manageDataDfsDirs,
- operation, null, racks, hosts,
- simulatedCapacities, null, true, false,
- MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0), true, false, false, null);
+ manageNameDfsDirs, true, manageDataDfsDirs, manageDataDfsDirs,
+ operation, null, racks, hosts,
+ null, simulatedCapacities, null, true, false,
+ MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0), true, false, false, null);
}
private void initMiniDFSCluster(
@@ -672,7 +736,8 @@ private void initMiniDFSCluster(
boolean manageNameDfsSharedDirs, boolean enableManagedDfsDirsRedundancy,
boolean manageDataDfsDirs, StartupOption startOpt,
StartupOption dnStartOpt, String[] racks,
- String[] hosts, long[] simulatedCapacities, String clusterId,
+ String[] hosts,
+ long[][] storageCapacities, long[] simulatedCapacities, String clusterId,
boolean waitSafeMode, boolean setupHostsFile,
MiniDFSNNTopology nnTopology, boolean checkExitOnShutdown,
boolean checkDataNodeAddrConfig,
@@ -746,7 +811,7 @@ private void initMiniDFSCluster(
// Start the DataNodes
startDataNodes(conf, numDataNodes, storageTypes, manageDataDfsDirs,
dnStartOpt != null ? dnStartOpt : startOpt,
- racks, hosts, simulatedCapacities, setupHostsFile,
+ racks, hosts, storageCapacities, simulatedCapacities, setupHostsFile,
checkDataNodeAddrConfig, checkDataNodeHostConfig, dnConfOverlays);
waitClusterUp();
//make sure ProxyUsers uses the latest conf
@@ -1121,8 +1186,8 @@ public void waitClusterUp() throws IOException {
String makeDataNodeDirs(int dnIndex, StorageType[] storageTypes) throws IOException {
StringBuilder sb = new StringBuilder();
- assert storageTypes == null || storageTypes.length == DIRS_PER_DATANODE;
- for (int j = 0; j < DIRS_PER_DATANODE; ++j) {
+ assert storageTypes == null || storageTypes.length == storagesPerDatanode;
+ for (int j = 0; j < storagesPerDatanode; ++j) {
File dir = getInstanceStorageDir(dnIndex, j);
dir.mkdirs();
if (!dir.isDirectory()) {
@@ -1198,7 +1263,7 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
long[] simulatedCapacities,
boolean setupHostsFile) throws IOException {
startDataNodes(conf, numDataNodes, null, manageDfsDirs, operation, racks, hosts,
- simulatedCapacities, setupHostsFile, false, false, null);
+ null, simulatedCapacities, setupHostsFile, false, false, null);
}
public synchronized void startDataNodes(Configuration conf, int numDataNodes,
@@ -1208,7 +1273,7 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
boolean setupHostsFile,
boolean checkDataNodeAddrConfig) throws IOException {
startDataNodes(conf, numDataNodes, null, manageDfsDirs, operation, racks, hosts,
- simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, false, null);
+ null, simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, false, null);
}
/**
@@ -1242,12 +1307,15 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
public synchronized void startDataNodes(Configuration conf, int numDataNodes,
StorageType[][] storageTypes, boolean manageDfsDirs, StartupOption operation,
String[] racks, String[] hosts,
+ long[][] storageCapacities,
long[] simulatedCapacities,
boolean setupHostsFile,
boolean checkDataNodeAddrConfig,
boolean checkDataNodeHostConfig,
Configuration[] dnConfOverlays) throws IOException {
+ assert storageCapacities == null || simulatedCapacities == null;
assert storageTypes == null || storageTypes.length == numDataNodes;
+ assert storageCapacities == null || storageCapacities.length == numDataNodes;
if (operation == StartupOption.RECOVER) {
return;
@@ -1300,7 +1368,7 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
operation != StartupOption.ROLLBACK) ?
null : new String[] {operation.getName()};
-
+ DataNode[] dns = new DataNode[numDataNodes];
for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) {
Configuration dnConf = new HdfsConfiguration(conf);
if (dnConfOverlays != null) {
@@ -1391,10 +1459,24 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
dn.runDatanodeDaemon();
dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs,
secureResources, dn.getIpcPort()));
+ dns[i - curDatanodesNum] = dn;
}
curDatanodesNum += numDataNodes;
this.numDataNodes += numDataNodes;
waitActive();
+
+ if (storageCapacities != null) {
+ for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; ++i) {
+ List<? extends FsVolumeSpi> volumes = dns[i].getFSDataset().getVolumes();
+ assert storageCapacities[i].length == storagesPerDatanode;
+ assert volumes.size() == storagesPerDatanode;
+
+ for (int j = 0; j < volumes.size(); ++j) {
+ FsVolumeImpl volume = (FsVolumeImpl) volumes.get(j);
+ volume.setCapacityForTesting(storageCapacities[i][j]);
+ }
+ }
+ }
}
Oops, something went wrong.

0 comments on commit a3491bc

Please sign in to comment.