From fd45ac3ec90c653a5cb5c6c3d5ae66e8bdde6296 Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Tue, 3 Oct 2017 11:55:14 -0400 Subject: [PATCH 1/2] Use ReentrantLock for delayed operation lock and avoid blocking --- .../coordinator/group/DelayedHeartbeat.scala | 4 - .../kafka/coordinator/group/DelayedJoin.scala | 4 - .../group/GroupMetadataManager.scala | 3 +- .../transaction/DelayedTxnMarker.scala | 4 - .../transaction/TransactionStateManager.scala | 6 +- .../scala/kafka/server/DelayedOperation.scala | 31 +++++-- .../scala/kafka/server/DelayedProduce.scala | 11 +-- .../scala/kafka/server/ReplicaManager.scala | 3 +- .../group/GroupCoordinatorTest.scala | 3 - .../group/GroupMetadataManagerTest.scala | 2 - .../TransactionStateManagerTest.scala | 2 - .../kafka/server/DelayedOperationTest.scala | 84 ++++++++++++++++++- .../unit/kafka/server/KafkaApisTest.scala | 3 - 13 files changed, 112 insertions(+), 48 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala index 2cbdf306796ba..73d5d0f807823 100644 --- a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala +++ b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala @@ -30,10 +30,6 @@ private[group] class DelayedHeartbeat(coordinator: GroupCoordinator, sessionTimeout: Long) extends DelayedOperation(sessionTimeout) { - // overridden since tryComplete already synchronizes on the group. This makes it safe to - // call purgatory operations while holding the group lock. - override def safeTryComplete(): Boolean = tryComplete() - override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, forceComplete _) override def onExpiration() = coordinator.onExpireHeartbeat(group, member, heartbeatDeadline) override def onComplete() = coordinator.onCompleteHeartbeat() diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala b/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala index 6a81242da968c..5232287962ad6 100644 --- a/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala +++ b/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala @@ -35,10 +35,6 @@ private[group] class DelayedJoin(coordinator: GroupCoordinator, group: GroupMetadata, rebalanceTimeout: Long) extends DelayedOperation(rebalanceTimeout) { - // overridden since tryComplete already synchronizes on the group. This makes it safe to - // call purgatory operations while holding the group lock. - override def safeTryComplete(): Boolean = tryComplete() - override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete _) override def onExpiration() = coordinator.onExpireJoin() override def onComplete() = coordinator.onCompleteJoin(group) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index c818b57461188..7519dc4f6d44e 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -243,8 +243,7 @@ class GroupMetadataManager(brokerId: Int, internalTopicsAllowed = true, isFromClient = false, entriesPerPartition = records, - responseCallback = callback, - delayedProduceLock = Some(group)) + responseCallback = callback) } /** diff --git a/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala b/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala index 82c4a8c3a6381..bc0f1b71da4c2 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala @@ -28,10 +28,6 @@ private[transaction] class DelayedTxnMarker(txnMetadata: TransactionMetadata, completionCallback: Errors => Unit) extends DelayedOperation(TimeUnit.DAYS.toMillis(100 * 365)) { - // overridden since tryComplete already synchronizes on the existing txn metadata. This makes it safe to - // call purgatory operations while holding the group lock. - override def safeTryComplete(): Boolean = tryComplete() - override def tryComplete(): Boolean = { txnMetadata synchronized { if (txnMetadata.topicPartitions.isEmpty) diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala index 394817c6a1adf..ad5d33b066376 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala @@ -196,8 +196,7 @@ class TransactionStateManager(brokerId: Int, internalTopicsAllowed = true, isFromClient = false, recordsPerPartition, - removeFromCacheCallback, - None + removeFromCacheCallback ) } @@ -601,8 +600,7 @@ class TransactionStateManager(brokerId: Int, internalTopicsAllowed = true, isFromClient = false, recordsPerPartition, - updateCacheCallback, - delayedProduceLock = Some(newMetadata)) + updateCacheCallback) trace(s"Appending new metadata $newMetadata for transaction id $transactionalId with coordinator epoch $coordinatorEpoch to the local transaction log") } diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index 899739559aa89..f055eab880f47 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -19,11 +19,11 @@ package kafka.server import java.util.concurrent._ import java.util.concurrent.atomic._ -import java.util.concurrent.locks.ReentrantReadWriteLock +import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock} import com.yammer.metrics.core.Gauge import kafka.metrics.KafkaMetricsGroup -import kafka.utils.CoreUtils.{inReadLock, inWriteLock} +import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock} import kafka.utils._ import kafka.utils.timer._ @@ -46,6 +46,11 @@ import scala.collection.mutable.ListBuffer abstract class DelayedOperation(override val delayMs: Long) extends TimerTask with Logging { private val completed = new AtomicBoolean(false) + protected var lock: ReentrantLock = null + + def createLock(): Unit = { + lock = new ReentrantLock + } /* * Force completing the delayed operation, if not already completed. @@ -96,13 +101,18 @@ abstract class DelayedOperation(override val delayMs: Long) extends TimerTask wi def tryComplete(): Boolean /** - * Thread-safe variant of tryComplete(). This can be overridden if the operation provides its - * own synchronization. + * Thread-safe variant of tryComplete() that attempts completion only if the lock can be acquired + * without blocking. */ def safeTryComplete(): Boolean = { - synchronized { - tryComplete() - } + if (lock.tryLock()) { + try { + tryComplete() + } finally { + lock.unlock() + } + } else + false } /* @@ -196,10 +206,15 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri // operation is unnecessarily added for watch. However, this is a less severe issue since the // expire reaper will clean it up periodically. - var isCompletedByMe = operation.safeTryComplete() + // At this point the only thread that can attempt this operation is this current thread + // Hence it is safe to tryComplete() without a lock + var isCompletedByMe = operation.tryComplete() if (isCompletedByMe) return true + // Create a lock before adding to watch since multiple threads may attempt to complete + operation.createLock() + var watchCreated = false for(key <- watchKeys) { // If the operation is already completed, stop adding it to the rest of the watcher list. diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index 0d452ccd6cc3a..ebbd9ee3d9a16 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit import com.yammer.metrics.core.Meter import kafka.metrics.KafkaMetricsGroup import kafka.utils.Pool + import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse @@ -53,12 +54,9 @@ case class ProduceMetadata(produceRequiredAcks: Short, class DelayedProduce(delayMs: Long, produceMetadata: ProduceMetadata, replicaManager: ReplicaManager, - responseCallback: Map[TopicPartition, PartitionResponse] => Unit, - lockOpt: Option[Object] = None) + responseCallback: Map[TopicPartition, PartitionResponse] => Unit) extends DelayedOperation(delayMs) { - val lock = lockOpt.getOrElse(this) - // first update the acks pending variable according to the error code produceMetadata.produceStatus.foreach { case (topicPartition, status) => if (status.responseStatus.error == Errors.NONE) { @@ -72,11 +70,6 @@ class DelayedProduce(delayMs: Long, trace("Initial partition status for %s is %s".format(topicPartition, status)) } - override def safeTryComplete(): Boolean = lock synchronized { - tryComplete() - } - - /** * The delayed produce operation can be completed if every partition * it produces to is satisfied by one of the following: diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 3a4ecef79d8c5..98a4be1a8ec16 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -450,7 +450,6 @@ class ReplicaManager(val config: KafkaConfig, isFromClient: Boolean, entriesPerPartition: Map[TopicPartition, MemoryRecords], responseCallback: Map[TopicPartition, PartitionResponse] => Unit, - delayedProduceLock: Option[Object] = None, processingStatsCallback: Map[TopicPartition, RecordsProcessingStats] => Unit = _ => ()) { if (isValidRequiredAcks(requiredAcks)) { val sTime = time.milliseconds @@ -470,7 +469,7 @@ class ReplicaManager(val config: KafkaConfig, if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) { // create delayed produce operation val produceMetadata = ProduceMetadata(requiredAcks, produceStatus) - val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock) + val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback) // create a list of (topic, partition) pairs to use as keys for this delayed produce operation val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index 85d72c3f59815..3fed45dfb3002 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -1367,7 +1367,6 @@ class GroupCoordinatorTest extends JUnitSuite { isFromClient = EasyMock.eq(false), EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], EasyMock.capture(capturedArgument), - EasyMock.anyObject().asInstanceOf[Option[Object]], EasyMock.anyObject())).andAnswer(new IAnswer[Unit] { override def answer = capturedArgument.getValue.apply( Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) -> @@ -1451,7 +1450,6 @@ class GroupCoordinatorTest extends JUnitSuite { isFromClient = EasyMock.eq(false), EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], EasyMock.capture(capturedArgument), - EasyMock.anyObject().asInstanceOf[Option[Object]], EasyMock.anyObject()) ).andAnswer(new IAnswer[Unit] { override def answer = capturedArgument.getValue.apply( @@ -1481,7 +1479,6 @@ class GroupCoordinatorTest extends JUnitSuite { isFromClient = EasyMock.eq(false), EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], EasyMock.capture(capturedArgument), - EasyMock.anyObject().asInstanceOf[Option[Object]], EasyMock.anyObject()) ).andAnswer(new IAnswer[Unit] { override def answer = capturedArgument.getValue.apply( diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index 4a509edda04ed..46a18785b6af7 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -1306,7 +1306,6 @@ class GroupMetadataManagerTest { isFromClient = EasyMock.eq(false), EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], EasyMock.capture(capturedArgument), - EasyMock.anyObject().asInstanceOf[Option[Object]], EasyMock.anyObject()) ) EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) @@ -1321,7 +1320,6 @@ class GroupMetadataManagerTest { isFromClient = EasyMock.eq(false), EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], EasyMock.capture(capturedArgument), - EasyMock.anyObject().asInstanceOf[Option[Object]], EasyMock.anyObject()) ).andAnswer(new IAnswer[Unit] { override def answer = capturedArgument.getValue.apply( diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index ed1636c04b6b8..0a2b6418000a4 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -498,7 +498,6 @@ class TransactionStateManagerTest { EasyMock.eq(false), EasyMock.eq(recordsByPartition), EasyMock.capture(capturedArgument), - EasyMock.eq(None), EasyMock.anyObject() )).andAnswer(new IAnswer[Unit] { override def answer(): Unit = { @@ -599,7 +598,6 @@ class TransactionStateManagerTest { isFromClient = EasyMock.eq(false), EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], EasyMock.capture(capturedArgument), - EasyMock.anyObject(), EasyMock.anyObject()) ).andAnswer(new IAnswer[Unit] { override def answer(): Unit = capturedArgument.getValue.apply( diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala index 82cf64239e56c..3e44f7cfe1fb6 100644 --- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala @@ -17,6 +17,11 @@ package kafka.server +import java.util.concurrent.{Executors, Future} +import java.util.concurrent.locks.ReentrantLock + +import kafka.utils.CoreUtils.inLock + import org.apache.kafka.common.utils.Time import org.junit.{After, Before, Test} import org.junit.Assert._ @@ -117,11 +122,88 @@ class DelayedOperationTest { assertEquals(Nil, cancelledOperations) } + @Test + def testDelayedOperationLock() { + val key = "key" + val executorService = Executors.newSingleThreadExecutor + try { + def createDelayedOperations(count: Int): Seq[MockDelayedOperation] = { + (1 to count).map { _ => + val op = new MockDelayedOperation(100000L) + purgatory.tryCompleteElseWatch(op, Seq(key)) + assertFalse("Not completable", op.isCompleted) + op + } + } + def createCompletableOperations(count: Int): Seq[MockDelayedOperation] = { + (1 to count).map { _ => + val op = new MockDelayedOperation(100000L) + op.completable = true + op + } + } - class MockDelayedOperation(delayMs: Long) extends DelayedOperation(delayMs) { + def runOnAnotherThread(fun: => Unit, shouldComplete: Boolean): Future[_] = { + val future = executorService.submit(new Runnable { + def run() = fun + }) + if (shouldComplete) + future.get() + else + assertFalse("Should not have completed", future.isDone) + future + } + + def checkAndComplete(completableOps: Seq[MockDelayedOperation], expectedComplete: Seq[MockDelayedOperation]): Unit = { + completableOps.foreach(op => op.completable = true) + val completed = purgatory.checkAndComplete(key) + assertEquals(expectedComplete.size, completed) + expectedComplete.foreach(op => assertTrue("Should have completed", op.isCompleted)) + val expectedNotComplete = completableOps.toSet -- expectedComplete + expectedNotComplete.foreach(op => assertFalse("Should not have completed", op.isCompleted)) + } + + // If locks are free all completable operations should complete + var ops = createDelayedOperations(2) + checkAndComplete(ops, ops) + + // Lock held by current thread, completable operations should complete + ops = createDelayedOperations(2) + inLock(ops(1).delayedOperationLock) { + checkAndComplete(ops, ops) + } + + // Lock held by another thread, should not block, only operations that can be + // locked without blocking on the current thread should complete + ops = createDelayedOperations(2) + runOnAnotherThread(ops(0).delayedOperationLock.lock(), true) + try { + checkAndComplete(ops, Seq(ops(1))) + } finally { + runOnAnotherThread(ops(0).delayedOperationLock.unlock(), true) + } + + // Immediately completable operations should complete without locking + ops = createCompletableOperations(2) + ops.foreach { op => + assertTrue("Should have completed", purgatory.tryCompleteElseWatch(op, Seq(key))) + assertNull("Lock created unnecessarily", op.delayedOperationLock) + assertTrue("Should have completed", op.isCompleted) + } + + } finally { + executorService.shutdown() + } + } + + + class MockDelayedOperation(delayMs: Long) + extends DelayedOperation(delayMs) { var completable = false + def delayedOperationLock = lock + def awaitExpiration() { synchronized { wait() diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 508bc35721036..76ae35bc56dd7 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -180,7 +180,6 @@ class KafkaApisTest { EasyMock.eq(false), EasyMock.anyObject(), EasyMock.capture(responseCallback), - EasyMock.anyObject(), EasyMock.anyObject())).andAnswer(new IAnswer[Unit] { override def answer(): Unit = { responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE))) @@ -219,7 +218,6 @@ class KafkaApisTest { EasyMock.eq(false), EasyMock.anyObject(), EasyMock.capture(responseCallback), - EasyMock.anyObject(), EasyMock.anyObject())).andAnswer(new IAnswer[Unit] { override def answer(): Unit = { responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE))) @@ -250,7 +248,6 @@ class KafkaApisTest { EasyMock.eq(false), EasyMock.anyObject(), EasyMock.anyObject(), - EasyMock.anyObject(), EasyMock.anyObject())) EasyMock.replay(replicaManager) From 4bf65f3da2e032a5a134b90974e8c382b94268dd Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Wed, 4 Oct 2017 13:13:14 -0400 Subject: [PATCH 2/2] Address review comments --- .../scala/kafka/server/DelayedOperation.scala | 18 ++++++------------ .../kafka/server/DelayedOperationTest.scala | 9 +++------ 2 files changed, 9 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index f055eab880f47..86bf1ff71a704 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -23,7 +23,7 @@ import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock} import com.yammer.metrics.core.Gauge import kafka.metrics.KafkaMetricsGroup -import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock} +import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils._ import kafka.utils.timer._ @@ -46,11 +46,8 @@ import scala.collection.mutable.ListBuffer abstract class DelayedOperation(override val delayMs: Long) extends TimerTask with Logging { private val completed = new AtomicBoolean(false) - protected var lock: ReentrantLock = null - - def createLock(): Unit = { - lock = new ReentrantLock - } + // Visible for testing + private[server] val lock: ReentrantLock = new ReentrantLock /* * Force completing the delayed operation, if not already completed. @@ -104,7 +101,7 @@ abstract class DelayedOperation(override val delayMs: Long) extends TimerTask wi * Thread-safe variant of tryComplete() that attempts completion only if the lock can be acquired * without blocking. */ - def safeTryComplete(): Boolean = { + private[server] def maybeTryComplete(): Boolean = { if (lock.tryLock()) { try { tryComplete() @@ -212,9 +209,6 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri if (isCompletedByMe) return true - // Create a lock before adding to watch since multiple threads may attempt to complete - operation.createLock() - var watchCreated = false for(key <- watchKeys) { // If the operation is already completed, stop adding it to the rest of the watcher list. @@ -228,7 +222,7 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri } } - isCompletedByMe = operation.safeTryComplete() + isCompletedByMe = operation.maybeTryComplete() if (isCompletedByMe) return true @@ -350,7 +344,7 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri if (curr.isCompleted) { // another thread has completed this operation, just remove it iter.remove() - } else if (curr.safeTryComplete()) { + } else if (curr.maybeTryComplete()) { iter.remove() completed += 1 } diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala index 3e44f7cfe1fb6..fdfb582c1017c 100644 --- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala @@ -170,25 +170,24 @@ class DelayedOperationTest { // Lock held by current thread, completable operations should complete ops = createDelayedOperations(2) - inLock(ops(1).delayedOperationLock) { + inLock(ops(1).lock) { checkAndComplete(ops, ops) } // Lock held by another thread, should not block, only operations that can be // locked without blocking on the current thread should complete ops = createDelayedOperations(2) - runOnAnotherThread(ops(0).delayedOperationLock.lock(), true) + runOnAnotherThread(ops(0).lock.lock(), true) try { checkAndComplete(ops, Seq(ops(1))) } finally { - runOnAnotherThread(ops(0).delayedOperationLock.unlock(), true) + runOnAnotherThread(ops(0).lock.unlock(), true) } // Immediately completable operations should complete without locking ops = createCompletableOperations(2) ops.foreach { op => assertTrue("Should have completed", purgatory.tryCompleteElseWatch(op, Seq(key))) - assertNull("Lock created unnecessarily", op.delayedOperationLock) assertTrue("Should have completed", op.isCompleted) } @@ -202,8 +201,6 @@ class DelayedOperationTest { extends DelayedOperation(delayMs) { var completable = false - def delayedOperationLock = lock - def awaitExpiration() { synchronized { wait()