Skip to content

Commit

Permalink
HDFS-16261 Configurable grace period around deletion of invalidated b…
Browse files Browse the repository at this point in the history
…locks
  • Loading branch information
bbeaudreault committed Oct 25, 2021
1 parent e12cd0c commit 2b9883b
Show file tree
Hide file tree
Showing 12 changed files with 440 additions and 82 deletions.
Expand Up @@ -473,6 +473,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY = "dfs.namenode.startup.delay.block.deletion.sec";
public static final long DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_DEFAULT = 0L;

/** Grace period for replaced blocks, before being sent to DataNodes for invalidation */
public static final String DFS_NAMENODE_BLOCK_REPLACE_GRACE_PERIOD_MS_KEY = "dfs.namenode.block.replace.grace.period.ms";
public static final long DFS_NAMENODE_BLOCK_REPLACE_GRACE_PERIOD_MS_DEFAULT = 0;

/** Block deletion increment. */
public static final String DFS_NAMENODE_BLOCK_DELETION_INCREMENT_KEY =
"dfs.namenode.block.deletion.increment";
Expand Down
Expand Up @@ -195,6 +195,7 @@ public class BlockManager implements BlockStatsMXBean {
private boolean initializedReplQueues;

private final long startupDelayBlockDeletionInMs;
private final long blockReplaceGracePeriodInMs;
private final BlockReportLeaseManager blockReportLeaseManager;
private ObjectName mxBeanName;

Expand Down Expand Up @@ -471,6 +472,9 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled,
startupDelayBlockDeletionInMs = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY,
DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_DEFAULT) * 1000L;
blockReplaceGracePeriodInMs = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_BLOCK_REPLACE_GRACE_PERIOD_MS_KEY,
DFSConfigKeys.DFS_NAMENODE_BLOCK_REPLACE_GRACE_PERIOD_MS_DEFAULT);
invalidateBlocks = new InvalidateBlocks(
datanodeManager.getBlockInvalidateLimit(),
startupDelayBlockDeletionInMs,
Expand Down Expand Up @@ -1720,11 +1724,11 @@ void removeBlocksAssociatedTo(final DatanodeStorageInfo storageInfo) {
* Adds block to list of blocks which will be invalidated on specified
* datanode and log the operation
*/
void addToInvalidates(final Block block, final DatanodeInfo datanode) {
void addToInvalidates(final Block block, final DatanodeInfo datanode, final long gracePeriodMs) {
if (!isPopulatingReplQueues()) {
return;
}
invalidateBlocks.add(block, datanode, true);
invalidateBlocks.addWithGracePeriod(block, datanode, gracePeriodMs,true);
}

/**
Expand Down Expand Up @@ -1820,7 +1824,7 @@ private void markBlockAsCorrupt(BlockToMarkCorrupt b,
if (b.getStored().isDeleted()) {
blockLog.debug("BLOCK markBlockAsCorrupt: {} cannot be marked as" +
" corrupt as it does not belong to any file", b);
addToInvalidates(b.getCorrupted(), node);
addToInvalidates(b.getCorrupted(), node, 0);
return;
}
short expectedRedundancies =
Expand Down Expand Up @@ -1903,7 +1907,7 @@ private boolean invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn,
} else {
// we already checked the number of replicas in the caller of this
// function and know there are enough live replicas, so we can delete it.
addToInvalidates(b.getCorrupted(), dn);
addToInvalidates(b.getCorrupted(), dn, 0);
removeStoredBlock(b.getStored(), node);
blockLog.debug("BLOCK* invalidateBlocks: {} on {} listed for deletion.",
b, dn);
Expand Down Expand Up @@ -2913,7 +2917,7 @@ Collection<Block> processReport(
"reported.", maxNumBlocksToLog, numBlocksLogged);
}
for (Block b : toInvalidate) {
addToInvalidates(b, node);
addToInvalidates(b, node, 0);
}
for (BlockToMarkCorrupt b : toCorrupt) {
markBlockAsCorrupt(b, storageInfo, node);
Expand Down Expand Up @@ -3460,7 +3464,7 @@ && hasMinStorage(storedBlock, numCurrentReplica)) {
private Block addStoredBlock(final BlockInfo block,
final Block reportedBlock,
DatanodeStorageInfo storageInfo,
DatanodeDescriptor delNodeHint,
ReplicaDeleteHint delNodeHint,
boolean logEveryBlock)
throws IOException {
assert block != null && namesystem.hasWriteLock();
Expand Down Expand Up @@ -3876,11 +3880,14 @@ public void setReplication(
*/
private void processExtraRedundancyBlock(final BlockInfo block,
final short replication, final DatanodeDescriptor addedNode,
DatanodeDescriptor delNodeHint) {
ReplicaDeleteHint delNodeHint) {
assert namesystem.hasWriteLock();
if (addedNode == delNodeHint) {
delNodeHint = null;
if (delNodeHint == null) {
delNodeHint = ReplicaDeleteHint.NONE;
} else if (addedNode == delNodeHint.getDatanode()) {
delNodeHint = delNodeHint.withNullDataNode();
}

Collection<DatanodeStorageInfo> nonExcess = new ArrayList<>();
Collection<DatanodeDescriptor> corruptNodes = corruptReplicas
.getNodes(block);
Expand Down Expand Up @@ -3913,7 +3920,7 @@ private void chooseExcessRedundancies(
final Collection<DatanodeStorageInfo> nonExcess,
BlockInfo storedBlock, short replication,
DatanodeDescriptor addedNode,
DatanodeDescriptor delNodeHint) {
ReplicaDeleteHint delNodeHint) {
assert namesystem.hasWriteLock();
// first form a rack to datanodes map and
BlockCollection bc = getBlockCollection(storedBlock);
Expand Down Expand Up @@ -3946,13 +3953,14 @@ private void chooseExcessRedundancies(
private void chooseExcessRedundancyContiguous(
final Collection<DatanodeStorageInfo> nonExcess, BlockInfo storedBlock,
short replication, DatanodeDescriptor addedNode,
DatanodeDescriptor delNodeHint, List<StorageType> excessTypes) {
ReplicaDeleteHint delNodeHint, List<StorageType> excessTypes) {

BlockPlacementPolicy replicator = placementPolicies.getPolicy(CONTIGUOUS);
List<DatanodeStorageInfo> replicasToDelete = replicator
.chooseReplicasToDelete(nonExcess, nonExcess, replication, excessTypes,
addedNode, delNodeHint);
addedNode, delNodeHint.getDatanode());
for (DatanodeStorageInfo chosenReplica : replicasToDelete) {
processChosenExcessRedundancy(nonExcess, chosenReplica, storedBlock);
processChosenExcessRedundancy(nonExcess, chosenReplica, delNodeHint.getGracePeriod(), storedBlock);
}
}

Expand All @@ -3968,7 +3976,7 @@ private void chooseExcessRedundancyContiguous(
private void chooseExcessRedundancyStriped(BlockCollection bc,
final Collection<DatanodeStorageInfo> nonExcess,
BlockInfo storedBlock,
DatanodeDescriptor delNodeHint) {
ReplicaDeleteHint delNodeHint) {
assert storedBlock instanceof BlockInfoStriped;
BlockInfoStriped sblk = (BlockInfoStriped) storedBlock;
short groupSize = sblk.getTotalBlockNum();
Expand All @@ -3987,13 +3995,13 @@ private void chooseExcessRedundancyStriped(BlockCollection bc,
storage2index.put(storage, index);
}

// use delHint only if delHint is duplicated
// use delNodeHint only if delNodeHint is duplicated
final DatanodeStorageInfo delStorageHint =
DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, delNodeHint);
DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, delNodeHint.getDatanode());
if (delStorageHint != null) {
Integer index = storage2index.get(delStorageHint);
if (index != null && duplicated.get(index)) {
processChosenExcessRedundancy(nonExcess, delStorageHint, storedBlock);
processChosenExcessRedundancy(nonExcess, delStorageHint, delNodeHint.getGracePeriod(), storedBlock);
}
}

Expand Down Expand Up @@ -4025,7 +4033,7 @@ private void chooseExcessRedundancyStriped(BlockCollection bc,
.chooseReplicasToDelete(nonExcess, candidates, (short) 1,
excessTypes, null, null);
for (DatanodeStorageInfo chosen : replicasToDelete) {
processChosenExcessRedundancy(nonExcess, chosen, storedBlock);
processChosenExcessRedundancy(nonExcess, chosen, delNodeHint.getGracePeriod(), storedBlock);
candidates.remove(chosen);
}
}
Expand All @@ -4035,7 +4043,8 @@ private void chooseExcessRedundancyStriped(BlockCollection bc,

private void processChosenExcessRedundancy(
final Collection<DatanodeStorageInfo> nonExcess,
final DatanodeStorageInfo chosen, BlockInfo storedBlock) {
final DatanodeStorageInfo chosen,
long gracePeriodMs, BlockInfo storedBlock) {
nonExcess.remove(chosen);
excessRedundancyMap.add(chosen.getDatanodeDescriptor(), storedBlock);
//
Expand All @@ -4048,9 +4057,10 @@ private void processChosenExcessRedundancy(
// upon giving instructions to the datanodes.
//
final Block blockToInvalidate = getBlockOnStorage(storedBlock, chosen);
addToInvalidates(blockToInvalidate, chosen.getDatanodeDescriptor());
addToInvalidates(blockToInvalidate, chosen.getDatanodeDescriptor(), gracePeriodMs);
blockLog.debug("BLOCK* chooseExcessRedundancies: "
+ "({}, {}) is added to invalidated blocks set", chosen, storedBlock);
+ "({}, {}) is added to invalidated blocks set with grace period {}", chosen, storedBlock,
gracePeriodMs);
}

private void removeStoredBlock(DatanodeStorageInfo storageInfo, Block block,
Expand Down Expand Up @@ -4160,20 +4170,22 @@ private long addBlock(BlockInfo block, List<BlockWithLocations> results) {
*/
@VisibleForTesting
public void addBlock(DatanodeStorageInfo storageInfo, Block block,
String delHint) throws IOException {
String delHint, long delGracePeriod) throws IOException {
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
// Decrement number of blocks scheduled to this datanode.
// for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with
// RECEIVED_BLOCK), we currently also decrease the approximate number.
node.decrementBlocksScheduled(storageInfo.getStorageType());

// get the deletion hint node
DatanodeDescriptor delHintNode = null;
ReplicaDeleteHint delHintNode = null;
if (delHint != null && delHint.length() != 0) {
delHintNode = datanodeManager.getDatanode(delHint);
if (delHintNode == null) {
DatanodeDescriptor datanode = datanodeManager.getDatanode(delHint);
if (datanode == null) {
blockLog.warn("BLOCK* blockReceived: {} is expected to be removed " +
"from an unrecorded node {}", block, delHint);
} else {
delHintNode = new ReplicaDeleteHint(datanode, delGracePeriod);
}
}

Expand All @@ -4199,7 +4211,7 @@ public void addBlock(DatanodeStorageInfo storageInfo, Block block,
*/
private boolean processAndHandleReportedBlock(
DatanodeStorageInfo storageInfo, Block block,
ReplicaState reportedState, DatanodeDescriptor delHintNode)
ReplicaState reportedState, ReplicaDeleteHint delHintNode)
throws IOException {
// blockReceived reports a finalized block
Collection<BlockInfoToAdd> toAdd = new LinkedList<>();
Expand Down Expand Up @@ -4243,7 +4255,7 @@ private boolean processAndHandleReportedBlock(
for (Block b : toInvalidate) {
blockLog.debug("BLOCK* addBlock: block {} on node {} size {} does not " +
"belong to any file", b, node, b.getNumBytes());
addToInvalidates(b, node);
addToInvalidates(b, node, 0);
}
for (BlockToMarkCorrupt b : toCorrupt) {
markBlockAsCorrupt(b, storageInfo, node);
Expand Down Expand Up @@ -4304,7 +4316,7 @@ private void processIncrementalBlockReport(final DatanodeDescriptor node,
deleted++;
break;
case RECEIVED_BLOCK:
addBlock(storageInfo, rdbi.getBlock(), rdbi.getDelHints());
addBlock(storageInfo, rdbi.getBlock(), rdbi.getDelHints(), blockReplaceGracePeriodInMs);
received++;
break;
case RECEIVING_BLOCK:
Expand Down

0 comments on commit 2b9883b

Please sign in to comment.