From fd2b1b2fad9fe2258b9cb29838f3b459eec0702f Mon Sep 17 00:00:00 2001 From: tomglliu Date: Mon, 29 Aug 2022 17:09:02 +0800 Subject: [PATCH 1/3] DN should notify NN when client requests a missing block --- .../hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 81b0d67c38238..f7cb474722c35 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -243,8 +243,13 @@ ReplicaInfo fetchReplicaInfo(String bpid, long blockId) { @Override // FsDatasetSpi public LengthInputStream getMetaDataInputStream(ExtendedBlock b) throws IOException { - File meta = FsDatasetUtil.getMetaFile(getBlockFile(b), b.getGenerationStamp()); FsVolumeSpi volume = null; + File meta = null; + try { + meta = FsDatasetUtil.getMetaFile(getBlockFile(b), b.getGenerationStamp()); + } catch (IOException e) { + throw new FileNotFoundException("BlockId " + b.getBlockName() + " is not valid."); + } if (meta == null || !meta.exists()) { return null; From 19d085a550635edc2d48bbe05bce6a4e8a4e6892 Mon Sep 17 00:00:00 2001 From: tomglliu Date: Mon, 29 Aug 2022 18:03:11 +0800 Subject: [PATCH 2/3] BPServiceActor to provide new thread to handle IBR --- .../hdfs/server/datanode/BPServiceActor.java | 25 +++++++++++++++++++ .../datanode/TestIncrementalBlockReports.java | 20 +++++++++++++-- 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index c977f6294b2ad..130c6934a5976 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -33,9 +33,13 @@ import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.hdfs.client.BlockReportOptions; @@ -92,6 +96,8 @@ class BPServiceActor implements Runnable { volatile long lastCacheReport = 0; private final Scheduler scheduler; + private final Object sendIBRLock; + private final ExecutorService ibrExecutorService; Thread bpThread; DatanodeProtocolClientSideTranslatorPB bpNamenode; @@ -101,6 +107,7 @@ static enum RunningState { } private volatile RunningState runningState = RunningState.CONNECTING; + private volatile boolean sendImmediateIBR = false; private volatile boolean shouldServiceRun = true; private final DataNode dn; private final DNConf dnConf; @@ -135,6 +142,10 @@ static enum RunningState { dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval, dnConf.outliersReportIntervalMs); // get the value of maxDataLength. + sendIBRLock = new Object(); + ibrExecutorService = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("ibr-executor-%d").build()); this.maxDataLength = dnConf.getMaxDataLength(); } @@ -349,6 +360,11 @@ private long generateUniqueBlockReportId() { * @throws IOException */ List blockReport(long fullBrLeaseId) throws IOException { + + synchronized (sendIBRLock) { + reportReceivedDeletedBlocks(); + } + final ArrayList cmds = new ArrayList(); // Flush any block information that precedes the block report. Otherwise @@ -569,6 +585,9 @@ void stop() { if (bpThread != null) { bpThread.interrupt(); } + if (ibrExecutorService != null && !ibrExecutorService.isShutdown()) { + ibrExecutorService.shutdownNow(); + } } //This must be called only by blockPoolManager @@ -590,6 +609,9 @@ private synchronized void cleanUp() { IOUtils.cleanup(null, bpNamenode); IOUtils.cleanup(null, lifelineSender); bpos.shutdownActor(this); + if (!ibrExecutorService.isShutdown()) { + ibrExecutorService.shutdownNow(); + } } private void handleRollingUpgradeStatus(HeartbeatResponse resp) throws IOException { @@ -842,6 +864,8 @@ public void run() { } runningState = RunningState.RUNNING; + ibrExecutorService.submit(new IBRTaskHandler()); + if (initialRegistrationComplete != null) { initialRegistrationComplete.countDown(); } @@ -1275,4 +1299,5 @@ public long monotonicNow() { return Time.monotonicNow(); } } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java index 03553fed4f3a7..e2fc94dcd6468 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java @@ -42,6 +42,7 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import org.mockito.exceptions.base.MockitoAssertionError; /** * Verify that incremental block reports are generated in response to @@ -124,7 +125,7 @@ public void testReportBlockReceived() throws InterruptedException, IOException { // Sleep for a very short time, this is necessary since the IBR is // generated asynchronously. - Thread.sleep(2000); + Thread.sleep(1000); // Ensure that the received block was reported immediately. Mockito.verify(nnSpy, times(1)).blockReceivedAndDeleted( @@ -166,13 +167,28 @@ public void testReportBlockDeleted() throws InterruptedException, IOException { // Trigger a heartbeat, this also triggers an IBR. DataNodeTestUtils.triggerHeartbeat(singletonDn); - Thread.sleep(2000); // Ensure that the deleted block is reported. Mockito.verify(nnSpy, times(1)).blockReceivedAndDeleted( any(DatanodeRegistration.class), anyString(), any(StorageReceivedDeletedBlocks[].class)); + int retries = 0; + while (true) { + try { + Mockito.verify(nnSpy, atLeastOnce()).blockReceivedAndDeleted( + any(DatanodeRegistration.class), + anyString(), + any(StorageReceivedDeletedBlocks[].class)); + break; + } catch (MockitoAssertionError e) { + if (retries > 7) { + throw e; + } + retries++; + Thread.sleep(2000); + } + } } finally { cluster.shutdown(); From b4cb264f9dbb5877b6d134134765a5c4337fc115 Mon Sep 17 00:00:00 2001 From: tomglliu Date: Mon, 29 Aug 2022 19:29:35 +0800 Subject: [PATCH 3/3] DN should notify NN when client requests a missing block --- .../hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index f7cb474722c35..32789ed4d4c00 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -243,7 +243,6 @@ ReplicaInfo fetchReplicaInfo(String bpid, long blockId) { @Override // FsDatasetSpi public LengthInputStream getMetaDataInputStream(ExtendedBlock b) throws IOException { - FsVolumeSpi volume = null; File meta = null; try { meta = FsDatasetUtil.getMetaFile(getBlockFile(b), b.getGenerationStamp()); @@ -255,6 +254,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) return null; } + FsVolumeSpi volume = null; try (AutoCloseableLock lock = datasetWriteLock.acquire()) { final ReplicaInfo replicaInfo = getReplicaInfo(b); if (replicaInfo != null) {