New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-9946; StopReplicaRequest deletePartition changes may cause premature topic deletion handling in controller #8609
Conversation
…ature topic deletion handling in controller
I haven't reviewed the PR, but I can confirm that I no longer see the premature deletion behaviour that lead me to report KAFKA-9946. |
ok to test |
false, stopReplicaTopicState.values.toBuffer.asJava) | ||
|
||
sendRequest(brokerId, stopReplicaRequestBuilder, (r: AbstractResponse) => { | ||
val stopReplicaResponse = r.asInstanceOf[StopReplicaResponse] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like we could keep the response callback creation method in the previous PR if we passed in partitionStates to the callback builder, or alternately create a map of the partitions that you expect to be deleted. I think this would save the code duplication in creating nearly the same callback which I believe is being done to deal with the differences in the deletePartition schemas i.e. all partitions in request vs specific partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree the duplication is a tad vexing. Perhaps we could pass a function TopicPartition -> Boolean
to the callback which tells whether deletion was requested. For the old version, we would return true
blindly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree as well. Passing a function is a really good idea. I should have thought about it.
// Verify that the topic deletion is in progress and | ||
// that the request deleted the replica | ||
if (controllerContext.isTopicDeletionInProgress(pe.topicName) && | ||
partitionStates.get(tp).exists(_.deletePartition)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could we align with the controllerContext
?
false, stopReplicaTopicState.values.toBuffer.asJava) | ||
|
||
sendRequest(brokerId, stopReplicaRequestBuilder, (r: AbstractResponse) => { | ||
val stopReplicaResponse = r.asInstanceOf[StopReplicaResponse] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree the duplication is a tad vexing. Perhaps we could pass a function TopicPartition -> Boolean
to the callback which tells whether deletion was requested. For the old version, we would return true
blindly.
@lbradstreet @hachikuji Thanks for your feedback. I have updated the PR accordingly. |
@dajac thanks! Looks good to me. |
val tp = new TopicPartition(pe.topicName, pe.partitionIndex) | ||
if (controllerContext.isTopicDeletionInProgress(pe.topicName) && | ||
isPartitionDeleted(tp)) { | ||
partitionErrorsForDeletingTopics += tp -> Errors.forCode(pe.errorCode) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: partitionErrorsForDeletingTopics
seems a bit ambiguous and makes it sound like it only includes partitions for which StopReplicaRequest failed. Perhaps something like partitionToError
is better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the name seems ok. To me it means that the map includes the errors of all topic being deleted. It might be nice if it could reflect that this is only covering partitions which were also requested to be deleted in the StopReplica request, but that name probably becomes unwieldy.
ok to test |
retest this please |
2 similar comments
retest this please |
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for the patch!
val tp = new TopicPartition(pe.topicName, pe.partitionIndex) | ||
if (controllerContext.isTopicDeletionInProgress(pe.topicName) && | ||
isPartitionDeleted(tp)) { | ||
partitionErrorsForDeletingTopics += tp -> Errors.forCode(pe.errorCode) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the name seems ok. To me it means that the map includes the errors of all topic being deleted. It might be nice if it could reflect that this is only covering partitions which were also requested to be deleted in the StopReplica request, but that name probably becomes unwieldy.
* 'trunk' of github.com:apache/kafka: (87 commits) KAFKA-9865: Expose output topic names from TopologyTestDriver (apache#8483) MINOR - Increase the number of Trogdor Histogram buckets to 10000 (apache#8627) KAFKA-9768: Fix handling of rest.advertised.listener config (apache#8360) KAFKA-9419: Fix possible integer overflow in CircularIterator (apache#7950) MINOR: Only add 'Data' suffix for generated request/response/header types (apache#8625) KAFKA-9947; Ensure proper shutdown of services in `TransactionsBounceTest` (apache#8602) KAFKA-6342; Remove unused workaround for JSON parsing of non-escaped strings (apache#8591) MINOR: Pass `-release 8` to scalac and upgrade to Gradle 6.4 (apache#8538) KAFKA-9946; Partition deletion event should only be sent if deletion was requested in the StopReplica request (apache#8609) MINOR: Improve TopologyTestDriver JavaDocs (apache#8619) MINOR: MockAdminClient should return InvalidReplicationFactorException if brokers.size < replicationFactor KAFKA-9748: Add Streams eos-beta integration test (apache#8496) KAFKA-9731: Disable immediate fetch response for hw propagation if replica selector is not defined (apache#8607) HOTFIX: set correct numIterations in shouldAllowConcurrentAccesses MINOR: Clean up some test dependencies on ConfigCommand and TopicCommand (apache#8527) KAFKA-9918; SslEngineFactory is NOT closed when channel is closing (apache#8551) KAFKA-9798: Send one round synchronously before starting the async producer (apache#8565) MINOR: Annotate KafkaAdminClientTest.testAlterClientQuotas() with @test KAFKA-9589: Enable testLogAppendTimeNonCompressedV2 and fix bug in helper method (apache#8533) MINOR: Use min/max function when possible (apache#8577) ... # Conflicts: # core/src/main/scala/kafka/log/Log.scala # gradle/dependencies.gradle # gradle/wrapper/gradle-wrapper.properties # gradlew
…was requested in the StopReplica request (apache#8609) This patch fixes a regression in the `StopReplica` response handling. We should only send the event on receiving the `StopReplica` response if we had requested deletion in the request. Reviewers: Lucas Bradstreet <lucas@confluent.io>, Jason Gustafson <jason@confluent.io>
Committer Checklist (excluded from commit message)