Skip to content

Commit

Permalink
KAFKA-14694: RPCProducerIdManager should not wait on new block (#13267)
Browse files Browse the repository at this point in the history
RPCProducerIdManager initiates an async request to the controller to grab a block of producer IDs and then blocks waiting for a response from the controller.

This is done in the request handler threads while holding a global lock. This means that if many producers are requesting producer IDs and the controller is slow to respond, many threads can get stuck waiting for the lock.

This patch aims to:
* resolve the deadlock scenario mentioned above by not waiting for a new block and returning an error immediately
* remove synchronization usages in RpcProducerIdManager.generateProducerId()
* handle errors returned from generateProducerId() so that KafkaApis does not log unexpected errors
* confirm producers backoff before retrying
* introduce backoff if manager fails to process AllocateProducerIdsResponse

Reviewers: Artem Livshits <alivshits@confluent.io>, Jason Gustafson <jason@confluent.io>
  • Loading branch information
jeffkbkim committed Jun 22, 2023
1 parent 9c8aaa2 commit 1dbcb7d
Show file tree
Hide file tree
Showing 12 changed files with 414 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3035,6 +3035,28 @@ public void testCustomErrorMessage() throws Exception {
verifyErrorMessage(produceResponse(tp0, 0L, Errors.INVALID_REQUEST, 0, -1, errorMessage), errorMessage);
}

@Test
public void testSenderShouldRetryWithBackoffOnRetriableError() {
final long producerId = 343434L;
TransactionManager transactionManager = createTransactionManager();
setupWithTransactionState(transactionManager);
long start = time.milliseconds();

// first request is sent immediately
prepareAndReceiveInitProducerId(producerId, (short) -1, Errors.COORDINATOR_LOAD_IN_PROGRESS);
long request1 = time.milliseconds();
assertEquals(start, request1);

// backoff before sending second request
prepareAndReceiveInitProducerId(producerId, (short) -1, Errors.COORDINATOR_LOAD_IN_PROGRESS);
long request2 = time.milliseconds();
assertEquals(RETRY_BACKOFF_MS, request2 - request1);

// third request should also backoff
prepareAndReceiveInitProducerId(producerId, Errors.NONE);
assertEquals(RETRY_BACKOFF_MS, time.milliseconds() - request2);
}

private void verifyErrorMessage(ProduceResponse response, String expectedMessage) throws Exception {
Future<RecordMetadata> future = appendToAccumulator(tp0, 0L, "key", "value");
sender.runOnce(); // connect
Expand Down Expand Up @@ -3191,7 +3213,7 @@ private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors e
}

private TransactionManager createTransactionManager() {
return new TransactionManager(new LogContext(), null, 0, 100L, new ApiVersions());
return new TransactionManager(new LogContext(), null, 0, RETRY_BACKOFF_MS, new ApiVersions());
}

private void setupWithTransactionState(TransactionManager transactionManager) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package kafka.coordinator.transaction

import kafka.coordinator.transaction.ProducerIdManager.{IterationLimit, NoRetry, RetryBackoffMs}
import kafka.server.{BrokerToControllerChannelManager, ControllerRequestCompletionHandler}
import kafka.utils.Logging
import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode}
Expand All @@ -24,10 +25,11 @@ import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.message.AllocateProducerIdsRequestData
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AllocateProducerIdsRequest, AllocateProducerIdsResponse}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.common.ProducerIdsBlock

import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference}
import scala.compat.java8.OptionConverters.RichOptionalGeneric
import scala.util.{Failure, Success, Try}

/**
Expand All @@ -41,6 +43,9 @@ import scala.util.{Failure, Success, Try}
object ProducerIdManager {
// Once we reach this percentage of PIDs consumed from the current block, trigger a fetch of the next block
val PidPrefetchThreshold = 0.90
val IterationLimit = 3
val RetryBackoffMs = 50
val NoRetry = -1L

// Creates a ProducerIdGenerate that directly interfaces with ZooKeeper, IBP < 3.0-IV0
def zk(brokerId: Int, zkClient: KafkaZkClient): ZkProducerIdManager = {
Expand All @@ -49,16 +54,20 @@ object ProducerIdManager {

// Creates a ProducerIdGenerate that uses AllocateProducerIds RPC, IBP >= 3.0-IV0
def rpc(brokerId: Int,
brokerEpochSupplier: () => Long,
controllerChannel: BrokerToControllerChannelManager,
maxWaitMs: Int): RPCProducerIdManager = {
new RPCProducerIdManager(brokerId, brokerEpochSupplier, controllerChannel, maxWaitMs)
time: Time,
brokerEpochSupplier: () => Long,
controllerChannel: BrokerToControllerChannelManager): RPCProducerIdManager = {

new RPCProducerIdManager(brokerId, time, brokerEpochSupplier, controllerChannel)
}
}

trait ProducerIdManager {
def generateProducerId(): Long
def generateProducerId(): Try[Long]
def shutdown() : Unit = {}

// For testing purposes
def hasValidBlock: Boolean
}

object ZkProducerIdManager {
Expand Down Expand Up @@ -103,8 +112,7 @@ object ZkProducerIdManager {
}
}

class ZkProducerIdManager(brokerId: Int,
zkClient: KafkaZkClient) extends ProducerIdManager with Logging {
class ZkProducerIdManager(brokerId: Int, zkClient: KafkaZkClient) extends ProducerIdManager with Logging {

this.logIdent = "[ZK ProducerId Manager " + brokerId + "]: "

Expand All @@ -123,73 +131,103 @@ class ZkProducerIdManager(brokerId: Int,
}
}

def generateProducerId(): Long = {
def generateProducerId(): Try[Long] = {
this synchronized {
// grab a new block of producerIds if this block has been exhausted
if (nextProducerId > currentProducerIdBlock.lastProducerId) {
allocateNewProducerIdBlock()
try {
allocateNewProducerIdBlock()
} catch {
case t: Throwable =>
return Failure(t)
}
nextProducerId = currentProducerIdBlock.firstProducerId
}
nextProducerId += 1
nextProducerId - 1
Success(nextProducerId - 1)
}
}

override def hasValidBlock: Boolean = {
this synchronized {
!currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
}
}
}

/**
* RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests
* for producers to retry if it does not have an available producer id and is waiting on a new block.
*/
class RPCProducerIdManager(brokerId: Int,
time: Time,
brokerEpochSupplier: () => Long,
controllerChannel: BrokerToControllerChannelManager,
maxWaitMs: Int) extends ProducerIdManager with Logging {
controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging {

this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "

private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
// Visible for testing
private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null)
private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY)
private val requestInFlight = new AtomicBoolean(false)
private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
private var nextProducerId: Long = -1L
private val backoffDeadlineMs = new AtomicLong(NoRetry)

override def generateProducerId(): Long = {
this synchronized {
if (nextProducerId == -1L) {
// Send an initial request to get the first block
maybeRequestNextBlock()
nextProducerId = 0L
} else {
nextProducerId += 1

// Check if we need to fetch the next block
if (nextProducerId >= (currentProducerIdBlock.firstProducerId + currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
maybeRequestNextBlock()
}
}
override def hasValidBlock: Boolean = {
nextProducerIdBlock.get != null
}

// If we've exhausted the current block, grab the next block (waiting if necessary)
if (nextProducerId > currentProducerIdBlock.lastProducerId) {
val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
if (block == null) {
// Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal
// when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out waiting for next producer ID block")
} else {
block match {
case Success(nextBlock) =>
currentProducerIdBlock = nextBlock
nextProducerId = currentProducerIdBlock.firstProducerId
case Failure(t) => throw t
override def generateProducerId(): Try[Long] = {
var result: Try[Long] = null
var iteration = 0
while (result == null) {
currentProducerIdBlock.get.claimNextId().asScala match {
case None =>
// Check the next block if current block is full
val block = nextProducerIdBlock.getAndSet(null)
if (block == null) {
// Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal
// when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
maybeRequestNextBlock()
result = Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block"))
} else {
currentProducerIdBlock.set(block)
requestInFlight.set(false)
iteration = iteration + 1
}
}

case Some(nextProducerId) =>
// Check if we need to prefetch the next block
val prefetchTarget = currentProducerIdBlock.get.firstProducerId + (currentProducerIdBlock.get.size * ProducerIdManager.PidPrefetchThreshold).toLong
if (nextProducerId == prefetchTarget) {
maybeRequestNextBlock()
}
result = Success(nextProducerId)
}
if (iteration == IterationLimit) {
result = Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block"))
}
nextProducerId
}
result
}


private def maybeRequestNextBlock(): Unit = {
if (nextProducerIdBlock.isEmpty && requestInFlight.compareAndSet(false, true)) {
sendRequest()
// Visible for testing
private[transaction] def maybeRequestNextBlock(): Unit = {
val retryTimestamp = backoffDeadlineMs.get()
if (retryTimestamp == NoRetry || time.milliseconds() >= retryTimestamp) {
// Send a request only if we reached the retry deadline, or if no deadline was set.

if (nextProducerIdBlock.get == null &&
requestInFlight.compareAndSet(false, true) ) {

sendRequest()
// Reset backoff after a successful send.
backoffDeadlineMs.set(NoRetry)
}
}
}

// Visible for testing
private[transaction] def sendRequest(): Unit = {
val message = new AllocateProducerIdsRequestData()
.setBrokerEpoch(brokerEpochSupplier.apply())
Expand All @@ -207,37 +245,40 @@ class RPCProducerIdManager(brokerId: Int,
})
}

// Visible for testing
private[transaction] def handleAllocateProducerIdsResponse(response: AllocateProducerIdsResponse): Unit = {
requestInFlight.set(false)
val data = response.data
var successfulResponse = false
Errors.forCode(data.errorCode()) match {
case Errors.NONE =>
debug(s"Got next producer ID block from controller $data")
// Do some sanity checks on the response
if (data.producerIdStart() < currentProducerIdBlock.lastProducerId) {
nextProducerIdBlock.put(Failure(new KafkaException(
s"Producer ID block is not monotonic with current block: current=$currentProducerIdBlock response=$data")))
if (data.producerIdStart() < currentProducerIdBlock.get.lastProducerId) {
error(s"Producer ID block is not monotonic with current block: current=$currentProducerIdBlock response=$data")
} else if (data.producerIdStart() < 0 || data.producerIdLen() < 0 || data.producerIdStart() > Long.MaxValue - data.producerIdLen()) {
nextProducerIdBlock.put(Failure(new KafkaException(s"Producer ID block includes invalid ID range: $data")))
error(s"Producer ID block includes invalid ID range: $data")
} else {
nextProducerIdBlock.put(
Success(new ProducerIdsBlock(brokerId, data.producerIdStart(), data.producerIdLen())))
nextProducerIdBlock.set(new ProducerIdsBlock(brokerId, data.producerIdStart(), data.producerIdLen()))
successfulResponse = true
}
case Errors.STALE_BROKER_EPOCH =>
warn("Our broker epoch was stale, trying again.")
maybeRequestNextBlock()
warn("Our broker currentBlockCount was stale, trying again.")
case Errors.BROKER_ID_NOT_REGISTERED =>
warn("Our broker ID is not yet known by the controller, trying again.")
maybeRequestNextBlock()
case e: Errors =>
warn("Had an unknown error from the controller, giving up.")
nextProducerIdBlock.put(Failure(e.exception()))
error(s"Received an unexpected error code from the controller: $e")
}

if (!successfulResponse) {
// There is no need to compare and set because only one thread
// handles the AllocateProducerIds response.
backoffDeadlineMs.set(time.milliseconds() + RetryBackoffMs)
requestInFlight.set(false)
}
}

private[transaction] def handleTimeout(): Unit = {
private def handleTimeout(): Unit = {
warn("Timed out when requesting AllocateProducerIds from the controller.")
requestInFlight.set(false)
maybeRequestNextBlock()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.kafka.common.utils.{LogContext, ProducerIdAndEpoch, Time}
import org.apache.kafka.server.util.Scheduler

import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success}

object TransactionCoordinator {

Expand Down Expand Up @@ -113,8 +114,12 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
if (transactionalId == null) {
// if the transactional id is null, then always blindly accept the request
// and return a new producerId from the producerId manager
val producerId = producerIdManager.generateProducerId()
responseCallback(InitProducerIdResult(producerId, producerEpoch = 0, Errors.NONE))
producerIdManager.generateProducerId() match {
case Success(producerId) =>
responseCallback(InitProducerIdResult(producerId, producerEpoch = 0, Errors.NONE))
case Failure(exception) =>
responseCallback(initTransactionError(Errors.forException(exception)))
}
} else if (transactionalId.isEmpty) {
// if transactional id is empty then return error as invalid request. This is
// to make TransactionCoordinator's behavior consistent with producer client
Expand All @@ -125,17 +130,22 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
} else {
val coordinatorEpochAndMetadata = txnManager.getTransactionState(transactionalId).flatMap {
case None =>
val producerId = producerIdManager.generateProducerId()
val createdMetadata = new TransactionMetadata(transactionalId = transactionalId,
producerId = producerId,
lastProducerId = RecordBatch.NO_PRODUCER_ID,
producerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
txnTimeoutMs = transactionTimeoutMs,
state = Empty,
topicPartitions = collection.mutable.Set.empty[TopicPartition],
txnLastUpdateTimestamp = time.milliseconds())
txnManager.putTransactionStateIfNotExists(createdMetadata)
producerIdManager.generateProducerId() match {
case Success(producerId) =>
val createdMetadata = new TransactionMetadata(transactionalId = transactionalId,
producerId = producerId,
lastProducerId = RecordBatch.NO_PRODUCER_ID,
producerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
txnTimeoutMs = transactionTimeoutMs,
state = Empty,
topicPartitions = collection.mutable.Set.empty[TopicPartition],
txnLastUpdateTimestamp = time.milliseconds())
txnManager.putTransactionStateIfNotExists(createdMetadata)

case Failure(exception) =>
Left(Errors.forException(exception))
}

case Some(epochAndTxnMetadata) => Right(epochAndTxnMetadata)
}
Expand Down Expand Up @@ -231,9 +241,14 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
// If the epoch is exhausted and the expected epoch (if provided) matches it, generate a new producer ID
if (txnMetadata.isProducerEpochExhausted &&
expectedProducerIdAndEpoch.forall(_.epoch == txnMetadata.producerEpoch)) {
val newProducerId = producerIdManager.generateProducerId()
Right(txnMetadata.prepareProducerIdRotation(newProducerId, transactionTimeoutMs, time.milliseconds(),
expectedProducerIdAndEpoch.isDefined))

producerIdManager.generateProducerId() match {
case Success(producerId) =>
Right(txnMetadata.prepareProducerIdRotation(producerId, transactionTimeoutMs, time.milliseconds(),
expectedProducerIdAndEpoch.isDefined))
case Failure(exception) =>
Left(Errors.forException(exception))
}
} else {
txnMetadata.prepareIncrementProducerEpoch(transactionTimeoutMs, expectedProducerIdAndEpoch.map(_.epoch),
time.milliseconds())
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,9 @@ class BrokerServer(

val producerIdManagerSupplier = () => ProducerIdManager.rpc(
config.brokerId,
time,
brokerEpochSupplier = () => lifecycleManager.brokerEpoch,
clientToControllerChannelManager,
config.requestTimeoutMs
clientToControllerChannelManager
)

// Create transaction coordinator, but don't start it until we've started replica manager.
Expand Down

0 comments on commit 1dbcb7d

Please sign in to comment.