diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 247de6b411885..c3dbda84abd4c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -25,27 +25,8 @@ import java.io.IOException; import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.BitSet; -import java.util.Collection; -import java.util.Collections; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.FutureTask; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; +import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import javax.management.ObjectName; @@ -338,6 +319,9 @@ public long getTotalECBlockGroups() { /** Block report thread for handling async reports. */ private final BlockReportProcessingThread blockReportThread; + /** Used to prevent the occurrence of FBR datanodeid collection. */ + private Set fullBRDatanodeIds = new HashSet(); + /** * Store blocks {@literal ->} datanodedescriptor(s) map of corrupt replicas. */ @@ -773,6 +757,7 @@ public void close() { datanodeManager.close(); pendingReconstruction.stop(); blocksMap.close(); + fullBRDatanodeIds.clear(); } /** @return the datanodeManager */ @@ -2603,6 +2588,12 @@ public long requestBlockReportLeaseId(DatanodeRegistration nodeReg) { LOG.warn("Failed to find datanode {}", nodeReg); return 0; } + + if (containFBRDatanode(nodeReg)) { + LOG.warn("The FBR data of {} already exists on the NameNode.", nodeReg); + return 0; + } + // Request a new block report lease. The BlockReportLeaseManager has // its own internal locking. long leaseId = blockReportLeaseManager.requestLease(node); @@ -5283,6 +5274,31 @@ public T runBlockOp(final Callable action) } } + @VisibleForTesting + public void addFBRDatanode(String datanodeId) throws IOException { + synchronized (this) { + if (datanodeId == null || "".equals(datanodeId)) { + String msg = "Full BR cannot make DataNode empty."; + throw new IOException(msg); + } + fullBRDatanodeIds.add(datanodeId); + } + } + + @VisibleForTesting + public void removeFBRDatanode(String datanodeId) { + synchronized (this) { + if (datanodeId != null && !"".equals(datanodeId)) { + fullBRDatanodeIds.remove(datanodeId); + } + } + } + + @VisibleForTesting + public boolean containFBRDatanode(DatanodeRegistration nodeReg) { + return fullBRDatanodeIds.contains(nodeReg.getDatanodeUuid()); + } + /** * Notification of a successful block recovery. * @param block for which the recovery succeeded diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 90819c28ffc3f..5ae6cc62f3609 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -1618,17 +1618,22 @@ public DatanodeCommand blockReport(final DatanodeRegistration nodeReg, boolean noStaleStorages = false; try { if (bm.checkBlockReportLease(context, nodeReg)) { - for (int r = 0; r < reports.length; r++) { - final BlockListAsLongs blocks = reports[r].getBlocks(); - // - // BlockManager.processReport accumulates information of prior calls - // for the same node and storage, so the value returned by the last - // call of this loop is the final updated value for noStaleStorage. - // - final int index = r; - noStaleStorages = bm.runBlockOp(() -> - bm.processReport(nodeReg, reports[index].getStorage(), - blocks, context)); + try { + bm.addFBRDatanode(nodeReg.getDatanodeUuid()); + for (int r = 0; r < reports.length; r++) { + final BlockListAsLongs blocks = reports[r].getBlocks(); + // + // BlockManager.processReport accumulates information of prior calls + // for the same node and storage, so the value returned by the last + // call of this loop is the final updated value for noStaleStorage. + // + final int index = r; + noStaleStorages = bm.runBlockOp(() -> + bm.processReport(nodeReg, reports[index].getStorage(), + blocks, context)); + } + } finally { + bm.removeFBRDatanode(nodeReg.getDatanodeUuid()); } } } catch (UnregisteredNodeException une) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportLease.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportLease.java index 40408b1924413..13090d5d3a115 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportLease.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportLease.java @@ -137,6 +137,44 @@ public void testCheckBlockReportLease() throws Exception { } } + @Test + public void testCheckBlockReportLease2() throws Exception { + HdfsConfiguration conf = new HdfsConfiguration(); + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1).build()) { + cluster.waitActive(); + + FSNamesystem fsn = cluster.getNamesystem(); + BlockManager blockManager = fsn.getBlockManager(); + BlockManager spyBlockManager = spy(blockManager); + fsn.setBlockManagerForTesting(spyBlockManager); + String poolId = cluster.getNamesystem().getBlockPoolId(); + NamenodeProtocols rpcServer = cluster.getNameNodeRpc(); + + // Test based on one DataNode report to Namenode + DataNode dn = cluster.getDataNodes().get(0); + DatanodeRegistration dnRegistration = dn.getDNRegistrationForBP(poolId); + StorageReport[] storages = dn.getFSDataset().getStorageReports(poolId); + + HeartbeatResponse hbResponse = rpcServer.sendHeartbeat( + dnRegistration, storages, 0, 0, 0, 0, 0, null, true, + SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT); + assertTrue(hbResponse.getFullBlockReportLeaseId() != 0); + + spyBlockManager.addFBRDatanode(dnRegistration.getDatanodeUuid()); + hbResponse = rpcServer.sendHeartbeat( + dnRegistration, storages, 0, 0, 0, 0, 0, null, true, + SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT); + assertTrue(hbResponse.getFullBlockReportLeaseId() == 0); + + spyBlockManager.removeFBRDatanode(dnRegistration.getDatanodeUuid()); + hbResponse = rpcServer.sendHeartbeat( + dnRegistration, storages, 0, 0, 0, 0, 0, null, true, + SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT); + assertTrue(hbResponse.getFullBlockReportLeaseId() != 0); + } + } + private StorageBlockReport[] createReports(DatanodeStorage[] dnStorages, int numBlocks) { int longsPerBlock = 3;