Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1916,23 +1916,29 @@ private void markBlockAsCorrupt(BlockToMarkCorrupt b,
b.getReasonCode(), b.getStored().isStriped());

NumberReplicas numberOfReplicas = countNodes(b.getStored());
boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >=
final int numUsableReplicas = numberOfReplicas.liveReplicas() +
numberOfReplicas.decommissioning() +
numberOfReplicas.liveEnteringMaintenanceReplicas();
boolean hasEnoughLiveReplicas = numUsableReplicas >=
expectedRedundancies;

boolean minReplicationSatisfied = hasMinStorage(b.getStored(),
numberOfReplicas.liveReplicas());
numUsableReplicas);

boolean hasMoreCorruptReplicas = minReplicationSatisfied &&
(numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) >
expectedRedundancies;
boolean corruptedDuringWrite = minReplicationSatisfied &&
b.isCorruptedDuringWrite();
// case 1: have enough number of live replicas
// case 2: corrupted replicas + live replicas > Replication factor
// case 1: have enough number of usable replicas
// case 2: corrupted replicas + usable replicas > Replication factor
// case 3: Block is marked corrupt due to failure while writing. In this
// case genstamp will be different than that of valid block.
// In all these cases we can delete the replica.
// In case of 3, rbw block will be deleted and valid block can be replicated
// In case 3, rbw block will be deleted and valid block can be replicated.
// Note NN only becomes aware of corrupt blocks when the block report is sent,
// this means that by default it can take up to 6 hours for a corrupt block to
// be invalidated, after which the valid block can be replicated.
if (hasEnoughLiveReplicas || hasMoreCorruptReplicas
|| corruptedDuringWrite) {
if (b.getStored().isStriped()) {
Expand Down Expand Up @@ -3656,7 +3662,7 @@ private Block addStoredBlock(final BlockInfo block,
". blockMap has {} but corrupt replicas map has {}",
storedBlock, numCorruptNodes, corruptReplicasCount);
}
if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileRedundancy)) {
if ((corruptReplicasCount > 0) && (numUsableReplicas >= fileRedundancy)) {
invalidateCorruptReplicas(storedBlock, reportedBlock, num);
}
return storedBlock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.Block;
Expand Down Expand Up @@ -1902,4 +1903,285 @@ private void createClusterWithDeadNodesDecommissionInProgress(final int numLiveN
!BlockManagerTestUtil.isNodeHealthyForDecommissionOrMaintenance(blockManager, node)
&& !node.isAlive()), 500, 20000);
}

/*
This test reproduces a scenario where an under-replicated block on a decommissioning node
cannot be replicated to some datanodes because they have a corrupt replica of the block.
The test ensures that the corrupt replicas are eventually invalidated so that the
under-replicated block can be replicated to sufficient datanodes & the decommissioning
node can be decommissioned.
*/
@Test(timeout = 60000)
public void testDeleteCorruptReplicaForUnderReplicatedBlock() throws Exception {
// Constants
final Path file = new Path("/test-file");
final int numDatanode = 3;
final short replicationFactor = 2;
final int numStoppedNodes = 2;
final int numDecommNodes = 1;
assertEquals(numDatanode, numStoppedNodes + numDecommNodes);

// Run monitor every 5 seconds to speed up decommissioning & make the test faster
final int datanodeAdminMonitorFixedRateSeconds = 5;
getConf().setInt(MiniDFSCluster.DFS_NAMENODE_DECOMMISSION_INTERVAL_TESTING_KEY,
datanodeAdminMonitorFixedRateSeconds);
// Set block report interval to 6 hours to avoid unexpected block reports.
// The default block report interval is different for a MiniDFSCluster
getConf().setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
// Run the BlockManager RedundancyMonitor every 3 seconds such that the Namenode
// sends under-replication blocks for replication frequently
getConf().setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT);
// Ensure that the DataStreamer client will replace the bad datanode on append failure
getConf().set(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY, "ALWAYS");
// Avoid having the DataStreamer client fail the append operation if datanode replacement fails
getConf()
.setBoolean(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY, true);

// References to datanodes in the cluster
// - 2 datanode will be stopped to generate corrupt block replicas & then
// restarted later to validate the corrupt replicas are invalidated
// - 1 datanode will start decommissioning to make the block under replicated
final List<DatanodeDescriptor> allNodes = new ArrayList<>();
final List<DatanodeDescriptor> stoppedNodes = new ArrayList<>();
final DatanodeDescriptor decommNode;

// Create MiniDFSCluster
startCluster(1, numDatanode);
getCluster().waitActive();
final FSNamesystem namesystem = getCluster().getNamesystem();
final BlockManager blockManager = namesystem.getBlockManager();
final DatanodeManager datanodeManager = blockManager.getDatanodeManager();
final DatanodeAdminManager decomManager = datanodeManager.getDatanodeAdminManager();
final FileSystem fs = getCluster().getFileSystem();

// Get DatanodeDescriptors
for (final DataNode node : getCluster().getDataNodes()) {
allNodes.add(getDatanodeDesriptor(namesystem, node.getDatanodeUuid()));
}

// Create block with 2 FINALIZED replicas
// Note that:
// - calling hflush leaves block in state ReplicaBeingWritten
// - calling close leaves the block in state FINALIZED
// - amount of data is kept small because flush is not synchronous
LOG.info("Creating Initial Block with {} FINALIZED replicas", replicationFactor);
FSDataOutputStream out = fs.create(file, replicationFactor);
for (int i = 0; i < 512; i++) {
out.write(i);
}
out.close();

// Validate the block exists with expected number of replicas
assertEquals(1, blockManager.getTotalBlocks());
BlockLocation[] blocksInFile = fs.getFileBlockLocations(file, 0, 0);
assertEquals(1, blocksInFile.length);
List<String> replicasInBlock = Arrays.asList(blocksInFile[0].getNames());
assertEquals(replicationFactor, replicasInBlock.size());

// Identify the DatanodeDescriptors associated with the 2 nodes with replicas.
// Each of nodes with a replica will be stopped later to corrupt the replica
DatanodeDescriptor decommNodeTmp = null;
for (DatanodeDescriptor node : allNodes) {
if (replicasInBlock.contains(node.getName())) {
stoppedNodes.add(node);
} else {
decommNodeTmp = node;
}
}
assertEquals(numStoppedNodes, stoppedNodes.size());
assertNotNull(decommNodeTmp);
decommNode = decommNodeTmp;
final DatanodeDescriptor firstStoppedNode = stoppedNodes.get(0);
final DatanodeDescriptor secondStoppedNode = stoppedNodes.get(1);
LOG.info("Detected 2 nodes with replicas : {} , {}", firstStoppedNode.getXferAddr(),
secondStoppedNode.getXferAddr());
LOG.info("Detected 1 node without replica : {}", decommNode.getXferAddr());

// Stop firstStoppedNode & the append to the block pipeline such that DataStreamer client:
// - detects firstStoppedNode as bad link in block pipeline
// - replaces the firstStoppedNode with decommNode in block pipeline
// The result is that:
// - secondStoppedNode & decommNode have a live block replica
// - firstStoppedNode has a corrupt replica (corrupt because of old GenStamp)
LOG.info("Stopping first node with replica {}", firstStoppedNode.getXferAddr());
final List<MiniDFSCluster.DataNodeProperties> stoppedNodeProps = new ArrayList<>();
MiniDFSCluster.DataNodeProperties stoppedNodeProp =
getCluster().stopDataNode(firstStoppedNode.getXferAddr());
stoppedNodeProps.add(stoppedNodeProp);
firstStoppedNode.setLastUpdate(213); // Set last heartbeat to be in the past
// Wait for NN to detect the datanode as dead
GenericTestUtils.waitFor(
() -> 2 == datanodeManager.getNumLiveDataNodes() && 1 == datanodeManager
.getNumDeadDataNodes(), 500, 30000);
// Append to block pipeline
appendBlock(fs, file, 2);

// Stop secondStoppedNode & the append to the block pipeline such that DataStreamer client:
// - detects secondStoppedNode as bad link in block pipeline
// - attempts to replace secondStoppedNode but cannot because there are no more live nodes
// - appends to the block pipeline containing just decommNode
// The result is that:
// - decommNode has a live block replica
// - firstStoppedNode & secondStoppedNode both have a corrupt replica
LOG.info("Stopping second node with replica {}", secondStoppedNode.getXferAddr());
stoppedNodeProp = getCluster().stopDataNode(secondStoppedNode.getXferAddr());
stoppedNodeProps.add(stoppedNodeProp);
secondStoppedNode.setLastUpdate(213); // Set last heartbeat to be in the past
// Wait for NN to detect the datanode as dead
GenericTestUtils.waitFor(() -> numDecommNodes == datanodeManager.getNumLiveDataNodes()
&& numStoppedNodes == datanodeManager.getNumDeadDataNodes(), 500, 30000);
// Append to block pipeline
appendBlock(fs, file, 1);

// Validate block replica locations
blocksInFile = fs.getFileBlockLocations(file, 0, 0);
assertEquals(1, blocksInFile.length);
replicasInBlock = Arrays.asList(blocksInFile[0].getNames());
assertEquals(numDecommNodes, replicasInBlock.size());
assertTrue(replicasInBlock.contains(decommNode.getName()));
LOG.info("Block now has 2 corrupt replicas on [{} , {}] and 1 live replica on {}",
firstStoppedNode.getXferAddr(), secondStoppedNode.getXferAddr(), decommNode.getXferAddr());

LOG.info("Decommission node {} with the live replica", decommNode.getXferAddr());
final ArrayList<DatanodeInfo> decommissionedNodes = new ArrayList<>();
takeNodeOutofService(0, decommNode.getDatanodeUuid(), 0, decommissionedNodes,
AdminStates.DECOMMISSION_INPROGRESS);

// Wait for the datanode to start decommissioning
try {
GenericTestUtils.waitFor(() -> decomManager.getNumTrackedNodes() == 0
&& decomManager.getNumPendingNodes() == numDecommNodes && decommNode.getAdminState()
.equals(AdminStates.DECOMMISSION_INPROGRESS), 500, 30000);
} catch (Exception e) {
blocksInFile = fs.getFileBlockLocations(file, 0, 0);
assertEquals(1, blocksInFile.length);
replicasInBlock = Arrays.asList(blocksInFile[0].getNames());
String errMsg = String.format("Node %s failed to start decommissioning."
+ " numTrackedNodes=%d , numPendingNodes=%d , adminState=%s , nodesWithReplica=[%s]",
decommNode.getXferAddr(), decomManager.getNumTrackedNodes(),
decomManager.getNumPendingNodes(), decommNode.getAdminState(),
String.join(", ", replicasInBlock));
LOG.error(errMsg); // Do not log generic timeout exception
fail(errMsg);
}

// Validate block replica locations
blocksInFile = fs.getFileBlockLocations(file, 0, 0);
assertEquals(1, blocksInFile.length);
replicasInBlock = Arrays.asList(blocksInFile[0].getNames());
assertEquals(numDecommNodes, replicasInBlock.size());
assertEquals(replicasInBlock.get(0), decommNode.getName());
LOG.info("Block now has 2 corrupt replicas on [{} , {}] and 1 decommissioning replica on {}",
firstStoppedNode.getXferAddr(), secondStoppedNode.getXferAddr(), decommNode.getXferAddr());

// Restart the 2 stopped datanodes
LOG.info("Restarting stopped nodes {} , {}", firstStoppedNode.getXferAddr(),
secondStoppedNode.getXferAddr());
for (final MiniDFSCluster.DataNodeProperties stoppedNode : stoppedNodeProps) {
assertTrue(getCluster().restartDataNode(stoppedNode));
}
for (final MiniDFSCluster.DataNodeProperties stoppedNode : stoppedNodeProps) {
try {
getCluster().waitDatanodeFullyStarted(stoppedNode.getDatanode(), 30000);
LOG.info("Node {} Restarted", stoppedNode.getDatanode().getXferAddress());
} catch (Exception e) {
String errMsg = String.format("Node %s Failed to Restart within 30 seconds",
stoppedNode.getDatanode().getXferAddress());
LOG.error(errMsg); // Do not log generic timeout exception
fail(errMsg);
}
}

// Trigger block reports for the 2 restarted nodes to ensure their corrupt
// block replicas are identified by the namenode
for (MiniDFSCluster.DataNodeProperties dnProps : stoppedNodeProps) {
DataNodeTestUtils.triggerBlockReport(dnProps.getDatanode());
}

// Validate the datanode is eventually decommissioned
// Some changes are needed to ensure replication/decommissioning occur in a timely manner:
// - if the namenode sends a DNA_TRANSFER before sending the DNA_INVALIDATE's then:
// - the block will enter the pendingReconstruction queue
// - this prevent the block from being sent for transfer again for some time
// - solution is to call "clearQueues" so that DNA_TRANSFER is sent again after DNA_INVALIDATE
// - need to run the check less frequently than DatanodeAdminMonitor
// such that in between "clearQueues" calls 2 things can occur:
// - DatanodeAdminMonitor runs which sets the block as neededReplication
// - datanode heartbeat is received which sends the DNA_TRANSFER to the node
final int checkEveryMillis = datanodeAdminMonitorFixedRateSeconds * 2 * 1000;
try {
GenericTestUtils.waitFor(() -> {
blockManager.clearQueues(); // Clear pendingReconstruction queue
return decomManager.getNumTrackedNodes() == 0 && decomManager.getNumPendingNodes() == 0
&& decommNode.getAdminState().equals(AdminStates.DECOMMISSIONED);
}, checkEveryMillis, 40000);
} catch (Exception e) {
blocksInFile = fs.getFileBlockLocations(file, 0, 0);
assertEquals(1, blocksInFile.length);
replicasInBlock = Arrays.asList(blocksInFile[0].getNames());
String errMsg = String.format("Node %s failed to complete decommissioning."
+ " numTrackedNodes=%d , numPendingNodes=%d , adminState=%s , nodesWithReplica=[%s]",
decommNode.getXferAddr(), decomManager.getNumTrackedNodes(),
decomManager.getNumPendingNodes(), decommNode.getAdminState(),
String.join(", ", replicasInBlock));
LOG.error(errMsg); // Do not log generic timeout exception
fail(errMsg);
}

// Validate block replica locations.
// Note that in order for decommissioning to complete the block must be
// replicated to both of the restarted datanodes; this implies that the
// corrupt replicas were invalidated on both of the restarted datanodes.
blocksInFile = fs.getFileBlockLocations(file, 0, 0);
assertEquals(1, blocksInFile.length);
replicasInBlock = Arrays.asList(blocksInFile[0].getNames());
assertEquals(numDatanode, replicasInBlock.size());
assertTrue(replicasInBlock.contains(decommNode.getName()));
for (final DatanodeDescriptor node : stoppedNodes) {
assertTrue(replicasInBlock.contains(node.getName()));
}
LOG.info("Block now has 2 live replicas on [{} , {}] and 1 decommissioned replica on {}",
firstStoppedNode.getXferAddr(), secondStoppedNode.getXferAddr(), decommNode.getXferAddr());
}

void appendBlock(final FileSystem fs, final Path file, int expectedReplicas) throws IOException {
LOG.info("Appending to the block pipeline");
boolean failed = false;
Exception failedReason = null;
try {
FSDataOutputStream out = fs.append(file);
for (int i = 0; i < 512; i++) {
out.write(i);
}
out.close();
} catch (Exception e) {
failed = true;
failedReason = e;
} finally {
BlockLocation[] blocksInFile = fs.getFileBlockLocations(file, 0, 0);
assertEquals(1, blocksInFile.length);
List<String> replicasInBlock = Arrays.asList(blocksInFile[0].getNames());
if (failed) {
String errMsg = String.format(
"Unexpected exception appending to the block pipeline."
+ " nodesWithReplica=[%s]", String.join(", ", replicasInBlock));
LOG.error(errMsg, failedReason); // Do not swallow the exception
fail(errMsg);
} else if (expectedReplicas != replicasInBlock.size()) {
String errMsg = String.format("Expecting %d replicas in block pipeline,"
+ " unexpectedly found %d replicas. nodesWithReplica=[%s]", expectedReplicas,
replicasInBlock.size(), String.join(", ", replicasInBlock));
LOG.error(errMsg);
fail(errMsg);
} else {
String infoMsg = String.format(
"Successfully appended block pipeline with %d replicas."
+ " nodesWithReplica=[%s]",
replicasInBlock.size(), String.join(", ", replicasInBlock));
LOG.info(infoMsg);
}
}
}
}