Skip to content

Commit

Permalink
HDFS-12498. Journal Syncer is not started in Federated + HA cluster. …
Browse files Browse the repository at this point in the history
…Contributed by Bharat Viswanadham.
  • Loading branch information
arp7 committed Nov 11, 2017
1 parent 1d6f8be commit 6d201f7
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 12 deletions.
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;


import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos
Expand All @@ -51,6 +52,8 @@
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.net.URL; import java.net.URL;
import java.util.Collection;
import java.util.HashSet;
import java.util.List; import java.util.List;


/** /**
Expand Down Expand Up @@ -263,25 +266,63 @@ private void syncWithJournalAtIndex(int index) {
} }


private List<InetSocketAddress> getOtherJournalNodeAddrs() { private List<InetSocketAddress> getOtherJournalNodeAddrs() {
URI uri = null; String uriStr = "";
try { try {
String uriStr = conf.get(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY); uriStr = conf.getTrimmed(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY);

if (uriStr == null || uriStr.isEmpty()) {
if (nameServiceId != null) {
uriStr = conf.getTrimmed(DFSConfigKeys
.DFS_NAMENODE_SHARED_EDITS_DIR_KEY + "." + nameServiceId);
}
}

if (uriStr == null || uriStr.isEmpty()) { if (uriStr == null || uriStr.isEmpty()) {
LOG.warn("Could not construct Shared Edits Uri"); HashSet<String> sharedEditsUri = Sets.newHashSet();
if (nameServiceId != null) {
Collection<String> nnIds = DFSUtilClient.getNameNodeIds(
conf, nameServiceId);
for (String nnId : nnIds) {
String suffix = nameServiceId + "." + nnId;
uriStr = conf.getTrimmed(DFSConfigKeys
.DFS_NAMENODE_SHARED_EDITS_DIR_KEY + "." + suffix);
sharedEditsUri.add(uriStr);
}
if (sharedEditsUri.size() > 1) {
uriStr = null;
LOG.error("The conf property " + DFSConfigKeys
.DFS_NAMENODE_SHARED_EDITS_DIR_KEY + " not set properly, " +
"it has been configured with different journalnode values " +
sharedEditsUri.toString() + " for a" +
" single nameserviceId" + nameServiceId);
}
}
}

if (uriStr == null || uriStr.isEmpty()) {
LOG.error("Could not construct Shared Edits Uri");
return null; return null;
} else {
return getJournalAddrList(uriStr);
} }
uri = new URI(uriStr);
return Util.getLoggerAddresses(uri,
Sets.newHashSet(jn.getBoundIpcAddress()));
} catch (URISyntaxException e) { } catch (URISyntaxException e) {
LOG.error("The conf property " + DFSConfigKeys LOG.error("The conf property " + DFSConfigKeys
.DFS_NAMENODE_SHARED_EDITS_DIR_KEY + " not set properly."); .DFS_NAMENODE_SHARED_EDITS_DIR_KEY + " not set properly.");
} catch (IOException e) { } catch (IOException e) {
LOG.error("Could not parse JournalNode addresses: " + uri); LOG.error("Could not parse JournalNode addresses: " + uriStr);
} }
return null; return null;
} }


private List<InetSocketAddress> getJournalAddrList(String uriStr) throws
URISyntaxException,
IOException {
URI uri = new URI(uriStr);
return Util.getLoggerAddresses(uri,
Sets.newHashSet(jn.getBoundIpcAddress()));
}

private JournalIdProto convertJournalId(String journalId) { private JournalIdProto convertJournalId(String journalId) {
return QJournalProtocolProtos.JournalIdProto.newBuilder() return QJournalProtocolProtos.JournalIdProto.newBuilder()
.setIdentifier(journalId) .setIdentifier(journalId)
Expand Down
Expand Up @@ -106,6 +106,24 @@ public void setup() throws Exception {
"testJournalNodeSyncerNotStartWhenSyncEnabled")) { "testJournalNodeSyncerNotStartWhenSyncEnabled")) {
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
"qjournal://jn0:9900;jn1:9901"); "qjournal://jn0:9900;jn1:9901");
} else if (testName.getMethodName().equals(
"testJournalNodeSyncwithFederationTypeConfigWithNameServiceId")) {
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY +".ns1",
"qjournal://journalnode0:9900;journalnode0:9901");
} else if (testName.getMethodName().equals(
"testJournalNodeSyncwithFederationTypeConfigWithNamenodeId")) {
conf.set(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + ".ns1", "nn1,nn2");
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY +".ns1" +".nn1",
"qjournal://journalnode0:9900;journalnode1:9901");
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY +".ns1" +".nn2",
"qjournal://journalnode0:9900;journalnode1:9901");
} else if (testName.getMethodName().equals(
"testJournalNodeSyncwithFederationTypeIncorrectConfigWithNamenodeId")) {
conf.set(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + ".ns1", "nn1,nn2");
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY +".ns1" +".nn1",
"qjournal://journalnode0:9900;journalnode1:9901");
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY +".ns1" +".nn2",
"qjournal://journalnode0:9902;journalnode1:9903");
} }
jn = new JournalNode(); jn = new JournalNode();
jn.setConf(conf); jn.setConf(conf);
Expand Down Expand Up @@ -387,7 +405,7 @@ public void testJournalNodeStartupFailsCleanly() {


@Test @Test
public void testJournalNodeSyncerNotStartWhenSyncDisabled() public void testJournalNodeSyncerNotStartWhenSyncDisabled()
throws IOException{ throws IOException {
//JournalSyncer will not be started, as journalsync is not enabled //JournalSyncer will not be started, as journalsync is not enabled
conf.setBoolean(DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY, false); conf.setBoolean(DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY, false);
jn.getOrCreateJournal(journalId); jn.getOrCreateJournal(journalId);
Expand All @@ -408,7 +426,7 @@ public void testJournalNodeSyncerNotStartWhenSyncDisabled()


@Test @Test
public void testJournalNodeSyncerNotStartWhenSyncEnabledIncorrectURI() public void testJournalNodeSyncerNotStartWhenSyncEnabledIncorrectURI()
throws IOException{ throws IOException {
//JournalSyncer will not be started, //JournalSyncer will not be started,
// as shared edits hostnames are not resolved // as shared edits hostnames are not resolved
jn.getOrCreateJournal(journalId); jn.getOrCreateJournal(journalId);
Expand All @@ -431,7 +449,7 @@ public void testJournalNodeSyncerNotStartWhenSyncEnabledIncorrectURI()


@Test @Test
public void testJournalNodeSyncerNotStartWhenSyncEnabled() public void testJournalNodeSyncerNotStartWhenSyncEnabled()
throws IOException{ throws IOException {
//JournalSyncer will not be started, //JournalSyncer will not be started,
// as shared edits hostnames are not resolved // as shared edits hostnames are not resolved
jn.getOrCreateJournal(journalId); jn.getOrCreateJournal(journalId);
Expand All @@ -452,9 +470,84 @@ public void testJournalNodeSyncerNotStartWhenSyncEnabled()


} }


private void setupStaticHostResolution(int nameServiceIdCount,
@Test
public void testJournalNodeSyncwithFederationTypeConfigWithNameServiceId()
throws IOException {
//JournalSyncer will not be started, as nameserviceId passed is null,
// but configured shared edits dir is appended with nameserviceId
setupStaticHostResolution(2, "journalnode");
jn.getOrCreateJournal(journalId);
Assert.assertEquals(false,
jn.getJournalSyncerStatus(journalId));
Assert.assertEquals(false,
jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId());

//Trying by passing nameserviceId and resolve hostnames
// now IstriedJournalSyncerStartWithnsId should be set
// and also journalnode syncer will also be started

jn.getOrCreateJournal(journalId, "ns1");
Assert.assertEquals(true,
jn.getJournalSyncerStatus(journalId));
Assert.assertEquals(true,
jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId());
}

@Test
public void testJournalNodeSyncwithFederationTypeConfigWithNamenodeId()
throws IOException {
//JournalSyncer will not be started, as nameserviceId passed is null,
// but configured shared edits dir is appended with nameserviceId +
// namenodeId
setupStaticHostResolution(2, "journalnode");
jn.getOrCreateJournal(journalId);
Assert.assertEquals(false,
jn.getJournalSyncerStatus(journalId));
Assert.assertEquals(false,
jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId());

//Trying by passing nameserviceId and resolve hostnames
// now IstriedJournalSyncerStartWithnsId should be set
// and also journalnode syncer will also be started

jn.getOrCreateJournal(journalId, "ns1");
Assert.assertEquals(true,
jn.getJournalSyncerStatus(journalId));
Assert.assertEquals(true,
jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId());
}

@Test
public void
testJournalNodeSyncwithFederationTypeIncorrectConfigWithNamenodeId()
throws IOException {
//JournalSyncer will not be started, as nameserviceId passed is null,
// but configured shared edits dir is appended with nameserviceId +
// namenodeId
setupStaticHostResolution(2, "journalnode");
jn.getOrCreateJournal(journalId);
Assert.assertEquals(false,
jn.getJournalSyncerStatus(journalId));
Assert.assertEquals(false,
jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId());

//Trying by passing nameserviceId and resolve hostnames
// now IstriedJournalSyncerStartWithnsId should be set
// and journalnode syncer will not be started
// as for each nnId, different shared Edits dir value is configured

jn.getOrCreateJournal(journalId, "ns1");
Assert.assertEquals(false,
jn.getJournalSyncerStatus(journalId));
Assert.assertEquals(true,
jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId());
}


private void setupStaticHostResolution(int journalNodeCount,
String hostname) { String hostname) {
for (int i = 0; i < nameServiceIdCount; i++) { for (int i = 0; i < journalNodeCount; i++) {
NetUtils.addStaticResolution(hostname + i, NetUtils.addStaticResolution(hostname + i,
"localhost"); "localhost");
} }
Expand Down

0 comments on commit 6d201f7

Please sign in to comment.