diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index c977f6294b2ad..130c6934a5976 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -33,9 +33,13 @@ import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.hdfs.client.BlockReportOptions; @@ -92,6 +96,8 @@ class BPServiceActor implements Runnable { volatile long lastCacheReport = 0; private final Scheduler scheduler; + private final Object sendIBRLock; + private final ExecutorService ibrExecutorService; Thread bpThread; DatanodeProtocolClientSideTranslatorPB bpNamenode; @@ -101,6 +107,7 @@ static enum RunningState { } private volatile RunningState runningState = RunningState.CONNECTING; + private volatile boolean sendImmediateIBR = false; private volatile boolean shouldServiceRun = true; private final DataNode dn; private final DNConf dnConf; @@ -135,6 +142,10 @@ static enum RunningState { dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval, dnConf.outliersReportIntervalMs); // get the value of maxDataLength. + sendIBRLock = new Object(); + ibrExecutorService = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("ibr-executor-%d").build()); this.maxDataLength = dnConf.getMaxDataLength(); } @@ -349,6 +360,11 @@ private long generateUniqueBlockReportId() { * @throws IOException */ List blockReport(long fullBrLeaseId) throws IOException { + + synchronized (sendIBRLock) { + reportReceivedDeletedBlocks(); + } + final ArrayList cmds = new ArrayList(); // Flush any block information that precedes the block report. Otherwise @@ -569,6 +585,9 @@ void stop() { if (bpThread != null) { bpThread.interrupt(); } + if (ibrExecutorService != null && !ibrExecutorService.isShutdown()) { + ibrExecutorService.shutdownNow(); + } } //This must be called only by blockPoolManager @@ -590,6 +609,9 @@ private synchronized void cleanUp() { IOUtils.cleanup(null, bpNamenode); IOUtils.cleanup(null, lifelineSender); bpos.shutdownActor(this); + if (!ibrExecutorService.isShutdown()) { + ibrExecutorService.shutdownNow(); + } } private void handleRollingUpgradeStatus(HeartbeatResponse resp) throws IOException { @@ -842,6 +864,8 @@ public void run() { } runningState = RunningState.RUNNING; + ibrExecutorService.submit(new IBRTaskHandler()); + if (initialRegistrationComplete != null) { initialRegistrationComplete.countDown(); } @@ -1275,4 +1299,5 @@ public long monotonicNow() { return Time.monotonicNow(); } } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 81b0d67c38238..32789ed4d4c00 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -243,13 +243,18 @@ ReplicaInfo fetchReplicaInfo(String bpid, long blockId) { @Override // FsDatasetSpi public LengthInputStream getMetaDataInputStream(ExtendedBlock b) throws IOException { - File meta = FsDatasetUtil.getMetaFile(getBlockFile(b), b.getGenerationStamp()); - FsVolumeSpi volume = null; + File meta = null; + try { + meta = FsDatasetUtil.getMetaFile(getBlockFile(b), b.getGenerationStamp()); + } catch (IOException e) { + throw new FileNotFoundException("BlockId " + b.getBlockName() + " is not valid."); + } if (meta == null || !meta.exists()) { return null; } + FsVolumeSpi volume = null; try (AutoCloseableLock lock = datasetWriteLock.acquire()) { final ReplicaInfo replicaInfo = getReplicaInfo(b); if (replicaInfo != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java index 03553fed4f3a7..e2fc94dcd6468 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java @@ -42,6 +42,7 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import org.mockito.exceptions.base.MockitoAssertionError; /** * Verify that incremental block reports are generated in response to @@ -124,7 +125,7 @@ public void testReportBlockReceived() throws InterruptedException, IOException { // Sleep for a very short time, this is necessary since the IBR is // generated asynchronously. - Thread.sleep(2000); + Thread.sleep(1000); // Ensure that the received block was reported immediately. Mockito.verify(nnSpy, times(1)).blockReceivedAndDeleted( @@ -166,13 +167,28 @@ public void testReportBlockDeleted() throws InterruptedException, IOException { // Trigger a heartbeat, this also triggers an IBR. DataNodeTestUtils.triggerHeartbeat(singletonDn); - Thread.sleep(2000); // Ensure that the deleted block is reported. Mockito.verify(nnSpy, times(1)).blockReceivedAndDeleted( any(DatanodeRegistration.class), anyString(), any(StorageReceivedDeletedBlocks[].class)); + int retries = 0; + while (true) { + try { + Mockito.verify(nnSpy, atLeastOnce()).blockReceivedAndDeleted( + any(DatanodeRegistration.class), + anyString(), + any(StorageReceivedDeletedBlocks[].class)); + break; + } catch (MockitoAssertionError e) { + if (retries > 7) { + throw e; + } + retries++; + Thread.sleep(2000); + } + } } finally { cluster.shutdown();