Skip to content
Browse files

HDFS-1295. Port to yahoo-merge branch. (mattf)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/yahoo-merge@1134449 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
1 parent 8f3a282 commit 97f02dbcdf202979987cbb9b27bdd46d0e068094 Matthew J. Foley committed Jun 10, 2011
View
3 CHANGES.txt
@@ -378,6 +378,9 @@ Trunk (unreleased changes)
HDFS-1905. Improve namenode -format command by not making -clusterId
parameter mandatory. (Bharath Mundlapudi via suresh)
+ HDFS-1295. Improve namenode restart times by short-circuiting the
+ first block reports from datanodes. (Matt Foley via suresh)
+
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
View
29 src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -909,27 +909,30 @@ void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
* @throws IOException
*/
DatanodeCommand blockReport() throws IOException {
- // send block report
+ // send block report if timer has expired.
DatanodeCommand cmd = null;
long startTime = now();
if (startTime - lastBlockReport > blockReportInterval) {
- //
- // Send latest block report if timer has expired.
- // Get back a list of local block(s) that are obsolete
- // and can be safely GC'ed.
- //
- long brStartTime = now();
+
+ // Create block report
+ long brCreateStartTime = now();
BlockListAsLongs bReport = data.getBlockReport(blockPoolId);
+
+ // Send block report
+ long brSendStartTime = now();
cmd = bpNamenode.blockReport(bpRegistration, blockPoolId, bReport
.getBlockListAsLongs());
- long brTime = now() - brStartTime;
- metrics.addBlockReport(brTime);
- LOG.info("BlockReport of " + bReport.getNumberOfBlocks() +
- " blocks got processed in " + brTime + " msecs");
- //
+
+ // Log the block report processing stats from Datanode perspective
+ long brSendCost = now() - brSendStartTime;
+ long brCreateCost = brSendStartTime - brCreateStartTime;
+ metrics.addBlockReport(brSendCost);
+ LOG.info("BlockReport of " + bReport.getNumberOfBlocks()
+ + " blocks took " + brCreateCost + " msec to generate and "
+ + brSendCost + " msecs for RPC and NN processing");
+
// If we have sent the first block report, then wait a random
// time before we start the periodic block reports.
- //
if (resetBlockReportTime) {
lastBlockReport = startTime - R.nextInt((int)(blockReportInterval));
resetBlockReportTime = false;
View
399 src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
@@ -35,12 +35,13 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas;
-import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
+import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks;
import org.apache.hadoop.hdfs.DFSConfigKeys;
/**
@@ -759,8 +760,8 @@ int computeReplicationWork(int blocksToProcess) throws IOException {
}
// Go through all blocks that need replications.
- BlockIterator neededReplicationsIterator = neededReplications
- .iterator();
+ UnderReplicatedBlocks.BlockIterator neededReplicationsIterator =
+ neededReplications.iterator();
// skip to the first unprocessed block, which is at replIndex
for (int i = 0; i < replIndex && neededReplicationsIterator.hasNext(); i++) {
neededReplicationsIterator.next();
@@ -1057,26 +1058,57 @@ void processPendingReplications() {
}
/**
+ * StatefulBlockInfo is used to build the "toUC" list, which is a list of
+ * updates to the information about under-construction blocks.
+ * Besides the block in question, it provides the ReplicaState
+ * reported by the datanode in the block report.
+ */
+ private static class StatefulBlockInfo {
+ final BlockInfoUnderConstruction storedBlock;
+ final ReplicaState reportedState;
+
+ StatefulBlockInfo(BlockInfoUnderConstruction storedBlock,
+ ReplicaState reportedState) {
+ this.storedBlock = storedBlock;
+ this.reportedState = reportedState;
+ }
+ }
+
+ /**
* The given node is reporting all its blocks. Use this info to
- * update the (machine-->blocklist) and (block-->machinelist) tables.
+ * update the (datanode-->blocklist) and (block-->nodelist) tables.
*/
public void processReport(DatanodeDescriptor node,
BlockListAsLongs report) throws IOException {
- //
+
+ boolean isFirstBlockReport = (node.numBlocks() == 0);
+ if (isFirstBlockReport) {
+ // Initial block reports can be processed a lot more efficiently than
+ // ordinary block reports. This shortens NN restart times.
+ processFirstBlockReport(node, report);
+ return;
+ }
+
+ // Normal case:
// Modify the (block-->datanode) map, according to the difference
// between the old and new block report.
//
- Collection<Block> toAdd = new LinkedList<Block>();
+ Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
Collection<Block> toRemove = new LinkedList<Block>();
Collection<Block> toInvalidate = new LinkedList<Block>();
Collection<BlockInfo> toCorrupt = new LinkedList<BlockInfo>();
- node.reportDiff(this, report, toAdd, toRemove, toInvalidate, toCorrupt);
+ Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
+ reportDiff(node, report, toAdd, toRemove, toInvalidate, toCorrupt, toUC);
+ // Process the blocks on each queue
+ for (StatefulBlockInfo b : toUC) {
+ addStoredBlockUnderConstruction(b.storedBlock, node, b.reportedState);
+ }
for (Block b : toRemove) {
removeStoredBlock(b, node);
}
- for (Block b : toAdd) {
- addStoredBlock(b, node, null);
+ for (BlockInfo b : toAdd) {
+ addStoredBlock(b, node, null, true);
}
for (Block b : toInvalidate) {
NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: block "
@@ -1090,16 +1122,286 @@ public void processReport(DatanodeDescriptor node,
}
/**
+ * processFirstBlockReport is intended only for processing "initial" block
+ * reports, the first block report received from a DN after it registers.
+ * It just adds all the valid replicas to the datanode, without calculating
+ * a toRemove list (since there won't be any). It also silently discards
+ * any invalid blocks, thereby deferring their processing until
+ * the next block report.
+ * @param node - DatanodeDescriptor of the node that sent the report
+ * @param report - the initial block report, to be processed
+ * @throws IOException
+ */
+ void processFirstBlockReport(DatanodeDescriptor node, BlockListAsLongs report)
+ throws IOException {
+ if (report == null) return;
+ assert (namesystem.hasWriteLock());
+ assert (node.numBlocks() == 0);
+ BlockReportIterator itBR = report.getBlockReportIterator();
+
+ while(itBR.hasNext()) {
+ Block iblk = itBR.next();
+ ReplicaState reportedState = itBR.getCurrentReplicaState();
+ BlockInfo storedBlock = blocksMap.getStoredBlock(iblk);
+ // If block does not belong to any file, we are done.
+ if (storedBlock == null) continue;
+
+ // If block is corrupt, mark it and continue to next block.
+ BlockUCState ucState = storedBlock.getBlockUCState();
+ if (isReplicaCorrupt(iblk, reportedState, storedBlock, ucState, node)) {
+ markBlockAsCorrupt(storedBlock, node);
+ continue;
+ }
+
+ // If block is under construction, add this replica to its list
+ if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
+ ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
+ node, iblk, reportedState);
+ //and fall through to next clause
+ }
+ //add replica if appropriate
+ if (reportedState == ReplicaState.FINALIZED) {
+ addStoredBlockImmediate(storedBlock, node);
+ }
+ }
+ }
+
+ void reportDiff(DatanodeDescriptor dn,
+ BlockListAsLongs newReport,
+ Collection<BlockInfo> toAdd, // add to DatanodeDescriptor
+ Collection<Block> toRemove, // remove from DatanodeDescriptor
+ Collection<Block> toInvalidate, // should be removed from DN
+ Collection<BlockInfo> toCorrupt, // add to corrupt replicas
+ Collection<StatefulBlockInfo> toUC) { // add to under-construction list
+ // place a delimiter in the list which separates blocks
+ // that have been reported from those that have not
+ BlockInfo delimiter = new BlockInfo(new Block(), 1);
+ boolean added = dn.addBlock(delimiter);
+ assert added : "Delimiting block cannot be present in the node";
+ if(newReport == null)
+ newReport = new BlockListAsLongs();
+ // scan the report and process newly reported blocks
+ BlockReportIterator itBR = newReport.getBlockReportIterator();
+ while(itBR.hasNext()) {
+ Block iblk = itBR.next();
+ ReplicaState iState = itBR.getCurrentReplicaState();
+ BlockInfo storedBlock = processReportedBlock(dn, iblk, iState,
+ toAdd, toInvalidate, toCorrupt, toUC);
+ // move block to the head of the list
+ if(storedBlock != null && storedBlock.findDatanode(dn) >= 0)
+ dn.moveBlockToHead(storedBlock);
+ }
+ // collect blocks that have not been reported
+ // all of them are next to the delimiter
+ Iterator<? extends Block> it = new DatanodeDescriptor.BlockIterator(
+ delimiter.getNext(0), dn);
+ while(it.hasNext())
+ toRemove.add(it.next());
+ dn.removeBlock(delimiter);
+ }
+
+ /**
+ * Process a block replica reported by the data-node.
+ * No side effects except adding to the passed-in Collections.
+ *
+ * <ol>
+ * <li>If the block is not known to the system (not in blocksMap) then the
+ * data-node should be notified to invalidate this block.</li>
+ * <li>If the reported replica is valid that is has the same generation stamp
+ * and length as recorded on the name-node, then the replica location should
+ * be added to the name-node.</li>
+ * <li>If the reported replica is not valid, then it is marked as corrupt,
+ * which triggers replication of the existing valid replicas.
+ * Corrupt replicas are removed from the system when the block
+ * is fully replicated.</li>
+ * <li>If the reported replica is for a block currently marked "under
+ * construction" in the NN, then it should be added to the
+ * BlockInfoUnderConstruction's list of replicas.</li>
+ * </ol>
+ *
+ * @param dn descriptor for the datanode that made the report
+ * @param block reported block replica
+ * @param reportedState reported replica state
+ * @param toAdd add to DatanodeDescriptor
+ * @param toInvalidate missing blocks (not in the blocks map)
+ * should be removed from the data-node
+ * @param toCorrupt replicas with unexpected length or generation stamp;
+ * add to corrupt replicas
+ * @param toUC replicas of blocks currently under construction
+ * @return
+ */
+ BlockInfo processReportedBlock(DatanodeDescriptor dn,
+ Block block, ReplicaState reportedState,
+ Collection<BlockInfo> toAdd,
+ Collection<Block> toInvalidate,
+ Collection<BlockInfo> toCorrupt,
+ Collection<StatefulBlockInfo> toUC) {
+
+ if(FSNamesystem.LOG.isDebugEnabled()) {
+ FSNamesystem.LOG.debug("Reported block " + block
+ + " on " + dn.getName() + " size " + block.getNumBytes()
+ + " replicaState = " + reportedState);
+ }
+
+ // find block by blockId
+ BlockInfo storedBlock = blocksMap.getStoredBlock(block);
+ if(storedBlock == null) {
+ // If blocksMap does not contain reported block id,
+ // the replica should be removed from the data-node.
+ toInvalidate.add(new Block(block));
+ return null;
+ }
+ BlockUCState ucState = storedBlock.getBlockUCState();
+
+ // Block is on the NN
+ if(FSNamesystem.LOG.isDebugEnabled()) {
+ FSNamesystem.LOG.debug("In memory blockUCState = " + ucState);
+ }
+
+ // Ignore replicas already scheduled to be removed from the DN
+ if(belongsToInvalidates(dn.getStorageID(), block)) {
+ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
+ + " in recentInvalidatesSet should not appear in DN " + dn;
+ return storedBlock;
+ }
+
+ if (isReplicaCorrupt(block, reportedState, storedBlock, ucState, dn)) {
+ toCorrupt.add(storedBlock);
+ return storedBlock;
+ }
+
+ if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
+ toUC.add(new StatefulBlockInfo(
+ (BlockInfoUnderConstruction)storedBlock, reportedState));
+ return storedBlock;
+ }
+
+ //add replica if appropriate
+ if (reportedState == ReplicaState.FINALIZED
+ && storedBlock.findDatanode(dn) < 0) {
+ toAdd.add(storedBlock);
+ }
+ return storedBlock;
+ }
+
+ /*
+ * The next two methods test the various cases under which we must conclude
+ * the replica is corrupt, or under construction. These are laid out
+ * as switch statements, on the theory that it is easier to understand
+ * the combinatorics of reportedState and ucState that way. It should be
+ * at least as efficient as boolean expressions.
+ */
+ private boolean isReplicaCorrupt(Block iblk, ReplicaState reportedState,
+ BlockInfo storedBlock, BlockUCState ucState,
+ DatanodeDescriptor dn) {
+ switch(reportedState) {
+ case FINALIZED:
+ switch(ucState) {
+ case COMPLETE:
+ case COMMITTED:
+ return (storedBlock.getGenerationStamp() != iblk.getGenerationStamp()
+ || storedBlock.getNumBytes() != iblk.getNumBytes());
+ default:
+ return false;
+ }
+ case RBW:
+ case RWR:
+ return storedBlock.isComplete();
+ case RUR: // should not be reported
+ case TEMPORARY: // should not be reported
+ default:
+ FSNamesystem.LOG.warn("Unexpected replica state " + reportedState
+ + " for block: " + storedBlock +
+ " on " + dn.getName() + " size " + storedBlock.getNumBytes());
+ return true;
+ }
+ }
+
+ private boolean isBlockUnderConstruction(BlockInfo storedBlock,
+ BlockUCState ucState, ReplicaState reportedState) {
+ switch(reportedState) {
+ case FINALIZED:
+ switch(ucState) {
+ case UNDER_CONSTRUCTION:
+ case UNDER_RECOVERY:
+ return true;
+ default:
+ return false;
+ }
+ case RBW:
+ case RWR:
+ return (!storedBlock.isComplete());
+ case RUR: // should not be reported
+ case TEMPORARY: // should not be reported
+ default:
+ return false;
+ }
+ }
+
+ void addStoredBlockUnderConstruction(
+ BlockInfoUnderConstruction block,
+ DatanodeDescriptor node,
+ ReplicaState reportedState)
+ throws IOException {
+ block.addReplicaIfNotPresent(node, block, reportedState);
+ if (reportedState == ReplicaState.FINALIZED && block.findDatanode(node) < 0) {
+ addStoredBlock(block, node, null, true);
+ }
+ }
+
+ /**
+ * Faster version of {@link addStoredBlock()}, intended for use with
+ * initial block report at startup. If not in startup safe mode, will
+ * call standard addStoredBlock().
+ * Assumes this method is called "immediately" so there is no need to
+ * refresh the storedBlock from blocksMap.
+ * Doesn't handle underReplication/overReplication, or worry about
+ * pendingReplications or corruptReplicas, because it's in startup safe mode.
+ * Doesn't log every block, because there are typically millions of them.
+ * @throws IOException
+ */
+ private void addStoredBlockImmediate(BlockInfo storedBlock,
+ DatanodeDescriptor node)
+ throws IOException {
+ assert (storedBlock != null && namesystem.hasWriteLock());
+ if (!namesystem.isInStartupSafeMode()) {
+ addStoredBlock(storedBlock, node, null, false);
+ return;
+ }
+
+ // just add it
+ node.addBlock(storedBlock);
+
+ // Now check for completion of blocks and safe block count
+ int numCurrentReplica = countLiveNodes(storedBlock);
+ if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED
+ && numCurrentReplica >= minReplication)
+ storedBlock = completeBlock(storedBlock.getINode(), storedBlock);
+
+ // check whether safe replication is reached for the block
+ // only complete blocks are counted towards that
+ if(storedBlock.isComplete())
+ namesystem.incrementSafeBlockCount(numCurrentReplica);
+ }
+
+ /**
* Modify (block-->datanode) map. Remove block from set of
* needed replications if this takes care of the problem.
* @return the block that is stored in blockMap.
*/
- private Block addStoredBlock(final Block block,
+ private Block addStoredBlock(final BlockInfo block,
DatanodeDescriptor node,
- DatanodeDescriptor delNodeHint)
+ DatanodeDescriptor delNodeHint,
+ boolean logEveryBlock)
throws IOException {
- assert (namesystem.hasWriteLock());
- BlockInfo storedBlock = blocksMap.getStoredBlock(block);
+ assert (block != null && namesystem.hasWriteLock());
+ BlockInfo storedBlock;
+ if (block instanceof BlockInfoUnderConstruction) {
+ //refresh our copy in case the block got completed in another thread
+ storedBlock = blocksMap.getStoredBlock(block);
+ } else {
+ storedBlock = block;
+ }
if (storedBlock == null || storedBlock.getINode() == null) {
// If this block does not belong to anyfile, then we are done.
NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
@@ -1115,29 +1417,25 @@ private Block addStoredBlock(final Block block,
INodeFile fileINode = storedBlock.getINode();
assert fileINode != null : "Block must belong to a file";
- // add block to the data-node
+ // add block to the datanode
boolean added = node.addBlock(storedBlock);
- int curReplicaDelta = 0;
+ int curReplicaDelta;
if (added) {
curReplicaDelta = 1;
- //
- // At startup time, because too many new blocks come in
- // they take up lots of space in the log file.
- // So, we log only when namenode is out of safemode.
- //
- if (!namesystem.isInSafeMode()) {
+ if (logEveryBlock) {
NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
+ "blockMap updated: " + node.getName() + " is added to " +
storedBlock + " size " + storedBlock.getNumBytes());
}
} else {
+ curReplicaDelta = 0;
NameNode.stateChangeLog.warn("BLOCK* NameSystem.addStoredBlock: "
+ "Redundant addStoredBlock request received for " + storedBlock
+ " on " + node.getName() + " size " + storedBlock.getNumBytes());
}
- // filter out containingNodes that are marked for decommission.
+ // Now check for completion of blocks and safe block count
NumberReplicas num = countNodes(storedBlock);
int numLiveReplicas = num.liveReplicas();
int numCurrentReplica = numLiveReplicas
@@ -1149,18 +1447,19 @@ private Block addStoredBlock(final Block block,
// check whether safe replication is reached for the block
// only complete blocks are counted towards that
+ // Is no-op if not in safe mode.
if(storedBlock.isComplete())
namesystem.incrementSafeBlockCount(numCurrentReplica);
- // if file is under construction, then check whether the block
- // can be completed
+ // if file is under construction, then done for now
if (fileINode.isUnderConstruction()) {
return storedBlock;
}
- // do not handle mis-replicated blocks during startup
- if (namesystem.isInSafeMode())
+ // do not try to handle over/under-replicated blocks during safe mode
+ if (namesystem.isInSafeMode()) {
return storedBlock;
+ }
// handle underReplication/overReplication
short fileReplication = fileINode.getReplication();
@@ -1395,18 +1694,22 @@ void addBlock(DatanodeDescriptor node, Block block, String delHint)
pendingReplications.remove(block);
// blockReceived reports a finalized block
- Collection<Block> toAdd = new LinkedList<Block>();
+ Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
Collection<Block> toInvalidate = new LinkedList<Block>();
Collection<BlockInfo> toCorrupt = new LinkedList<BlockInfo>();
- node.processReportedBlock(this, block, ReplicaState.FINALIZED,
- toAdd, toInvalidate, toCorrupt);
- // the block is only in one of the lists
+ Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
+ processReportedBlock(node, block, ReplicaState.FINALIZED,
+ toAdd, toInvalidate, toCorrupt, toUC);
+ // the block is only in one of the to-do lists
// if it is in none then data-node already has it
- assert toAdd.size() + toInvalidate.size() <= 1 :
- "The block should be only in one of the lists.";
+ assert toUC.size() + toAdd.size() + toInvalidate.size() + toCorrupt.size() <= 1
+ : "The block should be only in one of the lists.";
- for (Block b : toAdd) {
- addStoredBlock(b, node, delHintNode);
+ for (StatefulBlockInfo b : toUC) {
+ addStoredBlockUnderConstruction(b.storedBlock, node, b.reportedState);
+ }
+ for (BlockInfo b : toAdd) {
+ addStoredBlock(b, node, delHintNode, true);
}
for (Block b : toInvalidate) {
NameNode.stateChangeLog.info("BLOCK* NameSystem.addBlock: block "
@@ -1448,6 +1751,32 @@ NumberReplicas countNodes(Block b) {
return new NumberReplicas(live, count, corrupt, excess);
}
+ /**
+ * Simpler, faster form of {@link countNodes()} that only returns the number
+ * of live nodes. If in startup safemode (or its 30-sec extension period),
+ * then it gains speed by ignoring issues of excess replicas or nodes
+ * that are decommissioned or in process of becoming decommissioned.
+ * If not in startup, then it calls {@link countNodes()} instead.
+ *
+ * @param b - the block being tested
+ * @return count of live nodes for this block
+ */
+ int countLiveNodes(BlockInfo b) {
+ if (!namesystem.isInStartupSafeMode()) {
+ return countNodes(b).liveReplicas();
+ }
+ // else proceed with fast case
+ int live = 0;
+ Iterator<DatanodeDescriptor> nodeIter = blocksMap.nodeIterator(b);
+ Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
+ while (nodeIter.hasNext()) {
+ DatanodeDescriptor node = nodeIter.next();
+ if ((nodesCorrupt == null) || (!nodesCorrupt.contains(node)))
+ live++;
+ }
+ return live;
+ }
+
private void logBlockReplicationInfo(Block block, DatanodeDescriptor srcNode,
NumberReplicas num) {
int curReplicas = num.liveReplicas();
@@ -1783,7 +2112,7 @@ int getCapacity() {
/**
* Return an iterator over the set of blocks for which there are no replicas.
*/
- BlockIterator getCorruptReplicaBlockIterator() {
+ UnderReplicatedBlocks.BlockIterator getCorruptReplicaBlockIterator() {
return neededReplications
.iterator(UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
}
View
140 src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
@@ -24,11 +24,8 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hdfs.DeprecatedUTF8;
import org.apache.hadoop.io.WritableUtils;
@@ -280,7 +277,7 @@ void updateHeartbeat(long capacity, long dfsUsed, long remaining,
/**
* Iterates over the list of blocks belonging to the datanode.
*/
- static private class BlockIterator implements Iterator<BlockInfo> {
+ static class BlockIterator implements Iterator<BlockInfo> {
private BlockInfo current;
private DatanodeDescriptor node;
@@ -414,141 +411,6 @@ int getNumberOfBlocksToBeInvalidated() {
return blockarray;
}
- void reportDiff(BlockManager blockManager,
- BlockListAsLongs newReport,
- Collection<Block> toAdd, // add to DatanodeDescriptor
- Collection<Block> toRemove, // remove from DatanodeDescriptor
- Collection<Block> toInvalidate, // should be removed from DN
- Collection<BlockInfo> toCorrupt) {// add to corrupt replicas
- // place a delimiter in the list which separates blocks
- // that have been reported from those that have not
- BlockInfo delimiter = new BlockInfo(new Block(), 1);
- boolean added = this.addBlock(delimiter);
- assert added : "Delimiting block cannot be present in the node";
- if(newReport == null)
- newReport = new BlockListAsLongs();
- // scan the report and process newly reported blocks
- BlockReportIterator itBR = newReport.getBlockReportIterator();
- while(itBR.hasNext()) {
- Block iblk = itBR.next();
- ReplicaState iState = itBR.getCurrentReplicaState();
- BlockInfo storedBlock = processReportedBlock(blockManager, iblk, iState,
- toAdd, toInvalidate, toCorrupt);
- // move block to the head of the list
- if(storedBlock != null && storedBlock.findDatanode(this) >= 0)
- this.moveBlockToHead(storedBlock);
- }
- // collect blocks that have not been reported
- // all of them are next to the delimiter
- Iterator<? extends Block> it = new BlockIterator(delimiter.getNext(0),this);
- while(it.hasNext())
- toRemove.add(it.next());
- this.removeBlock(delimiter);
- }
-
- /**
- * Process a block replica reported by the data-node.
- *
- * <ol>
- * <li>If the block is not known to the system (not in blocksMap) then the
- * data-node should be notified to invalidate this block.</li>
- * <li>If the reported replica is valid that is has the same generation stamp
- * and length as recorded on the name-node, then the replica location is
- * added to the name-node.</li>
- * <li>If the reported replica is not valid, then it is marked as corrupt,
- * which triggers replication of the existing valid replicas.
- * Corrupt replicas are removed from the system when the block
- * is fully replicated.</li>
- * </ol>
- *
- * @param blockManager
- * @param block reported block replica
- * @param rState reported replica state
- * @param toAdd add to DatanodeDescriptor
- * @param toInvalidate missing blocks (not in the blocks map)
- * should be removed from the data-node
- * @param toCorrupt replicas with unexpected length or generation stamp;
- * add to corrupt replicas
- * @return
- */
- BlockInfo processReportedBlock(
- BlockManager blockManager,
- Block block, // reported block replica
- ReplicaState rState, // reported replica state
- Collection<Block> toAdd, // add to DatanodeDescriptor
- Collection<Block> toInvalidate, // should be removed from DN
- Collection<BlockInfo> toCorrupt) {// add to corrupt replicas
- if(FSNamesystem.LOG.isDebugEnabled()) {
- FSNamesystem.LOG.debug("Reported block " + block
- + " on " + getName() + " size " + block.getNumBytes()
- + " replicaState = " + rState);
- }
-
- // find block by blockId
- BlockInfo storedBlock = blockManager.blocksMap.getStoredBlock(block);
- if(storedBlock == null) {
- // If blocksMap does not contain reported block id,
- // the replica should be removed from the data-node.
- toInvalidate.add(new Block(block));
- return null;
- }
-
- if(FSNamesystem.LOG.isDebugEnabled()) {
- FSNamesystem.LOG.debug("In memory blockUCState = " +
- storedBlock.getBlockUCState());
- }
-
- // Ignore replicas already scheduled to be removed from the DN
- if(blockManager.belongsToInvalidates(getStorageID(), block)) {
- assert storedBlock.findDatanode(this) < 0 : "Block " + block
- + " in recentInvalidatesSet should not appear in DN " + this;
- return storedBlock;
- }
-
- // Block is on the DN
- boolean isCorrupt = false;
- switch(rState) {
- case FINALIZED:
- switch(storedBlock.getBlockUCState()) {
- case COMPLETE:
- case COMMITTED:
- if(storedBlock.getGenerationStamp() != block.getGenerationStamp()
- || storedBlock.getNumBytes() != block.getNumBytes())
- isCorrupt = true;
- break;
- case UNDER_CONSTRUCTION:
- case UNDER_RECOVERY:
- ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
- this, block, rState);
- }
- if(!isCorrupt && storedBlock.findDatanode(this) < 0)
- if (storedBlock.getNumBytes() != block.getNumBytes()) {
- toAdd.add(new Block(block));
- } else {
- toAdd.add(storedBlock);
- }
- break;
- case RBW:
- case RWR:
- if(!storedBlock.isComplete())
- ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
- this, block, rState);
- else
- isCorrupt = true;
- break;
- case RUR: // should not be reported
- case TEMPORARY: // should not be reported
- default:
- FSNamesystem.LOG.warn("Unexpected replica state " + rState
- + " for block: " + storedBlock +
- " on " + getName() + " size " + storedBlock.getNumBytes());
- break;
- }
- if(isCorrupt)
- toCorrupt.add(storedBlock);
- return storedBlock;
- }
-
/** Serialization for FSEditLog */
void readFieldsFromFSEditLog(DataInput in) throws IOException {
this.name = DeprecatedUTF8.readString(in);
View
26 src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -3138,18 +3138,14 @@ void heartbeatCheck() {
*/
public void processReport(DatanodeID nodeID, String poolId,
BlockListAsLongs newReport) throws IOException {
-
+ long startTime, endTime;
+
writeLock();
+ startTime = now(); //after acquiring write lock
try {
- long startTime = now();
- if (NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: "
- + "from " + nodeID.getName()+" " +
- newReport.getNumberOfBlocks()+" blocks");
- }
DatanodeDescriptor node = getDatanode(nodeID);
if (node == null || !node.isAlive) {
- throw new IOException("ProcessReport from dead or unregisterted node: "
+ throw new IOException("ProcessReport from dead or unregistered node: "
+ nodeID.getName());
}
// To minimize startup time, we discard any second (or later) block reports
@@ -3162,10 +3158,16 @@ public void processReport(DatanodeID nodeID, String poolId,
}
blockManager.processReport(node, newReport);
- NameNode.getNameNodeMetrics().addBlockReport((int) (now() - startTime));
} finally {
+ endTime = now();
writeUnlock();
}
+
+ // Log the block report processing stats from Namenode perspective
+ NameNode.getNameNodeMetrics().addBlockReport((int) (endTime - startTime));
+ NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: from "
+ + nodeID.getName() + ", blocks: " + newReport.getNumberOfBlocks()
+ + ", processing time: " + (endTime - startTime) + " msecs");
}
/**
@@ -3932,7 +3934,13 @@ synchronized void leave(boolean checkForUpgrades) {
}
}
// verify blocks replications
+ long startTimeMisReplicatedScan = now();
blockManager.processMisReplicatedBlocks();
+ NameNode.stateChangeLog.info("STATE* Replication Queue initialization "
+ + "scan for invalid, over- and under-replicated blocks "
+ + "completed in " + (now() - startTimeMisReplicatedScan)
+ + " msec");
+
long timeInSafemode = now() - systemStart;
NameNode.stateChangeLog.info("STATE* Leaving safe mode after "
+ timeInSafemode/1000 + " secs.");

0 comments on commit 97f02db

Please sign in to comment.
Something went wrong with that request. Please try again.