Skip to content

Commit

Permalink
HDFS-7768. Change fsck to support EC files. Contributed by Takanobu A…
Browse files Browse the repository at this point in the history
…sanuma
  • Loading branch information
Tsz-Wo Nicholas Sze authored and Zhe Zhang committed May 26, 2015
1 parent 4ae32ab commit 0ed92e5
Show file tree
Hide file tree
Showing 3 changed files with 238 additions and 45 deletions.
2 changes: 2 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
Expand Up @@ -253,3 +253,5 @@


HDFS-8441. Erasure Coding: make condition check earlier for setReplication. HDFS-8441. Erasure Coding: make condition check earlier for setReplication.
(waltersu4549) (waltersu4549)

HDFS-7768. Change fsck to support EC files. (Takanobu Asanuma via szetszwo)
Expand Up @@ -74,6 +74,7 @@
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet; import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.NodeBase;
Expand Down Expand Up @@ -123,6 +124,9 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
private final int totalDatanodes; private final int totalDatanodes;
private final InetAddress remoteAddress; private final InetAddress remoteAddress;


private long totalDirs = 0L;
private long totalSymlinks = 0L;

private String lostFound = null; private String lostFound = null;
private boolean lfInited = false; private boolean lfInited = false;
private boolean lfInitedOk = false; private boolean lfInitedOk = false;
Expand Down Expand Up @@ -356,13 +360,21 @@ public void fsck() {
namenode.getNamesystem().getBlockManager().getStoragePolicies()); namenode.getNamesystem().getBlockManager().getStoragePolicies());
} }


Result res = new Result(conf); Result replRes = new ReplicationResult(conf);
Result ecRes = new ErasureCodingResult(conf);


check(path, file, res); check(path, file, replRes, ecRes);


out.println(res); out.print("\nStatus: ");
out.println(" Number of data-nodes:\t\t" + totalDatanodes); out.println(replRes.isHealthy() && ecRes.isHealthy() ? "HEALTHY" : "CORRUPT");
out.println(" Number of data-nodes:\t" + totalDatanodes);
out.println(" Number of racks:\t\t" + networktopology.getNumOfRacks()); out.println(" Number of racks:\t\t" + networktopology.getNumOfRacks());
out.println(" Total dirs:\t\t\t" + totalDirs);
out.println(" Total symlinks:\t\t" + totalSymlinks);
out.println("\nReplicated Blocks:");
out.println(replRes);
out.println("\nErasure Coded Block Groups:");
out.println(ecRes);


if (this.showStoragePolcies) { if (this.showStoragePolcies) {
out.print(storageTypeSummary.toString()); out.print(storageTypeSummary.toString());
Expand All @@ -382,7 +394,7 @@ public void fsck() {
// of file system and return appropriate code. Changing the output // of file system and return appropriate code. Changing the output
// string might break testcases. Also note this must be the last line // string might break testcases. Also note this must be the last line
// of the report. // of the report.
if (res.isHealthy()) { if (replRes.isHealthy() && ecRes.isHealthy()) {
out.print("\n\nThe filesystem under path '" + path + "' " + HEALTHY_STATUS); out.print("\n\nThe filesystem under path '" + path + "' " + HEALTHY_STATUS);
} else { } else {
out.print("\n\nThe filesystem under path '" + path + "' " + CORRUPT_STATUS); out.print("\n\nThe filesystem under path '" + path + "' " + CORRUPT_STATUS);
Expand Down Expand Up @@ -425,42 +437,49 @@ private void listCorruptFileBlocks() throws IOException {
} }


@VisibleForTesting @VisibleForTesting
void check(String parent, HdfsFileStatus file, Result res) throws IOException { void check(String parent, HdfsFileStatus file, Result replRes, Result ecRes)
throws IOException {
String path = file.getFullName(parent); String path = file.getFullName(parent);
if (file.isDir()) { if (file.isDir()) {
checkDir(path, res); checkDir(path, replRes, ecRes);
return; return;
} }
if (file.isSymlink()) { if (file.isSymlink()) {
if (showFiles) { if (showFiles) {
out.println(path + " <symlink>"); out.println(path + " <symlink>");
} }
res.totalSymlinks++; totalSymlinks++;
return; return;
} }
LocatedBlocks blocks = getBlockLocations(path, file); LocatedBlocks blocks = getBlockLocations(path, file);
if (blocks == null) { // the file is deleted if (blocks == null) { // the file is deleted
return; return;
} }
collectFileSummary(path, file, res, blocks);
collectBlocksSummary(parent, file, res, blocks); final Result r = file.getReplication() == 0? ecRes: replRes;
collectFileSummary(path, file, r, blocks);
if (showprogress && (replRes.totalFiles + ecRes.totalFiles) % 100 == 0) {
out.println();
out.flush();
}
collectBlocksSummary(parent, file, r, blocks);
} }


private void checkDir(String path, Result res) throws IOException { private void checkDir(String path, Result replRes, Result ecRes) throws IOException {
if (snapshottableDirs != null && snapshottableDirs.contains(path)) { if (snapshottableDirs != null && snapshottableDirs.contains(path)) {
String snapshotPath = (path.endsWith(Path.SEPARATOR) ? path : path String snapshotPath = (path.endsWith(Path.SEPARATOR) ? path : path
+ Path.SEPARATOR) + Path.SEPARATOR)
+ HdfsConstants.DOT_SNAPSHOT_DIR; + HdfsConstants.DOT_SNAPSHOT_DIR;
HdfsFileStatus snapshotFileInfo = namenode.getRpcServer().getFileInfo( HdfsFileStatus snapshotFileInfo = namenode.getRpcServer().getFileInfo(
snapshotPath); snapshotPath);
check(snapshotPath, snapshotFileInfo, res); check(snapshotPath, snapshotFileInfo, replRes, ecRes);
} }
byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME; byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;
DirectoryListing thisListing; DirectoryListing thisListing;
if (showFiles) { if (showFiles) {
out.println(path + " <dir>"); out.println(path + " <dir>");
} }
res.totalDirs++; totalDirs++;
do { do {
assert lastReturnedName != null; assert lastReturnedName != null;
thisListing = namenode.getRpcServer().getListing( thisListing = namenode.getRpcServer().getListing(
Expand All @@ -470,7 +489,7 @@ private void checkDir(String path, Result res) throws IOException {
} }
HdfsFileStatus[] files = thisListing.getPartialListing(); HdfsFileStatus[] files = thisListing.getPartialListing();
for (int i = 0; i < files.length; i++) { for (int i = 0; i < files.length; i++) {
check(path, files[i], res); check(path, files[i], replRes, ecRes);
} }
lastReturnedName = thisListing.getLastName(); lastReturnedName = thisListing.getLastName();
} while (thisListing.hasMore()); } while (thisListing.hasMore());
Expand Down Expand Up @@ -517,10 +536,6 @@ private void collectFileSummary(String path, HdfsFileStatus file, Result res,
} else if (showprogress) { } else if (showprogress) {
out.print('.'); out.print('.');
} }
if ((showprogress) && res.totalFiles % 100 == 0) {
out.println();
out.flush();
}
} }


private void collectBlocksSummary(String parent, HdfsFileStatus file, Result res, private void collectBlocksSummary(String parent, HdfsFileStatus file, Result res,
Expand All @@ -543,7 +558,7 @@ private void collectBlocksSummary(String parent, HdfsFileStatus file, Result res
block.getLocalBlock()); block.getLocalBlock());
// count decommissionedReplicas / decommissioningReplicas // count decommissionedReplicas / decommissioningReplicas
NumberReplicas numberReplicas = bm.countNodes(storedBlock); NumberReplicas numberReplicas = bm.countNodes(storedBlock);
int decommissionedReplicas = numberReplicas.decommissioned();; int decommissionedReplicas = numberReplicas.decommissioned();
int decommissioningReplicas = numberReplicas.decommissioning(); int decommissioningReplicas = numberReplicas.decommissioning();
res.decommissionedReplicas += decommissionedReplicas; res.decommissionedReplicas += decommissionedReplicas;
res.decommissioningReplicas += decommissioningReplicas; res.decommissioningReplicas += decommissioningReplicas;
Expand All @@ -555,7 +570,15 @@ private void collectBlocksSummary(String parent, HdfsFileStatus file, Result res
res.totalReplicas += totalReplicasPerBlock; res.totalReplicas += totalReplicasPerBlock;


// count expected replicas // count expected replicas
short targetFileReplication = file.getReplication(); short targetFileReplication;
if(file.getReplication() == 0) {
INode inode = namenode.getNamesystem().getFSDirectory().getINode(path);
INodesInPath iip = INodesInPath.fromINode(inode);
ECSchema ecSchema = namenode.getNamesystem().getFSDirectory().getECSchema(iip);
targetFileReplication = (short) (ecSchema.getNumDataUnits() + ecSchema.getNumParityUnits());
} else {
targetFileReplication = file.getReplication();
}
res.numExpectedReplicas += targetFileReplication; res.numExpectedReplicas += targetFileReplication;


// count under min repl'd blocks // count under min repl'd blocks
Expand Down Expand Up @@ -981,7 +1004,7 @@ static class Result {
long missingReplicas = 0L; long missingReplicas = 0L;
long decommissionedReplicas = 0L; long decommissionedReplicas = 0L;
long decommissioningReplicas = 0L; long decommissioningReplicas = 0L;
long numUnderMinReplicatedBlocks=0L; long numUnderMinReplicatedBlocks = 0L;
long numOverReplicatedBlocks = 0L; long numOverReplicatedBlocks = 0L;
long numUnderReplicatedBlocks = 0L; long numUnderReplicatedBlocks = 0L;
long numMisReplicatedBlocks = 0L; // blocks that do not satisfy block placement policy long numMisReplicatedBlocks = 0L; // blocks that do not satisfy block placement policy
Expand All @@ -991,20 +1014,14 @@ static class Result {
long totalOpenFilesBlocks = 0L; long totalOpenFilesBlocks = 0L;
long totalFiles = 0L; long totalFiles = 0L;
long totalOpenFiles = 0L; long totalOpenFiles = 0L;
long totalDirs = 0L;
long totalSymlinks = 0L;
long totalSize = 0L; long totalSize = 0L;
long totalOpenFilesSize = 0L; long totalOpenFilesSize = 0L;
long totalReplicas = 0L; long totalReplicas = 0L;


final short replication;
final int minReplication; final int minReplication;


Result(Configuration conf) { Result(int minReplication) {
this.replication = (short)conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, this.minReplication = minReplication;
DFSConfigKeys.DFS_REPLICATION_DEFAULT);
this.minReplication = (short)conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
} }


/** /**
Expand Down Expand Up @@ -1032,19 +1049,28 @@ float getReplicationFactor() {
return 0.0f; return 0.0f;
return (float) (totalReplicas) / (float) totalBlocks; return (float) (totalReplicas) / (float) totalBlocks;
} }
}

@VisibleForTesting
static class ReplicationResult extends Result {
final short replication;

ReplicationResult(Configuration conf) {
super(conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT));
this.replication = (short)conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
DFSConfigKeys.DFS_REPLICATION_DEFAULT);
}


@Override @Override
public String toString() { public String toString() {
StringBuilder res = new StringBuilder(); StringBuilder res = new StringBuilder();
res.append("Status: ").append((isHealthy() ? "HEALTHY" : "CORRUPT")) res.append(" Total size:\t").append(totalSize).append(" B");
.append("\n Total size:\t").append(totalSize).append(" B");
if (totalOpenFilesSize != 0) { if (totalOpenFilesSize != 0) {
res.append(" (Total open files size: ").append(totalOpenFilesSize) res.append(" (Total open files size: ").append(totalOpenFilesSize)
.append(" B)"); .append(" B)");
} }
res.append("\n Total dirs:\t").append(totalDirs).append( res.append("\n Total files:\t").append(totalFiles);
"\n Total files:\t").append(totalFiles);
res.append("\n Total symlinks:\t\t").append(totalSymlinks);
if (totalOpenFiles != 0) { if (totalOpenFiles != 0) {
res.append(" (Files currently being written: ").append(totalOpenFiles) res.append(" (Files currently being written: ").append(totalOpenFiles)
.append(")"); .append(")");
Expand Down Expand Up @@ -1134,4 +1160,116 @@ public String toString() {
return res.toString(); return res.toString();
} }
} }

@VisibleForTesting
static class ErasureCodingResult extends Result {
final String ecSchema;

ErasureCodingResult(Configuration conf) {
this(ErasureCodingSchemaManager.getSystemDefaultSchema());
}

ErasureCodingResult(ECSchema ecSchema) {
super(ecSchema.getNumDataUnits());
this.ecSchema = ecSchema.getSchemaName();
}

@Override
public String toString() {
StringBuilder res = new StringBuilder();
res.append(" Total size:\t").append(totalSize).append(" B");
if (totalOpenFilesSize != 0) {
res.append(" (Total open files size: ").append(totalOpenFilesSize)
.append(" B)");
}
res.append("\n Total files:\t").append(totalFiles);
if (totalOpenFiles != 0) {
res.append(" (Files currently being written: ").append(totalOpenFiles)
.append(")");
}
res.append("\n Total block groups (validated):\t").append(totalBlocks);
if (totalBlocks > 0) {
res.append(" (avg. block group size ").append((totalSize / totalBlocks))
.append(" B)");
}
if (totalOpenFilesBlocks != 0) {
res.append(" (Total open file block groups (not validated): ").append(
totalOpenFilesBlocks).append(")");
}
if (corruptFiles > 0 || numUnderMinReplicatedBlocks > 0) {
res.append("\n ********************************");
if(numUnderMinReplicatedBlocks>0){
res.append("\n UNRECOVERABLE BLOCK GROUPS:\t").append(numUnderMinReplicatedBlocks);
if(totalBlocks>0){
res.append(" (").append(
((float) (numUnderMinReplicatedBlocks * 100) / (float) totalBlocks))
.append(" %)");
}
res.append("\n ").append("MIN REQUIRED EC BLOCK:\t")
.append(minReplication);
}
if(corruptFiles>0) {
res.append(
"\n CORRUPT FILES:\t").append(corruptFiles);
if (missingSize > 0) {
res.append("\n MISSING BLOCK GROUPS:\t").append(missingIds.size()).append(
"\n MISSING SIZE:\t\t").append(missingSize).append(" B");
}
if (corruptBlocks > 0) {
res.append("\n CORRUPT BLOCK GROUPS: \t").append(corruptBlocks).append(
"\n CORRUPT SIZE:\t\t").append(corruptSize).append(" B");
}
}
res.append("\n ********************************");
}
res.append("\n Minimally erasure-coded block groups:\t").append(
numMinReplicatedBlocks);
if (totalBlocks > 0) {
res.append(" (").append(
((float) (numMinReplicatedBlocks * 100) / (float) totalBlocks))
.append(" %)");
}
res.append("\n Over-erasure-coded block groups:\t")
.append(numOverReplicatedBlocks);
if (totalBlocks > 0) {
res.append(" (").append(
((float) (numOverReplicatedBlocks * 100) / (float) totalBlocks))
.append(" %)");
}
res.append("\n Under-erasure-coded block groups:\t").append(
numUnderReplicatedBlocks);
if (totalBlocks > 0) {
res.append(" (").append(
((float) (numUnderReplicatedBlocks * 100) / (float) totalBlocks))
.append(" %)");
}
res.append("\n Unsatisfactory placement block groups:\t\t")
.append(numMisReplicatedBlocks);
if (totalBlocks > 0) {
res.append(" (").append(
((float) (numMisReplicatedBlocks * 100) / (float) totalBlocks))
.append(" %)");
}
res.append("\n Default schema:\t").append(ecSchema)
.append("\n Average block group size:\t").append(
getReplicationFactor()).append("\n Missing block groups:\t\t").append(
missingIds.size()).append("\n Corrupt block groups:\t\t").append(
corruptBlocks).append("\n Missing ec-blocks:\t\t").append(
missingReplicas);
if (totalReplicas > 0) {
res.append(" (").append(
((float) (missingReplicas * 100) / (float) numExpectedReplicas)).append(
" %)");
}
if (decommissionedReplicas > 0) {
res.append("\n Decommissioned ec-blocks:\t").append(
decommissionedReplicas);
}
if (decommissioningReplicas > 0) {
res.append("\n Decommissioning ec-blocks:\t").append(
decommissioningReplicas);
}
return res.toString();
}
}
} }

0 comments on commit 0ed92e5

Please sign in to comment.