Skip to content

Commit

Permalink
Revert "HDFS-10923. Make InstrumentedLock require ReentrantLock."
Browse files Browse the repository at this point in the history
This reverts commit c7ce6fd.
  • Loading branch information
arp7 committed Oct 1, 2016
1 parent c7ce6fd commit fe9ebe2
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 65 deletions.
Expand Up @@ -33,7 +33,7 @@

/**
* This is a debugging class that can be used by callers to track
* whether a specific lock is being held for too long and periodically
* whether a specifc lock is being held for too long and periodically
* log a warning and stack trace, if so.
*
* The logged warnings are throttled so that logs are not spammed.
Expand All @@ -43,10 +43,9 @@
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class InstrumentedReentrantLock implements Lock {
public class InstrumentedLock implements Lock {

@VisibleForTesting
final ReentrantLock lock;
private final Lock lock;
private final Log logger;
private final String name;
private final Timer clock;
Expand All @@ -73,23 +72,20 @@ public class InstrumentedReentrantLock implements Lock {
* @param lockWarningThresholdMs the time threshold to view lock held
* time as being "too long"
*/
public InstrumentedReentrantLock(
String name, Log logger, long minLoggingGapMs,
public InstrumentedLock(String name, Log logger, long minLoggingGapMs,
long lockWarningThresholdMs) {
this(name, logger, new ReentrantLock(),
minLoggingGapMs, lockWarningThresholdMs);
}

public InstrumentedReentrantLock(
String name, Log logger, ReentrantLock lock,
public InstrumentedLock(String name, Log logger, Lock lock,
long minLoggingGapMs, long lockWarningThresholdMs) {
this(name, logger, lock,
minLoggingGapMs, lockWarningThresholdMs, new Timer());
}

@VisibleForTesting
InstrumentedReentrantLock(
String name, Log logger, ReentrantLock lock,
InstrumentedLock(String name, Log logger, Lock lock,
long minLoggingGapMs, long lockWarningThresholdMs, Timer clock) {
this.name = name;
this.lock = lock;
Expand All @@ -104,22 +100,18 @@ public InstrumentedReentrantLock(
@Override
public void lock() {
lock.lock();
if (lock.getHoldCount() == 1) {
lockAcquireTimestamp = clock.monotonicNow();
}
lockAcquireTimestamp = clock.monotonicNow();
}

@Override
public void lockInterruptibly() throws InterruptedException {
lock.lockInterruptibly();
if (lock.getHoldCount() == 1) {
lockAcquireTimestamp = clock.monotonicNow();
}
lockAcquireTimestamp = clock.monotonicNow();
}

@Override
public boolean tryLock() {
if (lock.tryLock() && lock.getHoldCount() == 1) {
if (lock.tryLock()) {
lockAcquireTimestamp = clock.monotonicNow();
return true;
}
Expand All @@ -128,7 +120,7 @@ public boolean tryLock() {

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
if (lock.tryLock(time, unit) && lock.getHoldCount() == 1) {
if (lock.tryLock(time, unit)) {
lockAcquireTimestamp = clock.monotonicNow();
return true;
}
Expand All @@ -137,13 +129,10 @@ public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {

@Override
public void unlock() {
final boolean needReport = (lock.getHoldCount() == 1);
long localLockReleaseTime = clock.monotonicNow();
long localLockAcquireTime = lockAcquireTimestamp;
lock.unlock();
if (needReport) {
check(localLockAcquireTime, localLockReleaseTime);
}
check(localLockAcquireTime, localLockReleaseTime);
}

@Override
Expand Down Expand Up @@ -192,4 +181,5 @@ private void check(long acquireTime, long releaseTime) {
logWarning(lockHeldTime, suppressed);
}
}

}
Expand Up @@ -58,7 +58,7 @@
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.InstrumentedReentrantLock;
import org.apache.hadoop.hdfs.InstrumentedLock;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
Expand Down Expand Up @@ -266,7 +266,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
this.conf = conf;
this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
this.datasetLock = new AutoCloseableLock(
new InstrumentedReentrantLock(getClass().getName(), LOG,
new InstrumentedLock(getClass().getName(), LOG,
conf.getTimeDuration(
DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
Expand Down
Expand Up @@ -20,10 +20,9 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.FakeTimer;
import org.apache.hadoop.util.Timer;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand All @@ -35,11 +34,11 @@
import static org.junit.Assert.*;

/**
* A test class for {@link InstrumentedReentrantLock}.
* A test class for InstrumentedLock.
*/
public class TestInstrumentedReentrantLock {
public class TestInstrumentedLock {

static final Log LOG = LogFactory.getLog(TestInstrumentedReentrantLock.class);
static final Log LOG = LogFactory.getLog(TestInstrumentedLock.class);

@Rule public TestName name = new TestName();

Expand All @@ -50,8 +49,7 @@ public class TestInstrumentedReentrantLock {
@Test(timeout=10000)
public void testMultipleThread() throws Exception {
String testname = name.getMethodName();
InstrumentedReentrantLock lock =
new InstrumentedReentrantLock(testname, LOG, 0, 300);
InstrumentedLock lock = new InstrumentedLock(testname, LOG, 0, 300);
lock.lock();
try {
Thread competingThread = new Thread() {
Expand All @@ -75,7 +73,7 @@ public void run() {
public void testTryWithResourceSyntax() throws Exception {
String testname = name.getMethodName();
final AtomicReference<Thread> lockThread = new AtomicReference<>(null);
Lock lock = new InstrumentedReentrantLock(testname, LOG, 0, 300) {
Lock lock = new InstrumentedLock(testname, LOG, 0, 300) {
@Override
public void lock() {
super.lock();
Expand Down Expand Up @@ -112,66 +110,57 @@ public void run() {
@Test(timeout=10000)
public void testLockLongHoldingReport() throws Exception {
String testname = name.getMethodName();
FakeTimer mclock = new FakeTimer();
final int warningThreshold = 500;
final int minLoggingGap = warningThreshold * 10;
final AtomicLong time = new AtomicLong(0);
Timer mclock = new Timer() {
@Override
public long monotonicNow() {
return time.get();
}
};
Lock mlock = mock(Lock.class);

final AtomicLong wlogged = new AtomicLong(0);
final AtomicLong wsuppresed = new AtomicLong(0);
InstrumentedReentrantLock lock = new InstrumentedReentrantLock(
testname, LOG, new ReentrantLock(), minLoggingGap,
warningThreshold, mclock) {
InstrumentedLock lock = new InstrumentedLock(
testname, LOG, mlock, 2000, 300, mclock) {
@Override
void logWarning(long lockHeldTime, long suppressed) {
wlogged.incrementAndGet();
wsuppresed.set(suppressed);
}
};

// do not log warning when the lock held time is <= warningThreshold.
lock.lock();
mclock.advance(warningThreshold);
lock.unlock();
// do not log warning when the lock held time is short
lock.lock(); // t = 0
time.set(200);
lock.unlock(); // t = 200
assertEquals(0, wlogged.get());
assertEquals(0, wsuppresed.get());

// log a warning when the lock held time exceeds the threshold.
lock.lock();
mclock.advance(warningThreshold + 1);
assertEquals(1, lock.lock.getHoldCount());
lock.unlock();
lock.lock(); // t = 200
time.set(700);
lock.unlock(); // t = 700
assertEquals(1, wlogged.get());
assertEquals(0, wsuppresed.get());

// despite the lock held time is greater than threshold
// suppress the log warning due to the logging gap
// (not recorded in wsuppressed until next log message)
lock.lock();
mclock.advance(warningThreshold + 1);
lock.unlock();
lock.lock(); // t = 700
time.set(1100);
lock.unlock(); // t = 1100
assertEquals(1, wlogged.get());
assertEquals(0, wsuppresed.get());

// log a warning message when the lock held time is greater the threshold
// and the logging time gap is satisfied. Also should display suppressed
// previous warnings.
lock.lock();
mclock.advance(minLoggingGap + 1);
time.set(2400);
lock.lock(); // t = 2400
time.set(2800);
lock.unlock(); // t = 2800
assertEquals(2, wlogged.get());
assertEquals(1, wsuppresed.get());

// Ensure that nested acquisitions do not log.
wlogged.set(0);
wsuppresed.set(0);
lock.lock();
lock.lock();
mclock.advance(minLoggingGap + 1);
lock.unlock();
assertEquals(0, wlogged.get()); // No warnings on nested release.
assertEquals(0, wsuppresed.get());
lock.unlock();
assertEquals(1, wlogged.get()); // Last release immediately logs.
assertEquals(0, wsuppresed.get());
}

}

0 comments on commit fe9ebe2

Please sign in to comment.