Skip to content

Commit

Permalink
Merge branch 'trunk' into YARN-11272
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 committed Aug 22, 2022
2 parents 99d4b91 + eda4bb5 commit a1a870c
Show file tree
Hide file tree
Showing 16 changed files with 442 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,16 @@ default int maxReadSizeForVectorReads() {
* As a result of the call, each range will have FileRange.setData(CompletableFuture)
* called with a future that when complete will have a ByteBuffer with the
* data from the file's range.
* <p>
* The position returned by getPos() after readVectored() is undefined.
* </p>
* <p>
* If a file is changed while the readVectored() operation is in progress, the output is
* undefined. Some ranges may have old data, some may have new and some may have both.
* </p>
* <p>
* While a readVectored() operation is in progress, normal read api calls may block.
* </p>
* @param ranges the byte ranges to read
* @param allocate the function to allocate ByteBuffer
* @throws IOException any IOE.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,10 @@ String getPassword(Configuration conf, String alias, String defaultPass) {
*/
@Override
public synchronized void destroy() {
if (trustManager != null) {
if (fileMonitoringTimer != null) {
fileMonitoringTimer.cancel();
}
if (trustManager != null) {
trustManager = null;
keyManagers = null;
trustManagers = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,13 @@ Also, clients are encouraged to use `WeakReferencedElasticByteBufferPool` for
allocating buffers such that even direct buffers are garbage collected when
they are no longer referenced.

The position returned by `getPos()` after `readVectored()` is undefined.

If a file is changed while the `readVectored()` operation is in progress, the output is
undefined. Some ranges may have old data, some may have new, and some may have both.

While a `readVectored()` operation is in progress, normal read api calls may block.

Note: Don't use direct buffers for reading from ChecksumFileSystem as that may
lead to memory fragmentation explained in HADOOP-18296.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ synchronized Journal getOrCreateJournal(String jid,
return journal;
}

@VisibleForTesting
public JournalNodeSyncer getJournalSyncer(String jid) {
return journalSyncersById.get(jid);
}

@VisibleForTesting
public boolean getJournalSyncerStatus(String jid) {
if (journalSyncersById.get(jid) != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.qjournal.server;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -39,6 +40,7 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -50,10 +52,10 @@
import java.net.URISyntaxException;
import java.net.URL;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
* A Journal Sync thread runs through the lifetime of the JN. It periodically
Expand Down Expand Up @@ -153,6 +155,9 @@ private boolean getOtherJournalNodeProxies() {
LOG.warn("Could not add proxy for Journal at addresss " + addr, e);
}
}
// Check if there are any other JournalNodes before starting the sync. Although some proxies
// may be unresolved now, the act of attempting to sync will instigate resolution when the
// servers become available.
if (otherJNProxies.isEmpty()) {
LOG.error("Cannot sync as there is no other JN available for sync.");
return false;
Expand Down Expand Up @@ -310,12 +315,24 @@ private List<InetSocketAddress> getOtherJournalNodeAddrs() {
return null;
}

private List<InetSocketAddress> getJournalAddrList(String uriStr) throws
@VisibleForTesting
protected List<InetSocketAddress> getJournalAddrList(String uriStr) throws
URISyntaxException,
IOException {
URI uri = new URI(uriStr);
return Util.getLoggerAddresses(uri,
new HashSet<>(Arrays.asList(jn.getBoundIpcAddress())), conf);

InetSocketAddress boundIpcAddress = jn.getBoundIpcAddress();
Set<InetSocketAddress> excluded = Sets.newHashSet(boundIpcAddress);
List<InetSocketAddress> addrList = Util.getLoggerAddresses(uri, excluded, conf);

// Exclude the current JournalNode instance (a local address and the same port). If the address
// is bound to a local address on the same port, then remove it to handle scenarios where a
// wildcard address (e.g. "0.0.0.0") is used. We can't simply exclude all local addresses
// since we may be running multiple servers on the same host.
addrList.removeIf(addr -> !addr.isUnresolved() && addr.getAddress().isAnyLocalAddress()
&& boundIpcAddress.getPort() == addr.getPort());

return addrList;
}

private void getMissingLogSegments(List<RemoteEditLog> thisJournalEditLogs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.qjournal.server;

import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
Expand All @@ -34,6 +36,7 @@
import static org.apache.hadoop.hdfs.server.namenode.FileJournalManager
.getLogFile;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;

import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.test.GenericTestUtils;
Expand Down Expand Up @@ -96,12 +99,45 @@ public void shutDownMiniCluster() throws IOException {
}
}

/**
* Test that the "self exclusion" works when there are multiple JournalNode instances running on
* the same server, but on different ports.
*/
@Test
public void testJournalNodeExcludesSelfMultilpePorts() throws URISyntaxException, IOException {
String uri = qjmhaCluster.getJournalCluster().getQuorumJournalURI("ns1").toString();
JournalNodeSyncer syncer = jCluster.getJournalNode(0).getJournalSyncer("ns1");

// Test: Get the Journal address list for the default configuration
List<InetSocketAddress> addrList = syncer.getJournalAddrList(uri);

// Verify: One of the addresses should be excluded so that the node isn't syncing with itself
assertEquals(2, addrList.size());
}

/**
* Test that the "self exclusion" works when there a host uses a wildcard address.
*/
@Test
public void testJournalNodeExcludesSelfWildCard() throws URISyntaxException, IOException {
String uri = qjmhaCluster.getJournalCluster().getQuorumJournalURI("ns1").toString();
JournalNodeSyncer syncer = jCluster.getJournalNode(0).getJournalSyncer("ns1");

// Test: Request the same Journal address list, but using the IPv4 "0.0.0.0" which is commonly
// used as a bind host.
String boundHostUri = uri.replaceAll("127.0.0.1", "0.0.0.0");
List<InetSocketAddress> boundHostAddrList = syncer.getJournalAddrList(boundHostUri);

// Verify: One of the address should be excluded so that the node isn't syncing with itself
assertEquals(2, boundHostAddrList.size());
}

@Test(timeout=30000)
public void testJournalNodeSync() throws Exception {

//As by default 3 journal nodes are started;
for(int i=0; i<3; i++) {
Assert.assertEquals(true,
assertEquals(true,
jCluster.getJournalNode(i).getJournalSyncerStatus("ns1"));
}

Expand Down Expand Up @@ -386,13 +422,13 @@ public void testSyncDuringRollingUpgrade() throws Exception {
HdfsConstants.RollingUpgradeAction.PREPARE);

//query rolling upgrade
Assert.assertEquals(info, dfsActive.rollingUpgrade(
assertEquals(info, dfsActive.rollingUpgrade(
HdfsConstants.RollingUpgradeAction.QUERY));

// Restart the Standby NN with rollingUpgrade option
dfsCluster.restartNameNode(standbyNNindex, true,
"-rollingUpgrade", "started");
Assert.assertEquals(info, dfsActive.rollingUpgrade(
assertEquals(info, dfsActive.rollingUpgrade(
HdfsConstants.RollingUpgradeAction.QUERY));

// Do some edits and delete some edit logs
Expand Down Expand Up @@ -420,7 +456,7 @@ public void testSyncDuringRollingUpgrade() throws Exception {
// Restart the current standby NN (previously active)
dfsCluster.restartNameNode(standbyNNindex, true,
"-rollingUpgrade", "started");
Assert.assertEquals(info, dfsActive.rollingUpgrade(
assertEquals(info, dfsActive.rollingUpgrade(
HdfsConstants.RollingUpgradeAction.QUERY));
dfsCluster.waitActive();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3920,6 +3920,13 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String DEFAULT_FEDERATION_REGISTRY_BASE_KEY =
"yarnfederation/";

public static final String FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY =
FEDERATION_PREFIX + "state-store.heartbeat.initial-delay";

// 30 secs
public static final int
DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY = 30;

public static final String FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS =
FEDERATION_PREFIX + "state-store.heartbeat-interval-secs";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3624,6 +3624,16 @@
<name>yarn.federation.enabled</name>
<value>false</value>
</property>
<property>
<description>
Initial delay for federation state-store heartbeat service. Value is followed by a unit
specifier: ns, us, ms, s, m, h, d for nanoseconds, microseconds, milliseconds, seconds,
minutes, hours, days respectively. Values should provide units,
but seconds are assumed
</description>
<name>yarn.federation.state-store.heartbeat.initial-delay</name>
<value>30s</value>
</property>
<property>
<description>
Machine list file to be loaded by the FederationSubCluster Resolver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,7 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}

return GetApplicationHomeSubClusterResponse.newInstance(
ApplicationHomeSubCluster.newInstance(appId, applications.get(appId)));
return GetApplicationHomeSubClusterResponse.newInstance(appId, applications.get(appId));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -720,8 +720,7 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
FederationStateStoreUtils.returnToPool(LOG, cstmt);
}
return GetApplicationHomeSubClusterResponse
.newInstance(ApplicationHomeSubCluster
.newInstance(request.getApplicationId(), homeRM));
.newInstance(request.getApplicationId(), homeRM);
}

@Override
Expand Down
Loading

0 comments on commit a1a870c

Please sign in to comment.