Skip to content

Commit

Permalink
HDFS-8391. NN should consider current EC tasks handling count from DN…
Browse files Browse the repository at this point in the history
… while assigning new tasks. Contributed by Uma Maheswara Rao G.
  • Loading branch information
umamaheswararao authored and Zhe Zhang committed May 26, 2015
1 parent bba15e0 commit c99c337
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 3 deletions.
3 changes: 3 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
Expand Up @@ -212,3 +212,6 @@

HDFS-8364. Erasure coding: fix some minor bugs in EC CLI
(Walter Su via vinayakumarb)

HDFS-8391. NN should consider current EC tasks handling count from DN while
assigning new tasks. (umamahesh)
Expand Up @@ -1909,6 +1909,21 @@ void incrDatanodeNetworkErrors(String host) {
int getXmitsInProgress() {
return xmitsInProgress.get();
}

/**
* Increments the xmitsInProgress count. xmitsInProgress count represents the
* number of data replication/reconstruction tasks running currently.
*/
public void incrementXmitsInProgress() {
xmitsInProgress.getAndIncrement();
}

/**
* Decrements the xmitsInProgress count
*/
public void decrementXmitsInProgress() {
xmitsInProgress.getAndDecrement();
}

private void reportBadBlock(final BPOfferService bpos,
final ExtendedBlock block, final String msg) {
Expand Down Expand Up @@ -2128,7 +2143,7 @@ private class DataTransfer implements Runnable {
*/
@Override
public void run() {
xmitsInProgress.getAndIncrement();
incrementXmitsInProgress();
Socket sock = null;
DataOutputStream out = null;
DataInputStream in = null;
Expand Down Expand Up @@ -2207,7 +2222,7 @@ public void run() {
// check if there are any disk problem
checkDiskErrorAsync();
} finally {
xmitsInProgress.getAndDecrement();
decrementXmitsInProgress();
IOUtils.closeStream(blockSender);
IOUtils.closeStream(out);
IOUtils.closeStream(in);
Expand Down
Expand Up @@ -312,6 +312,7 @@ private long getBlockLen(ExtendedBlock blockGroup, int i) {

@Override
public void run() {
datanode.incrementXmitsInProgress();
try {
// Store the indices of successfully read source
// This will be updated after doing real read.
Expand Down Expand Up @@ -397,8 +398,9 @@ public void run() {
// Currently we don't check the acks for packets, this is similar as
// block replication.
} catch (Throwable e) {
LOG.warn("Failed to recover striped block: " + blockGroup);
LOG.warn("Failed to recover striped block: " + blockGroup, e);
} finally {
datanode.decrementXmitsInProgress();
// close block readers
for (StripedReader stripedReader : stripedReaders) {
closeBlockReader(stripedReader.blockReader);
Expand Down

0 comments on commit c99c337

Please sign in to comment.