Skip to content

Commit

Permalink
HDFS-16598. Fix DataNode FsDatasetImpl lock issue without GS checks. (#…
Browse files Browse the repository at this point in the history
…4366). Contributed by ZanderXu.

Reviewed-by: Mingxiang Li <liaiphag0@gmail.com>
Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
  • Loading branch information
ZanderXu committed Jun 14, 2022
1 parent bebf03a commit d0715b1
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -906,14 +906,20 @@ ReplicaInfo getReplicaInfo(String bpid, long blkid)
return info;
}

String getStorageUuidForLock(ExtendedBlock b)
throws ReplicaNotFoundException {
return getReplicaInfo(b.getBlockPoolId(), b.getBlockId())
.getStorageUuid();
}

/**
* Returns handles to the block file and its metadata file
*/
@Override // FsDatasetSpi
public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
long blkOffset, long metaOffset) throws IOException {
try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
b.getBlockPoolId(), getStorageUuidForLock(b))) {
ReplicaInfo info = getReplicaInfo(b);
FsVolumeReference ref = info.getVolume().obtainReference();
try {
Expand Down Expand Up @@ -1379,7 +1385,7 @@ static void computeChecksum(ReplicaInfo srcReplica, File dstMeta,
public ReplicaHandler append(ExtendedBlock b,
long newGS, long expectedBlockLen) throws IOException {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
b.getBlockPoolId(), getStorageUuidForLock(b))) {
// If the block was successfully finalized because all packets
// were successfully processed at the Datanode but the ack for
// some of the packets were not received by the client. The client
Expand Down Expand Up @@ -1562,7 +1568,7 @@ public Replica recoverClose(ExtendedBlock b, long newGS,
while (true) {
try {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
b.getBlockPoolId(), getStorageUuidForLock(b))) {
// check replica's state
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
// bump the replica's GS
Expand Down Expand Up @@ -1665,7 +1671,7 @@ public ReplicaHandler recoverRbw(
while (true) {
try {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
b.getBlockPoolId(), getStorageUuidForLock(b))) {
ReplicaInfo replicaInfo =
getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
// check the replica's state
Expand Down Expand Up @@ -1697,7 +1703,7 @@ private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw,
ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
throws IOException {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
b.getBlockPoolId(), getStorageUuidForLock(b))) {
// check generation stamp
long replicaGenerationStamp = rbw.getGenerationStamp();
if (replicaGenerationStamp < b.getGenerationStamp() ||
Expand Down Expand Up @@ -1759,7 +1765,7 @@ public ReplicaInPipeline convertTemporaryToRbw(
final ExtendedBlock b) throws IOException {
long startTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
b.getBlockPoolId(), getStorageUuidForLock(b))) {
final long blockId = b.getBlockId();
final long expectedGs = b.getGenerationStamp();
final long visible = b.getNumBytes();
Expand Down Expand Up @@ -1957,7 +1963,7 @@ public void finalizeBlock(ExtendedBlock b, boolean fsyncDir)
ReplicaInfo finalizedReplicaInfo = null;
long startTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
b.getBlockPoolId(), getStorageUuidForLock(b))) {
if (Thread.interrupted()) {
// Don't allow data modifications from interrupted threads
throw new IOException("Cannot finalize block from Interrupted Thread");
Expand Down Expand Up @@ -2041,7 +2047,7 @@ private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo)
public void unfinalizeBlock(ExtendedBlock b) throws IOException {
long startTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
b.getBlockPoolId(), getStorageUuidForLock(b))) {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getLocalBlock());
if (replicaInfo != null &&
Expand Down Expand Up @@ -2992,7 +2998,7 @@ public Replica updateReplicaUnderRecovery(
final long newlength) throws IOException {
long startTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
oldBlock.getBlockPoolId(), getReplicaInfo(oldBlock).getStorageUuid())) {
oldBlock.getBlockPoolId(), getStorageUuidForLock(oldBlock))) {
//get replica
final String bpid = oldBlock.getBlockPoolId();
final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,10 @@ private void testAppend(String bpid, FsDatasetSpi<?> dataSet,
Assert.fail("Should not have appended to a non-existent replica " +
blocks[NON_EXISTENT]);
} catch (ReplicaNotFoundException e) {
Assert.assertEquals(ReplicaNotFoundException.NON_EXISTENT_REPLICA +
blocks[NON_EXISTENT], e.getMessage());
String expectMessage = ReplicaNotFoundException.NON_EXISTENT_REPLICA
+ blocks[NON_EXISTENT].getBlockPoolId() + ":"
+ blocks[NON_EXISTENT].getBlockId();
Assert.assertEquals(expectMessage, e.getMessage());
}

newGS = blocks[FINALIZED].getGenerationStamp()+1;
Expand Down

0 comments on commit d0715b1

Please sign in to comment.