Skip to content

Commit

Permalink
Merge branch 'apache:trunk' into YARN-11226-V2
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 committed Nov 16, 2022
2 parents 61f9165 + 142df24 commit a23e080
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 2 deletions.
Expand Up @@ -2166,6 +2166,13 @@ The switch to turn S3A auditing on or off.
<description>The AbstractFileSystem for gs: uris.</description>
</property>

<property>
<name>fs.azure.enable.readahead</name>
<value>false</value>
<description>Disable readahead/prefetching in AbfsInputStream.
See HADOOP-18521</description>
</property>

<property>
<name>io.seqfile.compress.blocksize</name>
<value>1000000</value>
Expand Down
Expand Up @@ -912,6 +912,7 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
fsNamesys.getFSImage().updateStorageVersion();
fsNamesys.getFSImage().renameCheckpoint(NameNodeFile.IMAGE_ROLLBACK,
NameNodeFile.IMAGE);
fsNamesys.setNeedRollbackFsImage(false);
break;
}
case OP_ADD_CACHE_DIRECTIVE: {
Expand Down
Expand Up @@ -33,6 +33,9 @@
import javax.management.ReflectionException;
import javax.management.openmbean.CompositeDataSupport;

import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
Expand Down Expand Up @@ -720,6 +723,39 @@ static void queryForPreparation(DistributedFileSystem dfs) throws IOException,
}
}

@Test
public void testEditLogTailerRollingUpgrade() throws IOException, InterruptedException {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 1);

HAUtil.setAllowStandbyReads(conf, true);

MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(0)
.build();
cluster.waitActive();

cluster.transitionToActive(0);

NameNode nn1 = cluster.getNameNode(0);
NameNode nn2 = cluster.getNameNode(1);
try {
// RU start should trigger rollback image in standbycheckpointer
nn1.getRpcServer().rollingUpgrade(HdfsConstants.RollingUpgradeAction.PREPARE);
HATestUtil.waitForStandbyToCatchUp(nn1, nn2);
Assert.assertTrue(nn2.getNamesystem().isNeedRollbackFsImage());

// RU finalize should reset rollback image flag in standbycheckpointer
nn1.getRpcServer().rollingUpgrade(HdfsConstants.RollingUpgradeAction.FINALIZE);
HATestUtil.waitForStandbyToCatchUp(nn1, nn2);
Assert.assertFalse(nn2.getNamesystem().isNeedRollbackFsImage());
} finally {
cluster.shutdown();
}
}

/**
* In non-HA setup, after rolling upgrade prepare, the Secondary NN should
* still be able to do checkpoint
Expand Down
Expand Up @@ -331,6 +331,8 @@ void buildPackage()
LOG.info("Compressing tarball");
try (TarArchiveOutputStream out = new TarArchiveOutputStream(
targetStream)) {
// Workaround for the compress issue present from 1.21: COMPRESS-587
out.setBigNumberMode(TarArchiveOutputStream.BIGNUMBER_STAR);
for (String fullPath : filteredInputFiles) {
LOG.info("Adding " + fullPath);
File file = new File(fullPath);
Expand Down
Expand Up @@ -106,7 +106,7 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_ABFS_LATENCY_TRACK = false;
public static final long DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS = 120;

public static final boolean DEFAULT_ENABLE_READAHEAD = true;
public static final boolean DEFAULT_ENABLE_READAHEAD = false;
public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING;
public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN";

Expand Down
Expand Up @@ -35,7 +35,7 @@ public class AbfsInputStreamContext extends AbfsStreamContext {

private boolean tolerateOobAppends;

private boolean isReadAheadEnabled = true;
private boolean isReadAheadEnabled = false;

private boolean alwaysReadBufferSize;

Expand Down
Expand Up @@ -34,6 +34,7 @@

import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_AHEAD_RANGE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
Expand Down Expand Up @@ -68,6 +69,7 @@ protected Configuration createConfiguration() {
protected AbstractFSContract createContract(final Configuration conf) {
conf.setInt(AZURE_READ_AHEAD_RANGE, MIN_BUFFER_SIZE);
conf.setInt(AZURE_READ_BUFFER_SIZE, MIN_BUFFER_SIZE);
conf.setBoolean(FS_AZURE_ENABLE_READAHEAD, true);
return new AbfsFileSystemContract(conf, isSecure);
}

Expand Down
Expand Up @@ -106,6 +106,7 @@ private AbfsClient getMockAbfsClient() {
private AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient,
String fileName) throws IOException {
AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1);
inputStreamContext.isReadAheadEnabled(true);
// Create AbfsInputStream with the client instance
AbfsInputStream inputStream = new AbfsInputStream(
mockAbfsClient,
Expand All @@ -131,6 +132,7 @@ public AbfsInputStream getAbfsInputStream(AbfsClient abfsClient,
boolean alwaysReadBufferSize,
int readAheadBlockSize) throws IOException {
AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1);
inputStreamContext.isReadAheadEnabled(true);
// Create AbfsInputStream with the client instance
AbfsInputStream inputStream = new AbfsInputStream(
abfsClient,
Expand Down

0 comments on commit a23e080

Please sign in to comment.