Skip to content

Commit

Permalink
HDFS-8495. Consolidate append() related implementation into a single …
Browse files Browse the repository at this point in the history
…class. Contributed by Rakesh R.
  • Loading branch information
Haohui Mai committed Jul 22, 2015
1 parent 393fe71 commit 31f1171
Show file tree
Hide file tree
Showing 7 changed files with 304 additions and 229 deletions.
3 changes: 3 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Expand Up @@ -737,6 +737,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8721. Add a metric for number of encryption zones.
(Rakesh R via cnauroth)

HDFS-8495. Consolidate append() related implementation into a single class.
(Rakesh R via wheat9)

OPTIMIZATIONS

HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
Expand Down
@@ -0,0 +1,261 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.List;

import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.RecoverLeaseOp;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion.Feature;

import com.google.common.base.Preconditions;

/**
* Helper class to perform append operation.
*/
final class FSDirAppendOp {

/**
* Private constructor for preventing FSDirAppendOp object creation.
* Static-only class.
*/
private FSDirAppendOp() {}

/**
* Append to an existing file.
* <p>
*
* The method returns the last block of the file if this is a partial block,
* which can still be used for writing more data. The client uses the
* returned block locations to form the data pipeline for this block.<br>
* The {@link LocatedBlock} will be null if the last block is full.
* The client then allocates a new block with the next call using
* {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#addBlock}.
* <p>
*
* For description of parameters and exceptions thrown see
* {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#append}
*
* @param fsn namespace
* @param srcArg path name
* @param pc permission checker to check fs permission
* @param holder client name
* @param clientMachine client machine info
* @param newBlock if the data is appended to a new block
* @param logRetryCache whether to record RPC ids in editlog for retry cache
* rebuilding
* @return the last block with status
*/
static LastBlockWithStatus appendFile(final FSNamesystem fsn,
final String srcArg, final FSPermissionChecker pc, final String holder,
final String clientMachine, final boolean newBlock,
final boolean logRetryCache) throws IOException {
assert fsn.hasWriteLock();

final byte[][] pathComponents = FSDirectory
.getPathComponentsForReservedPath(srcArg);
final LocatedBlock lb;
final FSDirectory fsd = fsn.getFSDirectory();
final String src;
fsd.writeLock();
try {
src = fsd.resolvePath(pc, srcArg, pathComponents);
final INodesInPath iip = fsd.getINodesInPath4Write(src);
// Verify that the destination does not exist as a directory already
final INode inode = iip.getLastINode();
final String path = iip.getPath();
if (inode != null && inode.isDirectory()) {
throw new FileAlreadyExistsException("Cannot append to directory "
+ path + "; already exists as a directory.");
}
if (fsd.isPermissionEnabled()) {
fsd.checkPathAccess(pc, iip, FsAction.WRITE);
}

if (inode == null) {
throw new FileNotFoundException(
"Failed to append to non-existent file " + path + " for client "
+ clientMachine);
}
final INodeFile file = INodeFile.valueOf(inode, path, true);
BlockManager blockManager = fsd.getBlockManager();
final BlockStoragePolicy lpPolicy = blockManager
.getStoragePolicy("LAZY_PERSIST");
if (lpPolicy != null && lpPolicy.getId() == file.getStoragePolicyID()) {
throw new UnsupportedOperationException(
"Cannot append to lazy persist file " + path);
}
// Opening an existing file for append - may need to recover lease.
fsn.recoverLeaseInternal(RecoverLeaseOp.APPEND_FILE, iip, path, holder,
clientMachine, false);

final BlockInfo lastBlock = file.getLastBlock();
// Check that the block has at least minimum replication.
if (lastBlock != null && lastBlock.isComplete()
&& !blockManager.isSufficientlyReplicated(lastBlock)) {
throw new IOException("append: lastBlock=" + lastBlock + " of src="
+ path + " is not sufficiently replicated yet.");
}
lb = prepareFileForAppend(fsn, iip, holder, clientMachine, newBlock,
true, logRetryCache);
} catch (IOException ie) {
NameNode.stateChangeLog
.warn("DIR* NameSystem.append: " + ie.getMessage());
throw ie;
} finally {
fsd.writeUnlock();
}

HdfsFileStatus stat = FSDirStatAndListingOp.getFileInfo(fsd, src, false,
FSDirectory.isReservedRawName(srcArg), true);
if (lb != null) {
NameNode.stateChangeLog.debug(
"DIR* NameSystem.appendFile: file {} for {} at {} block {} block"
+ " size {}", srcArg, holder, clientMachine, lb.getBlock(), lb
.getBlock().getNumBytes());
}
return new LastBlockWithStatus(lb, stat);
}

/**
* Convert current node to under construction.
* Recreate in-memory lease record.
*
* @param fsn namespace
* @param iip inodes in the path containing the file
* @param leaseHolder identifier of the lease holder on this file
* @param clientMachine identifier of the client machine
* @param newBlock if the data is appended to a new block
* @param writeToEditLog whether to persist this change to the edit log
* @param logRetryCache whether to record RPC ids in editlog for retry cache
* rebuilding
* @return the last block locations if the block is partial or null otherwise
* @throws IOException
*/
static LocatedBlock prepareFileForAppend(final FSNamesystem fsn,
final INodesInPath iip, final String leaseHolder,
final String clientMachine, final boolean newBlock,
final boolean writeToEditLog, final boolean logRetryCache)
throws IOException {
assert fsn.hasWriteLock();

final INodeFile file = iip.getLastINode().asFile();
final QuotaCounts delta = verifyQuotaForUCBlock(fsn, file, iip);

file.recordModification(iip.getLatestSnapshotId());
file.toUnderConstruction(leaseHolder, clientMachine);

fsn.getLeaseManager().addLease(
file.getFileUnderConstructionFeature().getClientName(), file.getId());

LocatedBlock ret = null;
if (!newBlock) {
FSDirectory fsd = fsn.getFSDirectory();
ret = fsd.getBlockManager().convertLastBlockToUnderConstruction(file, 0);
if (ret != null && delta != null) {
Preconditions.checkState(delta.getStorageSpace() >= 0, "appending to"
+ " a block with size larger than the preferred block size");
fsd.writeLock();
try {
fsd.updateCountNoQuotaCheck(iip, iip.length() - 1, delta);
} finally {
fsd.writeUnlock();
}
}
} else {
BlockInfo lastBlock = file.getLastBlock();
if (lastBlock != null) {
ExtendedBlock blk = new ExtendedBlock(fsn.getBlockPoolId(), lastBlock);
ret = new LocatedBlock(blk, new DatanodeInfo[0]);
}
}

if (writeToEditLog) {
final String path = iip.getPath();
if (NameNodeLayoutVersion.supports(Feature.APPEND_NEW_BLOCK,
fsn.getEffectiveLayoutVersion())) {
fsn.getEditLog().logAppendFile(path, file, newBlock, logRetryCache);
} else {
fsn.getEditLog().logOpenFile(path, file, false, logRetryCache);
}
}
return ret;
}

/**
* Verify quota when using the preferred block size for UC block. This is
* usually used by append and truncate.
*
* @throws QuotaExceededException when violating the storage quota
* @return expected quota usage update. null means no change or no need to
* update quota usage later
*/
private static QuotaCounts verifyQuotaForUCBlock(FSNamesystem fsn,
INodeFile file, INodesInPath iip) throws QuotaExceededException {
FSDirectory fsd = fsn.getFSDirectory();
if (!fsn.isImageLoaded() || fsd.shouldSkipQuotaChecks()) {
// Do not check quota if editlog is still being processed
return null;
}
if (file.getLastBlock() != null) {
final QuotaCounts delta = computeQuotaDeltaForUCBlock(fsn, file);
fsd.readLock();
try {
FSDirectory.verifyQuota(iip, iip.length() - 1, delta, null);
return delta;
} finally {
fsd.readUnlock();
}
}
return null;
}

/** Compute quota change for converting a complete block to a UC block. */
private static QuotaCounts computeQuotaDeltaForUCBlock(FSNamesystem fsn,
INodeFile file) {
final QuotaCounts delta = new QuotaCounts.Builder().build();
final BlockInfo lastBlock = file.getLastBlock();
if (lastBlock != null) {
final long diff = file.getPreferredBlockSize() - lastBlock.getNumBytes();
final short repl = file.getPreferredBlockReplication();
delta.addStorageSpace(diff * repl);
final BlockStoragePolicy policy = fsn.getFSDirectory()
.getBlockStoragePolicySuite().getPolicy(file.getStoragePolicyID());
List<StorageType> types = policy.chooseStorageTypes(repl);
for (StorageType t : types) {
if (t.supportTypeQuota()) {
delta.addTypeSpace(t, diff);
}
}
}
return delta;
}
}
Expand Up @@ -508,7 +508,7 @@ private static HdfsLocatedFileStatus createLocatedFileStatus(
final long fileSize = !inSnapshot && isUc ?
fileNode.computeFileSizeNotIncludingLastUcBlock() : size;

loc = fsd.getFSNamesystem().getBlockManager().createLocatedBlocks(
loc = fsd.getBlockManager().createLocatedBlocks(
fileNode.getBlocks(snapshot), fileSize, isUc, 0L, size, false,
inSnapshot, feInfo);
if (loc == null) {
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstructionContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.RecoverLeaseOp;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
Expand Down Expand Up @@ -79,11 +80,11 @@ static TruncateResult truncate(final FSNamesystem fsn, final String srcArg,
try {
src = fsd.resolvePath(pc, srcArg, pathComponents);
iip = fsd.getINodesInPath4Write(src, true);
if (fsn.isPermissionEnabled()) {
if (fsd.isPermissionEnabled()) {
fsd.checkPathAccess(pc, iip, FsAction.WRITE);
}
INodeFile file = INodeFile.valueOf(iip.getLastINode(), src);
final BlockStoragePolicy lpPolicy = fsn.getBlockManager()
final BlockStoragePolicy lpPolicy = fsd.getBlockManager()
.getStoragePolicy("LAZY_PERSIST");

if (lpPolicy != null && lpPolicy.getId() == file.getStoragePolicyID()) {
Expand Down Expand Up @@ -178,7 +179,7 @@ static void unprotectedTruncate(final FSNamesystem fsn, final String src,
"Should be the same block.";
if (oldBlock.getBlockId() != tBlk.getBlockId()
&& !file.isBlockInLatestSnapshot(oldBlock)) {
fsn.getBlockManager().removeBlockFromMap(oldBlock);
fsd.getBlockManager().removeBlockFromMap(oldBlock);
}
}
assert onBlockBoundary == (truncateBlock == null) :
Expand Down Expand Up @@ -223,16 +224,16 @@ static Block prepareFileForTruncate(FSNamesystem fsn, INodesInPath iip,
}

BlockInfoUnderConstruction truncatedBlockUC;
BlockManager blockManager = fsn.getFSDirectory().getBlockManager();
if (shouldCopyOnTruncate) {
// Add new truncateBlock into blocksMap and
// use oldBlock as a source for copy-on-truncate recovery
truncatedBlockUC = new BlockInfoUnderConstructionContiguous(newBlock,
file.getPreferredBlockReplication());
truncatedBlockUC.setNumBytes(oldBlock.getNumBytes() - lastBlockDelta);
truncatedBlockUC.setTruncateBlock(oldBlock);
file.setLastBlock(truncatedBlockUC,
fsn.getBlockManager().getStorages(oldBlock));
fsn.getBlockManager().addBlockCollection(truncatedBlockUC, file);
file.setLastBlock(truncatedBlockUC, blockManager.getStorages(oldBlock));
blockManager.addBlockCollection(truncatedBlockUC, file);

NameNode.stateChangeLog.debug(
"BLOCK* prepareFileForTruncate: Scheduling copy-on-truncate to new"
Expand All @@ -241,8 +242,7 @@ static Block prepareFileForTruncate(FSNamesystem fsn, INodesInPath iip,
truncatedBlockUC.getTruncateBlock());
} else {
// Use new generation stamp for in-place truncate recovery
fsn.getBlockManager().convertLastBlockToUnderConstruction(file,
lastBlockDelta);
blockManager.convertLastBlockToUnderConstruction(file, lastBlockDelta);
oldBlock = file.getLastBlock();
assert !oldBlock.isComplete() : "oldBlock should be under construction";
truncatedBlockUC = (BlockInfoUnderConstruction) oldBlock;
Expand Down
Expand Up @@ -206,8 +206,8 @@ static LocatedBlock makeLocatedBlock(FSNamesystem fsn, Block blk,
DatanodeStorageInfo[] locs, long offset) throws IOException {
LocatedBlock lBlk = BlockManager.newLocatedBlock(fsn.getExtendedBlock(blk),
locs, offset, false);
fsn.getBlockManager().setBlockToken(lBlk,
BlockTokenIdentifier.AccessMode.WRITE);
fsn.getFSDirectory().getBlockManager()
.setBlockToken(lBlk, BlockTokenIdentifier.AccessMode.WRITE);
return lBlk;
}

Expand Down Expand Up @@ -426,7 +426,7 @@ static HdfsFileStatus startFile(
fsd.setFileEncryptionInfo(src, feInfo);
newNode = fsd.getInode(newNode.getId()).asFile();
}
setNewINodeStoragePolicy(fsn.getBlockManager(), newNode, iip,
setNewINodeStoragePolicy(fsd.getBlockManager(), newNode, iip,
isLazyPersist);
fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
if (NameNode.stateChangeLog.isDebugEnabled()) {
Expand Down
Expand Up @@ -392,7 +392,7 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
FSNamesystem.LOG.debug("Reopening an already-closed file " +
"for append");
}
LocatedBlock lb = fsNamesys.prepareFileForAppend(path, iip,
LocatedBlock lb = FSDirAppendOp.prepareFileForAppend(fsNamesys, iip,
addCloseOp.clientName, addCloseOp.clientMachine, false, false,
false);
// add the op into retry cache if necessary
Expand Down Expand Up @@ -466,7 +466,7 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
INodesInPath iip = fsDir.getINodesInPath4Write(path);
INodeFile file = INodeFile.valueOf(iip.getLastINode(), path);
if (!file.isUnderConstruction()) {
LocatedBlock lb = fsNamesys.prepareFileForAppend(path, iip,
LocatedBlock lb = FSDirAppendOp.prepareFileForAppend(fsNamesys, iip,
appendOp.clientName, appendOp.clientMachine, appendOp.newBlock,
false, false);
// add the op into retry cache if necessary
Expand Down

0 comments on commit 31f1171

Please sign in to comment.