Skip to content

Commit

Permalink
HDFS-8865. Improve quota initialization performance. Contributed by K…
Browse files Browse the repository at this point in the history
…ihwal Lee.
  • Loading branch information
kihwal committed Aug 28, 2015
1 parent beb65c9 commit b6ceee9
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 57 deletions.
2 changes: 2 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Expand Up @@ -853,6 +853,8 @@ Release 2.8.0 - UNRELEASED
HDFS-8962. Clean up checkstyle warnings in o.a.h.hdfs.DfsClientConf.
(Mingliang Liu via wheat9)

HDFS-8865. Improve quota initialization performance. (kihwal)

OPTIMIZATIONS

HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
Expand Down
Expand Up @@ -214,6 +214,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {

public static final String DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY = "dfs.namenode.edits.dir.minimum";
public static final int DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT = 1;
public static final String DFS_NAMENODE_QUOTA_INIT_THREADS_KEY = "dfs.namenode.quota.init-threads";
public static final int DFS_NAMENODE_QUOTA_INIT_THREADS_DEFAULT = 4;

public static final String DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD = "dfs.namenode.edit.log.autoroll.multiplier.threshold";
public static final float DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD_DEFAULT = 2.0f;
Expand Down
Expand Up @@ -24,6 +24,7 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
Expand Down Expand Up @@ -82,6 +83,8 @@ static enum BNState {

private FSNamesystem namesystem;

private int quotaInitThreads;

/**
* Construct a backup image.
* @param conf Configuration
Expand All @@ -91,6 +94,9 @@ static enum BNState {
super(conf);
storage.setDisablePreUpgradableLayoutCheck(true);
bnState = BNState.DROP_UNTIL_NEXT_ROLL;
quotaInitThreads = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_QUOTA_INIT_THREADS_KEY,
DFSConfigKeys.DFS_NAMENODE_QUOTA_INIT_THREADS_DEFAULT);
}

synchronized FSNamesystem getNamesystem() {
Expand Down Expand Up @@ -218,7 +224,7 @@ private synchronized void applyEdits(long firstTxId, int numTxns, byte[] data)

FSImage.updateCountForQuota(
getNamesystem().dir.getBlockStoragePolicySuite(),
getNamesystem().dir.rootDir); // inefficient!
getNamesystem().dir.rootDir, quotaInitThreads);
} finally {
backupInputStream.clear();
}
Expand Down
Expand Up @@ -27,6 +27,8 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -70,6 +72,7 @@
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.EnumCounters;
import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.hdfs.util.ReadOnlyList;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.util.Time;

Expand Down Expand Up @@ -100,6 +103,7 @@ public class FSImage implements Closeable {
final private Configuration conf;

protected NNStorageRetentionManager archivalManager;
private int quotaInitThreads;

/* Used to make sure there are no concurrent checkpoints for a given txid
* The checkpoint here could be one of the following operations.
Expand Down Expand Up @@ -143,6 +147,10 @@ protected FSImage(Configuration conf,
storage.setRestoreFailedStorage(true);
}

this.quotaInitThreads = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_QUOTA_INIT_THREADS_KEY,
DFSConfigKeys.DFS_NAMENODE_QUOTA_INIT_THREADS_DEFAULT);

this.editLog = new FSEditLog(conf, storage, editsDirs);

archivalManager = new NNStorageRetentionManager(conf, storage, editLog);
Expand Down Expand Up @@ -847,7 +855,7 @@ private long loadEdits(Iterable<EditLogInputStream> editStreams,
FSEditLog.closeAllStreams(editStreams);
// update the counts
updateCountForQuota(target.getBlockManager().getStoragePolicySuite(),
target.dir.rootDir);
target.dir.rootDir, quotaInitThreads);
}
prog.endPhase(Phase.LOADING_EDITS);
return lastAppliedTxId - prevLastAppliedTxId;
Expand All @@ -862,65 +870,104 @@ private long loadEdits(Iterable<EditLogInputStream> editStreams,
* throw QuotaExceededException.
*/
static void updateCountForQuota(BlockStoragePolicySuite bsps,
INodeDirectory root) {
updateCountForQuotaRecursively(bsps, root.getStoragePolicyID(), root,
new QuotaCounts.Builder().build());
}

private static void updateCountForQuotaRecursively(BlockStoragePolicySuite bsps,
byte blockStoragePolicyId, INodeDirectory dir, QuotaCounts counts) {
final long parentNamespace = counts.getNameSpace();
final long parentStoragespace = counts.getStorageSpace();
final EnumCounters<StorageType> parentTypeSpaces = counts.getTypeSpaces();

dir.computeQuotaUsage4CurrentDirectory(bsps, blockStoragePolicyId, counts);

for (INode child : dir.getChildrenList(Snapshot.CURRENT_STATE_ID)) {
final byte childPolicyId = child.getStoragePolicyIDForQuota(blockStoragePolicyId);
if (child.isDirectory()) {
updateCountForQuotaRecursively(bsps, childPolicyId,
child.asDirectory(), counts);
} else {
// file or symlink: count here to reduce recursive calls.
counts.add(child.computeQuotaUsage(bsps, childPolicyId, false,
Snapshot.CURRENT_STATE_ID));
}
}

if (dir.isQuotaSet()) {
// check if quota is violated. It indicates a software bug.
final QuotaCounts q = dir.getQuotaCounts();

final long namespace = counts.getNameSpace() - parentNamespace;
final long nsQuota = q.getNameSpace();
if (Quota.isViolated(nsQuota, namespace)) {
LOG.warn("Namespace quota violation in image for "
+ dir.getFullPathName()
+ " quota = " + nsQuota + " < consumed = " + namespace);
}
INodeDirectory root, int threads) {
threads = (threads < 1) ? 1 : threads;
LOG.info("Initializing quota with " + threads + " thread(s)");
long start = Time.now();
QuotaCounts counts = new QuotaCounts.Builder().build();
ForkJoinPool p = new ForkJoinPool(threads);
RecursiveAction task = new InitQuotaTask(bsps, root.getStoragePolicyID(),
root, counts);
p.execute(task);
task.join();
LOG.info("Quota initialization completed in " + (Time.now() - start) +
" milliseconds\n" + counts);
}

final long ssConsumed = counts.getStorageSpace() - parentStoragespace;
final long ssQuota = q.getStorageSpace();
if (Quota.isViolated(ssQuota, ssConsumed)) {
LOG.warn("Storagespace quota violation in image for "
+ dir.getFullPathName()
+ " quota = " + ssQuota + " < consumed = " + ssConsumed);
/**
* parallel initialization using fork-join.
*/
private static class InitQuotaTask extends RecursiveAction {
private final INodeDirectory dir;
private final QuotaCounts counts;
private final BlockStoragePolicySuite bsps;
private final byte blockStoragePolicyId;

public InitQuotaTask(BlockStoragePolicySuite bsps,
byte blockStoragePolicyId, INodeDirectory dir, QuotaCounts counts) {
this.dir = dir;
this.counts = counts;
this.bsps = bsps;
this.blockStoragePolicyId = blockStoragePolicyId;
}

public void compute() {
QuotaCounts myCounts = new QuotaCounts.Builder().build();
dir.computeQuotaUsage4CurrentDirectory(bsps, blockStoragePolicyId,
myCounts);

ReadOnlyList<INode> children =
dir.getChildrenList(Snapshot.CURRENT_STATE_ID);

if (children.size() > 0) {
List<InitQuotaTask> subtasks = new ArrayList<InitQuotaTask>();
for (INode child : children) {
final byte childPolicyId =
child.getStoragePolicyIDForQuota(blockStoragePolicyId);
if (child.isDirectory()) {
subtasks.add(new InitQuotaTask(bsps, childPolicyId,
child.asDirectory(), myCounts));
} else {
// file or symlink. count using the local counts variable
myCounts.add(child.computeQuotaUsage(bsps, childPolicyId, false,
Snapshot.CURRENT_STATE_ID));
}
}
// invoke and wait for completion
invokeAll(subtasks);
}

final EnumCounters<StorageType> typeSpaces = counts.getTypeSpaces();
for (StorageType t : StorageType.getTypesSupportingQuota()) {
final long typeSpace = typeSpaces.get(t) - parentTypeSpaces.get(t);
final long typeQuota = q.getTypeSpaces().get(t);
if (Quota.isViolated(typeQuota, typeSpace)) {
LOG.warn("Storage type quota violation in image for "
if (dir.isQuotaSet()) {
// check if quota is violated. It indicates a software bug.
final QuotaCounts q = dir.getQuotaCounts();

final long nsConsumed = myCounts.getNameSpace();
final long nsQuota = q.getNameSpace();
if (Quota.isViolated(nsQuota, nsConsumed)) {
LOG.warn("Namespace quota violation in image for "
+ dir.getFullPathName()
+ " quota = " + nsQuota + " < consumed = " + nsConsumed);
}

final long ssConsumed = myCounts.getStorageSpace();
final long ssQuota = q.getStorageSpace();
if (Quota.isViolated(ssQuota, ssConsumed)) {
LOG.warn("Storagespace quota violation in image for "
+ dir.getFullPathName()
+ " type = " + t.toString() + " quota = "
+ typeQuota + " < consumed " + typeSpace);
+ " quota = " + ssQuota + " < consumed = " + ssConsumed);
}

final EnumCounters<StorageType> tsConsumed = myCounts.getTypeSpaces();
for (StorageType t : StorageType.getTypesSupportingQuota()) {
final long typeSpace = tsConsumed.get(t);
final long typeQuota = q.getTypeSpaces().get(t);
if (Quota.isViolated(typeQuota, typeSpace)) {
LOG.warn("Storage type quota violation in image for "
+ dir.getFullPathName()
+ " type = " + t.toString() + " quota = "
+ typeQuota + " < consumed " + typeSpace);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Setting quota for " + dir + "\n" + myCounts);
}
dir.getDirectoryWithQuotaFeature().setSpaceConsumed(nsConsumed,
ssConsumed, tsConsumed);
}

dir.getDirectoryWithQuotaFeature().setSpaceConsumed(namespace, ssConsumed,
typeSpaces);
synchronized(counts) {
counts.add(myCounts);
}
}
}

Expand Down
Expand Up @@ -159,6 +159,13 @@ public boolean anyTypeSpaceCountGreaterOrEqual(long val) {
return tsCounts.anyGreaterOrEqual(val);
}

@Override
public String toString() {
return "name space=" + getNameSpace() +
"\nstorage space=" + getStorageSpace() +
"\nstorage types=" + getTypeSpaces();
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
Expand All @@ -176,4 +183,5 @@ public int hashCode() {
assert false : "hashCode not designed";
return 42; // any arbitrary constant will do
}
}

}
Expand Up @@ -2402,4 +2402,14 @@
</description>
</property>

<property>
<name>dfs.namenode.quota.init-threads</name>
<value>4</value>
<description>
The number of concurrent threads to be used in quota initialization. The
speed of quota initialization also affects the namenode fail-over latency.
If the size of name space is big, try increasing this.
</description>
</property>

</configuration>
Expand Up @@ -21,6 +21,7 @@
import static org.junit.Assert.assertTrue;

import java.util.EnumSet;
import java.util.HashMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
Expand All @@ -36,6 +37,7 @@
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.ipc.RemoteException;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -310,4 +312,66 @@ public void testTruncateOverQuota() throws Exception {
dfs.recoverLease(file);
cluster.restartNameNodes();
}

/**
* Check whether the quota is initialized correctly.
*/
@Test
public void testQuotaInitialization() throws Exception {
final int size = 500;
Path testDir = new Path("/testDir");
long expectedSize = 3 * BLOCKSIZE + BLOCKSIZE/2;
dfs.mkdirs(testDir);
dfs.setQuota(testDir, size*4, expectedSize*size*2);

Path[] testDirs = new Path[size];
for (int i = 0; i < size; i++) {
testDirs[i] = new Path(testDir, "sub" + i);
dfs.mkdirs(testDirs[i]);
dfs.setQuota(testDirs[i], 100, 1000000);
DFSTestUtil.createFile(dfs, new Path(testDirs[i], "a"), expectedSize,
(short)1, 1L);
}

// Directly access the name system to obtain the current cached usage.
INodeDirectory root = fsdir.getRoot();
HashMap<String, Long> nsMap = new HashMap<String, Long>();
HashMap<String, Long> dsMap = new HashMap<String, Long>();
scanDirsWithQuota(root, nsMap, dsMap, false);

FSImage.updateCountForQuota(
fsdir.getBlockManager().getStoragePolicySuite(), root, 1);
scanDirsWithQuota(root, nsMap, dsMap, true);

FSImage.updateCountForQuota(
fsdir.getBlockManager().getStoragePolicySuite(), root, 2);
scanDirsWithQuota(root, nsMap, dsMap, true);

FSImage.updateCountForQuota(
fsdir.getBlockManager().getStoragePolicySuite(), root, 4);
scanDirsWithQuota(root, nsMap, dsMap, true);
}

private void scanDirsWithQuota(INodeDirectory dir,
HashMap<String, Long> nsMap,
HashMap<String, Long> dsMap, boolean verify) {
if (dir.isQuotaSet()) {
// get the current consumption
QuotaCounts q = dir.getDirectoryWithQuotaFeature().getSpaceConsumed();
String name = dir.getFullPathName();
if (verify) {
assertEquals(nsMap.get(name).longValue(), q.getNameSpace());
assertEquals(dsMap.get(name).longValue(), q.getStorageSpace());
} else {
nsMap.put(name, Long.valueOf(q.getNameSpace()));
dsMap.put(name, Long.valueOf(q.getStorageSpace()));
}
}

for (INode child : dir.getChildrenList(Snapshot.CURRENT_STATE_ID)) {
if (child instanceof INodeDirectory) {
scanDirsWithQuota((INodeDirectory)child, nsMap, dsMap, verify);
}
}
}
}
Expand Up @@ -159,7 +159,7 @@ private void loadFSImageFromTempFile(File imageFile) throws IOException {
try {
loader.load(imageFile, false);
FSImage.updateCountForQuota(fsn.getBlockManager().getStoragePolicySuite(),
INodeDirectory.valueOf(fsn.getFSDirectory().getINode("/"), "/"));
INodeDirectory.valueOf(fsn.getFSDirectory().getINode("/"), "/"), 4);
} finally {
fsn.getFSDirectory().writeUnlock();
fsn.writeUnlock();
Expand Down Expand Up @@ -509,4 +509,4 @@ public void testSaveLoadImageAfterSnapshotDeletion()
fsn = cluster.getNamesystem();
hdfs = cluster.getFileSystem();
}
}
}

0 comments on commit b6ceee9

Please sign in to comment.