Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,22 @@ public long getNumTimedOutPendingReconstructions() {
return pendingReconstruction.getNumTimedOuts();
}

/** Used by metrics.
*
* @return The number of pending replicated blocks.
*/
public long getNumReplicatedPendingBlocks() {
return pendingReconstruction.getNumReplicatedPendingBlocks();
}

/** Used by metrics.
*
* @return The number of pending EC blocks.
*/
public long getNumEcPendingBlocks() {
return pendingReconstruction.getNumEcPendingBlocks();
}

/** Used by metrics. */
public long getLowRedundancyBlocks() {
return neededReconstruction.getLowRedundancyBlocks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ class PendingReconstructionBlocks {
Daemon timerThread = null;
private volatile boolean fsRunning = true;
private long timedOutCount = 0L;
private long replicatedPendingNum = 0L;
private long ecPendingNum = 0L;
private long replicatedTimedOutNum = 0L;
private long ecTimedOutNum = 0L;

//
// It might take anywhere between 5 to 10 minutes before
Expand Down Expand Up @@ -86,6 +90,11 @@ void increment(BlockInfo block, DatanodeStorageInfo... targets) {
PendingBlockInfo found = pendingReconstructions.get(block);
if (found == null) {
pendingReconstructions.put(block, new PendingBlockInfo(targets));
if (block.isStriped()) {
ecPendingNum++;
} else {
replicatedPendingNum++;
}
} else {
found.incrementReplicas(targets);
found.setTimeStamp();
Expand All @@ -111,6 +120,11 @@ boolean decrement(BlockInfo block, DatanodeStorageInfo dn) {
if (found.getNumReplicas() <= 0) {
pendingReconstructions.remove(block);
removed = true;
if (block.isStriped()) {
ecPendingNum--;
} else {
replicatedPendingNum--;
}
}
}
}
Expand All @@ -126,6 +140,14 @@ boolean decrement(BlockInfo block, DatanodeStorageInfo dn) {
*/
PendingBlockInfo remove(BlockInfo block) {
synchronized (pendingReconstructions) {
PendingBlockInfo found = pendingReconstructions.get(block);
if (found != null && found.getNumReplicas() > 0) {
if (block.isStriped()) {
ecPendingNum--;
} else {
replicatedPendingNum--;
}
}
return pendingReconstructions.remove(block);
}
}
Expand All @@ -137,6 +159,10 @@ public void clear() {
timedOutItems.clear();
}
timedOutCount = 0L;
ecPendingNum = 0L;
replicatedPendingNum = 0L;
replicatedTimedOutNum = 0L;
ecTimedOutNum = 0L;
}
}

Expand Down Expand Up @@ -172,6 +198,32 @@ long getNumTimedOuts() {
}
}

/**
* Used for metrics.
*
* @return The number of pending replicated blocks.
*/
long getNumReplicatedPendingBlocks() {
synchronized (pendingReconstructions) {
synchronized (timedOutItems) {
return replicatedPendingNum;
}
}
}

/**
* Used for metrics.
*
* @return The number of pending EC blocks.
*/
long getNumEcPendingBlocks() {
synchronized (pendingReconstructions) {
synchronized (timedOutItems) {
return ecPendingNum;
}
}
}

/**
* Returns a list of blocks that have timed out their
* reconstruction requests. Returns null if no blocks have
Expand All @@ -187,6 +239,10 @@ BlockInfo[] getTimedOutBlocks() {
new BlockInfo[size]);
timedOutItems.clear();
timedOutCount += size;
ecPendingNum -= ecTimedOutNum;
replicatedPendingNum -= replicatedTimedOutNum;
ecTimedOutNum = 0L;
replicatedTimedOutNum = 0L;
return blockList;
}
}
Expand Down Expand Up @@ -279,6 +335,11 @@ void pendingReconstructionCheck() {
BlockInfo block = entry.getKey();
synchronized (timedOutItems) {
timedOutItems.add(block);
if (block.isStriped()) {
ecTimedOutNum++;
} else {
replicatedTimedOutNum++;
}
}
LOG.warn("PendingReconstructionMonitor timed out " + block);
NameNode.getNameNodeMetrics().incTimeoutReReplications();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4737,7 +4737,8 @@ public long getLastCheckpointTime() {
long[] getStats() {
final long[] stats = datanodeStatistics.getStats();
stats[ClientProtocol.GET_STATS_LOW_REDUNDANCY_IDX] =
getLowRedundancyBlocks();
getLowRedundancyBlocks() +
getPendingReplicatedBlocks() + getPendingEcBlocks();
stats[ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX] =
getCorruptReplicaBlocks();
stats[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX] =
Expand All @@ -4758,10 +4759,10 @@ long[] getStats() {
* @see ClientProtocol#getReplicatedBlockStats()
*/
ReplicatedBlockStats getReplicatedBlockStats() {
return new ReplicatedBlockStats(getLowRedundancyReplicatedBlocks(),
getCorruptReplicatedBlocks(), getMissingReplicatedBlocks(),
getMissingReplicationOneBlocks(), getBytesInFutureReplicatedBlocks(),
getPendingDeletionReplicatedBlocks(),
return new ReplicatedBlockStats(getLowRedundancyReplicatedBlocks() +
getPendingReplicatedBlocks(), getCorruptReplicatedBlocks(),
getMissingReplicatedBlocks(), getMissingReplicationOneBlocks(),
getBytesInFutureReplicatedBlocks(), getPendingDeletionReplicatedBlocks(),
getHighestPriorityLowRedundancyReplicatedBlocks());
}

Expand All @@ -4772,7 +4773,7 @@ ReplicatedBlockStats getReplicatedBlockStats() {
* @see ClientProtocol#getECBlockGroupStats()
*/
ECBlockGroupStats getECBlockGroupStats() {
return new ECBlockGroupStats(getLowRedundancyECBlockGroups(),
return new ECBlockGroupStats(getLowRedundancyECBlockGroups() + getPendingEcBlocks(),
getCorruptECBlockGroups(), getMissingECBlockGroups(),
getBytesInFutureECBlockGroups(), getPendingDeletionECBlocks(),
getHighestPriorityLowRedundancyECBlocks());
Expand Down Expand Up @@ -5362,6 +5363,24 @@ public long getPendingReconstructionBlocks() {
return blockManager.getPendingReconstructionBlocksCount();
}

/**
* Get aggregated count of replicated blocks pending to be reconstructed.
*
* @return The number of pending replicated blocks.
*/
public long getPendingReplicatedBlocks() {
return blockManager.getNumReplicatedPendingBlocks();
}

/**
* Get aggregated count of EC blocks pending to be reconstructed.
*
* @return The number of pending EC blocks.
*/
public long getPendingEcBlocks() {
return blockManager.getNumEcPendingBlocks();
}

/**
* Get aggregated count of all blocks with low redundancy.
* @deprecated - Use {@link #getLowRedundancyBlocks()} instead.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public void testPendingReconstruction() {
}
assertEquals("Size of pendingReconstruction ",
10, pendingReconstructions.size());
assertEquals(10L, pendingReconstructions.getNumReplicatedPendingBlocks());


//
Expand All @@ -107,22 +108,26 @@ public void testPendingReconstruction() {
pendingReconstructions.decrement(blk, storages[7]); // removes one replica
assertEquals("pendingReconstructions.getNumReplicas ",
7, pendingReconstructions.getNumReplicas(blk));
assertEquals(10L, pendingReconstructions.getNumReplicatedPendingBlocks());

//
// insert the same item twice should be counted as once
//
pendingReconstructions.increment(blk, storages[0]);
assertEquals("pendingReconstructions.getNumReplicas ",
7, pendingReconstructions.getNumReplicas(blk));
assertEquals(10L, pendingReconstructions.getNumReplicatedPendingBlocks());

for (int i = 0; i < 7; i++) {
// removes all replicas
pendingReconstructions.decrement(blk, storages[i]);
}
assertTrue(pendingReconstructions.size() == 9);
assertEquals(9L, pendingReconstructions.getNumReplicatedPendingBlocks());
pendingReconstructions.increment(blk,
DFSTestUtil.createDatanodeStorageInfos(8));
assertTrue(pendingReconstructions.size() == 10);
assertEquals(10L, pendingReconstructions.getNumReplicatedPendingBlocks());

//
// verify that the number of replicas returned
Expand Down Expand Up @@ -155,6 +160,7 @@ public void testPendingReconstruction() {
}
assertEquals(15, pendingReconstructions.size());
assertEquals(0L, pendingReconstructions.getNumTimedOuts());
assertEquals(15L, pendingReconstructions.getNumReplicatedPendingBlocks());

//
// Wait for everything to timeout.
Expand All @@ -175,11 +181,13 @@ public void testPendingReconstruction() {
//
assertEquals("Size of pendingReconstructions ", 0, pendingReconstructions.size());
assertEquals(15L, pendingReconstructions.getNumTimedOuts());
assertEquals(15L, pendingReconstructions.getNumReplicatedPendingBlocks());
Block[] timedOut = pendingReconstructions.getTimedOutBlocks();
assertNotNull(timedOut);
assertEquals(15, timedOut.length);
// Verify the number is not reset
assertEquals(15L, pendingReconstructions.getNumTimedOuts());
assertEquals(0L, pendingReconstructions.getNumReplicatedPendingBlocks());
for (Block block : timedOut) {
assertTrue(block.getBlockId() < 15);
}
Expand Down