Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-8334 Make sure the thread which tries to complete delayed reque… #8657

Merged
merged 17 commits into from Sep 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 1 addition & 21 deletions core/src/main/scala/kafka/cluster/Partition.scala
Expand Up @@ -103,19 +103,7 @@ class DelayedOperations(topicPartition: TopicPartition,
fetch.checkAndComplete(TopicPartitionOperationKey(topicPartition))
}

def checkAndCompleteProduce(): Unit = {
produce.checkAndComplete(TopicPartitionOperationKey(topicPartition))
}

def checkAndCompleteDeleteRecords(): Unit = {
deleteRecords.checkAndComplete(TopicPartitionOperationKey(topicPartition))
}

def numDelayedDelete: Int = deleteRecords.numDelayed

def numDelayedFetch: Int = fetch.numDelayed

def numDelayedProduce: Int = produce.numDelayed
}

object Partition extends KafkaMetricsGroup {
Expand Down Expand Up @@ -1010,15 +998,7 @@ class Partition(val topicPartition: TopicPartition,
}
}

// some delayed operations may be unblocked after HW changed
if (leaderHWIncremented)
tryCompleteDelayedRequests()
else {
// probably unblock some follower fetch requests since log end offset has been updated
delayedOperations.checkAndCompleteFetch()
}

info
info.copy(leaderHwChange = if (leaderHWIncremented) LeaderHwChange.Increased else LeaderHwChange.Same)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this change, DelayedOperations.checkAndCompleteFetch() is only used in tests. I am wondering if it can be removed. It's fine if we want to do this in a followup PR.

Unrelated to this PR, DelayedOperations.checkAndCompleteProduce and DelayedOperations.checkAndCompleteDeleteRecords seem unused. We can probably remove them in a separate PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this change, DelayedOperations.checkAndCompleteFetch() is only used in tests. I am wondering if it can be removed. It's fine if we want to do this in a followup PR.

I will file a ticket after this PR is merged.

Unrelated to this PR, DelayedOperations.checkAndCompleteProduce and DelayedOperations.checkAndCompleteDeleteRecords seem unused. We can probably remove them in a separate PR.

this is small change. I will remove them in this PR

}

def readRecords(fetchOffset: Long,
Expand Down
11 changes: 9 additions & 2 deletions core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
Expand Up @@ -36,8 +36,15 @@ private[group] class DelayedJoin(coordinator: GroupCoordinator,
rebalanceTimeout: Long) extends DelayedOperation(rebalanceTimeout, Some(group.lock)) {

override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete _)
override def onExpiration() = coordinator.onExpireJoin()
override def onComplete() = coordinator.onCompleteJoin(group)
override def onExpiration(): Unit = {
coordinator.onExpireJoin()
// try to complete delayed actions introduced by coordinator.onCompleteJoin
tryToCompleteDelayedAction()
chia7712 marked this conversation as resolved.
Show resolved Hide resolved
}
override def onComplete(): Unit = coordinator.onCompleteJoin(group)
chia7712 marked this conversation as resolved.
Show resolved Hide resolved

// TODO: remove this ugly chain after we move the action queue to handler thread
private def tryToCompleteDelayedAction(): Unit = coordinator.groupManager.replicaManager.tryCompleteActions()
}

/**
Expand Down
Expand Up @@ -56,7 +56,7 @@ import scala.jdk.CollectionConverters._
class GroupMetadataManager(brokerId: Int,
interBrokerProtocolVersion: ApiVersion,
config: OffsetConfig,
replicaManager: ReplicaManager,
val replicaManager: ReplicaManager,
zkClient: KafkaZkClient,
time: Time,
metrics: Metrics) extends Logging with KafkaMetricsGroup {
Expand Down
13 changes: 12 additions & 1 deletion core/src/main/scala/kafka/log/Log.scala
Expand Up @@ -68,6 +68,13 @@ object LogAppendInfo {
offsetsMonotonic = false, -1L, recordErrors, errorMessage)
}

sealed trait LeaderHwChange
object LeaderHwChange {
case object Increased extends LeaderHwChange
case object Same extends LeaderHwChange
case object None extends LeaderHwChange
}

/**
* Struct to hold various quantities we compute about each message set before appending to the log
*
Expand All @@ -85,6 +92,9 @@ object LogAppendInfo {
* @param validBytes The number of valid bytes
* @param offsetsMonotonic Are the offsets in this message set monotonically increasing
* @param lastOffsetOfFirstBatch The last offset of the first batch
* @param leaderHwChange Incremental if the high watermark needs to be increased after appending record.
* Same if high watermark is not changed. None is the default value and it means append failed
*
*/
case class LogAppendInfo(var firstOffset: Option[Long],
var lastOffset: Long,
Expand All @@ -100,7 +110,8 @@ case class LogAppendInfo(var firstOffset: Option[Long],
offsetsMonotonic: Boolean,
lastOffsetOfFirstBatch: Long,
recordErrors: Seq[RecordError] = List(),
errorMessage: String = null) {
errorMessage: String = null,
leaderHwChange: LeaderHwChange = LeaderHwChange.None) {
/**
* Get the first offset if it exists, else get the last offset of the first batch
* For magic versions 2 and newer, this method will return first offset. For magic versions
Expand Down
56 changes: 56 additions & 0 deletions core/src/main/scala/kafka/server/ActionQueue.scala
@@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.server

import java.util.concurrent.ConcurrentLinkedQueue

import kafka.utils.Logging

/**
* This queue is used to collect actions which need to be executed later. One use case is that ReplicaManager#appendRecords
* produces record changes so we need to check and complete delayed requests. In order to avoid conflicting locking,
* we add those actions to this queue and then complete them at the end of KafkaApis.handle() or DelayedJoin.onExpiration.
*/
class ActionQueue extends Logging {
private val queue = new ConcurrentLinkedQueue[() => Unit]()

/**
* add action to this queue.
* @param action action
*/
def add(action: () => Unit): Unit = queue.add(action)

/**
* try to complete all delayed actions
*/
def tryCompleteActions(): Unit = {
val maxToComplete = queue.size()
var count = 0
var done = false
while (!done && count < maxToComplete) {
try {
val action = queue.poll()
if (action == null) done = true
else action()
} catch {
case e: Throwable =>
error("failed to complete delayed actions", e)
} finally count += 1
}
}
}
118 changes: 51 additions & 67 deletions core/src/main/scala/kafka/server/DelayedOperation.scala
Expand Up @@ -41,13 +41,15 @@ import scala.collection.mutable.ListBuffer
* forceComplete().
*
* A subclass of DelayedOperation needs to provide an implementation of both onComplete() and tryComplete().
*
* Noted that if you add a future delayed operation that calls ReplicaManager.appendRecords() in onComplete()
* like DelayedJoin, you must be aware that this operation's onExpiration() needs to call actionQueue.tryCompleteAction().
*/
abstract class DelayedOperation(override val delayMs: Long,
lockOpt: Option[Lock] = None)
extends TimerTask with Logging {

private val completed = new AtomicBoolean(false)
private val tryCompletePending = new AtomicBoolean(false)
// Visible for testing
private[server] val lock: Lock = lockOpt.getOrElse(new ReentrantLock)

Expand Down Expand Up @@ -100,42 +102,24 @@ abstract class DelayedOperation(override val delayMs: Long,
def tryComplete(): Boolean

/**
* Thread-safe variant of tryComplete() that attempts completion only if the lock can be acquired
* without blocking.
*
* If threadA acquires the lock and performs the check for completion before completion criteria is met
* and threadB satisfies the completion criteria, but fails to acquire the lock because threadA has not
* yet released the lock, we need to ensure that completion is attempted again without blocking threadA
* or threadB. `tryCompletePending` is set by threadB when it fails to acquire the lock and at least one
* of threadA or threadB will attempt completion of the operation if this flag is set. This ensures that
* every invocation of `maybeTryComplete` is followed by at least one invocation of `tryComplete` until
* the operation is actually completed.
* Thread-safe variant of tryComplete() and call extra function if first tryComplete returns false
* @param f else function to be executed after first tryComplete returns false
* @return result of tryComplete
*/
private[server] def maybeTryComplete(): Boolean = {
var retry = false
var done = false
do {
if (lock.tryLock()) {
try {
tryCompletePending.set(false)
done = tryComplete()
} finally {
lock.unlock()
}
// While we were holding the lock, another thread may have invoked `maybeTryComplete` and set
// `tryCompletePending`. In this case we should retry.
retry = tryCompletePending.get()
} else {
// Another thread is holding the lock. If `tryCompletePending` is already set and this thread failed to
// acquire the lock, then the thread that is holding the lock is guaranteed to see the flag and retry.
// Otherwise, we should set the flag and retry on this thread since the thread holding the lock may have
// released the lock and returned by the time the flag is set.
retry = !tryCompletePending.getAndSet(true)
}
} while (!isCompleted && retry)
done
private[server] def safeTryCompleteOrElse(f: => Unit): Boolean = inLock(lock) {
if (tryComplete()) true
else {
f
// last completion check
tryComplete()
}
}

/**
* Thread-safe variant of tryComplete()
*/
private[server] def safeTryComplete(): Boolean = inLock(lock)(tryComplete())

/*
* run() method defines a task that is executed on timeout
*/
Expand Down Expand Up @@ -219,38 +203,38 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri
def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {
assert(watchKeys.nonEmpty, "The watch key list can't be empty")

// The cost of tryComplete() is typically proportional to the number of keys. Calling
// tryComplete() for each key is going to be expensive if there are many keys. Instead,
// we do the check in the following way. Call tryComplete(). If the operation is not completed,
// we just add the operation to all keys. Then we call tryComplete() again. At this time, if
// the operation is still not completed, we are guaranteed that it won't miss any future triggering
// event since the operation is already on the watcher list for all keys. This does mean that
// if the operation is completed (by another thread) between the two tryComplete() calls, the
// operation is unnecessarily added for watch. However, this is a less severe issue since the
// expire reaper will clean it up periodically.

// At this point the only thread that can attempt this operation is this current thread
// Hence it is safe to tryComplete() without a lock
var isCompletedByMe = operation.tryComplete()
if (isCompletedByMe)
return true

var watchCreated = false
for(key <- watchKeys) {
// If the operation is already completed, stop adding it to the rest of the watcher list.
if (operation.isCompleted)
return false
watchForOperation(key, operation)

if (!watchCreated) {
watchCreated = true
estimatedTotalOperations.incrementAndGet()
}
}

isCompletedByMe = operation.maybeTryComplete()
if (isCompletedByMe)
return true
// The cost of tryComplete() is typically proportional to the number of keys. Calling tryComplete() for each key is
// going to be expensive if there are many keys. Instead, we do the check in the following way through safeTryCompleteOrElse().
// If the operation is not completed, we just add the operation to all keys. Then we call tryComplete() again. At
// this time, if the operation is still not completed, we are guaranteed that it won't miss any future triggering
// event since the operation is already on the watcher list for all keys.
//
chia7712 marked this conversation as resolved.
Show resolved Hide resolved
// ==============[story about lock]==============
// Through safeTryCompleteOrElse(), we hold the operation's lock while adding the operation to watch list and doing
// the tryComplete() check. This is to avoid a potential deadlock between the callers to tryCompleteElseWatch() and
// checkAndComplete(). For example, the following deadlock can happen if the lock is only held for the final tryComplete()
// 1) thread_a holds readlock of stateLock from TransactionStateManager
// 2) thread_a is executing tryCompleteElseWatch()
// 3) thread_a adds op to watch list
// 4) thread_b requires writelock of stateLock from TransactionStateManager (blocked by thread_a)
// 5) thread_c calls checkAndComplete() and holds lock of op
// 6) thread_c is waiting readlock of stateLock to complete op (blocked by thread_b)
// 7) thread_a is waiting lock of op to call the final tryComplete() (blocked by thread_c)
//
// Note that even with the current approach, deadlocks could still be introduced. For example,
// 1) thread_a calls tryCompleteElseWatch() and gets lock of op
// 2) thread_a adds op to watch list
// 3) thread_a calls op#tryComplete and tries to require lock_b
// 4) thread_b holds lock_b and calls checkAndComplete()
// 5) thread_b sees op from watch list
// 6) thread_b needs lock of op
// To avoid the above scenario, we recommend DelayedOperationPurgatory.checkAndComplete() be called without holding
// any exclusive lock. Since DelayedOperationPurgatory.checkAndComplete() completes delayed operations asynchronously,
// holding a exclusive lock to make the call is often unnecessary.
if (operation.safeTryCompleteOrElse {
watchKeys.foreach(key => watchForOperation(key, operation))
if (watchKeys.nonEmpty) estimatedTotalOperations.incrementAndGet()
}) return true
chia7712 marked this conversation as resolved.
Show resolved Hide resolved

// if it cannot be completed by now and hence is watched, add to the expire queue also
if (!operation.isCompleted) {
Expand Down Expand Up @@ -375,7 +359,7 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri
if (curr.isCompleted) {
// another thread has completed this operation, just remove it
iter.remove()
} else if (curr.maybeTryComplete()) {
} else if (curr.safeTryComplete()) {
iter.remove()
completed += 1
}
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Expand Up @@ -186,6 +186,10 @@ class KafkaApis(val requestChannel: RequestChannel,
case e: FatalExitError => throw e
case e: Throwable => handleError(request, e)
} finally {
// try to complete delayed action. In order to avoid conflicting locking, the actions to complete delayed requests
// are kept in a queue. We add the logic to check the ReplicaManager queue at the end of KafkaApis.handle() and the
// expiration thread for certain delayed operations (e.g. DelayedJoin)
replicaManager.tryCompleteActions()
// The local completion time may be set while processing the request. Only record it if it's unset.
if (request.apiLocalCompleteTimeNanos < 0)
request.apiLocalCompleteTimeNanos = time.nanoseconds
Expand Down
31 changes: 31 additions & 0 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Expand Up @@ -558,10 +558,21 @@ class ReplicaManager(val config: KafkaConfig,
localLog(topicPartition).map(_.parentDir)
}

/**
* TODO: move this action queue to handle thread so we can simplify concurrency handling
*/
private val actionQueue = new ActionQueue

def tryCompleteActions(): Unit = actionQueue.tryCompleteActions()

/**
* Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas;
* the callback function will be triggered either when timeout or the required acks are satisfied;
* if the callback function itself is already synchronized on some object then pass this object to avoid deadlock.
*
* Noted that all pending delayed check operations are stored in a queue. All callers to ReplicaManager.appendRecords()
* are expected to call ActionQueue.tryCompleteActions for all affected partitions, without holding any conflicting
* locks.
*/
def appendRecords(timeout: Long,
requiredAcks: Short,
Expand All @@ -585,6 +596,26 @@ class ReplicaManager(val config: KafkaConfig,
result.info.logStartOffset, result.info.recordErrors.asJava, result.info.errorMessage)) // response status
}

actionQueue.add {
() =>
localProduceResults.foreach {
case (topicPartition, result) =>
val requestKey = TopicPartitionOperationKey(topicPartition)
result.info.leaderHwChange match {
case LeaderHwChange.Increased =>
// some delayed operations may be unblocked after HW changed
delayedProducePurgatory.checkAndComplete(requestKey)
delayedFetchPurgatory.checkAndComplete(requestKey)
delayedDeleteRecordsPurgatory.checkAndComplete(requestKey)
case LeaderHwChange.Same =>
// probably unblock some follower fetch requests since log end offset has been updated
delayedFetchPurgatory.checkAndComplete(requestKey)
case LeaderHwChange.None =>
// nothing
}
}
}

recordConversionStatsCallback(localProduceResults.map { case (k, v) => k -> v.info.recordConversionStats })

if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
Expand Down