-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-9539; Add leader epoch in StopReplicaRequest (KIP-570) #8257
Conversation
core/src/main/scala/kafka/controller/ControllerChannelManager.scala
Outdated
Show resolved
Hide resolved
ok to test |
build failures are due to #8301. |
retest this please |
ok to test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, left a few comments
} | ||
assertEquals(expectedPartitions, partitions); | ||
} else { | ||
Map<TopicPartition, StopReplicaPartitionState> partitionStates = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wonder if it makes sense to add this method to StopReplicaRequest
. Often the code expects to work with TopicPartition
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have considered this as well but I haven't done it because it is only used in tests so far. I think that the downside is that using this in core is not optimal thus I am a bit reluctant to provide it. I mean, allocating and populating the Map
is not necessary, especially when the controller and the brokers use the latest version of the API.
topic1.partitionStates().add(new StopReplicaPartitionState() | ||
.setPartitionIndex(2) | ||
.setLeaderEpoch(2)); | ||
topic1.partitionStates().add(new StopReplicaPartitionState() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be worth adding one case where the epoch is -2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, done.
s"epoch $controllerEpoch for partition $topicPartition since its associated " + | ||
s"leader epoch $requestLeaderEpoch is smaller than the current " + | ||
s"leader epoch $currentLeaderEpoch") | ||
responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.. This error seems a little inaccurate. Could we use FENCED_LEADER_EPOCH
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I completely forgot to raise it...
I use STALE_CONTROLLER_EPOCH
here to stay inline with the LeaderAndIsr
API which uses is as well when the leader epoch is stale. See here: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1227
It was introduced in an old refactoring: a9ff3f2#diff-4f99f5a41c14e2a8523c03ce4ae23987L630. It seems that back in the days, we had StaleLeaderEpochCode
but it got replaced by STALE_CONTROLLER_EPOCH
.
I was actually wondering if we should stay inline with the current behavior of the LeaderAndIsr
or just use FENCED_LEADER_EPOCH
. If there a not other reasons besides the historical one to use STALE_CONTROLLER_EPOCH
, FENCED_LEADER_EPOCH
seems indeed more appropriate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have changed it to use FENCED_LEADER_EPOCH
.
try { | ||
// Delete log and corresponding folders in case replica manager doesn't hold them anymore. | ||
// This could happen when topic is being deleted while broker is down and recovers. | ||
maybeCleanReplica(topicPartition, deletePartition) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This changes the order of the operations. Previously we would have stopped fetchers before attempting to delete the log directory. Are we sure this is safe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to take another look at it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a second look at this one and I think that the change is safe. Let me explain.
We can end up in this situation where the replica is not know any more by broker in two ways:
-
The broker receives a StopReplica request without having received a LeaderAndIsr request prior to it. In this case, the partition is not created and the fetchers haven't been started so we don't need to stop them for the concerned partitions. We have one test which simulate such scenario: https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala#L432. It fails if I comment the cleaning logic.
-
The handling of the StopReplica request fails with a storage exception when it deletes the log directory. The delete happens after the partition is effectively removed from the
allPartitions
map in the ReplicaManager. Note that the fetchers for the concerned partitions are already stopped at this point as they are stopped before removing the partition from the Map. If the request is retried somehow, the partition won't be there so the cleaning would take place.
All together, fetchers are always started after the partition is added in the allPartitions
Map and always stopped before removing the partition from the Map. If it is not in the Map, fetchers can't be started. Thus, this seems safe to me based on my current knowledge.
The only benefit of putting it there is that the logging is better in my opinion. When the replica does not exist, we don't get the handling StopReplica...
and completed StopReplica...
but only Ignoring StopReplica... cause replica does not exist
.
I would be fine reverting back to the prior behavior as it is only a cosmetic change at the end. It may be safer to do so.
retest this please |
@hachikuji I think that I have answered all your comments/questions. Could you have a second look at it please? |
|
||
case (topicPartition, Left(true)) if topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME => | ||
// The StopReplica API does not pass through the leader epoch | ||
txnCoordinator.onResignation(topicPartition.partition, coordinatorEpoch = None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji It looks like that we could improve this now that we do have the leader epoch. I am not familiar at all with transactions. Can I just pass the epoch when provided here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems safe to pass through when defined.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, left a few more comments.
} | ||
|
||
return new StopReplicaRequest(data, version); | ||
} | ||
|
||
private boolean deletePartitions() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might not be too big of a problem, but it would be nice to avoid this pass through all the partitions. As it is we have 1) first pass to split partitions, 2) second pass to validate split, 3) third pass to convert to the needed type. Seems like we should be able to save some work here, like perhaps moving the conversion to the caller (even though it's annoying).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. I did not realise this. I think that the best way is to move everything to the caller. I will do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have refactored this and pushed all the conversion to the caller.
|
||
case (topicPartition, Left(true)) if topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME => | ||
// The StopReplica API does not pass through the leader epoch | ||
txnCoordinator.onResignation(topicPartition.partition, coordinatorEpoch = None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems safe to pass through when defined.
s"${stopReplicaRequest.brokerEpoch} smaller than the current broker epoch ${controller.brokerEpoch}") | ||
sendResponseExemptThrottle(request, new StopReplicaResponse(new StopReplicaResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code))) | ||
} else { | ||
val (result, error) = replicaManager.stopReplicas(stopReplicaRequest) | ||
val (result, error) = replicaManager.stopReplicas(request.context.correlationId, stopReplicaRequest) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The return type is a bit awkward. As far as I can tell, the left side is just returning the deletion status, which is taken from the request. Was this an optimization in order to avoid another traversal of the request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct. Now, we also need the leaderEpoch to pass it to the txnCoordinator so I will refactor this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have refactored this as well. It is much better now.
try { | ||
// Delete log and corresponding folders in case replica manager doesn't hold them anymore. | ||
// This could happen when topic is being deleted while broker is down and recovers. | ||
maybeCleanReplica(topicPartition, deletePartition) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure I followed your response to my previous question. My concern was actually the happy path when the partition exists locally. If we delete first before stopping replica fetchers, then would the fetcher thread handle that gracefully? By removing the fetchers first, we are guaranteed that we couldn't have a write in progress at the time of deletion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry if my comment was not clear. I was trying to argue that the fetcher thread can't be running for a given partition if the partition is not known by the ReplicaManager (case HostedPartition.None
) because the fetcher thread is started after the partition is added to allPartitions
Map in the ReplicaManager by the LeaderAndIsrRequest and stopped before the partition is removed from allPartitions
by the StopReplicaRequest. This is based on my current understanding of the ReplicaManager but, as it is fairly new, I may have missed something though. Did I?
It is probably better to keep the previous behavior to be 100% safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have reverted to the previous behavior to be 100% safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I get what you were saying now. You were probably right.
@hachikuji I have updated the PR based on your inputs. Could you have another look at it please? |
ok to test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few minor questions. This looks nearly ready to merge.
@@ -396,9 +395,23 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, | |||
def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], | |||
topicPartition: TopicPartition, | |||
deletePartition: Boolean): Unit = { | |||
// A sentinel (-2) is used as an epoch if the topic is queued for deletion or | |||
// does not have a leader yet. This sentinel overrides any existing epoch. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first part of this definitely makes sense, but what is the motivation for the second part? Why not use LeaderAndIsr.NoEpoch
? Though I can't really think of what would cause this case to be hit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, LeaderAndIsr.NoEpoch
makes more sense as a default value here. I did a mistake here. You're right. I don't think this should ever happen but I went on the defensive path with a default value in case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have updated the PR.
.map(_.leaderAndIsr.leaderEpoch) | ||
.getOrElse(LeaderAndIsr.EpochDuringDelete) | ||
} | ||
|
||
brokerIds.filter(_ >= 0).foreach { brokerId => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to fix here, but do you know why we do this filtering?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've asked myself the same question but I couldn't find a reason. I believe that brokerId
is always >= 0 in the controller.
def stopReplica(topicPartition: TopicPartition, deletePartition: Boolean) = { | ||
stateChangeLogger.trace(s"Handling stop replica (delete=$deletePartition) for partition $topicPartition") | ||
|
||
def stopReplica(topicPartition: TopicPartition, deletePartition: Boolean): Unit = { | ||
if (deletePartition) { | ||
getPartition(topicPartition) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could potentially use nonOfflinePartition(topicPartition).foreach
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, it does not work because we need both the reference to the hosted partition and the partition bellow: hostedPartition
and removedPartition
. nonOfflinePartition
only provides the latter.
(responseMap, Errors.STALE_CONTROLLER_EPOCH) | ||
} else { | ||
val partitions = stopReplicaRequest.partitions.asScala.toSet | ||
controllerEpoch = stopReplicaRequest.controllerEpoch | ||
this.controllerEpoch = controllerEpoch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call updating this.
retest this please |
ok to test |
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for the patch!
This PR implements KIP-570.
Committer Checklist (excluded from commit message)