Skip to content

Commit

Permalink
HDFS-9255. Consolidate block recovery related implementation into a s…
Browse files Browse the repository at this point in the history
…ingle class. Contributed by Walter Su.

Change-Id: I7a1c03f50123d79ac0a78c981d9721617e3229d1
  • Loading branch information
zhe-thoughts committed Oct 28, 2015
1 parent a04b169 commit e287e7d
Show file tree
Hide file tree
Showing 7 changed files with 420 additions and 313 deletions.
3 changes: 3 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Expand Up @@ -1596,6 +1596,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9311. Support optional offload of NameNode HA service health checks to
a separate RPC server. (cnauroth)

HDFS-9255. Consolidate block recovery related implementation into a single
class. (Walter Su via zhz)

OPTIMIZATIONS

HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
Expand Down
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECRecoveryCommandProto;
Expand All @@ -49,13 +50,8 @@
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECRecoveryInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaOptionEntryProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ErasureCodingPolicyProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfosProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
Expand Down Expand Up @@ -367,11 +363,16 @@ public static RecoveringBlockProto convert(RecoveringBlock b) {
}

public static RecoveringBlock convert(RecoveringBlockProto b) {
ExtendedBlock block = PBHelperClient.convert(b.getBlock().getB());
DatanodeInfo[] locs = PBHelperClient.convert(b.getBlock().getLocsList());
return (b.hasTruncateBlock()) ?
new RecoveringBlock(block, locs, PBHelperClient.convert(b.getTruncateBlock())) :
new RecoveringBlock(block, locs, b.getNewGenStamp());
LocatedBlock lb = PBHelperClient.convertLocatedBlockProto(b.getBlock());
RecoveringBlock rBlock;
if (b.hasTruncateBlock()) {
rBlock = new RecoveringBlock(lb.getBlock(), lb.getLocations(),
PBHelperClient.convert(b.getTruncateBlock()));
} else {
rBlock = new RecoveringBlock(lb.getBlock(), lb.getLocations(),
b.getNewGenStamp());
}
return rBlock;
}

public static ReplicaState convert(ReplicaStateProto state) {
Expand Down
Expand Up @@ -1390,15 +1390,17 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
// in block recovery.
recoveryInfos = DatanodeStorageInfo.toDatanodeInfos(storages);
}
RecoveringBlock rBlock;
if(truncateRecovery) {
Block recoveryBlock = (copyOnTruncateRecovery) ? b :
uc.getTruncateBlock();
brCommand.add(new RecoveringBlock(primaryBlock, recoveryInfos,
recoveryBlock));
rBlock = new RecoveringBlock(primaryBlock, recoveryInfos,
recoveryBlock);
} else {
brCommand.add(new RecoveringBlock(primaryBlock, recoveryInfos,
uc.getBlockRecoveryId()));
rBlock = new RecoveringBlock(primaryBlock, recoveryInfos,
uc.getBlockRecoveryId());
}
brCommand.add(rBlock);
}
return new DatanodeCommand[] { brCommand };
}
Expand Down
Expand Up @@ -700,7 +700,8 @@ assert getBlockPoolId().equals(bp) :
break;
case DatanodeProtocol.DNA_RECOVERBLOCK:
String who = "NameNode at " + actor.getNNSocketAddress();
dn.recoverBlocks(who, ((BlockRecoveryCommand)cmd).getRecoveringBlocks());
dn.getBlockRecoveryWorker().recoverBlocks(who,
((BlockRecoveryCommand)cmd).getRecoveringBlocks());
break;
case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
Expand Down

0 comments on commit e287e7d

Please sign in to comment.