Skip to content

Commit

Permalink
HDFS-9070. Allow fsck display pending replica location information fo…
Browse files Browse the repository at this point in the history
…r being-written blocks. Contributed by Gao Rui.
  • Loading branch information
Jing9 committed Oct 21, 2015
1 parent 59ce780 commit d806a5b
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 57 deletions.
3 changes: 3 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Expand Up @@ -1546,6 +1546,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8647. Abstract BlockManager's rack policy into BlockPlacementPolicy.
(Brahma Reddy Battula via mingma)

HDFS-9070. Allow fsck display pending replica location information for
being-written blocks. (GAO Rui via jing9)

OPTIMIZATIONS

HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
Expand Down
Expand Up @@ -122,6 +122,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
public static final String FAILURE_STATUS = "FAILED";

private final NameNode namenode;
private final BlockManager blockManager;
private final NetworkTopology networktopology;
private final int totalDatanodes;
private final InetAddress remoteAddress;
Expand Down Expand Up @@ -196,6 +197,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
int totalDatanodes, InetAddress remoteAddress) {
this.conf = conf;
this.namenode = namenode;
this.blockManager = namenode.getNamesystem().getBlockManager();
this.networktopology = networktopology;
this.out = out;
this.totalDatanodes = totalDatanodes;
Expand Down Expand Up @@ -249,24 +251,23 @@ public void blockIdCK(String blockId) {
return;
}

BlockManager bm = namenode.getNamesystem().getBlockManager();
try {
//get blockInfo
Block block = new Block(Block.getBlockId(blockId));
//find which file this block belongs to
BlockInfo blockInfo = bm.getStoredBlock(block);
BlockInfo blockInfo = blockManager.getStoredBlock(block);
if(blockInfo == null) {
out.println("Block "+ blockId +" " + NONEXISTENT_STATUS);
LOG.warn("Block "+ blockId + " " + NONEXISTENT_STATUS);
return;
}
BlockCollection bc = bm.getBlockCollection(blockInfo);
BlockCollection bc = blockManager.getBlockCollection(blockInfo);
INode iNode = (INode) bc;
NumberReplicas numberReplicas= bm.countNodes(blockInfo);
NumberReplicas numberReplicas= blockManager.countNodes(blockInfo);
out.println("Block Id: " + blockId);
out.println("Block belongs to: "+iNode.getFullPathName());
out.println("No. of Expected Replica: " +
bm.getExpectedReplicaNum(blockInfo));
blockManager.getExpectedReplicaNum(blockInfo));
out.println("No. of live Replica: " + numberReplicas.liveReplicas());
out.println("No. of excess Replica: " + numberReplicas.excessReplicas());
out.println("No. of stale Replica: " +
Expand All @@ -279,8 +280,8 @@ public void blockIdCK(String blockId) {
numberReplicas.corruptReplicas());
//record datanodes that have corrupted block replica
Collection<DatanodeDescriptor> corruptionRecord = null;
if (bm.getCorruptReplicas(block) != null) {
corruptionRecord = bm.getCorruptReplicas(block);
if (blockManager.getCorruptReplicas(block) != null) {
corruptionRecord = blockManager.getCorruptReplicas(block);
}

//report block replicas status on datanodes
Expand All @@ -289,8 +290,8 @@ public void blockIdCK(String blockId) {
out.print("Block replica on datanode/rack: " + dn.getHostName() +
dn.getNetworkLocation() + " ");
if (corruptionRecord != null && corruptionRecord.contains(dn)) {
out.print(CORRUPT_STATUS+"\t ReasonCode: "+
bm.getCorruptReason(block,dn));
out.print(CORRUPT_STATUS + "\t ReasonCode: " +
blockManager.getCorruptReason(block, dn));
} else if (dn.isDecommissioned() ){
out.print(DECOMMISSIONED_STATUS);
} else if (dn.isDecommissionInProgress()) {
Expand Down Expand Up @@ -546,8 +547,63 @@ private void collectFileSummary(String path, HdfsFileStatus file, Result res,
}
}

private void collectBlocksSummary(String parent, HdfsFileStatus file, Result res,
LocatedBlocks blocks) throws IOException {
/**
* Display info of each replica for replication block.
* For striped block group, display info of each internal block.
*/
private String getReplicaInfo(BlockInfo storedBlock) {
if (!(showLocations || showRacks || showReplicaDetails)) {
return "";
}
final boolean isComplete = storedBlock.isComplete();
DatanodeStorageInfo[] storages = isComplete ?
blockManager.getStorages(storedBlock) :
storedBlock.getUnderConstructionFeature().getExpectedStorageLocations();
StringBuilder sb = new StringBuilder(" [");

for (int i = 0; i < storages.length; i++) {
DatanodeStorageInfo storage = storages[i];
DatanodeDescriptor dnDesc = storage.getDatanodeDescriptor();
if (showRacks) {
sb.append(NodeBase.getPath(dnDesc));
} else {
sb.append(new DatanodeInfoWithStorage(dnDesc, storage.getStorageID(),
storage.getStorageType()));
}
if (showReplicaDetails) {
LightWeightHashSet<BlockInfo> blocksExcess =
blockManager.excessReplicateMap.get(dnDesc.getDatanodeUuid());
Collection<DatanodeDescriptor> corruptReplicas =
blockManager.getCorruptReplicas(storedBlock);
sb.append("(");
if (dnDesc.isDecommissioned()) {
sb.append("DECOMMISSIONED)");
} else if (dnDesc.isDecommissionInProgress()) {
sb.append("DECOMMISSIONING)");
} else if (corruptReplicas != null
&& corruptReplicas.contains(dnDesc)) {
sb.append("CORRUPT)");
} else if (blocksExcess != null
&& blocksExcess.contains(storedBlock)) {
sb.append("EXCESS)");
} else if (dnDesc.isStale(this.staleInterval)) {
sb.append("STALE_NODE)");
} else if (storage.areBlockContentsStale()) {
sb.append("STALE_BLOCK_CONTENT)");
} else {
sb.append("LIVE)");
}
}
if (i < storages.length - 1) {
sb.append(", ");
}
}
sb.append(']');
return sb.toString();
}

private void collectBlocksSummary(String parent, HdfsFileStatus file,
Result res, LocatedBlocks blocks) throws IOException {
String path = file.getFullName(parent);
boolean isOpen = blocks.isUnderConstruction();
if (isOpen && !showOpenFiles) {
Expand All @@ -570,13 +626,12 @@ private void collectBlocksSummary(String parent, HdfsFileStatus file, Result res
// it is under construction
continue;
}
BlockManager bm = namenode.getNamesystem().getBlockManager();

final BlockInfo storedBlock = bm.getStoredBlock(
final BlockInfo storedBlock = blockManager.getStoredBlock(
block.getLocalBlock());
final int minReplication = bm.getMinStorageNum(storedBlock);
final int minReplication = blockManager.getMinStorageNum(storedBlock);
// count decommissionedReplicas / decommissioningReplicas
NumberReplicas numberReplicas = bm.countNodes(storedBlock);
NumberReplicas numberReplicas = blockManager.countNodes(storedBlock);
int decommissionedReplicas = numberReplicas.decommissioned();
int decommissioningReplicas = numberReplicas.decommissioning();
res.decommissionedReplicas += decommissionedReplicas;
Expand Down Expand Up @@ -663,7 +718,8 @@ private void collectBlocksSummary(String parent, HdfsFileStatus file, Result res

// report
String blkName = block.toString();
report.append(blockNumber + ". " + blkName + " len=" + block.getNumBytes());
report.append(blockNumber + ". " + blkName + " len=" +
block.getNumBytes());
if (totalReplicasPerBlock == 0 && !isCorrupt) {
// If the block is corrupted, it means all its available replicas are
// corrupted. We don't mark it as missing given these available replicas
Expand All @@ -675,52 +731,34 @@ private void collectBlocksSummary(String parent, HdfsFileStatus file, Result res
missize += block.getNumBytes();
} else {
report.append(" Live_repl=" + liveReplicas);
if (showLocations || showRacks || showReplicaDetails) {
StringBuilder sb = new StringBuilder("[");
DatanodeStorageInfo[] storages = bm.getStorages(storedBlock);
for (int i = 0; i < storages.length; i++) {
DatanodeStorageInfo storage = storages[i];
DatanodeDescriptor dnDesc = storage.getDatanodeDescriptor();
if (showRacks) {
sb.append(NodeBase.getPath(dnDesc));
} else {
sb.append(new DatanodeInfoWithStorage(dnDesc, storage.getStorageID(), storage
.getStorageType()));
}
if (showReplicaDetails) {
LightWeightHashSet<BlockInfo> blocksExcess =
bm.excessReplicateMap.get(dnDesc.getDatanodeUuid());
Collection<DatanodeDescriptor> corruptReplicas =
bm.getCorruptReplicas(block.getLocalBlock());
sb.append("(");
if (dnDesc.isDecommissioned()) {
sb.append("DECOMMISSIONED)");
} else if (dnDesc.isDecommissionInProgress()) {
sb.append("DECOMMISSIONING)");
} else if (corruptReplicas != null && corruptReplicas.contains(dnDesc)) {
sb.append("CORRUPT)");
} else if (blocksExcess != null && blocksExcess.contains(block.getLocalBlock())) {
sb.append("EXCESS)");
} else if (dnDesc.isStale(this.staleInterval)) {
sb.append("STALE_NODE)");
} else if (storage.areBlockContentsStale()) {
sb.append("STALE_BLOCK_CONTENT)");
} else {
sb.append("LIVE)");
}
}
if (i < storages.length - 1) {
sb.append(", ");
}
}
sb.append(']');
report.append(" " + sb.toString());
String info = getReplicaInfo(storedBlock);
if (!info.isEmpty()){
report.append(" ").append(info);
}
}
report.append('\n');
blockNumber++;
}

//display under construction block info.
if (!blocks.isLastBlockComplete() && lastBlock != null) {
ExtendedBlock block = lastBlock.getBlock();
String blkName = block.toString();
BlockInfo storedBlock = blockManager.getStoredBlock(
block.getLocalBlock());
DatanodeStorageInfo[] storages = storedBlock
.getUnderConstructionFeature().getExpectedStorageLocations();
report.append('\n');
report.append("Under Construction Block:\n");
report.append(blockNumber).append(". ").append(blkName);
report.append(" len=").append(block.getNumBytes());
report.append(" Expected_repl=" + storages.length);
String info=getReplicaInfo(storedBlock);
if (!info.isEmpty()){
report.append(" ").append(info);
}
}

// count corrupt file & move or delete if necessary
if ((missing > 0) || (corrupt > 0)) {
if (!showFiles) {
Expand Down
Expand Up @@ -625,9 +625,11 @@ public void testFsckOpenFiles() throws Exception {
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
assertFalse(outStr.contains("OPENFORWRITE"));
// Use -openforwrite option to list open files
outStr = runFsck(conf, 0, true, topDir, "-openforwrite");
outStr = runFsck(conf, 0, true, topDir, "-files", "-blocks",
"-locations", "-openforwrite");
System.out.println(outStr);
assertTrue(outStr.contains("OPENFORWRITE"));
assertTrue(outStr.contains("Under Construction Block:"));
assertTrue(outStr.contains("openFile"));
// Close the file
out.close();
Expand All @@ -636,6 +638,7 @@ public void testFsckOpenFiles() throws Exception {
System.out.println(outStr);
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
assertFalse(outStr.contains("OPENFORWRITE"));
assertFalse(outStr.contains("Under Construction Block:"));
util.cleanup(fs, topDir);
if (fs != null) {try{fs.close();} catch(Exception e){}}
cluster.shutdown();
Expand All @@ -645,6 +648,77 @@ public void testFsckOpenFiles() throws Exception {
}
}

@Test
public void testFsckOpenECFiles() throws Exception {
DFSTestUtil util = new DFSTestUtil.Builder().setName("TestFsckECFile").
setNumFiles(4).build();
MiniDFSCluster cluster = null;
FileSystem fs = null;
String outStr;
try {
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(10).build();
cluster.getFileSystem().getClient().setErasureCodingPolicy("/", null);
String topDir = "/myDir";
byte[] randomBytes = new byte[3000000];
int seed = 42;
new Random(seed).nextBytes(randomBytes);
cluster.waitActive();
fs = cluster.getFileSystem();
util.createFiles(fs, topDir);

// Open a EC file for writing and do not close for now
Path openFile = new Path(topDir + "/openECFile");
FSDataOutputStream out = fs.create(openFile);
int writeCount = 0;
while (writeCount != 300) {
out.write(randomBytes);
writeCount++;
}

// We expect the filesystem to be HEALTHY and show one open file
outStr = runFsck(conf, 0, true, openFile.toString(), "-files",
"-blocks", "-openforwrite");
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
assertTrue(outStr.contains("OPENFORWRITE"));
assertTrue(outStr.contains("Live_repl=9"));
assertTrue(outStr.contains("Expected_repl=9"));

// Use -openforwrite option to list open files
outStr = runFsck(conf, 0, true, openFile.toString(), "-files", "-blocks",
"-locations", "-openforwrite", "-replicaDetails");
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
assertTrue(outStr.contains("OPENFORWRITE"));
assertTrue(outStr.contains("Live_repl=9"));
assertTrue(outStr.contains("Expected_repl=9"));
assertTrue(outStr.contains("Under Construction Block:"));

// Close the file
out.close();

// Now, fsck should show HEALTHY fs and should not show any open files
outStr = runFsck(conf, 0, true, openFile.toString(), "-files", "-blocks",
"-locations", "-racks", "-replicaDetails");
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
assertFalse(outStr.contains("OPENFORWRITE"));
assertFalse(outStr.contains("Under Construction Block:"));
assertFalse(outStr.contains("Expected_repl=9"));
assertTrue(outStr.contains("Live_repl=9"));
util.cleanup(fs, topDir);
} finally {
if (fs != null) {
try {
fs.close();
} catch (Exception e) {
}
}
if (cluster != null) {
cluster.shutdown();
}
}
}

@Test
public void testCorruptBlock() throws Exception {
Configuration conf = new HdfsConfiguration();
Expand Down

0 comments on commit d806a5b

Please sign in to comment.