Skip to content

Commit

Permalink
HDFS-14260. Replace synchronized method in BlockReceiver with atomic …
Browse files Browse the repository at this point in the history
…value. Contributed by BELUGA BEHR.
  • Loading branch information
Inigo Goiri committed Feb 11, 2019
1 parent 73b67b2 commit 0ceb1b7
Showing 1 changed file with 23 additions and 17 deletions.
Expand Up @@ -31,6 +31,7 @@
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.Checksum;

import org.apache.commons.logging.Log;
Expand Down Expand Up @@ -142,7 +143,7 @@ class BlockReceiver implements Closeable {
private long maxWriteToDiskMs = 0;

private boolean pinning;
private long lastSentTime;
private final AtomicLong lastSentTime = new AtomicLong(0L);
private long maxSendIdleTime;

BlockReceiver(final ExtendedBlock block, final StorageType storageType,
Expand Down Expand Up @@ -182,7 +183,7 @@ class BlockReceiver implements Closeable {
|| stage == BlockConstructionStage.TRANSFER_FINALIZED;

this.pinning = pinning;
this.lastSentTime = Time.monotonicNow();
this.lastSentTime.set(Time.monotonicNow());
// Downstream will timeout in readTimeout on receiving the next packet.
// If there is no data traffic, a heartbeat packet is sent at
// the interval of 0.5*readTimeout. Here, we set 0.9*readTimeout to be
Expand Down Expand Up @@ -379,23 +380,28 @@ public void close() throws IOException {
}
}

synchronized void setLastSentTime(long sentTime) {
lastSentTime = sentTime;
}

/**
* It can return false if
* - upstream did not send packet for a long time
* - a packet was received but got stuck in local disk I/O.
* - a packet was received but got stuck on send to mirror.
* Check if a packet was sent within an acceptable period of time.
*
* Some example of when this method may return false:
* <ul>
* <li>Upstream did not send packet for a long time</li>
* <li>Packet was received but got stuck in local disk I/O</li>
* <li>Packet was received but got stuck on send to mirror</li>
* </ul>
*
* @return true if packet was sent within an acceptable period of time;
* otherwise false.
*/
synchronized boolean packetSentInTime() {
long diff = Time.monotonicNow() - lastSentTime;
if (diff > maxSendIdleTime) {
LOG.info("A packet was last sent " + diff + " milliseconds ago.");
return false;
boolean packetSentInTime() {
final long diff = Time.monotonicNow() - this.lastSentTime.get();
final boolean allowedIdleTime = (diff <= this.maxSendIdleTime);
LOG.debug("A packet was last sent {}ms ago.", diff);
if (!allowedIdleTime) {
LOG.warn("A packet was last sent {}ms ago. Maximum idle time: {}ms.",
diff, this.maxSendIdleTime);
}
return true;
return allowedIdleTime;
}

/**
Expand Down Expand Up @@ -589,7 +595,7 @@ private int receivePacket() throws IOException {
packetReceiver.mirrorPacketTo(mirrorOut);
mirrorOut.flush();
long now = Time.monotonicNow();
setLastSentTime(now);
this.lastSentTime.set(now);
long duration = now - begin;
DataNodeFaultInjector.get().logDelaySendingPacketDownstream(
mirrorAddr,
Expand Down

0 comments on commit 0ceb1b7

Please sign in to comment.