Skip to content

Commit

Permalink
HDFS-7056. Snapshot support for truncate. Contributed by Konstantin S…
Browse files Browse the repository at this point in the history
…hvachko and Plamen Jeliazkov.
  • Loading branch information
shvachko committed Jan 13, 2015
1 parent 7e9358f commit 08ac062
Show file tree
Hide file tree
Showing 39 changed files with 1,347 additions and 388 deletions.
2 changes: 2 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Expand Up @@ -20,6 +20,8 @@ Trunk (Unreleased)

HDFS-3107. Introduce truncate. (Plamen Jeliazkov via shv)

HDFS-7056. Snapshot support for truncate. (Plamen Jeliazkov and shv)

IMPROVEMENTS

HDFS-4665. Move TestNetworkTopologyWithNodeGroup to common.
Expand Down
Expand Up @@ -537,7 +537,7 @@ public void rename2(String src, String dst, Options.Rename... options)
* @param src existing file
* @param newLength the target size
*
* @return true if and client does not need to wait for block recovery,
* @return true if client does not need to wait for block recovery,
* false if client needs to wait for block recovery.
*
* @throws AccessControlException If access is denied
Expand Down
Expand Up @@ -76,12 +76,12 @@ public UpdateReplicaUnderRecoveryResponseProto updateReplicaUnderRecovery(
final String storageID;
try {
storageID = impl.updateReplicaUnderRecovery(
PBHelper.convert(request.getBlock()),
request.getRecoveryId(), request.getNewLength());
PBHelper.convert(request.getBlock()), request.getRecoveryId(),
request.getNewBlockId(), request.getNewLength());
} catch (IOException e) {
throw new ServiceException(e);
}
return UpdateReplicaUnderRecoveryResponseProto.newBuilder()
.setStorageUuid(storageID).build();
}
}
}
Expand Up @@ -102,11 +102,12 @@ public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)

@Override
public String updateReplicaUnderRecovery(ExtendedBlock oldBlock,
long recoveryId, long newLength) throws IOException {
long recoveryId, long newBlockId, long newLength) throws IOException {
UpdateReplicaUnderRecoveryRequestProto req =
UpdateReplicaUnderRecoveryRequestProto.newBuilder()
.setBlock(PBHelper.convert(oldBlock))
.setNewLength(newLength).setRecoveryId(recoveryId).build();
.setNewLength(newLength).setNewBlockId(newBlockId)
.setRecoveryId(recoveryId).build();
try {
return rpcProxy.updateReplicaUnderRecovery(NULL_CONTROLLER, req
).getStorageUuid();
Expand Down
Expand Up @@ -607,16 +607,19 @@ public static RecoveringBlockProto convert(RecoveringBlock b) {
return null;
}
LocatedBlockProto lb = PBHelper.convert((LocatedBlock)b);
return RecoveringBlockProto.newBuilder().setBlock(lb)
.setNewGenStamp(b.getNewGenerationStamp())
.setTruncateFlag(b.getTruncateFlag()).build();
RecoveringBlockProto.Builder builder = RecoveringBlockProto.newBuilder();
builder.setBlock(lb).setNewGenStamp(b.getNewGenerationStamp());
if(b.getNewBlock() != null)
builder.setTruncateBlock(PBHelper.convert(b.getNewBlock()));
return builder.build();
}

public static RecoveringBlock convert(RecoveringBlockProto b) {
ExtendedBlock block = convert(b.getBlock().getB());
DatanodeInfo[] locs = convert(b.getBlock().getLocsList());
return new RecoveringBlock(block, locs, b.getNewGenStamp(),
b.getTruncateFlag());
return (b.hasTruncateBlock()) ?
new RecoveringBlock(block, locs, PBHelper.convert(b.getTruncateBlock())) :
new RecoveringBlock(block, locs, b.getNewGenStamp());
}

public static DatanodeInfoProto.AdminState convert(
Expand Down
Expand Up @@ -54,6 +54,11 @@ public class BlockInfoUnderConstruction extends BlockInfo {
*/
private long blockRecoveryId = 0;

/**
* The block source to use in the event of copy-on-write truncate.
*/
private Block truncateBlock;

/**
* ReplicaUnderConstruction contains information about replicas while
* they are under construction.
Expand Down Expand Up @@ -229,6 +234,15 @@ public long getBlockRecoveryId() {
return blockRecoveryId;
}

/** Get recover block */
public Block getTruncateBlock() {
return truncateBlock;
}

public void setTruncateBlock(Block recoveryBlock) {
this.truncateBlock = recoveryBlock;
}

/**
* Process the recorded replicas. When about to commit or finish the
* pipeline recovery sort out bad replicas.
Expand Down Expand Up @@ -273,11 +287,7 @@ void commitBlock(Block block) throws IOException {
* make it primary.
*/
public void initializeBlockRecovery(long recoveryId) {
initializeBlockRecovery(BlockUCState.UNDER_RECOVERY, recoveryId);
}

public void initializeBlockRecovery(BlockUCState s, long recoveryId) {
setBlockUCState(s);
setBlockUCState(BlockUCState.UNDER_RECOVERY);
blockRecoveryId = recoveryId;
if (replicas.size() == 0) {
NameNode.blockStateChangeLog.warn("BLOCK*"
Expand Down
Expand Up @@ -700,13 +700,14 @@ public BlockInfo forceCompleteBlock(final BlockCollection bc,
* The client is supposed to allocate a new block with the next call.
*
* @param bc file
* @param bytesToRemove num of bytes to remove from block
* @return the last block locations if the block is partial or null otherwise
*/
public LocatedBlock convertLastBlockToUnderConstruction(
BlockCollection bc) throws IOException {
BlockCollection bc, long bytesToRemove) throws IOException {
BlockInfo oldBlock = bc.getLastBlock();
if(oldBlock == null ||
bc.getPreferredBlockSize() == oldBlock.getNumBytes())
bc.getPreferredBlockSize() == oldBlock.getNumBytes() - bytesToRemove)
return null;
assert oldBlock == getStoredBlock(oldBlock) :
"last block of the file is not in blocksMap";
Expand Down
Expand Up @@ -32,7 +32,6 @@
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
Expand Down Expand Up @@ -1433,26 +1432,37 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
recoveryLocations.add(storages[i]);
}
}
// If we are performing a truncate recovery than set recovery fields
// to old block.
boolean truncateRecovery = b.getTruncateBlock() != null;
boolean copyOnTruncateRecovery = truncateRecovery &&
b.getTruncateBlock().getBlockId() != b.getBlockId();
ExtendedBlock primaryBlock = (copyOnTruncateRecovery) ?
new ExtendedBlock(blockPoolId, b.getTruncateBlock()) :
new ExtendedBlock(blockPoolId, b);
// If we only get 1 replica after eliminating stale nodes, then choose all
// replicas for recovery and let the primary data node handle failures.
DatanodeInfo[] recoveryInfos;
if (recoveryLocations.size() > 1) {
if (recoveryLocations.size() != storages.length) {
LOG.info("Skipped stale nodes for recovery : " +
(storages.length - recoveryLocations.size()));
}
boolean isTruncate = b.getBlockUCState().equals(
HdfsServerConstants.BlockUCState.BEING_TRUNCATED);
brCommand.add(new RecoveringBlock(
new ExtendedBlock(blockPoolId, b),
DatanodeStorageInfo.toDatanodeInfos(recoveryLocations),
b.getBlockRecoveryId(), isTruncate));
recoveryInfos =
DatanodeStorageInfo.toDatanodeInfos(recoveryLocations);
} else {
// If too many replicas are stale, then choose all replicas to participate
// in block recovery.
brCommand.add(new RecoveringBlock(
new ExtendedBlock(blockPoolId, b),
DatanodeStorageInfo.toDatanodeInfos(storages),
b.getBlockRecoveryId()));
recoveryInfos = DatanodeStorageInfo.toDatanodeInfos(storages);
}
if(truncateRecovery) {
Block recoveryBlock = (copyOnTruncateRecovery) ? b :
b.getTruncateBlock();
brCommand.add(new RecoveringBlock(primaryBlock, recoveryInfos,
recoveryBlock));
} else {
brCommand.add(new RecoveringBlock(primaryBlock, recoveryInfos,
b.getBlockRecoveryId()));
}
}
return new DatanodeCommand[] { brCommand };
Expand Down
Expand Up @@ -299,13 +299,6 @@ static public enum BlockUCState {
* which synchronizes the existing replicas contents.
*/
UNDER_RECOVERY,
/**
* The block is being truncated.<br>
* When a file is truncated its last block may need to be truncated
* and needs to go through a recovery procedure,
* which synchronizes the existing replicas contents.
*/
BEING_TRUNCATED,
/**
* The block is committed.<br>
* The client reported that all bytes are written to data-nodes
Expand Down
Expand Up @@ -2530,14 +2530,16 @@ private static ReplicaRecoveryInfo callInitReplicaRecovery(
*/
@Override // InterDatanodeProtocol
public String updateReplicaUnderRecovery(final ExtendedBlock oldBlock,
final long recoveryId, final long newLength) throws IOException {
final long recoveryId, final long newBlockId, final long newLength)
throws IOException {
final String storageID = data.updateReplicaUnderRecovery(oldBlock,
recoveryId, newLength);
recoveryId, newBlockId, newLength);
// Notify the namenode of the updated block info. This is important
// for HA, since otherwise the standby node may lose track of the
// block locations until the next block report.
ExtendedBlock newBlock = new ExtendedBlock(oldBlock);
newBlock.setGenerationStamp(recoveryId);
newBlock.setBlockId(newBlockId);
newBlock.setNumBytes(newLength);
notifyNamenodeReceivedBlock(newBlock, "", storageID);
return storageID;
Expand All @@ -2559,10 +2561,12 @@ static class BlockRecord {
this.rInfo = rInfo;
}

void updateReplicaUnderRecovery(String bpid, long recoveryId, long newLength
) throws IOException {
void updateReplicaUnderRecovery(String bpid, long recoveryId,
long newBlockId, long newLength)
throws IOException {
final ExtendedBlock b = new ExtendedBlock(bpid, rInfo);
storageID = datanode.updateReplicaUnderRecovery(b, recoveryId, newLength);
storageID = datanode.updateReplicaUnderRecovery(b, recoveryId, newBlockId,
newLength);
}

@Override
Expand Down Expand Up @@ -2644,8 +2648,12 @@ void syncBlock(RecoveringBlock rBlock,
final String bpid = block.getBlockPoolId();
DatanodeProtocolClientSideTranslatorPB nn =
getActiveNamenodeForBP(block.getBlockPoolId());

long recoveryId = rBlock.getNewGenerationStamp();
boolean isTruncateRecovery = rBlock.getNewBlock() != null;
long blockId = (isTruncateRecovery) ?
rBlock.getNewBlock().getBlockId() : block.getBlockId();

if (LOG.isDebugEnabled()) {
LOG.debug("block=" + block + ", (length=" + block.getNumBytes()
+ "), syncList=" + syncList);
Expand Down Expand Up @@ -2679,7 +2687,7 @@ void syncBlock(RecoveringBlock rBlock,
// Calculate list of nodes that will participate in the recovery
// and the new block size
List<BlockRecord> participatingList = new ArrayList<BlockRecord>();
final ExtendedBlock newBlock = new ExtendedBlock(bpid, block.getBlockId(),
final ExtendedBlock newBlock = new ExtendedBlock(bpid, blockId,
-1, recoveryId);
switch(bestState) {
case FINALIZED:
Expand All @@ -2691,10 +2699,7 @@ void syncBlock(RecoveringBlock rBlock,
r.rInfo.getNumBytes() == finalizedLength)
participatingList.add(r);
}
if(rBlock.getTruncateFlag())
newBlock.setNumBytes(rBlock.getBlock().getNumBytes());
else
newBlock.setNumBytes(finalizedLength);
newBlock.setNumBytes(finalizedLength);
break;
case RBW:
case RWR:
Expand All @@ -2706,21 +2711,21 @@ void syncBlock(RecoveringBlock rBlock,
participatingList.add(r);
}
}
if(rBlock.getTruncateFlag())
newBlock.setNumBytes(rBlock.getBlock().getNumBytes());
else
newBlock.setNumBytes(minLength);
newBlock.setNumBytes(minLength);
break;
case RUR:
case TEMPORARY:
assert false : "bad replica state: " + bestState;
}
if(isTruncateRecovery)
newBlock.setNumBytes(rBlock.getNewBlock().getNumBytes());

List<DatanodeID> failedList = new ArrayList<DatanodeID>();
final List<BlockRecord> successList = new ArrayList<BlockRecord>();
for(BlockRecord r : participatingList) {
try {
r.updateReplicaUnderRecovery(bpid, recoveryId, newBlock.getNumBytes());
r.updateReplicaUnderRecovery(bpid, recoveryId, blockId,
newBlock.getNumBytes());
successList.add(r);
} catch (IOException e) {
InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
Expand Down
Expand Up @@ -418,7 +418,7 @@ public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock
* @return the ID of storage that stores the block
*/
public String updateReplicaUnderRecovery(ExtendedBlock oldBlock,
long recoveryId, long newLength) throws IOException;
long recoveryId, long newBlockId, long newLength) throws IOException;

/**
* add new block pool ID
Expand Down

0 comments on commit 08ac062

Please sign in to comment.