Skip to content

Commit

Permalink
HDFS-9142. Separating Configuration object for namenode(s) in MiniDFS…
Browse files Browse the repository at this point in the history
…Cluster. (Siqi Li via mingma)
  • Loading branch information
mingmasplace committed Oct 9, 2015
1 parent c11fc8a commit de8efc6
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 32 deletions.
3 changes: 3 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Expand Up @@ -1998,6 +1998,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9137. DeadLock between DataNode#refreshVolumes and HDFS-9137. DeadLock between DataNode#refreshVolumes and
BPOfferService#registrationSucceeded. (Uma Maheswara Rao G via yliu) BPOfferService#registrationSucceeded. (Uma Maheswara Rao G via yliu)


HDFS-9142. Separating Configuration object for namenode(s) in
MiniDFSCluster. (Siqi Li via mingma)

Release 2.7.2 - UNRELEASED Release 2.7.2 - UNRELEASED


INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
Expand Down
Expand Up @@ -40,6 +40,7 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
Expand Down Expand Up @@ -890,6 +891,44 @@ private void createNameNodesAndSetConf(MiniDFSNNTopology nnTopology,
format, operation, clusterId, nnCounter); format, operation, clusterId, nnCounter);
nnCounter += nameservice.getNNs().size(); nnCounter += nameservice.getNNs().size();
} }

for (NameNodeInfo nn : namenodes.values()) {
Configuration nnConf = nn.conf;
for (NameNodeInfo nnInfo : namenodes.values()) {
if (nn.equals(nnInfo)) {
continue;
}
copyKeys(conf, nnConf, nnInfo.nameserviceId, nnInfo.nnId);
}
}
}

private static void copyKeys(Configuration srcConf, Configuration destConf,
String nameserviceId, String nnId) {
String key = DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
nameserviceId, nnId);
destConf.set(key, srcConf.get(key));

key = DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTP_ADDRESS_KEY,
nameserviceId, nnId);
String val = srcConf.get(key);
if (val != null) {
destConf.set(key, srcConf.get(key));
}

key = DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTPS_ADDRESS_KEY,
nameserviceId, nnId);
val = srcConf.get(key);
if (val != null) {
destConf.set(key, srcConf.get(key));
}

key = DFSUtil.addKeySuffixes(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
nameserviceId, nnId);
val = srcConf.get(key);
if (val != null) {
destConf.set(key, srcConf.get(key));
}
} }


/** /**
Expand Down Expand Up @@ -972,16 +1011,13 @@ private void configureNameService(MiniDFSNNTopology.NSConf nameservice, int nsCo
// create all the namenodes in the namespace // create all the namenodes in the namespace
nnIndex = nnCounter; nnIndex = nnCounter;
for (NNConf nn : nameservice.getNNs()) { for (NNConf nn : nameservice.getNNs()) {
initNameNodeConf(conf, nsId, nsCounter, nn.getNnId(), manageNameDfsDirs, Configuration hdfsConf = new Configuration(conf);
initNameNodeConf(hdfsConf, nsId, nsCounter, nn.getNnId(), manageNameDfsDirs,
enableManagedDfsDirsRedundancy, nnIndex++); enableManagedDfsDirsRedundancy, nnIndex++);
NameNodeInfo info = createNameNode(conf, false, operation, createNameNode(hdfsConf, false, operation,
clusterId, nsId, nn.getNnId()); clusterId, nsId, nn.getNnId());

// Record the last namenode uri // Record the last namenode uri
if (info != null && info.conf != null) { lastDefaultFileSystem = hdfsConf.get(FS_DEFAULT_NAME_KEY);
lastDefaultFileSystem =
info.conf.get(FS_DEFAULT_NAME_KEY);
}
} }
if (!federation && lastDefaultFileSystem != null) { if (!federation && lastDefaultFileSystem != null) {
// Set the default file system to the actual bind address of NN. // Set the default file system to the actual bind address of NN.
Expand Down Expand Up @@ -1198,50 +1234,39 @@ private static String[] createArgs(StartupOption operation) {
return args; return args;
} }


private NameNodeInfo createNameNode(Configuration conf, boolean format, StartupOption operation, private void createNameNode(Configuration hdfsConf, boolean format, StartupOption operation,
String clusterId, String nameserviceId, String nnId) throws IOException { String clusterId, String nameserviceId, String nnId) throws IOException {
// Format and clean out DataNode directories // Format and clean out DataNode directories
if (format) { if (format) {
DFSTestUtil.formatNameNode(conf); DFSTestUtil.formatNameNode(hdfsConf);
} }
if (operation == StartupOption.UPGRADE){ if (operation == StartupOption.UPGRADE){
operation.setClusterId(clusterId); operation.setClusterId(clusterId);
} }


// Start the NameNode after saving the default file system.
String originalDefaultFs = conf.get(FS_DEFAULT_NAME_KEY);
String[] args = createArgs(operation); String[] args = createArgs(operation);
NameNode nn = NameNode.createNameNode(args, conf); NameNode nn = NameNode.createNameNode(args, hdfsConf);
if (operation == StartupOption.RECOVER) { if (operation == StartupOption.RECOVER) {
return null; return;
} }

// After the NN has started, set back the bound ports into // After the NN has started, set back the bound ports into
// the conf // the conf
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, hdfsConf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
nameserviceId, nnId), nn.getNameNodeAddressHostPortString()); nameserviceId, nnId), nn.getNameNodeAddressHostPortString());
if (nn.getHttpAddress() != null) { if (nn.getHttpAddress() != null) {
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTP_ADDRESS_KEY, hdfsConf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTP_ADDRESS_KEY,
nameserviceId, nnId), NetUtils.getHostPortString(nn.getHttpAddress())); nameserviceId, nnId), NetUtils.getHostPortString(nn.getHttpAddress()));
} }
if (nn.getHttpsAddress() != null) { if (nn.getHttpsAddress() != null) {
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTPS_ADDRESS_KEY, hdfsConf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTPS_ADDRESS_KEY,
nameserviceId, nnId), NetUtils.getHostPortString(nn.getHttpsAddress())); nameserviceId, nnId), NetUtils.getHostPortString(nn.getHttpsAddress()));
} }

copyKeys(hdfsConf, conf, nameserviceId, nnId);
DFSUtil.setGenericConf(conf, nameserviceId, nnId, DFSUtil.setGenericConf(hdfsConf, nameserviceId, nnId,
DFS_NAMENODE_HTTP_ADDRESS_KEY); DFS_NAMENODE_HTTP_ADDRESS_KEY);
NameNodeInfo info = new NameNodeInfo(nn, nameserviceId, nnId, NameNodeInfo info = new NameNodeInfo(nn, nameserviceId, nnId,
operation, new Configuration(conf)); operation, hdfsConf);
namenodes.put(nameserviceId, info); namenodes.put(nameserviceId, info);

// Restore the default fs name
if (originalDefaultFs == null) {
conf.set(FS_DEFAULT_NAME_KEY, "");
} else {
conf.set(FS_DEFAULT_NAME_KEY, originalDefaultFs);
}
return info;
} }


/** /**
Expand Down Expand Up @@ -2856,7 +2881,7 @@ private void checkSingleNameNode() {
* *
* @return newly started namenode * @return newly started namenode
*/ */
public NameNode addNameNode(Configuration conf, int namenodePort) public void addNameNode(Configuration conf, int namenodePort)
throws IOException { throws IOException {
if(!federation) if(!federation)
throw new IOException("cannot add namenode to non-federated cluster"); throw new IOException("cannot add namenode to non-federated cluster");
Expand All @@ -2875,7 +2900,7 @@ public NameNode addNameNode(Configuration conf, int namenodePort)
NameNodeInfo[] infos = this.getNameNodeInfos(nameserviceId); NameNodeInfo[] infos = this.getNameNodeInfos(nameserviceId);
int nnIndex = infos == null ? 0 : infos.length; int nnIndex = infos == null ? 0 : infos.length;
initNameNodeConf(conf, nameserviceId, nameServiceIndex, nnId, true, true, nnIndex); initNameNodeConf(conf, nameserviceId, nameServiceIndex, nnId, true, true, nnIndex);
NameNodeInfo info = createNameNode(conf, true, null, null, nameserviceId, nnId); createNameNode(conf, true, null, null, nameserviceId, nnId);


// Refresh datanodes with the newly started namenode // Refresh datanodes with the newly started namenode
for (DataNodeProperties dn : dataNodes) { for (DataNodeProperties dn : dataNodes) {
Expand All @@ -2885,7 +2910,6 @@ public NameNode addNameNode(Configuration conf, int namenodePort)


// Wait for new namenode to get registrations from all the datanodes // Wait for new namenode to get registrations from all the datanodes
waitActive(nnIndex); waitActive(nnIndex);
return info.nameNode;
} }


protected void setupDatanodeAddress(Configuration conf, boolean setupHostsFile, protected void setupDatanodeAddress(Configuration conf, boolean setupHostsFile,
Expand Down
Expand Up @@ -18,6 +18,7 @@


package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;


import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue; import static org.junit.Assume.assumeTrue;
Expand All @@ -28,6 +29,7 @@


import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.MiniDFSCluster.NameNodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.test.PathUtils; import org.apache.hadoop.test.PathUtils;
Expand Down Expand Up @@ -182,4 +184,49 @@ public void testClusterNoStorageTypeSetForDatanodes() throws IOException {
MiniDFSCluster.shutdownCluster(cluster); MiniDFSCluster.shutdownCluster(cluster);
} }
} }

@Test
public void testSetUpFederatedCluster() throws Exception {
Configuration conf = new Configuration();
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).nnTopology(
MiniDFSNNTopology.simpleHAFederatedTopology(2))
.numDataNodes(2)
.build();
try {
cluster.waitActive();
cluster.transitionToActive(1);
cluster.transitionToActive(3);
assertEquals("standby", cluster.getNamesystem(0).getHAState());
assertEquals("active", cluster.getNamesystem(1).getHAState());
assertEquals("standby", cluster.getNamesystem(2).getHAState());
assertEquals("active", cluster.getNamesystem(3).getHAState());

String ns0nn0 = conf.get(
DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTP_ADDRESS_KEY, "ns0", "nn0"));
String ns0nn1 = conf.get(
DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTP_ADDRESS_KEY, "ns0", "nn1"));
String ns1nn0 = conf.get(
DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTP_ADDRESS_KEY, "ns1", "nn0"));
String ns1nn1 = conf.get(
DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTP_ADDRESS_KEY, "ns1", "nn1"));

for(NameNodeInfo nnInfo : cluster.getNameNodeInfos()) {
assertEquals(ns0nn0, nnInfo.conf.get(
DFSUtil.addKeySuffixes(
DFS_NAMENODE_HTTP_ADDRESS_KEY, "ns0", "nn0")));
assertEquals(ns0nn1, nnInfo.conf.get(
DFSUtil.addKeySuffixes(
DFS_NAMENODE_HTTP_ADDRESS_KEY, "ns0", "nn1")));
assertEquals(ns1nn0, nnInfo.conf.get(
DFSUtil.addKeySuffixes(
DFS_NAMENODE_HTTP_ADDRESS_KEY, "ns1", "nn0")));
assertEquals(ns1nn1, nnInfo.conf.get(
DFSUtil.addKeySuffixes(
DFS_NAMENODE_HTTP_ADDRESS_KEY, "ns1", "nn1")));
}
} finally {
MiniDFSCluster.shutdownCluster(cluster);
}
}
} }

0 comments on commit de8efc6

Please sign in to comment.