Skip to content

Commit

Permalink
Merge branch 'apache:trunk' into MAPREDUCE-7390
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 committed Nov 10, 2022
2 parents fa77c6f + b398a7b commit 72f6940
Show file tree
Hide file tree
Showing 21 changed files with 289 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1393,8 +1393,7 @@ private class Listener extends Thread {
bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
//Could be an ephemeral port
this.listenPort = acceptChannel.socket().getLocalPort();
Thread.currentThread().setName("Listener at " +
bindAddress + "/" + this.listenPort);
LOG.info("Listener at {}:{}", bindAddress, this.listenPort);
// create a selector;
selector= Selector.open();
readers = new Reader[readThreads];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,14 @@ public synchronized void snapshot(MetricsRecordBuilder builder, boolean all) {
if (all || changed()) {
numSamples += intervalStat.numSamples();
builder.addCounter(numInfo, numSamples)
.addGauge(avgInfo, lastStat().mean());
.addGauge(avgInfo, intervalStat.mean());
if (extended) {
builder.addGauge(stdevInfo, lastStat().stddev())
.addGauge(iMinInfo, lastStat().min())
.addGauge(iMaxInfo, lastStat().max())
builder.addGauge(stdevInfo, intervalStat.stddev())
.addGauge(iMinInfo, intervalStat.min())
.addGauge(iMaxInfo, intervalStat.max())
.addGauge(minInfo, minMax.min())
.addGauge(maxInfo, minMax.max())
.addGauge(iNumInfo, lastStat().numSamples());
.addGauge(iNumInfo, intervalStat.numSamples());
}
if (changed()) {
if (numSamples > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,27 @@ private static void snapshotMutableRatesWithAggregation(
}
}

/**
* MutableStat should output 0 instead of the previous state when there is no change.
*/
@Test public void testMutableWithoutChanged() {
MetricsRecordBuilder builderWithChange = mockMetricsRecordBuilder();
MetricsRecordBuilder builderWithoutChange = mockMetricsRecordBuilder();
MetricsRegistry registry = new MetricsRegistry("test");
MutableStat stat = registry.newStat("Test", "Test", "Ops", "Val", true);
stat.add(1000, 1000);
stat.add(1000, 2000);
registry.snapshot(builderWithChange, true);

assertCounter("TestNumOps", 2000L, builderWithChange);
assertGauge("TestINumOps", 2000L, builderWithChange);
assertGauge("TestAvgVal", 1.5, builderWithChange);

registry.snapshot(builderWithoutChange, true);
assertGauge("TestINumOps", 0L, builderWithoutChange);
assertGauge("TestAvgVal", 0.0, builderWithoutChange);
}

@Test
public void testDuplicateMetrics() {
MutableRatesWithAggregation rates = new MutableRatesWithAggregation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,9 +349,6 @@ public static ClientProtocol createProxyWithAlignmentContext(
boolean withRetries, AtomicBoolean fallbackToSimpleAuth,
AlignmentContext alignmentContext)
throws IOException {
if (alignmentContext == null) {
alignmentContext = new ClientGSIContext();
}
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
ProtobufRpcEngine2.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.net.NetUtils;
Expand Down Expand Up @@ -233,6 +234,20 @@ public FileSystem getFileSystem() throws IOException {
return DistributedFileSystem.get(conf);
}

public FileSystem getFileSystemWithObserverReadsEnabled() throws IOException {
Configuration observerReadConf = new Configuration(conf);
observerReadConf.set(DFS_NAMESERVICES,
observerReadConf.get(DFS_NAMESERVICES)+ ",router-service");
observerReadConf.set(DFS_HA_NAMENODES_KEY_PREFIX + ".router-service", "router1");
observerReadConf.set(DFS_NAMENODE_RPC_ADDRESS_KEY+ ".router-service.router1",
getFileSystemURI().toString());
observerReadConf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX
+ "." + "router-service", ObserverReadProxyProvider.class.getName());
DistributedFileSystem.setDefaultUri(observerReadConf, "hdfs://router-service");

return DistributedFileSystem.get(observerReadConf);
}

public DFSClient getClient(UserGroupInformation user)
throws IOException, URISyntaxException, InterruptedException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertThrows;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
Expand All @@ -41,15 +42,40 @@
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.junit.After;
import org.junit.Test;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;

public class TestObserverWithRouter {

public class TestObserverWithRouter {
private static final String SKIP_BEFORE_EACH_CLUSTER_STARTUP = "SkipBeforeEachClusterStartup";
private MiniRouterDFSCluster cluster;
private RouterContext routerContext;
private FileSystem fileSystem;

public void startUpCluster(int numberOfObserver) throws Exception {
startUpCluster(numberOfObserver, null);
@BeforeEach
void init(TestInfo info) throws Exception {
if (info.getTags().contains(SKIP_BEFORE_EACH_CLUSTER_STARTUP)) {
return;
}
startUpCluster(2, null);
}

@AfterEach
public void teardown() throws IOException {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}

routerContext = null;

if (fileSystem != null) {
fileSystem.close();
fileSystem = null;
}
}

public void startUpCluster(int numberOfObserver, Configuration confOverrides) throws Exception {
Expand Down Expand Up @@ -95,31 +121,39 @@ public void startUpCluster(int numberOfObserver, Configuration confOverrides) th
cluster.installMockLocations();

cluster.waitActiveNamespaces();
routerContext = cluster.getRandomRouter();
fileSystem = routerContext.getFileSystemWithObserverReadsEnabled();
}

@After
public void teardown() throws IOException {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
@Test
public void testObserverRead() throws Exception {
internalTestObserverRead();
}

/**
* Tests that without adding config to use ObserverProxyProvider, the client shouldn't
* have reads served by Observers.
* Fixes regression in HDFS-13522.
*/
@Test
public void testObserverRead() throws Exception {
startUpCluster(1);
RouterContext routerContext = cluster.getRandomRouter();
public void testReadWithoutObserverClientConfigurations() throws Exception {
fileSystem.close();
fileSystem = routerContext.getFileSystem();
assertThrows(AssertionError.class, this::internalTestObserverRead);
}

public void internalTestObserverRead()
throws Exception {
List<? extends FederationNamenodeContext> namenodes = routerContext
.getRouter().getNamenodeResolver()
.getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
assertEquals("First namenode should be observer", namenodes.get(0).getState(),
FederationNamenodeServiceState.OBSERVER);
FileSystem fileSystem = routerContext.getFileSystem();
Path path = new Path("/testFile");
// Send Create call to active
// Send create call
fileSystem.create(path).close();

// Send read request to observer
// Send read request
fileSystem.open(path).close();

long rpcCountForActive = routerContext.getRouter().getRpcServer()
Expand All @@ -131,21 +165,19 @@ public void testObserverRead() throws Exception {
.getRPCMetrics().getObserverProxyOps();
// getBlockLocations should be sent to observer
assertEquals("One call should be sent to observer", 1, rpcCountForObserver);
fileSystem.close();
}

@Test
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
public void testObserverReadWithoutFederatedStatePropagation() throws Exception {
Configuration confOverrides = new Configuration(false);
confOverrides.setInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE, 0);
startUpCluster(1, confOverrides);
RouterContext routerContext = cluster.getRandomRouter();
startUpCluster(2, confOverrides);
List<? extends FederationNamenodeContext> namenodes = routerContext
.getRouter().getNamenodeResolver()
.getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
assertEquals("First namenode should be observer", namenodes.get(0).getState(),
FederationNamenodeServiceState.OBSERVER);
FileSystem fileSystem = routerContext.getFileSystem();
Path path = new Path("/testFile");
// Send Create call to active
fileSystem.create(path).close();
Expand All @@ -161,22 +193,19 @@ public void testObserverReadWithoutFederatedStatePropagation() throws Exception
long rpcCountForObserver = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getObserverProxyOps();
assertEquals("No call should be sent to observer", 0, rpcCountForObserver);
fileSystem.close();
}

@Test
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
public void testDisablingObserverReadUsingNameserviceOverride() throws Exception {
// Disable observer reads using per-nameservice override
Configuration confOverrides = new Configuration(false);
confOverrides.set(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES, "ns0");
startUpCluster(1, confOverrides);
startUpCluster(2, confOverrides);

RouterContext routerContext = cluster.getRandomRouter();
FileSystem fileSystem = routerContext.getFileSystem();
Path path = new Path("/testFile");
fileSystem.create(path).close();
fileSystem.open(path).close();
fileSystem.close();

long rpcCountForActive = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getActiveProxyOps();
Expand All @@ -190,17 +219,15 @@ public void testDisablingObserverReadUsingNameserviceOverride() throws Exception

@Test
public void testReadWhenObserverIsDown() throws Exception {
startUpCluster(1);
RouterContext routerContext = cluster.getRandomRouter();
FileSystem fileSystem = routerContext.getFileSystem();
Path path = new Path("/testFile1");
// Send Create call to active
fileSystem.create(path).close();

// Stop observer NN
int nnIndex = stopObserver(1);

assertNotEquals("No observer found", 3, nnIndex);
nnIndex = stopObserver(1);
assertNotEquals("No observer found", 4, nnIndex);

// Send read request
fileSystem.open(path).close();
Expand All @@ -215,14 +242,10 @@ public void testReadWhenObserverIsDown() throws Exception {
.getRPCMetrics().getObserverProxyOps();
assertEquals("No call should send to observer", 0,
rpcCountForObserver);
fileSystem.close();
}

@Test
public void testMultipleObserver() throws Exception {
startUpCluster(2);
RouterContext routerContext = cluster.getRandomRouter();
FileSystem fileSystem = routerContext.getFileSystem();
Path path = new Path("/testFile1");
// Send Create call to active
fileSystem.create(path).close();
Expand Down Expand Up @@ -267,7 +290,6 @@ public void testMultipleObserver() throws Exception {
.getRpcServer().getRPCMetrics().getObserverProxyOps();
assertEquals("No call should send to observer",
expectedObserverRpc, rpcCountForObserver);
fileSystem.close();
}

private int stopObserver(int num) {
Expand All @@ -288,9 +310,9 @@ private int stopObserver(int num) {
// test router observer with multiple to know which observer NN received
// requests
@Test
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
public void testMultipleObserverRouter() throws Exception {
StateStoreDFSCluster innerCluster;
RouterContext routerContext;
MembershipNamenodeResolver resolver;

String ns0;
Expand Down Expand Up @@ -356,14 +378,12 @@ public void testMultipleObserverRouter() throws Exception {
namespaceInfo0.get(1).getNamenodeId());
assertEquals(namespaceInfo1.get(0).getState(),
FederationNamenodeServiceState.OBSERVER);

innerCluster.shutdown();
}

@Test
public void testUnavailableObserverNN() throws Exception {
startUpCluster(2);
RouterContext routerContext = cluster.getRandomRouter();
FileSystem fileSystem = routerContext.getFileSystem();

stopObserver(2);

Path path = new Path("/testFile");
Expand Down Expand Up @@ -397,12 +417,10 @@ public void testUnavailableObserverNN() throws Exception {
assertTrue("There must be unavailable namenodes", hasUnavailable);
}



@Test
public void testRouterMsync() throws Exception {
startUpCluster(1);
RouterContext routerContext = cluster.getRandomRouter();

FileSystem fileSystem = routerContext.getFileSystem();
Path path = new Path("/testFile");

// Send Create call to active
Expand All @@ -420,6 +438,5 @@ public void testRouterMsync() throws Exception {
// 2 msync calls should be sent. One to each active namenode in the two namespaces.
assertEquals("Four calls should be sent to active", 4,
rpcCountForActive);
fileSystem.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,11 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_ABFS_LATENCY_TRACK)
private boolean trackLatency;

@BooleanConfigurationValidatorAnnotation(
ConfigurationKey = FS_AZURE_ENABLE_READAHEAD,
DefaultValue = DEFAULT_ENABLE_READAHEAD)
private boolean enabledReadAhead;

@LongConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS,
MinValue = 0,
DefaultValue = DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS)
Expand Down Expand Up @@ -915,6 +920,15 @@ public SASTokenProvider getSASTokenProvider() throws AzureBlobFileSystemExceptio
}
}

public boolean isReadAheadEnabled() {
return this.enabledReadAhead;
}

@VisibleForTesting
void setReadAheadEnabled(final boolean enabledReadAhead) {
this.enabledReadAhead = enabledReadAhead;
}

public int getReadAheadRange() {
return this.readAheadRange;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,7 @@ private AbfsInputStreamContext populateAbfsInputStreamContext(
.withReadBufferSize(abfsConfiguration.getReadBufferSize())
.withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
.withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
.isReadAheadEnabled(abfsConfiguration.isReadAheadEnabled())
.withReadSmallFilesCompletely(abfsConfiguration.readSmallFilesCompletely())
.withOptimizeFooterRead(abfsConfiguration.optimizeFooterRead())
.withReadAheadRange(abfsConfiguration.getReadAheadRange())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,13 @@ public final class ConfigurationKeys {
public static final String FS_AZURE_SKIP_SUPER_USER_REPLACEMENT = "fs.azure.identity.transformer.skip.superuser.replacement";
public static final String AZURE_KEY_ACCOUNT_KEYPROVIDER = "fs.azure.account.keyprovider";
public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script";

/**
* Enable or disable readahead buffer in AbfsInputStream.
* Value: {@value}.
*/
public static final String FS_AZURE_ENABLE_READAHEAD = "fs.azure.enable.readahead";

/** Setting this true will make the driver use it's own RemoteIterator implementation */
public static final String FS_AZURE_ENABLE_ABFS_LIST_ITERATOR = "fs.azure.enable.abfslistiterator";
/** Server side encryption key */
Expand Down

0 comments on commit 72f6940

Please sign in to comment.