-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-5202: Handle topic deletion while trying to send txn markers #3130
KAFKA-5202: Handle topic deletion while trying to send txn markers #3130
Conversation
ping @hachikuji @apurvam @dguy |
@@ -208,6 +209,10 @@ class TransactionCoordinator(brokerId: Int, | |||
if (transactionalId == null || transactionalId.isEmpty) { | |||
responseCallback(Errors.INVALID_REQUEST) | |||
} else { | |||
// if there is any partitions unknown in the metadata cache, return immediately to client | |||
if (partitions.exists(tp => !metadataCache.contains(tp.topic, tp.partition))) | |||
responseCallback(Errors.UNKNOWN_TOPIC_OR_PARTITION) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mentioned offline, but leaving it here as well: We should send this as a per partition error only for the partitions which are missing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about this a bit more, the behavior of the client will be tricky. Essentially, the send will block until it's expired, and yet the AddPartitions
will keep retrying indefinitely. And because of https://issues.apache.org/jira/browse/KAFKA-5251, the client will keep retrying the AddPartitions
even if the user aborts. If we want to keep this behavior, I think we may want to fix KAFKA-5251 as well. This way the user can at least abort the transaction properly after realizing that they are trying to send to deleted partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to keep this behavior, I think we may want to fix KAFKA-5251 as well.
I think KAFKA-5251 is an optimization, while we want to fix the client behavior that if the topic partition gets deleted while the producer is having an ongoing transaction, it should be able to detect this and proceed instead of retrying, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. In other words, the problem is that the UNKNOWN_TOPIC_PARTITION
is a retriable error right now, so it will keep retrying. We need to add logic distinguish between cases where a broker is bounced and doesn't have its metadata yet, and from cases where the topic is truly deleted. There is no such logic in the client today, so it will retry indefinitely when a topic is deleted.
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
// TODO: instead of retry until succeed, we can first put it into an unknown broker queue and let the sender thread to look for its broker and migrate them | ||
while (brokerNode.isEmpty) { | ||
brokerNode = metadataCache.getPartitionLeaderEndpoint(topicPartition.topic, topicPartition.partition, interBrokerListenerName) | ||
brokerNode = metadataCache.getPartitionLeaderEndpoint(topicPartition.topic, topicPartition.partition, interBrokerListenerName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can't we simply do metadataCache.contains(topicPartition.topic)
to check if the topic exists?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I follow.. we need to get the broker node in order to put into the corresponding queue right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. I guess I need to sync face to face. But from the code it seems like if there is no leader for any single topic partition, then the operation for the entire transactionalId in the purgatory will be marked as completed? If this is true, then I have two questions:
- Would we enter this case if there are no live replicas for a particular partition? Or will
metadataCache.getPartitionLeaderEndpoint
only return no brokers if the topic is deleted? - Assuming that no leader means the partition is deleted, would the current logic mark the operation in purgatory as successful if even one partition in the transaction was deleted?
As I said, we can discuss this face to face: it may be more efficient as I am not most familiar with a bunch of this stuff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- good question about
getPartitionLeaderEndpoint
: I will use a separate queue for brokers who are not known but not available yet. - that is intentional: if one of the partitions are deleted but others have successfully written the markers, we should still treat this transaction ad completed, since the append of prepareXX txn log means that the txn has to be completedXX, even if some data partitions are deleted.
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
brokerNode.get | ||
} | ||
case None => | ||
// if the leader of the partition is unknown, skip sending the txn marker since |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guozhangwang i guess we are assuming deletion as we would never have made it this far if the partition wasn't previously in the cache?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The bottom line is that metadataCache
's cache
memory structure is "incremental" that it will not remove entries once added them unless the tp is marked as deleted in the metadata update request.
In KafkaApis
we have already checked the cache and filtered any partitions whose info is not in the cache, so if later in the path here, that it has been gone, it means it was there and has been deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it is possible that the topic is deleted and recreated before we notice that it is gone. In that case, we might write the marker to the new topic. That seems fine since there would be no transactional data from the producer.
By the way, is it assumed that metadata must be updated before we can become the leader of any partition? Is there no scenario where we could see an uninitialized or stale metadata cache?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When there is a leader change, controller will always send the "update metadata request", wait for its response, and then send "leader and isr request" in a second round trip, so I think the answer should be yes.
That seems fine since there would be no transactional data from the producer.
Yes, as I mentioned in the description of the PR above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, saw your comment after I posted the review. Glad we came to the same conclusion!
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch. Left mostly minor comments. The only thing I wasn't too sure about is how safe it is to depend on the contents of the metadata cache. Also, if checking the cache is reliable, we could also check it when loading the transaction metadata on partition immigration before sending any markers.
@@ -156,6 +155,9 @@ class TransactionMarkerChannelManager(config: KafkaConfig, | |||
} | |||
|
|||
// visible for testing | |||
private[transaction] def queueForUnknownBroker = markersQueueForUnknownBroker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe you could just give markersQueueForUnknownBroker
private scope in the transaction package? Also, this name is a bit easier on the eyes than markersQueueForUnknownBroker
, so maybe we could remove this and change the name of the variable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I felt the variable names the queue markersQueueForUnknownBroker
and markersQueuePerBroker
are okay, and the reason of these two functions queueForUnknownBroker
and queueForBroker()
is exactly to separate the actual private variables from the test-only functionalities.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hiding the field doesn't seem to have much value if we expose it directly through an accessor anyway and it's annoying to need two names for the same thing. If it only needs to be accessed from a test case, I wouldn't worry too much about violating encapsulation because we can just change the test case if the implementation changes. That said, it's just a nit, so feel free to ignore.
@@ -128,10 +128,9 @@ class TransactionMarkerChannelManager(config: KafkaConfig, | |||
|
|||
private val markersQueuePerBroker: concurrent.Map[Int, TxnMarkerQueue] = concurrent.TrieMap.empty[Int, TxnMarkerQueue] | |||
|
|||
private val interBrokerListenerName: ListenerName = config.interBrokerListenerName | |||
private val markersQueueForUnknownBroker: TxnMarkerQueue = new TxnMarkerQueue(Node.noNode) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: type parameter on the lhs seems redundant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Intellij IDE states the opposite :) Anyways, I know Ismael is a strong advocator against those redundants so I will change it back.
@@ -16,6 +16,7 @@ | |||
*/ | |||
package kafka.coordinator.transaction | |||
|
|||
import kafka.server.MetadataCache |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems we don't use this anywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack.
@@ -160,16 +160,20 @@ class MetadataCache(brokerId: Int) extends Logging { | |||
} | |||
} | |||
|
|||
// if the leader is not known, return None; | |||
// if the leader is known and corresponding node is available, return Some(node) | |||
// if the leader is known but corresponding node with the listener name is not available, return Some(NO_NODE) | |||
def getPartitionLeaderEndpoint(topic: String, partitionId: Int, listenerName: ListenerName): Option[Node] = { | |||
inReadLock(partitionMetadataLock) { | |||
cache.get(topic).flatMap(_.get(partitionId)) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: seems this could be a map
instead of match
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes! ack.
@@ -235,6 +239,8 @@ class MetadataCache(brokerId: Int) extends Logging { | |||
} | |||
} | |||
|
|||
def contains(topic: String, partitionId: Int): Boolean = getPartitionInfo(topic, partitionId).isDefined |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could this accept TopicPartition
instead? Seems like that's what we usually have in KafkaApis
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack.
@@ -254,31 +272,65 @@ class TransactionMarkerChannelManager(config: KafkaConfig, | |||
result: TransactionResult, coordinatorEpoch: Int, | |||
topicPartitions: immutable.Set[TopicPartition]): Unit = { | |||
val txnTopicPartition = txnStateManager.partitionFor(transactionalId) | |||
val partitionsByDestination: immutable.Map[Node, immutable.Set[TopicPartition]] = topicPartitions.groupBy { topicPartition: TopicPartition => | |||
var brokerNode: Option[Node] = None | |||
val partitionsByDestination: immutable.Map[Option[Node], immutable.Set[TopicPartition]] = topicPartitions.groupBy { topicPartition: TopicPartition => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: keying with an Option
is a little odd. I wonder if it would be clearer to partition this into two separate collections, partitionsByDestination
and partitionsWithUnknownDestinations
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree it is a bit odd, though with the above path we need to traverse the map twice with metadataCache.getPartitionLeaderEndpoint
, one for partition
and one for groupBy
. Since this code is part of a sorta critical path I'm not sure which one is more efficient?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, not too sure how much difference it would make, but we can leave it as is.
brokerNode.get | ||
} | ||
case None => | ||
// if the leader of the partition is unknown, skip sending the txn marker since |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it is possible that the topic is deleted and recreated before we notice that it is gone. In that case, we might write the marker to the new topic. That seems fine since there would be no transactional data from the producer.
By the way, is it assumed that metadata must be updated before we can become the leader of any partition? Is there no scenario where we could see an uninitialized or stale metadata cache?
val marker = new TxnMarkerEntry(producerId, producerEpoch, coordinatorEpoch, result, topicPartitions.toList.asJava) | ||
val txnIdAndMarker = TxnIdAndMarkerEntry(transactionalId, marker) | ||
|
||
if (brokerNode.eq(Node.noNode)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I understand we can use reference equality, but is it necessary? Seems a bit brittle to depend on noNode
returning the same object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again this is for efficiency, I can change it to equals
if you feel it is not necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No strong opinion either way, but I'd favor the normal ==
check because the performance difference is almost certainly negligible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack.
case None => | ||
// if the leader of the partition is unknown, skip sending the txn marker since | ||
// the partition is likely to be deleted already | ||
info(s"Couldn't find leader endpoint for partitions $topicPartitions while trying to send transaction markers for " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Maybe we could move this to after coordinator epoch check? Seems like it might be misleading if we end up cancelling the operation anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack.
|
||
case Right(Some(epochAndMetadata)) => | ||
if (epochAndMetadata.coordinatorEpoch != coordinatorEpoch) { | ||
info(s"The cached metadata have been changed to $epochAndMetadata since preparing to send markers; cancel sending markers to its partition leaders") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe worth mentioning the old and new coordinator epoch values?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack.
…into K5202-handle-topic-deletion
…into K5202-handle-topic-deletion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Tests passing locally. Merging to trunk and 0.11.0. |
Here is the sketch of this proposal: 1. When it is time to send the txn markers, only look for the leader node of the partition once instead of retrying, and if that information is not available, it means the partition is highly likely been removed since it was in the cache before. In this case, we just remove the partition from the metadata object and skip putting into the corresponding queue, and if all partitions' leader broker are non-available, complete this delayed operation to proceed to write the complete txn log entry. 2. If the leader id is unknown from the cache but the corresponding node object with the listener name is not available, it means that the leader is likely unavailable right now. Put it into a separate queue and let sender thread retry fetching its metadata again each time upon draining the queue. One caveat of this approach is the delete-and-recreate case, and the argument is that since all the messages are deleted anyways when deleting the topic-partition, it does not matter whether the markers are on the log partitions or not. Author: Guozhang Wang <wangguoz@gmail.com> Reviewers: Apurva Mehta <apurva@confluent.io>, Damian Guy <damian.guy@gmail.com>, Jason Gustafson <jason@confluent.io> Closes #3130 from guozhangwang/K5202-handle-topic-deletion (cherry picked from commit 80223b1) Signed-off-by: Jason Gustafson <jason@confluent.io>
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Here is the sketch of this proposal:
When it is time to send the txn markers, only look for the leader node of the partition once instead of retrying, and if that information is not available, it means the partition is highly likely been removed since it was in the cache before. In this case, we just remove the partition from the metadata object and skip putting into the corresponding queue, and if all partitions' leader broker are non-available, complete this delayed operation to proceed to write the complete txn log entry.
If the leader id is unknown from the cache but the corresponding node object with the listener name is not available, it means that the leader is likely unavailable right now. Put it into a separate queue and let sender thread retry fetching its metadata again each time upon draining the queue.
One caveat of this approach is the delete-and-recreate case, and the argument is that since all the messages are deleted anyways when deleting the topic-partition, it does not matter whether the markers are on the log partitions or not.