From 6df043a6bb17c5ca7a127f414dadb31e9106a078 Mon Sep 17 00:00:00 2001 From: Zinan Zhuang Date: Wed, 10 Jan 2024 16:01:35 -0800 Subject: [PATCH] backporting HDFS-16064. Determine when to invalidate corrupt replica based on number of usable replicas --- .../server/blockmanagement/BlockManager.java | 17 +- .../apache/hadoop/hdfs/TestDecommission.java | 508 ++++++++++++++++++ 2 files changed, 520 insertions(+), 5 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 1f87972f8c9d7..658aaf44db9ba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1463,8 +1463,12 @@ private void markBlockAsCorrupt(BlockToMarkCorrupt b, b.getReason(), b.getReasonCode()); NumberReplicas numberOfReplicas = countNodes(b.getStored()); - boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >= - expectedReplicas; + final int numUsableReplicas = numberOfReplicas.liveReplicas() + + numberOfReplicas.decommissioning() + + numberOfReplicas.liveEnteringMaintenanceReplicas(); + boolean hasEnoughLiveReplicas = numUsableReplicas >= + expectedRedundancies; + boolean minReplicationSatisfied = numberOfReplicas.liveReplicas() >= minReplication; boolean hasMoreCorruptReplicas = minReplicationSatisfied && @@ -1472,12 +1476,15 @@ private void markBlockAsCorrupt(BlockToMarkCorrupt b, expectedReplicas; boolean corruptedDuringWrite = minReplicationSatisfied && b.isCorruptedDuringWrite(); - // case 1: have enough number of live replicas - // case 2: corrupted replicas + live replicas > Replication factor + // case 1: have enough number of usable replicas + // case 2: corrupted replicas + usable replicas > Replication factor // case 3: Block is marked corrupt due to failure while writing. In this // case genstamp will be different than that of valid block. // In all these cases we can delete the replica. - // In case of 3, rbw block will be deleted and valid block can be replicated + // In case 3, rbw block will be deleted and valid block can be replicated. + // Note NN only becomes aware of corrupt blocks when the block report is sent, + // this means that by default it can take up to 6 hours for a corrupt block to + // be invalidated, after which the valid block can be replicated. if (hasEnoughLiveReplicas || hasMoreCorruptReplicas || corruptedDuringWrite) { // the block is over-replicated so invalidate the replicas immediately diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java index a3891b33a9d22..be5834c272ac8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java @@ -39,6 +39,8 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.protocol.DatanodeID; @@ -1413,4 +1415,510 @@ public Boolean get() { cleanupFile(fileSys, file); } + + /** + * Test DatanodeAdminManager logic to re-queue unhealthy decommissioning nodes + * which are blocking the decommissioning of healthy nodes. + * Force the tracked nodes set to be filled with nodes lost while decommissioning, + * then decommission healthy nodes & validate they are decommissioned eventually. + */ + @Test(timeout = 120000) + public void testRequeueUnhealthyDecommissioningNodes() throws Exception { + // Create a MiniDFSCluster with 3 live datanode in AdminState=NORMAL and + // 2 dead datanodes in AdminState=DECOMMISSION_INPROGRESS and a file + // with replication factor of 5. + final int numLiveNodes = 3; + final int numDeadNodes = 2; + final int numNodes = numLiveNodes + numDeadNodes; + final List liveNodes = new ArrayList<>(); + final Map deadNodeProps = + new HashMap<>(); + final ArrayList decommissionedNodes = new ArrayList<>(); + final Path filePath = new Path("/tmp/test"); + createClusterWithDeadNodesDecommissionInProgress(numLiveNodes, liveNodes, numDeadNodes, + deadNodeProps, decommissionedNodes, filePath); + final FSNamesystem namesystem = getCluster().getNamesystem(); + final BlockManager blockManager = namesystem.getBlockManager(); + final DatanodeManager datanodeManager = blockManager.getDatanodeManager(); + final DatanodeAdminManager decomManager = datanodeManager.getDatanodeAdminManager(); + + // Validate the 2 "dead" nodes are not removed from the tracked nodes set + // after several seconds of operation + final Duration checkDuration = Duration.ofSeconds(5); + Instant checkUntil = Instant.now().plus(checkDuration); + while (Instant.now().isBefore(checkUntil)) { + BlockManagerTestUtil.recheckDecommissionState(datanodeManager); + assertEquals( + "Unexpected number of decommissioning nodes queued in DatanodeAdminManager.", + 0, decomManager.getNumPendingNodes()); + assertEquals( + "Unexpected number of decommissioning nodes tracked in DatanodeAdminManager.", + numDeadNodes, decomManager.getNumTrackedNodes()); + assertTrue( + "Dead decommissioning nodes unexpectedly transitioned out of DECOMMISSION_INPROGRESS.", + deadNodeProps.keySet().stream() + .allMatch(node -> node.getAdminState().equals(AdminStates.DECOMMISSION_INPROGRESS))); + Thread.sleep(500); + } + + // Delete the file such that its no longer a factor blocking decommissioning of live nodes + // which have block replicas for that file + getCluster().getFileSystem().delete(filePath, true); + + // Start decommissioning 2 "live" datanodes + int numLiveDecommNodes = 2; + final List liveDecommNodes = liveNodes.subList(0, numLiveDecommNodes); + for (final DatanodeDescriptor liveNode : liveDecommNodes) { + takeNodeOutofService(0, liveNode.getDatanodeUuid(), 0, decommissionedNodes, + AdminStates.DECOMMISSION_INPROGRESS); + decommissionedNodes.add(liveNode); + } + + // Write a new file such that there are under-replicated blocks preventing decommissioning + // of dead nodes + writeFile(getCluster().getFileSystem(), filePath, numNodes, 10); + + // Validate that the live datanodes are put into the pending decommissioning queue + GenericTestUtils.waitFor(() -> decomManager.getNumTrackedNodes() == numDeadNodes + && decomManager.getNumPendingNodes() == numLiveDecommNodes + && liveDecommNodes.stream().allMatch( + node -> node.getAdminState().equals(AdminStates.DECOMMISSION_INPROGRESS)), + 500, 30000); + assertThat(liveDecommNodes) + .as("Check all live decommissioning nodes queued in DatanodeAdminManager") + .containsAll(decomManager.getPendingNodes()); + + // Run DatanodeAdminManager.Monitor, then validate the dead nodes are re-queued & the + // live nodes are decommissioned + if (this instanceof TestDecommissionWithBackoffMonitor) { + // For TestDecommissionWithBackoffMonitor a single tick/execution of the + // DatanodeAdminBackoffMonitor will re-queue the dead nodes, then call + // "processPendingNodes" to de-queue the live nodes & decommission them + BlockManagerTestUtil.recheckDecommissionState(datanodeManager); + assertEquals( + "DatanodeAdminBackoffMonitor did not re-queue dead decommissioning nodes as expected.", + 2, decomManager.getNumPendingNodes()); + assertEquals( + "DatanodeAdminBackoffMonitor did not re-queue dead decommissioning nodes as expected.", + 0, decomManager.getNumTrackedNodes()); + } else { + // For TestDecommission a single tick/execution of the DatanodeAdminDefaultMonitor + // will re-queue the dead nodes. A seconds tick is needed to de-queue the live nodes + // & decommission them + BlockManagerTestUtil.recheckDecommissionState(datanodeManager); + assertEquals( + "DatanodeAdminDefaultMonitor did not re-queue dead decommissioning nodes as expected.", + 4, decomManager.getNumPendingNodes()); + assertEquals( + "DatanodeAdminDefaultMonitor did not re-queue dead decommissioning nodes as expected.", + 0, decomManager.getNumTrackedNodes()); + BlockManagerTestUtil.recheckDecommissionState(datanodeManager); + assertEquals( + "DatanodeAdminDefaultMonitor did not decommission live nodes as expected.", + 2, decomManager.getNumPendingNodes()); + assertEquals( + "DatanodeAdminDefaultMonitor did not decommission live nodes as expected.", + 0, decomManager.getNumTrackedNodes()); + } + assertTrue("Live nodes not DECOMMISSIONED as expected.", liveDecommNodes.stream() + .allMatch(node -> node.getAdminState().equals(AdminStates.DECOMMISSIONED))); + assertTrue("Dead nodes not DECOMMISSION_INPROGRESS as expected.", + deadNodeProps.keySet().stream() + .allMatch(node -> node.getAdminState().equals(AdminStates.DECOMMISSION_INPROGRESS))); + assertThat(deadNodeProps.keySet()) + .as("Check all dead decommissioning nodes queued in DatanodeAdminManager") + .containsAll(decomManager.getPendingNodes()); + + // Validate the 2 "dead" nodes are not removed from the tracked nodes set + // after several seconds of operation + checkUntil = Instant.now().plus(checkDuration); + while (Instant.now().isBefore(checkUntil)) { + BlockManagerTestUtil.recheckDecommissionState(datanodeManager); + assertEquals( + "Unexpected number of decommissioning nodes queued in DatanodeAdminManager.", + 0, decomManager.getNumPendingNodes()); + assertEquals( + "Unexpected number of decommissioning nodes tracked in DatanodeAdminManager.", + numDeadNodes, decomManager.getNumTrackedNodes()); + assertTrue( + "Dead decommissioning nodes unexpectedly transitioned out of DECOMMISSION_INPROGRESS.", + deadNodeProps.keySet().stream() + .allMatch(node -> node.getAdminState().equals(AdminStates.DECOMMISSION_INPROGRESS))); + Thread.sleep(500); + } + + // Delete the file such that there are no more under-replicated blocks + // allowing the dead nodes to be decommissioned + getCluster().getFileSystem().delete(filePath, true); + + // Validate the dead nodes are eventually decommissioned + GenericTestUtils.waitFor(() -> { + try { + BlockManagerTestUtil.recheckDecommissionState(datanodeManager); + } catch (ExecutionException | InterruptedException e) { + LOG.warn("Exception running DatanodeAdminMonitor", e); + return false; + } + return decomManager.getNumTrackedNodes() == 0 && decomManager.getNumPendingNodes() == 0 + && deadNodeProps.keySet().stream().allMatch( + node -> node.getAdminState().equals(AdminStates.DECOMMISSIONED)); + }, 500, 30000); + } + + /** + * Create a MiniDFSCluster with "numLiveNodes" live datanodes in AdminState=NORMAL and + * "numDeadNodes" dead datanodes in AdminState=DECOMMISSION_INPROGRESS. Create a file + * replicated to all datanodes. + * + * @param numLiveNodes - number of live nodes in cluster + * @param liveNodes - list which will be loaded with references to 3 live datanodes + * @param numDeadNodes - number of live nodes in cluster + * @param deadNodeProps - map which will be loaded with references to 2 dead datanodes + * @param decommissionedNodes - list which will be loaded with references to decommissioning nodes + * @param filePath - path used to create HDFS file + */ + private void createClusterWithDeadNodesDecommissionInProgress(final int numLiveNodes, + final List liveNodes, final int numDeadNodes, + final Map deadNodeProps, + final ArrayList decommissionedNodes, final Path filePath) throws Exception { + assertTrue("Must have numLiveNode > 0", numLiveNodes > 0); + assertTrue("Must have numDeadNode > 0", numDeadNodes > 0); + int numNodes = numLiveNodes + numDeadNodes; + + // Allow "numDeadNodes" datanodes to be decommissioned at a time + getConf() + .setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES, numDeadNodes); + // Disable the normal monitor runs + getConf() + .setInt(MiniDFSCluster.DFS_NAMENODE_DECOMMISSION_INTERVAL_TESTING_KEY, Integer.MAX_VALUE); + + // Start cluster with "numNodes" datanodes + startCluster(1, numNodes); + final FSNamesystem namesystem = getCluster().getNamesystem(); + final BlockManager blockManager = namesystem.getBlockManager(); + final DatanodeManager datanodeManager = blockManager.getDatanodeManager(); + final DatanodeAdminManager decomManager = datanodeManager.getDatanodeAdminManager(); + assertEquals(numNodes, getCluster().getDataNodes().size()); + getCluster().waitActive(); + + // "numLiveNodes" datanodes will remain "live" + for (final DataNode node : getCluster().getDataNodes().subList(0, numLiveNodes)) { + liveNodes.add(getDatanodeDesriptor(namesystem, node.getDatanodeUuid())); + } + assertEquals(numLiveNodes, liveNodes.size()); + + // "numDeadNodes" datanodes will be "dead" while decommissioning + final List deadNodes = + getCluster().getDataNodes().subList(numLiveNodes, numNodes).stream() + .map(dn -> getDatanodeDesriptor(namesystem, dn.getDatanodeUuid())) + .collect(Collectors.toList()); + assertEquals(numDeadNodes, deadNodes.size()); + + // Create file with block replicas on all nodes + writeFile(getCluster().getFileSystem(), filePath, numNodes, 10); + + // Cause the "dead" nodes to be lost while in state decommissioning + // and fill the tracked nodes set with those "dead" nodes + for (final DatanodeDescriptor deadNode : deadNodes) { + // Start decommissioning the node, it will not be able to complete due to the + // under-replicated file + takeNodeOutofService(0, deadNode.getDatanodeUuid(), 0, decommissionedNodes, + AdminStates.DECOMMISSION_INPROGRESS); + decommissionedNodes.add(deadNode); + + // Stop the datanode so that it is lost while decommissioning + MiniDFSCluster.DataNodeProperties dn = getCluster().stopDataNode(deadNode.getXferAddr()); + deadNodeProps.put(deadNode, dn); + deadNode.setLastUpdate(213); // Set last heartbeat to be in the past + } + assertEquals(numDeadNodes, deadNodeProps.size()); + + // Wait for the decommissioning nodes to become dead & to be added to "pendingNodes" + GenericTestUtils.waitFor(() -> decomManager.getNumTrackedNodes() == 0 + && decomManager.getNumPendingNodes() == numDeadNodes + && deadNodes.stream().allMatch(node -> + !BlockManagerTestUtil.isNodeHealthyForDecommissionOrMaintenance(blockManager, node) + && !node.isAlive()), 500, 20000); + } + + /* + This test reproduces a scenario where an under-replicated block on a decommissioning node + cannot be replicated to some datanodes because they have a corrupt replica of the block. + The test ensures that the corrupt replicas are eventually invalidated so that the + under-replicated block can be replicated to sufficient datanodes & the decommissioning + node can be decommissioned. + */ + @Test(timeout = 60000) + public void testDeleteCorruptReplicaForUnderReplicatedBlock() throws Exception { + // Constants + final Path file = new Path("/test-file"); + final int numDatanode = 3; + final short replicationFactor = 2; + final int numStoppedNodes = 2; + final int numDecommNodes = 1; + assertEquals(numDatanode, numStoppedNodes + numDecommNodes); + + // Run monitor every 5 seconds to speed up decommissioning & make the test faster + final int datanodeAdminMonitorFixedRateSeconds = 5; + getConf().setInt(MiniDFSCluster.DFS_NAMENODE_DECOMMISSION_INTERVAL_TESTING_KEY, + datanodeAdminMonitorFixedRateSeconds); + // Set block report interval to 6 hours to avoid unexpected block reports. + // The default block report interval is different for a MiniDFSCluster + getConf().setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, + DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT); + // Run the BlockManager RedundancyMonitor every 3 seconds such that the Namenode + // sends under-replication blocks for replication frequently + getConf().setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, + DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT); + // Ensure that the DataStreamer client will replace the bad datanode on append failure + getConf().set(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY, "ALWAYS"); + // Avoid having the DataStreamer client fail the append operation if datanode replacement fails + getConf() + .setBoolean(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY, true); + + // References to datanodes in the cluster + // - 2 datanode will be stopped to generate corrupt block replicas & then + // restarted later to validate the corrupt replicas are invalidated + // - 1 datanode will start decommissioning to make the block under replicated + final List allNodes = new ArrayList<>(); + final List stoppedNodes = new ArrayList<>(); + final DatanodeDescriptor decommNode; + + // Create MiniDFSCluster + startCluster(1, numDatanode); + getCluster().waitActive(); + final FSNamesystem namesystem = getCluster().getNamesystem(); + final BlockManager blockManager = namesystem.getBlockManager(); + final DatanodeManager datanodeManager = blockManager.getDatanodeManager(); + final DatanodeAdminManager decomManager = datanodeManager.getDatanodeAdminManager(); + final FileSystem fs = getCluster().getFileSystem(); + + // Get DatanodeDescriptors + for (final DataNode node : getCluster().getDataNodes()) { + allNodes.add(getDatanodeDesriptor(namesystem, node.getDatanodeUuid())); + } + + // Create block with 2 FINALIZED replicas + // Note that: + // - calling hflush leaves block in state ReplicaBeingWritten + // - calling close leaves the block in state FINALIZED + // - amount of data is kept small because flush is not synchronous + LOG.info("Creating Initial Block with {} FINALIZED replicas", replicationFactor); + FSDataOutputStream out = fs.create(file, replicationFactor); + for (int i = 0; i < 512; i++) { + out.write(i); + } + out.close(); + + // Validate the block exists with expected number of replicas + assertEquals(1, blockManager.getTotalBlocks()); + BlockLocation[] blocksInFile = fs.getFileBlockLocations(file, 0, 0); + assertEquals(1, blocksInFile.length); + List replicasInBlock = Arrays.asList(blocksInFile[0].getNames()); + assertEquals(replicationFactor, replicasInBlock.size()); + + // Identify the DatanodeDescriptors associated with the 2 nodes with replicas. + // Each of nodes with a replica will be stopped later to corrupt the replica + DatanodeDescriptor decommNodeTmp = null; + for (DatanodeDescriptor node : allNodes) { + if (replicasInBlock.contains(node.getName())) { + stoppedNodes.add(node); + } else { + decommNodeTmp = node; + } + } + assertEquals(numStoppedNodes, stoppedNodes.size()); + assertNotNull(decommNodeTmp); + decommNode = decommNodeTmp; + final DatanodeDescriptor firstStoppedNode = stoppedNodes.get(0); + final DatanodeDescriptor secondStoppedNode = stoppedNodes.get(1); + LOG.info("Detected 2 nodes with replicas : {} , {}", firstStoppedNode.getXferAddr(), + secondStoppedNode.getXferAddr()); + LOG.info("Detected 1 node without replica : {}", decommNode.getXferAddr()); + + // Stop firstStoppedNode & the append to the block pipeline such that DataStreamer client: + // - detects firstStoppedNode as bad link in block pipeline + // - replaces the firstStoppedNode with decommNode in block pipeline + // The result is that: + // - secondStoppedNode & decommNode have a live block replica + // - firstStoppedNode has a corrupt replica (corrupt because of old GenStamp) + LOG.info("Stopping first node with replica {}", firstStoppedNode.getXferAddr()); + final List stoppedNodeProps = new ArrayList<>(); + MiniDFSCluster.DataNodeProperties stoppedNodeProp = + getCluster().stopDataNode(firstStoppedNode.getXferAddr()); + stoppedNodeProps.add(stoppedNodeProp); + firstStoppedNode.setLastUpdate(213); // Set last heartbeat to be in the past + // Wait for NN to detect the datanode as dead + GenericTestUtils.waitFor( + () -> 2 == datanodeManager.getNumLiveDataNodes() && 1 == datanodeManager + .getNumDeadDataNodes(), 500, 30000); + // Append to block pipeline + appendBlock(fs, file, 2); + + // Stop secondStoppedNode & the append to the block pipeline such that DataStreamer client: + // - detects secondStoppedNode as bad link in block pipeline + // - attempts to replace secondStoppedNode but cannot because there are no more live nodes + // - appends to the block pipeline containing just decommNode + // The result is that: + // - decommNode has a live block replica + // - firstStoppedNode & secondStoppedNode both have a corrupt replica + LOG.info("Stopping second node with replica {}", secondStoppedNode.getXferAddr()); + stoppedNodeProp = getCluster().stopDataNode(secondStoppedNode.getXferAddr()); + stoppedNodeProps.add(stoppedNodeProp); + secondStoppedNode.setLastUpdate(213); // Set last heartbeat to be in the past + // Wait for NN to detect the datanode as dead + GenericTestUtils.waitFor(() -> numDecommNodes == datanodeManager.getNumLiveDataNodes() + && numStoppedNodes == datanodeManager.getNumDeadDataNodes(), 500, 30000); + // Append to block pipeline + appendBlock(fs, file, 1); + + // Validate block replica locations + blocksInFile = fs.getFileBlockLocations(file, 0, 0); + assertEquals(1, blocksInFile.length); + replicasInBlock = Arrays.asList(blocksInFile[0].getNames()); + assertEquals(numDecommNodes, replicasInBlock.size()); + assertTrue(replicasInBlock.contains(decommNode.getName())); + LOG.info("Block now has 2 corrupt replicas on [{} , {}] and 1 live replica on {}", + firstStoppedNode.getXferAddr(), secondStoppedNode.getXferAddr(), decommNode.getXferAddr()); + + LOG.info("Decommission node {} with the live replica", decommNode.getXferAddr()); + final ArrayList decommissionedNodes = new ArrayList<>(); + takeNodeOutofService(0, decommNode.getDatanodeUuid(), 0, decommissionedNodes, + AdminStates.DECOMMISSION_INPROGRESS); + + // Wait for the datanode to start decommissioning + try { + GenericTestUtils.waitFor(() -> decomManager.getNumTrackedNodes() == 0 + && decomManager.getNumPendingNodes() == numDecommNodes && decommNode.getAdminState() + .equals(AdminStates.DECOMMISSION_INPROGRESS), 500, 30000); + } catch (Exception e) { + blocksInFile = fs.getFileBlockLocations(file, 0, 0); + assertEquals(1, blocksInFile.length); + replicasInBlock = Arrays.asList(blocksInFile[0].getNames()); + String errMsg = String.format("Node %s failed to start decommissioning." + + " numTrackedNodes=%d , numPendingNodes=%d , adminState=%s , nodesWithReplica=[%s]", + decommNode.getXferAddr(), decomManager.getNumTrackedNodes(), + decomManager.getNumPendingNodes(), decommNode.getAdminState(), + String.join(", ", replicasInBlock)); + LOG.error(errMsg); // Do not log generic timeout exception + fail(errMsg); + } + + // Validate block replica locations + blocksInFile = fs.getFileBlockLocations(file, 0, 0); + assertEquals(1, blocksInFile.length); + replicasInBlock = Arrays.asList(blocksInFile[0].getNames()); + assertEquals(numDecommNodes, replicasInBlock.size()); + assertEquals(replicasInBlock.get(0), decommNode.getName()); + LOG.info("Block now has 2 corrupt replicas on [{} , {}] and 1 decommissioning replica on {}", + firstStoppedNode.getXferAddr(), secondStoppedNode.getXferAddr(), decommNode.getXferAddr()); + + // Restart the 2 stopped datanodes + LOG.info("Restarting stopped nodes {} , {}", firstStoppedNode.getXferAddr(), + secondStoppedNode.getXferAddr()); + for (final MiniDFSCluster.DataNodeProperties stoppedNode : stoppedNodeProps) { + assertTrue(getCluster().restartDataNode(stoppedNode)); + } + for (final MiniDFSCluster.DataNodeProperties stoppedNode : stoppedNodeProps) { + try { + getCluster().waitDatanodeFullyStarted(stoppedNode.getDatanode(), 30000); + LOG.info("Node {} Restarted", stoppedNode.getDatanode().getXferAddress()); + } catch (Exception e) { + String errMsg = String.format("Node %s Failed to Restart within 30 seconds", + stoppedNode.getDatanode().getXferAddress()); + LOG.error(errMsg); // Do not log generic timeout exception + fail(errMsg); + } + } + + // Trigger block reports for the 2 restarted nodes to ensure their corrupt + // block replicas are identified by the namenode + for (MiniDFSCluster.DataNodeProperties dnProps : stoppedNodeProps) { + DataNodeTestUtils.triggerBlockReport(dnProps.getDatanode()); + } + + // Validate the datanode is eventually decommissioned + // Some changes are needed to ensure replication/decommissioning occur in a timely manner: + // - if the namenode sends a DNA_TRANSFER before sending the DNA_INVALIDATE's then: + // - the block will enter the pendingReconstruction queue + // - this prevent the block from being sent for transfer again for some time + // - solution is to call "clearQueues" so that DNA_TRANSFER is sent again after DNA_INVALIDATE + // - need to run the check less frequently than DatanodeAdminMonitor + // such that in between "clearQueues" calls 2 things can occur: + // - DatanodeAdminMonitor runs which sets the block as neededReplication + // - datanode heartbeat is received which sends the DNA_TRANSFER to the node + final int checkEveryMillis = datanodeAdminMonitorFixedRateSeconds * 2 * 1000; + try { + GenericTestUtils.waitFor(() -> { + blockManager.clearQueues(); // Clear pendingReconstruction queue + return decomManager.getNumTrackedNodes() == 0 && decomManager.getNumPendingNodes() == 0 + && decommNode.getAdminState().equals(AdminStates.DECOMMISSIONED); + }, checkEveryMillis, 40000); + } catch (Exception e) { + blocksInFile = fs.getFileBlockLocations(file, 0, 0); + assertEquals(1, blocksInFile.length); + replicasInBlock = Arrays.asList(blocksInFile[0].getNames()); + String errMsg = String.format("Node %s failed to complete decommissioning." + + " numTrackedNodes=%d , numPendingNodes=%d , adminState=%s , nodesWithReplica=[%s]", + decommNode.getXferAddr(), decomManager.getNumTrackedNodes(), + decomManager.getNumPendingNodes(), decommNode.getAdminState(), + String.join(", ", replicasInBlock)); + LOG.error(errMsg); // Do not log generic timeout exception + fail(errMsg); + } + + // Validate block replica locations. + // Note that in order for decommissioning to complete the block must be + // replicated to both of the restarted datanodes; this implies that the + // corrupt replicas were invalidated on both of the restarted datanodes. + blocksInFile = fs.getFileBlockLocations(file, 0, 0); + assertEquals(1, blocksInFile.length); + replicasInBlock = Arrays.asList(blocksInFile[0].getNames()); + assertEquals(numDatanode, replicasInBlock.size()); + assertTrue(replicasInBlock.contains(decommNode.getName())); + for (final DatanodeDescriptor node : stoppedNodes) { + assertTrue(replicasInBlock.contains(node.getName())); + } + LOG.info("Block now has 2 live replicas on [{} , {}] and 1 decommissioned replica on {}", + firstStoppedNode.getXferAddr(), secondStoppedNode.getXferAddr(), decommNode.getXferAddr()); + } + + void appendBlock(final FileSystem fs, final Path file, int expectedReplicas) throws IOException { + LOG.info("Appending to the block pipeline"); + boolean failed = false; + Exception failedReason = null; + try { + FSDataOutputStream out = fs.append(file); + for (int i = 0; i < 512; i++) { + out.write(i); + } + out.close(); + } catch (Exception e) { + failed = true; + failedReason = e; + } finally { + BlockLocation[] blocksInFile = fs.getFileBlockLocations(file, 0, 0); + assertEquals(1, blocksInFile.length); + List replicasInBlock = Arrays.asList(blocksInFile[0].getNames()); + if (failed) { + String errMsg = String.format( + "Unexpected exception appending to the block pipeline." + + " nodesWithReplica=[%s]", String.join(", ", replicasInBlock)); + LOG.error(errMsg, failedReason); // Do not swallow the exception + fail(errMsg); + } else if (expectedReplicas != replicasInBlock.size()) { + String errMsg = String.format("Expecting %d replicas in block pipeline," + + " unexpectedly found %d replicas. nodesWithReplica=[%s]", expectedReplicas, + replicasInBlock.size(), String.join(", ", replicasInBlock)); + LOG.error(errMsg); + fail(errMsg); + } else { + String infoMsg = String.format( + "Successfully appended block pipeline with %d replicas." + + " nodesWithReplica=[%s]", + replicasInBlock.size(), String.join(", ", replicasInBlock)); + LOG.info(infoMsg); + } + } + } }