Skip to content

Commit

Permalink
HDFS-3107. Introduce truncate. Contributed by Plamen Jeliazkov.
Browse files Browse the repository at this point in the history
  • Loading branch information
pjeli authored and shvachko committed Jan 13, 2015
1 parent c4cba61 commit 7e9358f
Show file tree
Hide file tree
Showing 32 changed files with 1,091 additions and 193 deletions.
2 changes: 2 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Expand Up @@ -18,6 +18,8 @@ Trunk (Unreleased)


HDFS-3125. Add JournalService to enable Journal Daemon. (suresh) HDFS-3125. Add JournalService to enable Journal Daemon. (suresh)


HDFS-3107. Introduce truncate. (Plamen Jeliazkov via shv)

IMPROVEMENTS IMPROVEMENTS


HDFS-4665. Move TestNetworkTopologyWithNodeGroup to common. HDFS-4665. Move TestNetworkTopologyWithNodeGroup to common.
Expand Down
Expand Up @@ -1916,6 +1916,21 @@ public void rename(String src, String dst, Options.Rename... options)
SnapshotAccessControlException.class); SnapshotAccessControlException.class);
} }
} }

/**
* Truncate a file to an indicated size
* See {@link ClientProtocol#truncate(String, long)}.
*/
public boolean truncate(String src, long newLength) throws IOException {
checkOpen();
try {
return namenode.truncate(src, newLength, clientName);
} catch (RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
UnresolvedPathException.class);
}
}

/** /**
* Delete file or directory. * Delete file or directory.
* See {@link ClientProtocol#delete(String, boolean)}. * See {@link ClientProtocol#delete(String, boolean)}.
Expand Down
Expand Up @@ -626,7 +626,20 @@ public Void next(final FileSystem fs, final Path p)
}.resolve(this, absDst); }.resolve(this, absDst);
} }
} }


/**
* Truncate the file in the indicated path to the indicated size.
* @param f The path to the file to be truncated
* @param newLength The size the file is to be truncated to
*
* @return true if and client does not need to wait for block recovery,
* false if client needs to wait for block recovery.
*/
public boolean truncate(Path f, final long newLength) throws IOException {
statistics.incrementWriteOps(1);
return dfs.truncate(getPathName(f), newLength);
}

@Override @Override
public boolean delete(Path f, final boolean recursive) throws IOException { public boolean delete(Path f, final boolean recursive) throws IOException {
statistics.incrementWriteOps(1); statistics.incrementWriteOps(1);
Expand Down
Expand Up @@ -521,7 +521,37 @@ public void rename2(String src, String dst, Options.Rename... options)
FileAlreadyExistsException, FileNotFoundException, FileAlreadyExistsException, FileNotFoundException,
NSQuotaExceededException, ParentNotDirectoryException, SafeModeException, NSQuotaExceededException, ParentNotDirectoryException, SafeModeException,
UnresolvedLinkException, SnapshotAccessControlException, IOException; UnresolvedLinkException, SnapshotAccessControlException, IOException;


/**
* Truncate file src to new size.
* <ul>
* <li>Fails if src is a directory.
* <li>Fails if src does not exist.
* <li>Fails if src is not closed.
* <li>Fails if new size is greater than current size.
* </ul>
* <p>
* This implementation of truncate is purely a namespace operation if truncate
* occurs at a block boundary. Requires DataNode block recovery otherwise.
* <p>
* @param src existing file
* @param newLength the target size
*
* @return true if and client does not need to wait for block recovery,
* false if client needs to wait for block recovery.
*
* @throws AccessControlException If access is denied
* @throws FileNotFoundException If file <code>src</code> is not found
* @throws SafeModeException truncate not allowed in safemode
* @throws UnresolvedLinkException If <code>src</code> contains a symlink
* @throws SnapshotAccessControlException if path is in RO snapshot
* @throws IOException If an I/O error occurred
*/
@Idempotent
public boolean truncate(String src, long newLength, String clientName)
throws AccessControlException, FileNotFoundException, SafeModeException,
UnresolvedLinkException, SnapshotAccessControlException, IOException;

/** /**
* Delete the given file or directory from the file system. * Delete the given file or directory from the file system.
* <p> * <p>
Expand Down
Expand Up @@ -181,6 +181,8 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.TruncateRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.TruncateResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
Expand Down Expand Up @@ -584,6 +586,18 @@ public Rename2ResponseProto rename2(RpcController controller,
return VOID_RENAME2_RESPONSE; return VOID_RENAME2_RESPONSE;
} }


@Override
public TruncateResponseProto truncate(RpcController controller,
TruncateRequestProto req) throws ServiceException {
try {
boolean result = server.truncate(req.getSrc(), req.getNewLength(),
req.getClientName());
return TruncateResponseProto.newBuilder().setResult(result).build();
} catch (IOException e) {
throw new ServiceException(e);
}
}

@Override @Override
public DeleteResponseProto delete(RpcController controller, public DeleteResponseProto delete(RpcController controller,
DeleteRequestProto req) throws ServiceException { DeleteRequestProto req) throws ServiceException {
Expand Down
Expand Up @@ -155,6 +155,7 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetReplicationRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetReplicationRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSafeModeRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSafeModeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.TruncateRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto;
Expand Down Expand Up @@ -301,6 +302,21 @@ public HdfsFileStatus create(String src, FsPermission masked,


} }


@Override
public boolean truncate(String src, long newLength, String clientName)
throws IOException, UnresolvedLinkException {
TruncateRequestProto req = TruncateRequestProto.newBuilder()
.setSrc(src)
.setNewLength(newLength)
.setClientName(clientName)
.build();
try {
return rpcProxy.truncate(null, req).getResult();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}

@Override @Override
public LastBlockWithStatus append(String src, String clientName) public LastBlockWithStatus append(String src, String clientName)
throws AccessControlException, DSQuotaExceededException, throws AccessControlException, DSQuotaExceededException,
Expand Down
Expand Up @@ -608,13 +608,15 @@ public static RecoveringBlockProto convert(RecoveringBlock b) {
} }
LocatedBlockProto lb = PBHelper.convert((LocatedBlock)b); LocatedBlockProto lb = PBHelper.convert((LocatedBlock)b);
return RecoveringBlockProto.newBuilder().setBlock(lb) return RecoveringBlockProto.newBuilder().setBlock(lb)
.setNewGenStamp(b.getNewGenerationStamp()).build(); .setNewGenStamp(b.getNewGenerationStamp())
.setTruncateFlag(b.getTruncateFlag()).build();
} }


public static RecoveringBlock convert(RecoveringBlockProto b) { public static RecoveringBlock convert(RecoveringBlockProto b) {
ExtendedBlock block = convert(b.getBlock().getB()); ExtendedBlock block = convert(b.getBlock().getB());
DatanodeInfo[] locs = convert(b.getBlock().getLocsList()); DatanodeInfo[] locs = convert(b.getBlock().getLocsList());
return new RecoveringBlock(block, locs, b.getNewGenStamp()); return new RecoveringBlock(block, locs, b.getNewGenStamp(),
b.getTruncateFlag());
} }


public static DatanodeInfoProto.AdminState convert( public static DatanodeInfoProto.AdminState convert(
Expand Down
Expand Up @@ -273,7 +273,11 @@ void commitBlock(Block block) throws IOException {
* make it primary. * make it primary.
*/ */
public void initializeBlockRecovery(long recoveryId) { public void initializeBlockRecovery(long recoveryId) {
setBlockUCState(BlockUCState.UNDER_RECOVERY); initializeBlockRecovery(BlockUCState.UNDER_RECOVERY, recoveryId);
}

public void initializeBlockRecovery(BlockUCState s, long recoveryId) {
setBlockUCState(s);
blockRecoveryId = recoveryId; blockRecoveryId = recoveryId;
if (replicas.size() == 0) { if (replicas.size() == 0) {
NameNode.blockStateChangeLog.warn("BLOCK*" NameNode.blockStateChangeLog.warn("BLOCK*"
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.hdfs.protocol.*; import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
Expand Down Expand Up @@ -1439,10 +1440,12 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
LOG.info("Skipped stale nodes for recovery : " + LOG.info("Skipped stale nodes for recovery : " +
(storages.length - recoveryLocations.size())); (storages.length - recoveryLocations.size()));
} }
boolean isTruncate = b.getBlockUCState().equals(
HdfsServerConstants.BlockUCState.BEING_TRUNCATED);
brCommand.add(new RecoveringBlock( brCommand.add(new RecoveringBlock(
new ExtendedBlock(blockPoolId, b), new ExtendedBlock(blockPoolId, b),
DatanodeStorageInfo.toDatanodeInfos(recoveryLocations), DatanodeStorageInfo.toDatanodeInfos(recoveryLocations),
b.getBlockRecoveryId())); b.getBlockRecoveryId(), isTruncate));
} else { } else {
// If too many replicas are stale, then choose all replicas to participate // If too many replicas are stale, then choose all replicas to participate
// in block recovery. // in block recovery.
Expand Down
Expand Up @@ -240,7 +240,7 @@ public AddBlockResult addBlock(BlockInfo b) {
return result; return result;
} }


boolean removeBlock(BlockInfo b) { public boolean removeBlock(BlockInfo b) {
blockList = b.listRemove(blockList, this); blockList = b.listRemove(blockList, this);
if (b.removeStorage(this)) { if (b.removeStorage(this)) {
numBlocks--; numBlocks--;
Expand Down
Expand Up @@ -299,6 +299,13 @@ static public enum BlockUCState {
* which synchronizes the existing replicas contents. * which synchronizes the existing replicas contents.
*/ */
UNDER_RECOVERY, UNDER_RECOVERY,
/**
* The block is being truncated.<br>
* When a file is truncated its last block may need to be truncated
* and needs to go through a recovery procedure,
* which synchronizes the existing replicas contents.
*/
BEING_TRUNCATED,
/** /**
* The block is committed.<br> * The block is committed.<br>
* The client reported that all bytes are written to data-nodes * The client reported that all bytes are written to data-nodes
Expand Down
Expand Up @@ -2691,7 +2691,10 @@ void syncBlock(RecoveringBlock rBlock,
r.rInfo.getNumBytes() == finalizedLength) r.rInfo.getNumBytes() == finalizedLength)
participatingList.add(r); participatingList.add(r);
} }
newBlock.setNumBytes(finalizedLength); if(rBlock.getTruncateFlag())
newBlock.setNumBytes(rBlock.getBlock().getNumBytes());
else
newBlock.setNumBytes(finalizedLength);
break; break;
case RBW: case RBW:
case RWR: case RWR:
Expand All @@ -2703,7 +2706,10 @@ void syncBlock(RecoveringBlock rBlock,
participatingList.add(r); participatingList.add(r);
} }
} }
newBlock.setNumBytes(minLength); if(rBlock.getTruncateFlag())
newBlock.setNumBytes(rBlock.getBlock().getNumBytes());
else
newBlock.setNumBytes(minLength);
break; break;
case RUR: case RUR:
case TEMPORARY: case TEMPORARY:
Expand Down
Expand Up @@ -1087,7 +1087,71 @@ void addYieldCount(long value) {
public INodeMap getINodeMap() { public INodeMap getINodeMap() {
return inodeMap; return inodeMap;
} }


/**
* FSEditLogLoader implementation.
* Unlike FSNamesystem.truncate, this will not schedule block recovery.
*/
void unprotectedTruncate(String src, String clientName, String clientMachine,
long newLength, long mtime)
throws UnresolvedLinkException, QuotaExceededException,
SnapshotAccessControlException, IOException {
INodesInPath iip = getINodesInPath(src, true);
BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
boolean onBlockBoundary =
unprotectedTruncate(iip, newLength, collectedBlocks, mtime);

if(! onBlockBoundary) {
getFSNamesystem().prepareFileForWrite(src,
iip, clientName, clientMachine, false, false);
}
getFSNamesystem().removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
}

boolean truncate(INodesInPath iip, long newLength,
BlocksMapUpdateInfo collectedBlocks,
long mtime)
throws IOException {
writeLock();
try {
return unprotectedTruncate(iip, newLength, collectedBlocks, mtime);
} finally {
writeUnlock();
}
}

/**
* Truncate has the following properties:
* 1.) Any block deletions occur now.
* 2.) INode length is truncated now – clients can only read up to new length.
* 3.) INode will be set to UC and lastBlock set to UNDER_RECOVERY.
* 4.) NN will trigger DN truncation recovery and waits for DNs to report.
* 5.) File is considered UNDER_RECOVERY until truncation recovery completes.
* 6.) Soft and hard Lease expiration require truncation recovery to complete.
*
* @return true if on the block boundary or false if recovery is need
*/
boolean unprotectedTruncate(INodesInPath iip, long newLength,
BlocksMapUpdateInfo collectedBlocks,
long mtime) throws IOException {
assert hasWriteLock();
INodeFile file = iip.getLastINode().asFile();
long oldDiskspace = file.diskspaceConsumed();
long remainingLength =
file.collectBlocksBeyondMax(newLength, collectedBlocks);
file.setModificationTime(mtime);
updateCount(iip, 0, file.diskspaceConsumed() - oldDiskspace, true);
// If on block boundary, then return
long lastBlockDelta = remainingLength - newLength;
if(lastBlockDelta == 0)
return true;
// Set new last block length
BlockInfo lastBlock = file.getLastBlock();
assert lastBlock.getNumBytes() - lastBlockDelta > 0 : "wrong block size";
lastBlock.setNumBytes(lastBlock.getNumBytes() - lastBlockDelta);
return false;
}

/** /**
* This method is always called with writeLock of FSDirectory held. * This method is always called with writeLock of FSDirectory held.
*/ */
Expand Down
Expand Up @@ -86,6 +86,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetXAttrOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetXAttrOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TruncateOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeOp;
Expand Down Expand Up @@ -896,6 +897,20 @@ void logDelete(String src, long timestamp, boolean toLogRpcIds) {
logRpcIds(op, toLogRpcIds); logRpcIds(op, toLogRpcIds);
logEdit(op); logEdit(op);
} }

/**
* Add truncate file record to edit log
*/
void logTruncate(String src, String clientName, String clientMachine,
long size, long timestamp) {
TruncateOp op = TruncateOp.getInstance(cache.get())
.setPath(src)
.setClientName(clientName)
.setClientMachine(clientMachine)
.setNewLength(size)
.setTimestamp(timestamp);
logEdit(op);
}


/** /**
* Add legacy block generation stamp record to edit log * Add legacy block generation stamp record to edit log
Expand Down
Expand Up @@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;


import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TruncateOp;
import static org.apache.hadoop.hdfs.server.namenode.FSImageFormat.renameReservedPathsOnUpgrade; import static org.apache.hadoop.hdfs.server.namenode.FSImageFormat.renameReservedPathsOnUpgrade;
import static org.apache.hadoop.util.Time.now; import static org.apache.hadoop.util.Time.now;


Expand Down Expand Up @@ -853,6 +854,12 @@ fsDir, renameReservedPathsOnUpgrade(timesOp.path, logVersion),
} }
break; break;
} }
case OP_TRUNCATE: {
TruncateOp truncateOp = (TruncateOp) op;
fsDir.unprotectedTruncate(truncateOp.src, truncateOp.clientName,
truncateOp.clientMachine, truncateOp.newLength, truncateOp.timestamp);
break;
}
case OP_SET_STORAGE_POLICY: { case OP_SET_STORAGE_POLICY: {
SetStoragePolicyOp setStoragePolicyOp = (SetStoragePolicyOp) op; SetStoragePolicyOp setStoragePolicyOp = (SetStoragePolicyOp) op;
final String path = renameReservedPathsOnUpgrade(setStoragePolicyOp.path, final String path = renameReservedPathsOnUpgrade(setStoragePolicyOp.path,
Expand Down

0 comments on commit 7e9358f

Please sign in to comment.