From ed183d5b5f19bb23b8516da12a364be1c4e25125 Mon Sep 17 00:00:00 2001 From: Ahmed Hussein Date: Mon, 12 Jul 2021 14:52:50 -0500 Subject: [PATCH] HDFS-15160. branch-2.10 ReplicaMap, Disk Balancer, Directory Scanner and various FsDatasetImpl methods should use datanode readlock. Contributed By Stephen O'Donnell and Ahmed Hussein --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 + .../hdfs/server/datanode/BlockSender.java | 2 +- .../hadoop/hdfs/server/datanode/DataNode.java | 2 +- .../server/datanode/DirectoryScanner.java | 2 +- .../datanode/fsdataset/FsDatasetSpi.java | 10 +- .../fsdataset/impl/FsDatasetImpl.java | 65 +++++---- .../datanode/fsdataset/impl/ReplicaMap.java | 31 +++-- .../src/main/resources/hdfs-default.xml | 13 ++ .../fsdataset/impl/TestFsDatasetImpl.java | 123 +++++++++++++++++- 9 files changed, 210 insertions(+), 42 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 980ce9bd97840..ec02f9d46a086 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -455,6 +455,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_DATANODE_LOCK_FAIR_KEY = "dfs.datanode.lock.fair"; public static final boolean DFS_DATANODE_LOCK_FAIR_DEFAULT = true; + public static final String DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY = + "dfs.datanode.lock.read.write.enabled"; + public static final Boolean DFS_DATANODE_LOCK_READ_WRITE_ENABLED_DEFAULT = + true; public static final String DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_KEY = "dfs.datanode.lock-reporting-threshold-ms"; public static final long diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index db19128043cbc..dba883a68151a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -252,7 +252,7 @@ class BlockSender implements java.io.Closeable { // the append write. ChunkChecksum chunkChecksum = null; final long replicaVisibleLength; - try(AutoCloseableLock lock = datanode.data.acquireDatasetLock()) { + try(AutoCloseableLock lock = datanode.data.acquireDatasetReadLock()) { replica = getReplica(block, datanode); replicaVisibleLength = replica.getVisibleLength(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 3788cd1847025..b3c61e594f65c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -2910,7 +2910,7 @@ void transferReplicaForPipelineRecovery(final ExtendedBlock b, final BlockConstructionStage stage; //get replica information - try(AutoCloseableLock lock = data.acquireDatasetLock()) { + try(AutoCloseableLock lock = data.acquireDatasetReadLock()) { Block storedBlock = data.getStoredBlock(b.getBlockPoolId(), b.getBlockId()); if (null == storedBlock) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java index d31e3772fb844..7010ee90bc8e0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java @@ -605,7 +605,7 @@ private void scan() { Map diskReport = getDiskReport(); // Hold FSDataset lock to prevent further changes to the block map - try(AutoCloseableLock lock = dataset.acquireDatasetLock()) { + try(AutoCloseableLock lock = dataset.acquireDatasetReadLock()) { for (Entry entry : diskReport.entrySet()) { String bpid = entry.getKey(); ScanInfo[] blockpoolReport = entry.getValue(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index d29d7722618a0..a1e63f55bcf2a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -658,12 +658,16 @@ ReplicaInfo moveBlockAcrossStorage(final ExtendedBlock block, boolean isDeletingBlock(String bpid, long blockId); /** - * Acquire the lock of the dataset. + * Acquire the lock of the data set. This prevents other threads from + * modifying the volume map structure inside the datanode, but other changes + * are still possible. For example modifying the genStamp of a block instance. */ AutoCloseableLock acquireDatasetLock(); - /*** - * Acquire the read lock of the data set. + /** + * Acquire the read lock of the data set. This prevents other threads from + * modifying the volume map structure inside the datanode, but other changes + * are still possible. For example modifying the genStamp of a block instance. * @return The AutoClosable read lock instance. */ AutoCloseableLock acquireDatasetReadLock(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 9977b96d6b810..b7dba2f8fca50 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -125,7 +125,6 @@ import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantReadWriteLock; /************************************************** * FSDataset manages a set of data blocks. Each block @@ -179,7 +178,7 @@ public StorageReport[] getStorageReports(String bpid) @Override public FsVolumeImpl getVolume(final ExtendedBlock b) { - try(AutoCloseableLock lock = datasetWriteLock.acquire()) { + try(AutoCloseableLock lock = datasetReadLock.acquire()) { final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); return r != null ? (FsVolumeImpl) r.getVolume() : null; @@ -189,7 +188,7 @@ public FsVolumeImpl getVolume(final ExtendedBlock b) { @Override // FsDatasetSpi public Block getStoredBlock(String bpid, long blkid) throws IOException { - try(AutoCloseableLock lock = datasetWriteLock.acquire()) { + try(AutoCloseableLock lock = datasetReadLock.acquire()) { File blockfile = null; ReplicaInfo info = volumeMap.get(bpid, blkid); @@ -210,7 +209,7 @@ public Block getStoredBlock(String bpid, long blkid) public Set deepCopyReplica(String bpid) throws IOException { Set replicas = null; - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetReadLock.acquire()) { replicas = new HashSet<>(volumeMap.replicas(bpid) == null ? Collections. EMPTY_SET : volumeMap.replicas(bpid)); } @@ -323,7 +322,20 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT, TimeUnit.MILLISECONDS)); this.datasetWriteLock = new AutoCloseableLock(datasetRWLock.writeLock()); - this.datasetReadLock = new AutoCloseableLock(datasetRWLock.readLock()); + boolean enableRL = conf.getBoolean( + DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY, + DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_DEFAULT); + // The read lock can be disabled by the above config key. If it is disabled + // then we simply make the both the read and write lock variables hold + // the write lock. All accesses to the lock are via these variables, so that + // effectively disables the read lock. + if (enableRL) { + LOG.info("The datanode lock is a read write lock"); + this.datasetReadLock = new AutoCloseableLock(datasetRWLock.readLock()); + } else { + LOG.info("The datanode lock is an exclusive write lock"); + this.datasetReadLock = this.datasetWriteLock; + } this.datasetWriteLockCondition = datasetWriteLock.newCondition(); // The number of volumes required for operation is the total number @@ -363,7 +375,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) } storageMap = new ConcurrentHashMap(); - volumeMap = new ReplicaMap(datasetRWLock); + volumeMap = new ReplicaMap(datasetReadLock, datasetWriteLock); ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this); @SuppressWarnings("unchecked") @@ -484,7 +496,8 @@ private void addVolume(Collection dataLocations, FsVolumeImpl fsVolume = new FsVolumeImpl( this, sd.getStorageUuid(), dir, this.conf, storageType); FsVolumeReference ref = fsVolume.obtainReference(); - ReplicaMap tempVolumeMap = new ReplicaMap(datasetRWLock); + ReplicaMap tempVolumeMap = + new ReplicaMap(datasetReadLock, datasetWriteLock); fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker); activateVolume(tempVolumeMap, sd, storageType, ref); @@ -519,7 +532,7 @@ public void addVolume(final StorageLocation location, final FsVolumeImpl fsVolume = createFsVolume(sd.getStorageUuid(), sd.getCurrentDir(), storageType); final ReplicaMap tempVolumeMap = - new ReplicaMap(new ReentrantReadWriteLock()); + new ReplicaMap(datasetReadLock, datasetWriteLock); ArrayList exceptions = Lists.newArrayList(); for (final NamespaceInfo nsInfo : nsInfos) { @@ -825,7 +838,7 @@ File getBlockFile(String bpid, long blockId) throws IOException { public InputStream getBlockInputStream(ExtendedBlock b, long seekOffset) throws IOException { ReplicaInfo info; - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetReadLock.acquire()) { info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); } @@ -915,7 +928,7 @@ private ReplicaInfo getReplicaInfo(String bpid, long blkid) @Override // FsDatasetSpi public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkOffset, long metaOffset) throws IOException { - try(AutoCloseableLock lock = datasetWriteLock.acquire()) { + try(AutoCloseableLock lock = datasetReadLock.acquire()) { final ReplicaInfo info = getReplicaInfo(b); final FileIoProvider fileIoProvider = datanode.getFileIoProvider(); FsVolumeReference ref = info.getVolume().obtainReference(); @@ -1037,7 +1050,7 @@ public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block, } FsVolumeReference volumeRef = null; - try(AutoCloseableLock lock = datasetWriteLock.acquire()) { + try(AutoCloseableLock lock = datasetReadLock.acquire()) { volumeRef = volumes.getNextVolume(targetStorageType, block.getNumBytes()); } try { @@ -1056,7 +1069,7 @@ public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block, newReplicaInfo.setNumBytes(blockFiles[1].length()); // Finalize the copied files newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo); - try(AutoCloseableLock lock = datasetWriteLock.acquire()) { + try(AutoCloseableLock lock = datasetReadLock.acquire()) { // Increment numBlocks here as this block moved without knowing to BPS FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume(); volume.getBlockPoolSlice(block.getBlockPoolId()).incrNumBlocks(); @@ -1937,7 +1950,7 @@ public Map getBlockReports(String bpid) { new HashMap(); List curVolumes = null; - try(AutoCloseableLock lock = datasetWriteLock.acquire()) { + try(AutoCloseableLock lock = datasetReadLock.acquire()) { curVolumes = volumes.getVolumes(); for (FsVolumeSpi v : curVolumes) { builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength)); @@ -2002,7 +2015,7 @@ public List getCacheReport(String bpid) { */ @Override public List getFinalizedBlocks(String bpid) { - try(AutoCloseableLock lock = datasetWriteLock.acquire()) { + try(AutoCloseableLock lock = datasetReadLock.acquire()) { final ArrayList finalized = new ArrayList(volumeMap.size(bpid)); for (ReplicaInfo b : volumeMap.replicas(bpid)) { @@ -2088,7 +2101,7 @@ File validateBlockFile(String bpid, long blockId) { //Should we check for metadata file too? File f = null; ReplicaInfo info; - try(AutoCloseableLock lock = datasetWriteLock.acquire()) { + try(AutoCloseableLock lock = datasetReadLock.acquire()) { info = volumeMap.get(bpid, blockId); if (info != null) { f = info.getBlockFile(); @@ -2337,7 +2350,7 @@ public boolean isCached(String bpid, long blockId) { @Override // FsDatasetSpi public boolean contains(final ExtendedBlock block) { - try(AutoCloseableLock lock = datasetWriteLock.acquire()) { + try(AutoCloseableLock lock = datasetReadLock.acquire()) { final long blockId = block.getLocalBlock().getBlockId(); return getFile(block.getBlockPoolId(), blockId, false) != null; } @@ -2632,7 +2645,7 @@ public ReplicaInfo getReplica(String bpid, long blockId) { @Override public String getReplicaString(String bpid, long blockId) { - try(AutoCloseableLock lock = datasetWriteLock.acquire()) { + try(AutoCloseableLock lock = datasetReadLock.acquire()) { final Replica r = volumeMap.get(bpid, blockId); return r == null ? "null" : r.toString(); } @@ -2879,7 +2892,7 @@ private File[] copyReplicaWithNewBlockIdAndGS( @Override // FsDatasetSpi public long getReplicaVisibleLength(final ExtendedBlock block) throws IOException { - try(AutoCloseableLock lock = datasetWriteLock.acquire()) { + try(AutoCloseableLock lock = datasetReadLock.acquire()) { final Replica replica = getReplicaInfo(block.getBlockPoolId(), block.getBlockId()); if (replica.getGenerationStamp() < block.getGenerationStamp()) { @@ -3007,18 +3020,20 @@ public void deleteBlockPool(String bpid, boolean force) @Override // FsDatasetSpi public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block) throws IOException { - try(AutoCloseableLock lock = datasetWriteLock.acquire()) { + try(AutoCloseableLock lock = datasetReadLock.acquire()) { final Replica replica = volumeMap.get(block.getBlockPoolId(), block.getBlockId()); if (replica == null) { throw new ReplicaNotFoundException(block); } - if (replica.getGenerationStamp() < block.getGenerationStamp()) { - throw new IOException( - "Replica generation stamp < block generation stamp, block=" - + block + ", replica=" + replica); - } else if (replica.getGenerationStamp() > block.getGenerationStamp()) { - block.setGenerationStamp(replica.getGenerationStamp()); + synchronized (replica) { + if (replica.getGenerationStamp() < block.getGenerationStamp()) { + throw new IOException( + "Replica generation stamp < block generation stamp, block=" + + block + ", replica=" + replica); + } else if (replica.getGenerationStamp() > block.getGenerationStamp()) { + block.setGenerationStamp(replica.getGenerationStamp()); + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java index adda9cf1e2d64..00d4c70f0ceee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java @@ -32,7 +32,6 @@ * Maintains the replica map. */ class ReplicaMap { - private final ReadWriteLock rwLock; // Lock object to synchronize this instance. private final AutoCloseableLock readLock; private final AutoCloseableLock writeLock; @@ -41,18 +40,22 @@ class ReplicaMap { private final Map> map = new HashMap>(); - ReplicaMap(ReadWriteLock lock) { - if (lock == null) { + ReplicaMap(AutoCloseableLock rLock, AutoCloseableLock wLock) { + if (rLock == null || wLock == null) { throw new HadoopIllegalArgumentException( "Lock to synchronize on cannot be null"); } - this.rwLock = lock; - this.readLock = new AutoCloseableLock(rwLock.readLock()); - this.writeLock = new AutoCloseableLock(rwLock.writeLock()); + this.readLock = rLock; + this.writeLock = wLock; + } + + ReplicaMap(ReadWriteLock lock) { + this(new AutoCloseableLock(lock.readLock()), + new AutoCloseableLock(lock.writeLock())); } String[] getBlockPoolList() { - try (AutoCloseableLock l = writeLock.acquire()) { + try (AutoCloseableLock l = readLock.acquire()) { return map.keySet().toArray(new String[map.keySet().size()]); } } @@ -97,7 +100,7 @@ ReplicaInfo get(String bpid, Block block) { */ ReplicaInfo get(String bpid, long blockId) { checkBlockPool(bpid); - try (AutoCloseableLock l = writeLock.acquire()) { + try (AutoCloseableLock l = readLock.acquire()) { LightWeightResizableGSet m = map.get(bpid); return m != null ? m.get(new Block(blockId)) : null; } @@ -217,7 +220,7 @@ ReplicaInfo remove(String bpid, long blockId) { */ int size(String bpid) { LightWeightResizableGSet m = null; - try (AutoCloseableLock l = writeLock.acquire()) { + try (AutoCloseableLock l = readLock.acquire()) { m = map.get(bpid); return m != null ? m.size() : 0; } @@ -265,4 +268,14 @@ void cleanUpBlockPool(String bpid) { AutoCloseableLock getLock() { return writeLock; } + + /** + * Get the lock object used for synchronizing the ReplicasMap for read only + * operations. + * @return The read lock object + */ + AutoCloseableLock getReadLock() { + return readLock; + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 2cbd48675cd32..282f1b4acffeb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -3046,6 +3046,19 @@ + + dfs.datanode.lock.read.write.enabled + true + If this is true, the FsDataset lock will be a read write lock. If + it is false, all locks will be a write lock. + Enabling this should give better datanode throughput, as many read only + functions can run concurrently under the read lock, when they would + previously have required the exclusive write lock. As the feature is + experimental, this switch can be used to disable the shared read lock, and + cause all lock acquisitions to use the exclusive write lock. + + + dfs.datanode.lock-reporting-threshold-ms 300 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index 0ee3196c22973..bb8f80f380678 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -20,6 +20,8 @@ import com.google.common.base.Supplier; import com.google.common.collect.Lists; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -55,6 +57,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.FakeTimer; import org.apache.hadoop.util.StringUtils; import org.junit.Assert; @@ -184,6 +187,122 @@ public void setUp() throws IOException { assertEquals(0, dataset.getNumFailedVolumes()); } + @Test(timeout=10000) + public void testReadLockEnabledByDefault() + throws Exception { + final FsDatasetSpi ds = dataset; + final AtomicBoolean accessed = new AtomicBoolean(false); + final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch waiterLatch = new CountDownLatch(1); + + Thread holder = new Thread() { + public void run() { + try (AutoCloseableLock l = ds.acquireDatasetReadLock()) { + latch.countDown(); + // wait for the waiter thread to access the lock. + waiterLatch.await(); + } catch (Exception e) { + } + } + }; + + Thread waiter = new Thread() { + public void run() { + try { + latch.await(); + } catch (InterruptedException e) { + waiterLatch.countDown(); + return; + } + try (AutoCloseableLock l = ds.acquireDatasetReadLock()) { + accessed.getAndSet(true); + // signal the holder thread. + waiterLatch.countDown(); + } catch (Exception e) { + } + } + }; + waiter.start(); + holder.start(); + holder.join(); + waiter.join(); + // The holder thread is still holding the lock, but the waiter can still + // run as the lock is a shared read lock. + // Otherwise test will timeout with deadlock. + assertEquals(true, accessed.get()); + holder.interrupt(); + } + + @Test(timeout=20000) + public void testReadLockCanBeDisabledByConfig() + throws Exception { + HdfsConfiguration conf = new HdfsConfiguration(); + conf.setBoolean( + DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY, false); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1).build(); + try { + final AtomicBoolean accessed = new AtomicBoolean(false); + final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch waiterLatch = new CountDownLatch(1); + cluster.waitActive(); + DataNode dn = cluster.getDataNodes().get(0); + final FsDatasetSpi ds = DataNodeTestUtils.getFSDataset(dn); + + + Thread holder = new Thread() { + public void run() { + try (AutoCloseableLock l = ds.acquireDatasetReadLock()) { + latch.countDown(); + // wait for the waiter thread to access the lock. + waiterLatch.await(); + } catch (Exception e) { + } + } + }; + + Thread waiter = new Thread() { + public void run() { + try { + // Wait for holder to get ds read lock. + latch.await(); + } catch (InterruptedException e) { + waiterLatch.countDown(); + return; + } + try (AutoCloseableLock l = ds.acquireDatasetReadLock()) { + accessed.getAndSet(true); + // signal the holder thread. + waiterLatch.countDown(); + } catch (Exception e) { + } + } + }; + waiter.start(); + holder.start(); + // Wait for sometime to make sure we are in deadlock, + try { + GenericTestUtils.waitFor( + new Supplier() { + @Override + public Boolean get() { + return accessed.get(); + }}, 100, 10000); + fail("Waiter thread should not execute."); + } catch (TimeoutException e) { + } + // Release waiterLatch to exit deadlock. + waiterLatch.countDown(); + holder.join(); + waiter.join(); + // After releasing waiterLatch water + // thread will be able to execute. + assertTrue(accessed.get()); + } finally { + cluster.shutdown(); + } + } + @Test public void testAddVolumes() throws IOException { final int numNewVolumes = 3; @@ -225,8 +344,8 @@ public void testAddVolumes() throws IOException { @Test public void testAddVolumeWithSameStorageUuid() throws IOException { - HdfsConfiguration conf = new HdfsConfiguration(); - MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + HdfsConfiguration hdfsConf = new HdfsConfiguration(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(hdfsConf) .numDataNodes(1).build(); try { cluster.waitActive();