Skip to content

Commit

Permalink
HDFS-7957. Truncate should verify quota before making changes. Contri…
Browse files Browse the repository at this point in the history
…buted by Jing Zhao.
  • Loading branch information
Jing9 committed Mar 20, 2015
1 parent a6a5aae commit d368d36
Show file tree
Hide file tree
Showing 6 changed files with 380 additions and 18 deletions.
2 changes: 2 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1227,6 +1227,8 @@ Release 2.7.0 - UNRELEASED

HDFS-7930. commitBlockSynchronization() does not remove locations. (yliu)

HDFS-7957. Truncate should verify quota before making changes. (jing9)

BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS

HDFS-7720. Quota by Storage Type API, tools and ClientNameNode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshotFeature;
import org.apache.hadoop.hdfs.util.ByteArray;
import org.apache.hadoop.hdfs.util.EnumCounters;
import org.apache.hadoop.security.AccessControlException;
Expand Down Expand Up @@ -677,7 +678,7 @@ void updateCount(INodesInPath iip, long nsDelta, long ssDelta, short oldRep,
* @param checkQuota if true then check if quota is exceeded
* @throws QuotaExceededException if the new count violates any quota limit
*/
void updateCount(INodesInPath iip, int numOfINodes,
void updateCount(INodesInPath iip, int numOfINodes,
QuotaCounts counts, boolean checkQuota)
throws QuotaExceededException {
assert hasWriteLock();
Expand Down Expand Up @@ -1050,7 +1051,7 @@ void unprotectedTruncate(String src, String clientName, String clientMachine,
INodeFile file = iip.getLastINode().asFile();
BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
boolean onBlockBoundary =
unprotectedTruncate(iip, newLength, collectedBlocks, mtime);
unprotectedTruncate(iip, newLength, collectedBlocks, mtime, null);

if(! onBlockBoundary) {
BlockInfoContiguous oldBlock = file.getLastBlock();
Expand All @@ -1073,11 +1074,11 @@ void unprotectedTruncate(String src, String clientName, String clientMachine,

boolean truncate(INodesInPath iip, long newLength,
BlocksMapUpdateInfo collectedBlocks,
long mtime)
long mtime, QuotaCounts delta)
throws IOException {
writeLock();
try {
return unprotectedTruncate(iip, newLength, collectedBlocks, mtime);
return unprotectedTruncate(iip, newLength, collectedBlocks, mtime, delta);
} finally {
writeUnlock();
}
Expand All @@ -1097,22 +1098,49 @@ boolean truncate(INodesInPath iip, long newLength,
*/
boolean unprotectedTruncate(INodesInPath iip, long newLength,
BlocksMapUpdateInfo collectedBlocks,
long mtime) throws IOException {
long mtime, QuotaCounts delta) throws IOException {
assert hasWriteLock();
INodeFile file = iip.getLastINode().asFile();
int latestSnapshot = iip.getLatestSnapshotId();
file.recordModification(latestSnapshot, true);
long oldDiskspaceNoRep = file.storagespaceConsumedNoReplication();

verifyQuotaForTruncate(iip, file, newLength, delta);

long remainingLength =
file.collectBlocksBeyondMax(newLength, collectedBlocks);
file.excludeSnapshotBlocks(latestSnapshot, collectedBlocks);
file.setModificationTime(mtime);
updateCount(iip, 0, file.storagespaceConsumedNoReplication() - oldDiskspaceNoRep,
file.getBlockReplication(), true);
// return whether on a block boundary
return (remainingLength - newLength) == 0;
}

private void verifyQuotaForTruncate(INodesInPath iip, INodeFile file,
long newLength, QuotaCounts delta) throws QuotaExceededException {
if (!getFSNamesystem().isImageLoaded() || shouldSkipQuotaChecks()) {
// Do not check quota if edit log is still being processed
return;
}
final long diff = file.computeQuotaDeltaForTruncate(newLength);
final short repl = file.getBlockReplication();
delta.addStorageSpace(diff * repl);
final BlockStoragePolicy policy = getBlockStoragePolicySuite()
.getPolicy(file.getStoragePolicyID());
List<StorageType> types = policy.chooseStorageTypes(repl);
for (StorageType t : types) {
if (t.supportTypeQuota()) {
delta.addTypeSpace(t, diff);
}
}
if (diff > 0) {
readLock();
try {
verifyQuota(iip, iip.length() - 1, delta, null);
} finally {
readUnlock();
}
}
}

/**
* This method is always called with writeLock of FSDirectory held.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1995,16 +1995,26 @@ boolean truncateInternal(String src, long newLength,
", truncate size: " + newLength + ".");
}
// Perform INodeFile truncation.
boolean onBlockBoundary = dir.truncate(iip, newLength,
toRemoveBlocks, mtime);
final QuotaCounts delta = new QuotaCounts.Builder().build();
boolean onBlockBoundary = dir.truncate(iip, newLength, toRemoveBlocks,
mtime, delta);
Block truncateBlock = null;
if(! onBlockBoundary) {
if(!onBlockBoundary) {
// Open file for write, but don't log into edits
long lastBlockDelta = file.computeFileSize() - newLength;
assert lastBlockDelta > 0 : "delta is 0 only if on block bounday";
truncateBlock = prepareFileForTruncate(iip, clientName, clientMachine,
lastBlockDelta, null);
}

// update the quota: use the preferred block size for UC block
dir.writeLock();
try {
dir.updateCountNoQuotaCheck(iip, iip.length() - 1, delta);
} finally {
dir.writeUnlock();
}

getEditLog().logTruncate(src, clientName, clientMachine, newLength, mtime,
truncateBlock);
return onBlockBoundary;
Expand Down Expand Up @@ -2073,13 +2083,10 @@ Block prepareFileForTruncate(INodesInPath iip,
+ truncatedBlockUC.getTruncateBlock().getNumBytes()
+ " block=" + truncatedBlockUC);
}
if(shouldRecoverNow)
if (shouldRecoverNow) {
truncatedBlockUC.initializeBlockRecovery(newBlock.getGenerationStamp());
}

// update the quota: use the preferred block size for UC block
final long diff =
file.getPreferredBlockSize() - truncatedBlockUC.getNumBytes();
dir.updateSpaceConsumed(iip, 0, diff, file.getBlockReplication());
return newBlock;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -804,6 +805,43 @@ public long collectBlocksBeyondMax(final long max,
return size;
}

/**
* compute the quota usage change for a truncate op
* @param newLength the length for truncation
* @return the quota usage delta (not considering replication factor)
*/
long computeQuotaDeltaForTruncate(final long newLength) {
final BlockInfoContiguous[] blocks = getBlocks();
if (blocks == null || blocks.length == 0) {
return 0;
}

int n = 0;
long size = 0;
for (; n < blocks.length && newLength > size; n++) {
size += blocks[n].getNumBytes();
}
final boolean onBoundary = size == newLength;

long truncateSize = 0;
for (int i = (onBoundary ? n : n - 1); i < blocks.length; i++) {
truncateSize += blocks[i].getNumBytes();
}

FileWithSnapshotFeature sf = getFileWithSnapshotFeature();
if (sf != null) {
FileDiff diff = sf.getDiffs().getLast();
BlockInfoContiguous[] sblocks = diff != null ? diff.getBlocks() : null;
if (sblocks != null) {
for (int i = (onBoundary ? n : n-1); i < blocks.length
&& i < sblocks.length && blocks[i].equals(sblocks[i]); i++) {
truncateSize -= blocks[i].getNumBytes();
}
}
}
return onBoundary ? -truncateSize : (getPreferredBlockSize() - truncateSize);
}

void truncateBlocksTo(int n) {
final BlockInfoContiguous[] newBlocks;
if (n == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,7 @@ public void testAppendOverStorageQuota() throws Exception {
INodeFile inode = fsdir.getINode(file.toString()).asFile();
Assert.assertNotNull(inode);
Assert.assertFalse("should not be UC", inode.isUnderConstruction());
Assert.assertNull("should not have a lease", cluster.getNamesystem()
.getLeaseManager().getLeaseByPath(file.toString()));
Assert.assertNull("should not have a lease", cluster.getNamesystem().getLeaseManager().getLeaseByPath(file.toString()));
// make sure the quota usage is unchanged
final long newSpaceUsed = dirNode.getDirectoryWithQuotaFeature()
.getSpaceConsumed().getStorageSpace();
Expand Down Expand Up @@ -269,4 +268,44 @@ public void testAppendOverTypeQuota() throws Exception {
dfs.recoverLease(file);
cluster.restartNameNodes();
}

/**
* Test truncate over quota does not mark file as UC or create a lease
*/
@Test (timeout=60000)
public void testTruncateOverQuota() throws Exception {
final Path dir = new Path("/TestTruncateOverquota");
final Path file = new Path(dir, "file");

// create partial block file
dfs.mkdirs(dir);
DFSTestUtil.createFile(dfs, file, BLOCKSIZE/2, REPLICATION, seed);

// lower quota to cause exception when appending to partial block
dfs.setQuota(dir, Long.MAX_VALUE - 1, 1);
final INodeDirectory dirNode = fsdir.getINode4Write(dir.toString())
.asDirectory();
final long spaceUsed = dirNode.getDirectoryWithQuotaFeature()
.getSpaceConsumed().getStorageSpace();
try {
dfs.truncate(file, BLOCKSIZE / 2 - 1);
Assert.fail("truncate didn't fail");
} catch (RemoteException e) {
assertTrue(e.getClassName().contains("DSQuotaExceededException"));
}

// check that the file exists, isn't UC, and has no dangling lease
INodeFile inode = fsdir.getINode(file.toString()).asFile();
Assert.assertNotNull(inode);
Assert.assertFalse("should not be UC", inode.isUnderConstruction());
Assert.assertNull("should not have a lease", cluster.getNamesystem()
.getLeaseManager().getLeaseByPath(file.toString()));
// make sure the quota usage is unchanged
final long newSpaceUsed = dirNode.getDirectoryWithQuotaFeature()
.getSpaceConsumed().getStorageSpace();
assertEquals(spaceUsed, newSpaceUsed);
// make sure edits aren't corrupted
dfs.recoverLease(file);
cluster.restartNameNodes();
}
}

0 comments on commit d368d36

Please sign in to comment.