Skip to content

Commit

Permalink
HDFS-12043. Add counters for block re-replication. Contributed by Che…
Browse files Browse the repository at this point in the history
…n Liang.
  • Loading branch information
arp7 committed Jun 30, 2017
1 parent a2f0cbd commit 6a9dc5f
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 7 deletions.
Expand Up @@ -1851,7 +1851,7 @@ boolean hasEnoughEffectiveReplicas(BlockInfo block,
(pendingReplicaNum > 0 || isPlacementPolicySatisfied(block));
}

private BlockReconstructionWork scheduleReconstruction(BlockInfo block,
BlockReconstructionWork scheduleReconstruction(BlockInfo block,
int priority) {
// skip abandoned block or block reopened for append
if (block.isDeleted() || !block.isCompleteOrCommitted()) {
Expand All @@ -1873,6 +1873,7 @@ private BlockReconstructionWork scheduleReconstruction(BlockInfo block,
if(srcNodes == null || srcNodes.length == 0) {
// block can not be reconstructed from any node
LOG.debug("Block {} cannot be reconstructed from any node", block);
NameNode.getNameNodeMetrics().incNumTimesReReplicationNotScheduled();
return null;
}

Expand All @@ -1885,6 +1886,7 @@ private BlockReconstructionWork scheduleReconstruction(BlockInfo block,
neededReconstruction.remove(block, priority);
blockLog.debug("BLOCK* Removing {} from neededReconstruction as" +
" it has enough replicas", block);
NameNode.getNameNodeMetrics().incNumTimesReReplicationNotScheduled();
return null;
}

Expand All @@ -1900,6 +1902,7 @@ private BlockReconstructionWork scheduleReconstruction(BlockInfo block,
if (block.isStriped()) {
if (pendingNum > 0) {
// Wait the previous reconstruction to finish.
NameNode.getNameNodeMetrics().incNumTimesReReplicationNotScheduled();
return null;
}

Expand Down Expand Up @@ -3727,8 +3730,8 @@ private long addBlock(BlockInfo block, List<BlockWithLocations> results) {
* The given node is reporting that it received a certain block.
*/
@VisibleForTesting
void addBlock(DatanodeStorageInfo storageInfo, Block block, String delHint)
throws IOException {
public void addBlock(DatanodeStorageInfo storageInfo, Block block,
String delHint) throws IOException {
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
// Decrement number of blocks scheduled to this datanode.
// for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with
Expand All @@ -3751,7 +3754,9 @@ void addBlock(DatanodeStorageInfo storageInfo, Block block, String delHint)
BlockInfo storedBlock = getStoredBlock(block);
if (storedBlock != null &&
block.getGenerationStamp() == storedBlock.getGenerationStamp()) {
pendingReconstruction.decrement(storedBlock, node);
if (pendingReconstruction.decrement(storedBlock, node)) {
NameNode.getNameNodeMetrics().incSuccessfulReReplications();
}
}
processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED,
delHintNode);
Expand Down
Expand Up @@ -30,6 +30,7 @@
import java.util.Map;

import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;

Expand Down Expand Up @@ -97,18 +98,22 @@ void increment(BlockInfo block, DatanodeDescriptor... targets) {
* for this block.
*
* @param dn The DataNode that finishes the reconstruction
* @return true if the block is decremented to 0 and got removed.
*/
void decrement(BlockInfo block, DatanodeDescriptor dn) {
boolean decrement(BlockInfo block, DatanodeDescriptor dn) {
boolean removed = false;
synchronized (pendingReconstructions) {
PendingBlockInfo found = pendingReconstructions.get(block);
if (found != null) {
LOG.debug("Removing pending reconstruction for {}", block);
found.decrementReplicas(dn);
if (found.getNumReplicas() <= 0) {
pendingReconstructions.remove(block);
removed = true;
}
}
}
return removed;
}

/**
Expand Down Expand Up @@ -263,6 +268,7 @@ void pendingReconstructionCheck() {
timedOutItems.add(block);
}
LOG.warn("PendingReconstructionMonitor timed out " + block);
NameNode.getNameNodeMetrics().incTimeoutReReplications();
iter.remove();
}
}
Expand Down
Expand Up @@ -58,6 +58,12 @@ public class NameNodeMetrics {
@Metric MutableCounterLong createSymlinkOps;
@Metric MutableCounterLong getLinkTargetOps;
@Metric MutableCounterLong filesInGetListingOps;
@Metric ("Number of successful re-replications")
MutableCounterLong successfulReReplications;
@Metric ("Number of times we failed to schedule a block re-replication.")
MutableCounterLong numTimesReReplicationNotScheduled;
@Metric("Number of timed out block re-replications")
MutableCounterLong timeoutReReplications;
@Metric("Number of allowSnapshot operations")
MutableCounterLong allowSnapshotOps;
@Metric("Number of disallowSnapshot operations")
Expand Down Expand Up @@ -300,6 +306,18 @@ public void incrTransactionsBatchedInSync(long count) {
transactionsBatchedInSync.incr(count);
}

public void incSuccessfulReReplications() {
successfulReReplications.incr();
}

public void incNumTimesReReplicationNotScheduled() {
numTimesReReplicationNotScheduled.incr();
}

public void incTimeoutReReplications() {
timeoutReReplications.incr();
}

public void addSync(long elapsed) {
syncs.add(elapsed);
for (MutableQuantiles q : syncsQuantiles) {
Expand Down
Expand Up @@ -17,13 +17,21 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.TimeoutException;

import com.google.common.base.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
Expand All @@ -44,6 +52,8 @@
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Test;
import org.mockito.Mockito;

Expand Down Expand Up @@ -178,7 +188,7 @@ public void testPendingReconstruction() {
public void testProcessPendingReconstructions() throws Exception {
final Configuration conf = new HdfsConfiguration();
conf.setLong(
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, TIMEOUT);
DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, TIMEOUT);
MiniDFSCluster cluster = null;
Block block;
BlockInfo blockInfo;
Expand Down Expand Up @@ -418,7 +428,7 @@ public void testPendingAndInvalidate() throws Exception {
CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFS_REPLICATION_INTERVAL);
CONF.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
CONF.setInt(DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
DFS_REPLICATION_INTERVAL);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(
DATANODE_COUNT).build();
Expand Down Expand Up @@ -471,4 +481,80 @@ public void testPendingAndInvalidate() throws Exception {
cluster.shutdown();
}
}

/**
* Test the metric counters of the re-replication process.
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
*/
@Test (timeout = 300000)
public void testReplicationCounter() throws IOException,
InterruptedException, TimeoutException {
HdfsConfiguration conf = new HdfsConfiguration();
conf.setInt(DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
conf.setInt(DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, 1);
MiniDFSCluster tmpCluster = new MiniDFSCluster.Builder(conf).numDataNodes(
DATANODE_COUNT).build();
tmpCluster.waitActive();
FSNamesystem fsn = tmpCluster.getNamesystem(0);
fsn.writeLock();

try {
BlockManager bm = fsn.getBlockManager();
BlocksMap blocksMap = bm.blocksMap;

// create three blockInfo below, blockInfo0 will success, blockInfo1 will
// time out, blockInfo2 will fail the replication.
BlockCollection bc0 = Mockito.mock(BlockCollection.class);
BlockInfo blockInfo0 = new BlockInfoContiguous((short) 3);
blockInfo0.setBlockId(0);

BlockCollection bc1 = Mockito.mock(BlockCollection.class);
BlockInfo blockInfo1 = new BlockInfoContiguous((short) 3);
blockInfo1.setBlockId(1);

BlockCollection bc2 = Mockito.mock(BlockCollection.class);
BlockInfo blockInfo2 = new BlockInfoContiguous((short) 3);
blockInfo2.setBlockId(2);

blocksMap.addBlockCollection(blockInfo0, bc0);
blocksMap.addBlockCollection(blockInfo1, bc1);
blocksMap.addBlockCollection(blockInfo2, bc2);

PendingReconstructionBlocks pending = bm.pendingReconstruction;

MetricsRecordBuilder rb = getMetrics("NameNodeActivity");
assertCounter("SuccessfulReReplications", 0L, rb);
assertCounter("NumTimesReReplicationNotScheduled", 0L, rb);
assertCounter("TimeoutReReplications", 0L, rb);

// add block0 and block1 to pending queue.
pending.increment(blockInfo0);
pending.increment(blockInfo1);

// call addBlock on block0 will make it successfully replicated.
// not calling addBlock on block1 will make it timeout later.
DatanodeStorageInfo[] storageInfos =
DFSTestUtil.createDatanodeStorageInfos(1);
bm.addBlock(storageInfos[0], blockInfo0, null);

// call schedule replication on blockInfo2 will fail the re-replication.
// because there is no source data to replicate from.
bm.scheduleReconstruction(blockInfo2, 0);

GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
MetricsRecordBuilder rb = getMetrics("NameNodeActivity");
return getLongCounter("SuccessfulReReplications", rb) == 1 &&
getLongCounter("NumTimesReReplicationNotScheduled", rb) == 1 &&
getLongCounter("TimeoutReReplications", rb) == 1;
}
}, 100, 60000);
} finally {
tmpCluster.shutdown();
fsn.writeUnlock();
}
}
}

0 comments on commit 6a9dc5f

Please sign in to comment.