Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_DATANODE_LOCK_FAIR_KEY =
"dfs.datanode.lock.fair";
public static final boolean DFS_DATANODE_LOCK_FAIR_DEFAULT = true;
public static final String DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY =
"dfs.datanode.lock.read.write.enabled";
public static final Boolean DFS_DATANODE_LOCK_READ_WRITE_ENABLED_DEFAULT =
true;
public static final String DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_KEY =
"dfs.datanode.lock-reporting-threshold-ms";
public static final long
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ class BlockSender implements java.io.Closeable {
// the append write.
ChunkChecksum chunkChecksum = null;
final long replicaVisibleLength;
try(AutoCloseableLock lock = datanode.data.acquireDatasetLock()) {
try(AutoCloseableLock lock = datanode.data.acquireDatasetReadLock()) {
replica = getReplica(block, datanode);
replicaVisibleLength = replica.getVisibleLength();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2910,7 +2910,7 @@ void transferReplicaForPipelineRecovery(final ExtendedBlock b,
final BlockConstructionStage stage;

//get replica information
try(AutoCloseableLock lock = data.acquireDatasetLock()) {
try(AutoCloseableLock lock = data.acquireDatasetReadLock()) {
Block storedBlock = data.getStoredBlock(b.getBlockPoolId(),
b.getBlockId());
if (null == storedBlock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ private void scan() {
Map<String, ScanInfo[]> diskReport = getDiskReport();

// Hold FSDataset lock to prevent further changes to the block map
try(AutoCloseableLock lock = dataset.acquireDatasetLock()) {
try(AutoCloseableLock lock = dataset.acquireDatasetReadLock()) {
for (Entry<String, ScanInfo[]> entry : diskReport.entrySet()) {
String bpid = entry.getKey();
ScanInfo[] blockpoolReport = entry.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -658,12 +658,16 @@ ReplicaInfo moveBlockAcrossStorage(final ExtendedBlock block,
boolean isDeletingBlock(String bpid, long blockId);

/**
* Acquire the lock of the dataset.
* Acquire the lock of the data set. This prevents other threads from
* modifying the volume map structure inside the datanode, but other changes
* are still possible. For example modifying the genStamp of a block instance.
*/
AutoCloseableLock acquireDatasetLock();

/***
* Acquire the read lock of the data set.
/**
* Acquire the read lock of the data set. This prevents other threads from
* modifying the volume map structure inside the datanode, but other changes
* are still possible. For example modifying the genStamp of a block instance.
* @return The AutoClosable read lock instance.
*/
AutoCloseableLock acquireDatasetReadLock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**************************************************
* FSDataset manages a set of data blocks. Each block
Expand Down Expand Up @@ -179,7 +178,7 @@ public StorageReport[] getStorageReports(String bpid)

@Override
public FsVolumeImpl getVolume(final ExtendedBlock b) {
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
try(AutoCloseableLock lock = datasetReadLock.acquire()) {
final ReplicaInfo r =
volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
return r != null ? (FsVolumeImpl) r.getVolume() : null;
Expand All @@ -189,7 +188,7 @@ public FsVolumeImpl getVolume(final ExtendedBlock b) {
@Override // FsDatasetSpi
public Block getStoredBlock(String bpid, long blkid)
throws IOException {
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
try(AutoCloseableLock lock = datasetReadLock.acquire()) {
File blockfile = null;

ReplicaInfo info = volumeMap.get(bpid, blkid);
Expand All @@ -210,7 +209,7 @@ public Block getStoredBlock(String bpid, long blkid)
public Set<? extends Replica> deepCopyReplica(String bpid)
throws IOException {
Set<? extends Replica> replicas = null;
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
replicas = new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.
EMPTY_SET : volumeMap.replicas(bpid));
}
Expand Down Expand Up @@ -323,7 +322,20 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT,
TimeUnit.MILLISECONDS));
this.datasetWriteLock = new AutoCloseableLock(datasetRWLock.writeLock());
this.datasetReadLock = new AutoCloseableLock(datasetRWLock.readLock());
boolean enableRL = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY,
DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_DEFAULT);
// The read lock can be disabled by the above config key. If it is disabled
// then we simply make the both the read and write lock variables hold
// the write lock. All accesses to the lock are via these variables, so that
// effectively disables the read lock.
if (enableRL) {
LOG.info("The datanode lock is a read write lock");
this.datasetReadLock = new AutoCloseableLock(datasetRWLock.readLock());
} else {
LOG.info("The datanode lock is an exclusive write lock");
this.datasetReadLock = this.datasetWriteLock;
}
this.datasetWriteLockCondition = datasetWriteLock.newCondition();

// The number of volumes required for operation is the total number
Expand Down Expand Up @@ -363,7 +375,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
}

storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
volumeMap = new ReplicaMap(datasetRWLock);
volumeMap = new ReplicaMap(datasetReadLock, datasetWriteLock);
ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this);

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -484,7 +496,8 @@ private void addVolume(Collection<StorageLocation> dataLocations,
FsVolumeImpl fsVolume = new FsVolumeImpl(
this, sd.getStorageUuid(), dir, this.conf, storageType);
FsVolumeReference ref = fsVolume.obtainReference();
ReplicaMap tempVolumeMap = new ReplicaMap(datasetRWLock);
ReplicaMap tempVolumeMap =
new ReplicaMap(datasetReadLock, datasetWriteLock);
fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);

activateVolume(tempVolumeMap, sd, storageType, ref);
Expand Down Expand Up @@ -519,7 +532,7 @@ public void addVolume(final StorageLocation location,
final FsVolumeImpl fsVolume =
createFsVolume(sd.getStorageUuid(), sd.getCurrentDir(), storageType);
final ReplicaMap tempVolumeMap =
new ReplicaMap(new ReentrantReadWriteLock());
new ReplicaMap(datasetReadLock, datasetWriteLock);
ArrayList<IOException> exceptions = Lists.newArrayList();

for (final NamespaceInfo nsInfo : nsInfos) {
Expand Down Expand Up @@ -825,7 +838,7 @@ File getBlockFile(String bpid, long blockId) throws IOException {
public InputStream getBlockInputStream(ExtendedBlock b,
long seekOffset) throws IOException {
ReplicaInfo info;
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
}

Expand Down Expand Up @@ -915,7 +928,7 @@ private ReplicaInfo getReplicaInfo(String bpid, long blkid)
@Override // FsDatasetSpi
public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
long blkOffset, long metaOffset) throws IOException {
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
try(AutoCloseableLock lock = datasetReadLock.acquire()) {
final ReplicaInfo info = getReplicaInfo(b);
final FileIoProvider fileIoProvider = datanode.getFileIoProvider();
FsVolumeReference ref = info.getVolume().obtainReference();
Expand Down Expand Up @@ -1037,7 +1050,7 @@ public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
}

FsVolumeReference volumeRef = null;
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
try(AutoCloseableLock lock = datasetReadLock.acquire()) {
volumeRef = volumes.getNextVolume(targetStorageType, block.getNumBytes());
}
try {
Expand All @@ -1056,7 +1069,7 @@ public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
newReplicaInfo.setNumBytes(blockFiles[1].length());
// Finalize the copied files
newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo);
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
try(AutoCloseableLock lock = datasetReadLock.acquire()) {
// Increment numBlocks here as this block moved without knowing to BPS
FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume();
volume.getBlockPoolSlice(block.getBlockPoolId()).incrNumBlocks();
Expand Down Expand Up @@ -1937,7 +1950,7 @@ public Map<DatanodeStorage, BlockListAsLongs> getBlockReports(String bpid) {
new HashMap<String, BlockListAsLongs.Builder>();

List<FsVolumeImpl> curVolumes = null;
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
try(AutoCloseableLock lock = datasetReadLock.acquire()) {
curVolumes = volumes.getVolumes();
for (FsVolumeSpi v : curVolumes) {
builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength));
Expand Down Expand Up @@ -2002,7 +2015,7 @@ public List<Long> getCacheReport(String bpid) {
*/
@Override
public List<FinalizedReplica> getFinalizedBlocks(String bpid) {
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
try(AutoCloseableLock lock = datasetReadLock.acquire()) {
final ArrayList<FinalizedReplica> finalized =
new ArrayList<FinalizedReplica>(volumeMap.size(bpid));
for (ReplicaInfo b : volumeMap.replicas(bpid)) {
Expand Down Expand Up @@ -2088,7 +2101,7 @@ File validateBlockFile(String bpid, long blockId) {
//Should we check for metadata file too?
File f = null;
ReplicaInfo info;
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
try(AutoCloseableLock lock = datasetReadLock.acquire()) {
info = volumeMap.get(bpid, blockId);
if (info != null) {
f = info.getBlockFile();
Expand Down Expand Up @@ -2337,7 +2350,7 @@ public boolean isCached(String bpid, long blockId) {

@Override // FsDatasetSpi
public boolean contains(final ExtendedBlock block) {
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
try(AutoCloseableLock lock = datasetReadLock.acquire()) {
final long blockId = block.getLocalBlock().getBlockId();
return getFile(block.getBlockPoolId(), blockId, false) != null;
}
Expand Down Expand Up @@ -2632,7 +2645,7 @@ public ReplicaInfo getReplica(String bpid, long blockId) {

@Override
public String getReplicaString(String bpid, long blockId) {
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
try(AutoCloseableLock lock = datasetReadLock.acquire()) {
final Replica r = volumeMap.get(bpid, blockId);
return r == null ? "null" : r.toString();
}
Expand Down Expand Up @@ -2879,7 +2892,7 @@ private File[] copyReplicaWithNewBlockIdAndGS(
@Override // FsDatasetSpi
public long getReplicaVisibleLength(final ExtendedBlock block)
throws IOException {
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
try(AutoCloseableLock lock = datasetReadLock.acquire()) {
final Replica replica = getReplicaInfo(block.getBlockPoolId(),
block.getBlockId());
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
Expand Down Expand Up @@ -3007,18 +3020,20 @@ public void deleteBlockPool(String bpid, boolean force)
@Override // FsDatasetSpi
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
throws IOException {
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
try(AutoCloseableLock lock = datasetReadLock.acquire()) {
final Replica replica = volumeMap.get(block.getBlockPoolId(),
block.getBlockId());
if (replica == null) {
throw new ReplicaNotFoundException(block);
}
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
throw new IOException(
"Replica generation stamp < block generation stamp, block="
+ block + ", replica=" + replica);
} else if (replica.getGenerationStamp() > block.getGenerationStamp()) {
block.setGenerationStamp(replica.getGenerationStamp());
synchronized (replica) {
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
throw new IOException(
"Replica generation stamp < block generation stamp, block="
+ block + ", replica=" + replica);
} else if (replica.getGenerationStamp() > block.getGenerationStamp()) {
block.setGenerationStamp(replica.getGenerationStamp());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
* Maintains the replica map.
*/
class ReplicaMap {
private final ReadWriteLock rwLock;
// Lock object to synchronize this instance.
private final AutoCloseableLock readLock;
private final AutoCloseableLock writeLock;
Expand All @@ -41,18 +40,22 @@ class ReplicaMap {
private final Map<String, LightWeightResizableGSet<Block, ReplicaInfo>> map =
new HashMap<String, LightWeightResizableGSet<Block, ReplicaInfo>>();

ReplicaMap(ReadWriteLock lock) {
if (lock == null) {
ReplicaMap(AutoCloseableLock rLock, AutoCloseableLock wLock) {
if (rLock == null || wLock == null) {
throw new HadoopIllegalArgumentException(
"Lock to synchronize on cannot be null");
}
this.rwLock = lock;
this.readLock = new AutoCloseableLock(rwLock.readLock());
this.writeLock = new AutoCloseableLock(rwLock.writeLock());
this.readLock = rLock;
this.writeLock = wLock;
}

ReplicaMap(ReadWriteLock lock) {
this(new AutoCloseableLock(lock.readLock()),
new AutoCloseableLock(lock.writeLock()));
}

String[] getBlockPoolList() {
try (AutoCloseableLock l = writeLock.acquire()) {
try (AutoCloseableLock l = readLock.acquire()) {
return map.keySet().toArray(new String[map.keySet().size()]);
}
}
Expand Down Expand Up @@ -97,7 +100,7 @@ ReplicaInfo get(String bpid, Block block) {
*/
ReplicaInfo get(String bpid, long blockId) {
checkBlockPool(bpid);
try (AutoCloseableLock l = writeLock.acquire()) {
try (AutoCloseableLock l = readLock.acquire()) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
return m != null ? m.get(new Block(blockId)) : null;
}
Expand Down Expand Up @@ -217,7 +220,7 @@ ReplicaInfo remove(String bpid, long blockId) {
*/
int size(String bpid) {
LightWeightResizableGSet<Block, ReplicaInfo> m = null;
try (AutoCloseableLock l = writeLock.acquire()) {
try (AutoCloseableLock l = readLock.acquire()) {
m = map.get(bpid);
return m != null ? m.size() : 0;
}
Expand Down Expand Up @@ -265,4 +268,14 @@ void cleanUpBlockPool(String bpid) {
AutoCloseableLock getLock() {
return writeLock;
}

/**
* Get the lock object used for synchronizing the ReplicasMap for read only
* operations.
* @return The read lock object
*/
AutoCloseableLock getReadLock() {
return readLock;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3046,6 +3046,19 @@
</description>
</property>

<property>
<name>dfs.datanode.lock.read.write.enabled</name>
<value>true</value>
<description>If this is true, the FsDataset lock will be a read write lock. If
it is false, all locks will be a write lock.
Enabling this should give better datanode throughput, as many read only
functions can run concurrently under the read lock, when they would
previously have required the exclusive write lock. As the feature is
experimental, this switch can be used to disable the shared read lock, and
cause all lock acquisitions to use the exclusive write lock.
</description>
</property>

<property>
<name>dfs.datanode.lock-reporting-threshold-ms</name>
<value>300</value>
Expand Down
Loading