From 96af68bd058aae80c180c81f4f53bc00b59d833b Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Thu, 1 Jun 2017 15:35:52 +0100 Subject: [PATCH 1/6] use loop rather than recursion --- .../kafka/common/InterBrokerSendThread.scala | 1 - .../transaction/DelayedTxnMarker.scala | 7 +-- .../TransactionMarkerChannelManager.scala | 56 ++++++++++++------- .../TransactionMarkerChannelManagerTest.scala | 23 +++++--- 4 files changed, 53 insertions(+), 34 deletions(-) diff --git a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala index ac142431af2bf..e8ac235d9227c 100644 --- a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala +++ b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala @@ -45,7 +45,6 @@ class InterBrokerSendThread(name: String, for (request: RequestAndCompletionHandler <- requestsToSend) { val destination = Integer.toString(request.destination.id()) val completionHandler = request.handler - // TODO: Need to check inter broker protocol and error if new request is not supported val clientRequest = networkClient.newClientRequest(destination, request.request, now, diff --git a/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala b/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala index 82c4a8c3a6381..74d5cd2df26ed 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala @@ -28,16 +28,11 @@ private[transaction] class DelayedTxnMarker(txnMetadata: TransactionMetadata, completionCallback: Errors => Unit) extends DelayedOperation(TimeUnit.DAYS.toMillis(100 * 365)) { - // overridden since tryComplete already synchronizes on the existing txn metadata. This makes it safe to - // call purgatory operations while holding the group lock. - override def safeTryComplete(): Boolean = tryComplete() - override def tryComplete(): Boolean = { - txnMetadata synchronized { if (txnMetadata.topicPartitions.isEmpty) forceComplete() else false - } + } override def onExpiration(): Unit = { diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala index 344863fe5243b..301bb765f74c4 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala @@ -18,7 +18,8 @@ package kafka.coordinator.transaction import java.util -import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue} +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.{BlockingQueue, CountDownLatch, LinkedBlockingQueue} import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler} import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache} @@ -226,27 +227,42 @@ class TransactionMarkerChannelManager(config: KafkaConfig, case Right(Some(epochAndMetadata)) => if (epochAndMetadata.coordinatorEpoch == coordinatorEpoch) { debug(s"Updating $transactionalId's transaction state to $txnMetadata with coordinator epoch $coordinatorEpoch for $transactionalId succeeded") - - // try to append to the transaction log - def retryAppendCallback(error: Errors): Unit = - error match { - case Errors.NONE => - trace(s"Completed transaction for $transactionalId with coordinator epoch $coordinatorEpoch, final state: state after commit: ${txnMetadata.state}") - - case Errors.NOT_COORDINATOR => - info(s"No longer the coordinator for transactionalId: $transactionalId while trying to append to transaction log, skip writing to transaction log") - - case Errors.COORDINATOR_NOT_AVAILABLE => - warn(s"Failed updating transaction state for $transactionalId when appending to transaction log due to ${error.exceptionName}. retrying") - - // retry appending - txnStateManager.appendTransactionToLog(transactionalId, coordinatorEpoch, newMetadata, retryAppendCallback) - - case errors: Errors => - throw new IllegalStateException(s"Unexpected error ${errors.exceptionName} while appending to transaction log for $transactionalId") + var retryBackoffMs = 100L // TODO: how much do we back-off? + val needsRetry = new AtomicBoolean(true) + @volatile + var appendLatch:CountDownLatch = null + + def retryAppendCallback(error: Errors): Unit = { + try { + error match { + case Errors.NONE => + trace(s"Completed transaction for $transactionalId with coordinator epoch $coordinatorEpoch, final state: state after commit: ${txnMetadata.state}") + needsRetry.set(false) + + case Errors.NOT_COORDINATOR => + info(s"No longer the coordinator for transactionalId: $transactionalId while trying to append to transaction log, skip writing to transaction log") + needsRetry.set(false) + + case Errors.COORDINATOR_NOT_AVAILABLE => + warn(s"Failed updating transaction state for $transactionalId when appending to transaction log due to ${error.exceptionName}. retrying") + + case errors: Errors => + needsRetry.set(false) + throw new IllegalStateException(s"Unexpected error ${errors.exceptionName} while appending to transaction log for $transactionalId") + } + } finally { + appendLatch.countDown() } + } - txnStateManager.appendTransactionToLog(transactionalId, coordinatorEpoch, newMetadata, retryAppendCallback) + while(needsRetry.get()) { + appendLatch = new CountDownLatch(1) + txnStateManager.appendTransactionToLog(transactionalId, coordinatorEpoch, newMetadata, retryAppendCallback) + appendLatch.await() + if (needsRetry.get()) { + time.sleep(retryBackoffMs) + } + } } else { info(s"Updating $transactionalId's transaction state to $txnMetadata with coordinator epoch $coordinatorEpoch for $transactionalId failed after the transaction markers " + s"has been sent to brokers. The cached metadata have been changed to $epochAndMetadata since preparing to send markers") diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala index 4015a4fe1b47d..128c17f033b89 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala @@ -20,10 +20,11 @@ import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache} import kafka.utils.timer.MockTimer import kafka.utils.TestUtils import org.apache.kafka.clients.NetworkClient +import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{TransactionResult, WriteTxnMarkersRequest} import org.apache.kafka.common.utils.{MockTime, Utils} import org.apache.kafka.common.{Node, TopicPartition} -import org.easymock.EasyMock +import org.easymock.{Capture, EasyMock, IAnswer} import org.junit.Assert._ import org.junit.Test @@ -82,8 +83,6 @@ class TransactionMarkerChannelManagerTest { EasyMock.expect(txnStateManager.getAndMaybeAddTransactionState(EasyMock.eq(transactionalId2), EasyMock.anyObject[Option[TransactionMetadata]]())) .andReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata2)))) .anyTimes() - - EasyMock.replay(txnStateManager) } @Test @@ -106,7 +105,7 @@ class TransactionMarkerChannelManagerTest { EasyMock.anyObject()) ).andReturn(Some(broker2)).anyTimes() - EasyMock.replay(metadataCache) + EasyMock.replay(metadataCache, txnStateManager) channelManager.addTxnMarkersToSend(transactionalId1, coordinatorEpoch, txnResult, txnMetadata1, txnMetadata1.prepareComplete(time.milliseconds())) channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, txnResult, txnMetadata2, txnMetadata2.prepareComplete(time.milliseconds())) @@ -148,7 +147,17 @@ class TransactionMarkerChannelManagerTest { EasyMock.anyObject()) ).andReturn(Some(broker2)).anyTimes() - EasyMock.replay(metadataCache) + val capturedCallback: Capture[Errors => Unit] = EasyMock.newCapture() + EasyMock.expect(txnStateManager.appendTransactionToLog( + EasyMock.eq(transactionalId2), + EasyMock.anyObject(), + EasyMock.anyObject(), + EasyMock.capture(capturedCallback) + )).andAnswer(new IAnswer[Unit] { + override def answer(): Unit = capturedCallback.getValue.apply(Errors.NONE) + }) + + EasyMock.replay(metadataCache, txnStateManager) channelManager.addTxnMarkersToSend(transactionalId1, coordinatorEpoch, txnResult, txnMetadata1, txnMetadata1.prepareComplete(time.milliseconds())) channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, txnResult, txnMetadata2, txnMetadata2.prepareComplete(time.milliseconds())) @@ -180,7 +189,7 @@ class TransactionMarkerChannelManagerTest { EasyMock.anyObject()) ).andReturn(Some(broker2)).anyTimes() - EasyMock.replay(metadataCache) + EasyMock.replay(metadataCache, txnStateManager) channelManager.addTxnMarkersToSend(transactionalId1, coordinatorEpoch, txnResult, txnMetadata1, txnMetadata1.prepareComplete(time.milliseconds())) channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, txnResult, txnMetadata2, txnMetadata2.prepareComplete(time.milliseconds())) @@ -228,7 +237,7 @@ class TransactionMarkerChannelManagerTest { EasyMock.anyObject()) ).andReturn(Some(broker2)).anyTimes() - EasyMock.replay(metadataCache) + EasyMock.replay(metadataCache, txnStateManager) channelManager.addTxnMarkersToSend(transactionalId1, coordinatorEpoch, txnResult, txnMetadata1, txnMetadata1.prepareComplete(time.milliseconds())) channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, txnResult, txnMetadata2, txnMetadata2.prepareComplete(time.milliseconds())) From 1a3d1693271d8c42433e73c849bf28af309d8695 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Fri, 2 Jun 2017 12:06:21 +0100 Subject: [PATCH 2/6] add retry queue for txn log appends --- .../transaction/DelayedTxnMarker.scala | 7 +- .../TransactionMarkerChannelManager.scala | 77 ++++++++++--------- .../TransactionMarkerChannelManagerTest.scala | 23 ++---- 3 files changed, 52 insertions(+), 55 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala b/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala index 74d5cd2df26ed..82c4a8c3a6381 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala @@ -28,11 +28,16 @@ private[transaction] class DelayedTxnMarker(txnMetadata: TransactionMetadata, completionCallback: Errors => Unit) extends DelayedOperation(TimeUnit.DAYS.toMillis(100 * 365)) { + // overridden since tryComplete already synchronizes on the existing txn metadata. This makes it safe to + // call purgatory operations while holding the group lock. + override def safeTryComplete(): Boolean = tryComplete() + override def tryComplete(): Boolean = { + txnMetadata synchronized { if (txnMetadata.topicPartitions.isEmpty) forceComplete() else false - + } } override def onExpiration(): Unit = { diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala index 301bb765f74c4..b39ced99f8b8d 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala @@ -18,8 +18,7 @@ package kafka.coordinator.transaction import java.util -import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.{BlockingQueue, CountDownLatch, LinkedBlockingQueue} +import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue} import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler} import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache} @@ -83,7 +82,8 @@ object TransactionMarkerChannelManager { networkClient, txnStateManager, txnMarkerPurgatory, - time) + time + ) } } @@ -137,6 +137,8 @@ class TransactionMarkerChannelManager(config: KafkaConfig, new InterBrokerSendThread("TxnMarkerSenderThread-" + config.brokerId, networkClient, drainQueuedTransactionMarkers, time) } + private val txnLogAppendRetryQueue = new LinkedBlockingQueue[TxnLogAppend]() + def start(): Unit = { txnMarkerSendThread.start() } @@ -173,7 +175,16 @@ class TransactionMarkerChannelManager(config: KafkaConfig, trace(s"Added marker ${txnIdAndMarker.txnMarkerEntry} for transactional id ${txnIdAndMarker.txnId} to destination broker $brokerId") } + def retryLogAppends(): Unit = { + val txnLogAppendRetries: java.util.List[TxnLogAppend] = new util.ArrayList[TxnLogAppend]() + txnLogAppendRetryQueue.drainTo(txnLogAppendRetries) + debug(s"retrying: ${txnLogAppendRetries.size} transaction log appends") + txnLogAppendRetries.asScala.foreach { txnLogAppend => tryAppendToLog(txnLogAppend) } + } + + private[transaction] def drainQueuedTransactionMarkers(): Iterable[RequestAndCompletionHandler] = { + retryLogAppends() val txnIdAndMarkerEntries: java.util.List[TxnIdAndMarkerEntry] = new util.ArrayList[TxnIdAndMarkerEntry]() markersQueueForUnknownBroker.forEachTxnTopicPartition { case (_, queue) => queue.drainTo(txnIdAndMarkerEntries) @@ -227,42 +238,8 @@ class TransactionMarkerChannelManager(config: KafkaConfig, case Right(Some(epochAndMetadata)) => if (epochAndMetadata.coordinatorEpoch == coordinatorEpoch) { debug(s"Updating $transactionalId's transaction state to $txnMetadata with coordinator epoch $coordinatorEpoch for $transactionalId succeeded") - var retryBackoffMs = 100L // TODO: how much do we back-off? - val needsRetry = new AtomicBoolean(true) - @volatile - var appendLatch:CountDownLatch = null - - def retryAppendCallback(error: Errors): Unit = { - try { - error match { - case Errors.NONE => - trace(s"Completed transaction for $transactionalId with coordinator epoch $coordinatorEpoch, final state: state after commit: ${txnMetadata.state}") - needsRetry.set(false) - - case Errors.NOT_COORDINATOR => - info(s"No longer the coordinator for transactionalId: $transactionalId while trying to append to transaction log, skip writing to transaction log") - needsRetry.set(false) - - case Errors.COORDINATOR_NOT_AVAILABLE => - warn(s"Failed updating transaction state for $transactionalId when appending to transaction log due to ${error.exceptionName}. retrying") - - case errors: Errors => - needsRetry.set(false) - throw new IllegalStateException(s"Unexpected error ${errors.exceptionName} while appending to transaction log for $transactionalId") - } - } finally { - appendLatch.countDown() - } - } - while(needsRetry.get()) { - appendLatch = new CountDownLatch(1) - txnStateManager.appendTransactionToLog(transactionalId, coordinatorEpoch, newMetadata, retryAppendCallback) - appendLatch.await() - if (needsRetry.get()) { - time.sleep(retryBackoffMs) - } - } + tryAppendToLog(TxnLogAppend(transactionalId, coordinatorEpoch, txnMetadata, newMetadata)) } else { info(s"Updating $transactionalId's transaction state to $txnMetadata with coordinator epoch $coordinatorEpoch for $transactionalId failed after the transaction markers " + s"has been sent to brokers. The cached metadata have been changed to $epochAndMetadata since preparing to send markers") @@ -284,6 +261,28 @@ class TransactionMarkerChannelManager(config: KafkaConfig, addTxnMarkersToBrokerQueue(transactionalId, txnMetadata.producerId, txnMetadata.producerEpoch, txnResult, coordinatorEpoch, txnMetadata.topicPartitions.toSet) } + private def tryAppendToLog(txnLogAppend: TxnLogAppend) = { + // try to append to the transaction log + def retryAppendCallback(error: Errors): Unit = + error match { + case Errors.NONE => + trace(s"Completed transaction for ${txnLogAppend.transactionalId} with coordinator epoch ${txnLogAppend.coordinatorEpoch}, final state: state after commit: ${txnLogAppend.txnMetadata.state}") + + case Errors.NOT_COORDINATOR => + info(s"No longer the coordinator for transactionalId: ${txnLogAppend.transactionalId} while trying to append to transaction log, skip writing to transaction log") + + case Errors.COORDINATOR_NOT_AVAILABLE => + warn(s"Failed updating transaction state for ${txnLogAppend.transactionalId} when appending to transaction log due to ${error.exceptionName}. retrying") + // enqueue for retry + txnLogAppendRetryQueue.add(txnLogAppend) + + case errors: Errors => + throw new IllegalStateException(s"Unexpected error ${errors.exceptionName} while appending to transaction log for ${txnLogAppend.transactionalId}") + } + + txnStateManager.appendTransactionToLog(txnLogAppend.transactionalId, txnLogAppend.coordinatorEpoch, txnLogAppend.newMetadata, retryAppendCallback) + } + def addTxnMarkersToBrokerQueue(transactionalId: String, producerId: Long, producerEpoch: Short, result: TransactionResult, coordinatorEpoch: Int, topicPartitions: immutable.Set[TopicPartition]): Unit = { @@ -367,3 +366,5 @@ class TransactionMarkerChannelManager(config: KafkaConfig, } case class TxnIdAndMarkerEntry(txnId: String, txnMarkerEntry: TxnMarkerEntry) + +case class TxnLogAppend(transactionalId: String, coordinatorEpoch: Int, txnMetadata: TransactionMetadata, newMetadata: TxnTransitMetadata) diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala index 128c17f033b89..4015a4fe1b47d 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala @@ -20,11 +20,10 @@ import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache} import kafka.utils.timer.MockTimer import kafka.utils.TestUtils import org.apache.kafka.clients.NetworkClient -import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{TransactionResult, WriteTxnMarkersRequest} import org.apache.kafka.common.utils.{MockTime, Utils} import org.apache.kafka.common.{Node, TopicPartition} -import org.easymock.{Capture, EasyMock, IAnswer} +import org.easymock.EasyMock import org.junit.Assert._ import org.junit.Test @@ -83,6 +82,8 @@ class TransactionMarkerChannelManagerTest { EasyMock.expect(txnStateManager.getAndMaybeAddTransactionState(EasyMock.eq(transactionalId2), EasyMock.anyObject[Option[TransactionMetadata]]())) .andReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata2)))) .anyTimes() + + EasyMock.replay(txnStateManager) } @Test @@ -105,7 +106,7 @@ class TransactionMarkerChannelManagerTest { EasyMock.anyObject()) ).andReturn(Some(broker2)).anyTimes() - EasyMock.replay(metadataCache, txnStateManager) + EasyMock.replay(metadataCache) channelManager.addTxnMarkersToSend(transactionalId1, coordinatorEpoch, txnResult, txnMetadata1, txnMetadata1.prepareComplete(time.milliseconds())) channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, txnResult, txnMetadata2, txnMetadata2.prepareComplete(time.milliseconds())) @@ -147,17 +148,7 @@ class TransactionMarkerChannelManagerTest { EasyMock.anyObject()) ).andReturn(Some(broker2)).anyTimes() - val capturedCallback: Capture[Errors => Unit] = EasyMock.newCapture() - EasyMock.expect(txnStateManager.appendTransactionToLog( - EasyMock.eq(transactionalId2), - EasyMock.anyObject(), - EasyMock.anyObject(), - EasyMock.capture(capturedCallback) - )).andAnswer(new IAnswer[Unit] { - override def answer(): Unit = capturedCallback.getValue.apply(Errors.NONE) - }) - - EasyMock.replay(metadataCache, txnStateManager) + EasyMock.replay(metadataCache) channelManager.addTxnMarkersToSend(transactionalId1, coordinatorEpoch, txnResult, txnMetadata1, txnMetadata1.prepareComplete(time.milliseconds())) channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, txnResult, txnMetadata2, txnMetadata2.prepareComplete(time.milliseconds())) @@ -189,7 +180,7 @@ class TransactionMarkerChannelManagerTest { EasyMock.anyObject()) ).andReturn(Some(broker2)).anyTimes() - EasyMock.replay(metadataCache, txnStateManager) + EasyMock.replay(metadataCache) channelManager.addTxnMarkersToSend(transactionalId1, coordinatorEpoch, txnResult, txnMetadata1, txnMetadata1.prepareComplete(time.milliseconds())) channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, txnResult, txnMetadata2, txnMetadata2.prepareComplete(time.milliseconds())) @@ -237,7 +228,7 @@ class TransactionMarkerChannelManagerTest { EasyMock.anyObject()) ).andReturn(Some(broker2)).anyTimes() - EasyMock.replay(metadataCache, txnStateManager) + EasyMock.replay(metadataCache) channelManager.addTxnMarkersToSend(transactionalId1, coordinatorEpoch, txnResult, txnMetadata1, txnMetadata1.prepareComplete(time.milliseconds())) channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, txnResult, txnMetadata2, txnMetadata2.prepareComplete(time.milliseconds())) From 9fe3efd8aae79dd703e9e99e78584afcfc71b19d Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Fri, 2 Jun 2017 14:55:37 +0100 Subject: [PATCH 3/6] add metrics --- .../TransactionMarkerChannelManager.scala | 26 ++++++++++++++++--- .../TransactionMarkerChannelManagerTest.scala | 5 +++- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala index b39ced99f8b8d..d8d4a8d3bee6c 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala @@ -25,7 +25,8 @@ import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache} import kafka.utils.Logging import org.apache.kafka.clients._ import org.apache.kafka.common.{Node, TopicPartition} -import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.metrics.{Metrics, Sensor} +import org.apache.kafka.common.metrics.stats.Total import org.apache.kafka.common.network._ import org.apache.kafka.common.requests.{TransactionResult, WriteTxnMarkersRequest} import org.apache.kafka.common.security.JaasContext @@ -82,7 +83,8 @@ object TransactionMarkerChannelManager { networkClient, txnStateManager, txnMarkerPurgatory, - time + time, + metrics ) } @@ -125,11 +127,18 @@ class TransactionMarkerChannelManager(config: KafkaConfig, networkClient: NetworkClient, txnStateManager: TransactionStateManager, txnMarkerPurgatory: DelayedOperationPurgatory[DelayedTxnMarker], - time: Time) extends Logging { + time: Time, + metrics:Metrics) extends Logging { + + private def addQueueLengthMetric(sensor: Sensor, namePrefix: String): Unit = { + sensor.add(metrics.metricName(s"$namePrefix-queue-length", "transaction-marker-channel-metrics"), new Total()) + } private val markersQueuePerBroker: concurrent.Map[Int, TxnMarkerQueue] = concurrent.TrieMap.empty[Int, TxnMarkerQueue] private val markersQueueForUnknownBroker = new TxnMarkerQueue(Node.noNode) + private val unknownBrokerQueueSensor = metrics.sensor("unknown-broker-queue") + addQueueLengthMetric(unknownBrokerQueueSensor, "unknown-broker") private val interBrokerListenerName: ListenerName = config.interBrokerListenerName @@ -139,6 +148,9 @@ class TransactionMarkerChannelManager(config: KafkaConfig, private val txnLogAppendRetryQueue = new LinkedBlockingQueue[TxnLogAppend]() + private val txnLogAppendRetryQueueSensor = metrics.sensor("txn-log-append-retry-queue") + addQueueLengthMetric(txnLogAppendRetryQueueSensor, "txn-log-append-retry") + def start(): Unit = { txnMarkerSendThread.start() } @@ -179,7 +191,10 @@ class TransactionMarkerChannelManager(config: KafkaConfig, val txnLogAppendRetries: java.util.List[TxnLogAppend] = new util.ArrayList[TxnLogAppend]() txnLogAppendRetryQueue.drainTo(txnLogAppendRetries) debug(s"retrying: ${txnLogAppendRetries.size} transaction log appends") - txnLogAppendRetries.asScala.foreach { txnLogAppend => tryAppendToLog(txnLogAppend) } + txnLogAppendRetries.asScala.foreach { txnLogAppend => + txnLogAppendRetryQueueSensor.record(-1) + tryAppendToLog(txnLogAppend) + } } @@ -191,6 +206,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig, } for (txnIdAndMarker: TxnIdAndMarkerEntry <- txnIdAndMarkerEntries.asScala) { + unknownBrokerQueueSensor.record(-1) val transactionalId = txnIdAndMarker.txnId val producerId = txnIdAndMarker.txnMarkerEntry.producerId val producerEpoch = txnIdAndMarker.txnMarkerEntry.producerEpoch @@ -273,6 +289,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig, case Errors.COORDINATOR_NOT_AVAILABLE => warn(s"Failed updating transaction state for ${txnLogAppend.transactionalId} when appending to transaction log due to ${error.exceptionName}. retrying") + txnLogAppendRetryQueueSensor.record(1) // enqueue for retry txnLogAppendRetryQueue.add(txnLogAppend) @@ -298,6 +315,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig, val txnIdAndMarker = TxnIdAndMarkerEntry(transactionalId, marker) if (brokerNode == Node.noNode) { + unknownBrokerQueueSensor.record(1) // if the leader of the partition is known but node not available, put it into an unknown broker queue // and let the sender thread to look for its broker and migrate them later markersQueueForUnknownBroker.addMarkers(txnTopicPartition, txnIdAndMarker) diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala index 4015a4fe1b47d..35699f73f5192 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala @@ -20,6 +20,7 @@ import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache} import kafka.utils.timer.MockTimer import kafka.utils.TestUtils import org.apache.kafka.clients.NetworkClient +import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.requests.{TransactionResult, WriteTxnMarkersRequest} import org.apache.kafka.common.utils.{MockTime, Utils} import org.apache.kafka.common.{Node, TopicPartition} @@ -59,13 +60,15 @@ class TransactionMarkerChannelManagerTest { reaperEnabled = false) private val time = new MockTime + private val metrics = new Metrics() private val channelManager = new TransactionMarkerChannelManager( KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:2181")), metadataCache, networkClient, txnStateManager, txnMarkerPurgatory, - time) + time, + metrics) private val senderThread = channelManager.senderThread From d000866cb8aa0c5b463b65713f55cf1bfadceb8f Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Fri, 2 Jun 2017 18:14:51 +0100 Subject: [PATCH 4/6] catch throwable in InterBrokerSendThread and rethrow as FatalExitError --- .../kafka/common/InterBrokerSendThread.scala | 48 +++++++++++-------- 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala index e8ac235d9227c..ea7aaf1e8ff60 100644 --- a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala +++ b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala @@ -19,6 +19,7 @@ package kafka.common import kafka.utils.ShutdownableThread import org.apache.kafka.clients.{ClientResponse, NetworkClient, RequestCompletionHandler} import org.apache.kafka.common.Node +import org.apache.kafka.common.internals.FatalExitError import org.apache.kafka.common.requests.AbstractRequest import org.apache.kafka.common.utils.Time @@ -40,32 +41,39 @@ class InterBrokerSendThread(name: String, val now = time.milliseconds() var pollTimeout = Long.MaxValue - val requestsToSend: Iterable[RequestAndCompletionHandler] = requestGenerator() + try { + val requestsToSend: Iterable[RequestAndCompletionHandler] = requestGenerator() - for (request: RequestAndCompletionHandler <- requestsToSend) { - val destination = Integer.toString(request.destination.id()) - val completionHandler = request.handler - val clientRequest = networkClient.newClientRequest(destination, - request.request, - now, - true, - completionHandler) + for (request: RequestAndCompletionHandler <- requestsToSend) { + val destination = Integer.toString(request.destination.id()) + val completionHandler = request.handler + val clientRequest = networkClient.newClientRequest(destination, + request.request, + now, + true, + completionHandler) - if (networkClient.ready(request.destination, now)) { - networkClient.send(clientRequest, now) - } else { - val disConnectedResponse: ClientResponse = new ClientResponse(clientRequest.makeHeader(request.request.desiredOrLatestVersion()), - completionHandler, destination, - now /* createdTimeMs */, now /* receivedTimeMs */, true /* disconnected */, null /* versionMismatch */, null /* responseBody */) + if (networkClient.ready(request.destination, now)) { + networkClient.send(clientRequest, now) + } else { + val disConnectedResponse: ClientResponse = new ClientResponse(clientRequest.makeHeader(request.request.desiredOrLatestVersion()), + completionHandler, destination, + now /* createdTimeMs */ , now /* receivedTimeMs */ , true /* disconnected */ , null /* versionMismatch */ , null /* responseBody */) - // poll timeout would be the minimum of connection delay if there are any dest yet to be reached; - // otherwise it is infinity - pollTimeout = Math.min(pollTimeout, networkClient.connectionDelay(request.destination, now)) + // poll timeout would be the minimum of connection delay if there are any dest yet to be reached; + // otherwise it is infinity + pollTimeout = Math.min(pollTimeout, networkClient.connectionDelay(request.destination, now)) - completionHandler.onComplete(disConnectedResponse) + completionHandler.onComplete(disConnectedResponse) + } } + networkClient.poll(pollTimeout, now) + } catch { + case e: FatalExitError => throw e + case t: Throwable => + error(s"unhandled exception caught in InterBrokerSendThread", t) + throw new FatalExitError() } - networkClient.poll(pollTimeout, now) } } From 50a8b73c03585d77f2689ab20288b9a488243578 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Sat, 3 Jun 2017 09:06:56 +0100 Subject: [PATCH 5/6] address comments --- .../kafka/common/InterBrokerSendThread.scala | 4 ++++ .../TransactionMarkerChannelManager.scala | 17 +++++++++++------ 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala index ea7aaf1e8ff60..886e41c487353 100644 --- a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala +++ b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala @@ -72,6 +72,10 @@ class InterBrokerSendThread(name: String, case e: FatalExitError => throw e case t: Throwable => error(s"unhandled exception caught in InterBrokerSendThread", t) + // rethrow any unhandled exceptions as FatalExitError so the JVM will be terminated + // as we will be in an unknown state with potentially some requests dropped and not + // being able to make progress. Known and expected Errors should have been appropriately + // dealt with already. throw new FatalExitError() } } diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala index d8d4a8d3bee6c..f6b1dc56ad55e 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala @@ -18,6 +18,7 @@ package kafka.coordinator.transaction import java.util +import java.util.Collections import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue} import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler} @@ -130,15 +131,19 @@ class TransactionMarkerChannelManager(config: KafkaConfig, time: Time, metrics:Metrics) extends Logging { - private def addQueueLengthMetric(sensor: Sensor, namePrefix: String): Unit = { - sensor.add(metrics.metricName(s"$namePrefix-queue-length", "transaction-marker-channel-metrics"), new Total()) + private def addQueueLengthMetric(sensor: Sensor, namePrefix: String, description: String): Unit = { + sensor.add(metrics.metricName(s"$namePrefix-queue-length", + "transaction-marker-channel-metrics", + description, + Collections.singletonMap("broker-id",config.brokerId.toString)), + new Total()) } private val markersQueuePerBroker: concurrent.Map[Int, TxnMarkerQueue] = concurrent.TrieMap.empty[Int, TxnMarkerQueue] private val markersQueueForUnknownBroker = new TxnMarkerQueue(Node.noNode) private val unknownBrokerQueueSensor = metrics.sensor("unknown-broker-queue") - addQueueLengthMetric(unknownBrokerQueueSensor, "unknown-broker") + addQueueLengthMetric(unknownBrokerQueueSensor, "unknown-broker", "the number of WriteTxnMarker requests with unknown brokers") private val interBrokerListenerName: ListenerName = config.interBrokerListenerName @@ -149,7 +154,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig, private val txnLogAppendRetryQueue = new LinkedBlockingQueue[TxnLogAppend]() private val txnLogAppendRetryQueueSensor = metrics.sensor("txn-log-append-retry-queue") - addQueueLengthMetric(txnLogAppendRetryQueueSensor, "txn-log-append-retry") + addQueueLengthMetric(txnLogAppendRetryQueueSensor, "txn-log-append-retry", "the number of txn log appends that need to be retried") def start(): Unit = { txnMarkerSendThread.start() @@ -190,9 +195,9 @@ class TransactionMarkerChannelManager(config: KafkaConfig, def retryLogAppends(): Unit = { val txnLogAppendRetries: java.util.List[TxnLogAppend] = new util.ArrayList[TxnLogAppend]() txnLogAppendRetryQueue.drainTo(txnLogAppendRetries) + txnLogAppendRetryQueueSensor.record(-txnLogAppendRetries.size()) debug(s"retrying: ${txnLogAppendRetries.size} transaction log appends") txnLogAppendRetries.asScala.foreach { txnLogAppend => - txnLogAppendRetryQueueSensor.record(-1) tryAppendToLog(txnLogAppend) } } @@ -205,8 +210,8 @@ class TransactionMarkerChannelManager(config: KafkaConfig, queue.drainTo(txnIdAndMarkerEntries) } + unknownBrokerQueueSensor.record(-txnIdAndMarkerEntries.size()) for (txnIdAndMarker: TxnIdAndMarkerEntry <- txnIdAndMarkerEntries.asScala) { - unknownBrokerQueueSensor.record(-1) val transactionalId = txnIdAndMarker.txnId val producerId = txnIdAndMarker.txnMarkerEntry.producerId val producerEpoch = txnIdAndMarker.txnMarkerEntry.producerEpoch From c0cb21f089250e042ad0b9f9e67ac0b25cdb5813 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Mon, 5 Jun 2017 10:05:14 +0100 Subject: [PATCH 6/6] close metrics in test --- .../transaction/TransactionMarkerChannelManagerTest.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala index 35699f73f5192..a59927f7907b9 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala @@ -26,7 +26,7 @@ import org.apache.kafka.common.utils.{MockTime, Utils} import org.apache.kafka.common.{Node, TopicPartition} import org.easymock.EasyMock import org.junit.Assert._ -import org.junit.Test +import org.junit.{After, Test} import scala.collection.mutable @@ -72,6 +72,11 @@ class TransactionMarkerChannelManagerTest { private val senderThread = channelManager.senderThread + @After + def after(): Unit = { + metrics.close() + } + private def mockCache(): Unit = { EasyMock.expect(txnStateManager.partitionFor(transactionalId1)) .andReturn(txnTopicPartition1)