Skip to content

Commit

Permalink
HDFS-6763. Initialize file system-wide quota once on transitioning to…
Browse files Browse the repository at this point in the history
… active. Contributed by Kihwal Lee
  • Loading branch information
kihwal committed Sep 10, 2015
1 parent 7b5b2c5 commit a40342b
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 141 deletions.
3 changes: 3 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Expand Up @@ -914,6 +914,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8974. Convert docs in xdoc format to markdown. HDFS-8974. Convert docs in xdoc format to markdown.
(Masatake Iwasaki via aajisaka) (Masatake Iwasaki via aajisaka)


HDFS-6763. Initialize file system-wide quota once on transitioning to active
(kihwal)

OPTIMIZATIONS OPTIMIZATIONS


HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
Expand Down
Expand Up @@ -24,7 +24,6 @@


import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException; import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
Expand Down Expand Up @@ -94,9 +93,6 @@ static enum BNState {
super(conf); super(conf);
storage.setDisablePreUpgradableLayoutCheck(true); storage.setDisablePreUpgradableLayoutCheck(true);
bnState = BNState.DROP_UNTIL_NEXT_ROLL; bnState = BNState.DROP_UNTIL_NEXT_ROLL;
quotaInitThreads = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_QUOTA_INIT_THREADS_KEY,
DFSConfigKeys.DFS_NAMENODE_QUOTA_INIT_THREADS_DEFAULT);
} }


synchronized FSNamesystem getNamesystem() { synchronized FSNamesystem getNamesystem() {
Expand Down Expand Up @@ -222,9 +218,7 @@ private synchronized void applyEdits(long firstTxId, int numTxns, byte[] data)
} }
lastAppliedTxId = logLoader.getLastAppliedTxId(); lastAppliedTxId = logLoader.getLastAppliedTxId();


FSImage.updateCountForQuota( getNamesystem().dir.updateCountForQuota();
getNamesystem().dir.getBlockStoragePolicySuite(),
getNamesystem().dir.rootDir, quotaInitThreads);
} finally { } finally {
backupInputStream.clear(); backupInputStream.clear();
} }
Expand Down
Expand Up @@ -57,8 +57,10 @@
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo.UpdatedReplicationInfo; import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo.UpdatedReplicationInfo;
import org.apache.hadoop.hdfs.util.ByteArray; import org.apache.hadoop.hdfs.util.ByteArray;
import org.apache.hadoop.hdfs.util.EnumCounters; import org.apache.hadoop.hdfs.util.EnumCounters;
import org.apache.hadoop.hdfs.util.ReadOnlyList;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand All @@ -68,6 +70,8 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
Expand Down Expand Up @@ -138,6 +142,7 @@ private static INodeDirectory createRoot(FSNamesystem namesystem) {
private final long contentSleepMicroSec; private final long contentSleepMicroSec;
private final INodeMap inodeMap; // Synchronized by dirLock private final INodeMap inodeMap; // Synchronized by dirLock
private long yieldCount = 0; // keep track of lock yield count. private long yieldCount = 0; // keep track of lock yield count.
private int quotaInitThreads;


private final int inodeXAttrsLimit; //inode xattrs max limit private final int inodeXAttrsLimit; //inode xattrs max limit


Expand Down Expand Up @@ -312,6 +317,10 @@ public int getWriteHoldCount() {
namesystem = ns; namesystem = ns;
this.editLog = ns.getEditLog(); this.editLog = ns.getEditLog();
ezManager = new EncryptionZoneManager(this, conf); ezManager = new EncryptionZoneManager(this, conf);

this.quotaInitThreads = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_QUOTA_INIT_THREADS_KEY,
DFSConfigKeys.DFS_NAMENODE_QUOTA_INIT_THREADS_DEFAULT);
} }


FSNamesystem getFSNamesystem() { FSNamesystem getFSNamesystem() {
Expand Down Expand Up @@ -503,6 +512,125 @@ void updateReplicationFactor(Collection<UpdatedReplicationInfo> blocks) {
} }
} }


/**
* Update the count of each directory with quota in the namespace.
* A directory's count is defined as the total number inodes in the tree
* rooted at the directory.
*
* This is an update of existing state of the filesystem and does not
* throw QuotaExceededException.
*/
void updateCountForQuota(int initThreads) {
writeLock();
try {
int threads = (initThreads < 1) ? 1 : initThreads;
LOG.info("Initializing quota with " + threads + " thread(s)");
long start = Time.now();
QuotaCounts counts = new QuotaCounts.Builder().build();
ForkJoinPool p = new ForkJoinPool(threads);
RecursiveAction task = new InitQuotaTask(getBlockStoragePolicySuite(),
rootDir.getStoragePolicyID(), rootDir, counts);
p.execute(task);
task.join();
p.shutdown();
LOG.info("Quota initialization completed in " + (Time.now() - start) +
" milliseconds\n" + counts);
} finally {
writeUnlock();
}
}

void updateCountForQuota() {
updateCountForQuota(quotaInitThreads);
}

/**
* parallel initialization using fork-join.
*/
private static class InitQuotaTask extends RecursiveAction {
private final INodeDirectory dir;
private final QuotaCounts counts;
private final BlockStoragePolicySuite bsps;
private final byte blockStoragePolicyId;

public InitQuotaTask(BlockStoragePolicySuite bsps,
byte blockStoragePolicyId, INodeDirectory dir, QuotaCounts counts) {
this.dir = dir;
this.counts = counts;
this.bsps = bsps;
this.blockStoragePolicyId = blockStoragePolicyId;
}

public void compute() {
QuotaCounts myCounts = new QuotaCounts.Builder().build();
dir.computeQuotaUsage4CurrentDirectory(bsps, blockStoragePolicyId,
myCounts);

ReadOnlyList<INode> children =
dir.getChildrenList(CURRENT_STATE_ID);

if (children.size() > 0) {
List<InitQuotaTask> subtasks = new ArrayList<InitQuotaTask>();
for (INode child : children) {
final byte childPolicyId =
child.getStoragePolicyIDForQuota(blockStoragePolicyId);
if (child.isDirectory()) {
subtasks.add(new InitQuotaTask(bsps, childPolicyId,
child.asDirectory(), myCounts));
} else {
// file or symlink. count using the local counts variable
myCounts.add(child.computeQuotaUsage(bsps, childPolicyId, false,
CURRENT_STATE_ID));
}
}
// invoke and wait for completion
invokeAll(subtasks);
}

if (dir.isQuotaSet()) {
// check if quota is violated. It indicates a software bug.
final QuotaCounts q = dir.getQuotaCounts();

final long nsConsumed = myCounts.getNameSpace();
final long nsQuota = q.getNameSpace();
if (Quota.isViolated(nsQuota, nsConsumed)) {
LOG.warn("Namespace quota violation in image for "
+ dir.getFullPathName()
+ " quota = " + nsQuota + " < consumed = " + nsConsumed);
}

final long ssConsumed = myCounts.getStorageSpace();
final long ssQuota = q.getStorageSpace();
if (Quota.isViolated(ssQuota, ssConsumed)) {
LOG.warn("Storagespace quota violation in image for "
+ dir.getFullPathName()
+ " quota = " + ssQuota + " < consumed = " + ssConsumed);
}

final EnumCounters<StorageType> tsConsumed = myCounts.getTypeSpaces();
for (StorageType t : StorageType.getTypesSupportingQuota()) {
final long typeSpace = tsConsumed.get(t);
final long typeQuota = q.getTypeSpaces().get(t);
if (Quota.isViolated(typeQuota, typeSpace)) {
LOG.warn("Storage type quota violation in image for "
+ dir.getFullPathName()
+ " type = " + t.toString() + " quota = "
+ typeQuota + " < consumed " + typeSpace);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Setting quota for " + dir + "\n" + myCounts);
}
dir.getDirectoryWithQuotaFeature().setSpaceConsumed(nsConsumed,
ssConsumed, tsConsumed);
}

synchronized(counts) {
counts.add(myCounts);
}
}
}

/** Updates namespace, storagespace and typespaces consumed for all /** Updates namespace, storagespace and typespaces consumed for all
* directories until the parent directory of file represented by path. * directories until the parent directory of file represented by path.
* *
Expand Down
Expand Up @@ -27,8 +27,6 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
Expand All @@ -42,12 +40,10 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.protocol.LayoutVersion; import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
Expand All @@ -61,7 +57,6 @@
import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.FSImageFile; import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.FSImageFile;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase; import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress; import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand; import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
Expand All @@ -70,9 +65,7 @@
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.util.Canceler; import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.EnumCounters;
import org.apache.hadoop.hdfs.util.MD5FileUtils; import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.hdfs.util.ReadOnlyList;
import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;


Expand Down Expand Up @@ -147,12 +140,7 @@ protected FSImage(Configuration conf,
storage.setRestoreFailedStorage(true); storage.setRestoreFailedStorage(true);
} }


this.quotaInitThreads = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_QUOTA_INIT_THREADS_KEY,
DFSConfigKeys.DFS_NAMENODE_QUOTA_INIT_THREADS_DEFAULT);

this.editLog = new FSEditLog(conf, storage, editsDirs); this.editLog = new FSEditLog(conf, storage, editsDirs);

archivalManager = new NNStorageRetentionManager(conf, storage, editLog); archivalManager = new NNStorageRetentionManager(conf, storage, editLog);
} }


Expand Down Expand Up @@ -853,125 +841,11 @@ private long loadEdits(Iterable<EditLogInputStream> editStreams,
} }
} finally { } finally {
FSEditLog.closeAllStreams(editStreams); FSEditLog.closeAllStreams(editStreams);
// update the counts
updateCountForQuota(target.getBlockManager().getStoragePolicySuite(),
target.dir.rootDir, quotaInitThreads);
} }
prog.endPhase(Phase.LOADING_EDITS); prog.endPhase(Phase.LOADING_EDITS);
return lastAppliedTxId - prevLastAppliedTxId; return lastAppliedTxId - prevLastAppliedTxId;
} }


/**
* Update the count of each directory with quota in the namespace.
* A directory's count is defined as the total number inodes in the tree
* rooted at the directory.
*
* This is an update of existing state of the filesystem and does not
* throw QuotaExceededException.
*/
static void updateCountForQuota(BlockStoragePolicySuite bsps,
INodeDirectory root, int threads) {
threads = (threads < 1) ? 1 : threads;
LOG.info("Initializing quota with " + threads + " thread(s)");
long start = Time.now();
QuotaCounts counts = new QuotaCounts.Builder().build();
ForkJoinPool p = new ForkJoinPool(threads);
RecursiveAction task = new InitQuotaTask(bsps, root.getStoragePolicyID(),
root, counts);
p.execute(task);
task.join();
p.shutdown();
LOG.info("Quota initialization completed in " + (Time.now() - start) +
" milliseconds\n" + counts);
}

/**
* parallel initialization using fork-join.
*/
private static class InitQuotaTask extends RecursiveAction {
private final INodeDirectory dir;
private final QuotaCounts counts;
private final BlockStoragePolicySuite bsps;
private final byte blockStoragePolicyId;

public InitQuotaTask(BlockStoragePolicySuite bsps,
byte blockStoragePolicyId, INodeDirectory dir, QuotaCounts counts) {
this.dir = dir;
this.counts = counts;
this.bsps = bsps;
this.blockStoragePolicyId = blockStoragePolicyId;
}

public void compute() {
QuotaCounts myCounts = new QuotaCounts.Builder().build();
dir.computeQuotaUsage4CurrentDirectory(bsps, blockStoragePolicyId,
myCounts);

ReadOnlyList<INode> children =
dir.getChildrenList(Snapshot.CURRENT_STATE_ID);

if (children.size() > 0) {
List<InitQuotaTask> subtasks = new ArrayList<InitQuotaTask>();
for (INode child : children) {
final byte childPolicyId =
child.getStoragePolicyIDForQuota(blockStoragePolicyId);
if (child.isDirectory()) {
subtasks.add(new InitQuotaTask(bsps, childPolicyId,
child.asDirectory(), myCounts));
} else {
// file or symlink. count using the local counts variable
myCounts.add(child.computeQuotaUsage(bsps, childPolicyId, false,
Snapshot.CURRENT_STATE_ID));
}
}
// invoke and wait for completion
invokeAll(subtasks);
}

if (dir.isQuotaSet()) {
// check if quota is violated. It indicates a software bug.
final QuotaCounts q = dir.getQuotaCounts();

final long nsConsumed = myCounts.getNameSpace();
final long nsQuota = q.getNameSpace();
if (Quota.isViolated(nsQuota, nsConsumed)) {
LOG.warn("Namespace quota violation in image for "
+ dir.getFullPathName()
+ " quota = " + nsQuota + " < consumed = " + nsConsumed);
}

final long ssConsumed = myCounts.getStorageSpace();
final long ssQuota = q.getStorageSpace();
if (Quota.isViolated(ssQuota, ssConsumed)) {
LOG.warn("Storagespace quota violation in image for "
+ dir.getFullPathName()
+ " quota = " + ssQuota + " < consumed = " + ssConsumed);
}

final EnumCounters<StorageType> tsConsumed = myCounts.getTypeSpaces();
for (StorageType t : StorageType.getTypesSupportingQuota()) {
final long typeSpace = tsConsumed.get(t);
final long typeQuota = q.getTypeSpaces().get(t);
if (Quota.isViolated(typeQuota, typeSpace)) {
LOG.warn("Storage type quota violation in image for "
+ dir.getFullPathName()
+ " type = " + t.toString() + " quota = "
+ typeQuota + " < consumed " + typeSpace);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Setting quota for " + dir + "\n" + myCounts);
}
dir.getDirectoryWithQuotaFeature().setSpaceConsumed(nsConsumed,
ssConsumed, tsConsumed);
}

synchronized(counts) {
counts.add(myCounts);
}
}
}

/** /**
* Load the image namespace from the given image file, verifying * Load the image namespace from the given image file, verifying
* it against the MD5 sum stored in its associated .md5 file. * it against the MD5 sum stored in its associated .md5 file.
Expand Down
Expand Up @@ -1117,6 +1117,8 @@ void startActiveServices() throws IOException {
getFSImage().editLog.openForWrite(getEffectiveLayoutVersion()); getFSImage().editLog.openForWrite(getEffectiveLayoutVersion());
} }


// Initialize the quota.
dir.updateCountForQuota();
// Enable quota checks. // Enable quota checks.
dir.enableQuotaChecks(); dir.enableQuotaChecks();
if (haEnabled) { if (haEnabled) {
Expand Down

0 comments on commit a40342b

Please sign in to comment.