diff --git a/automq-shell/build.gradle b/automq-shell/build.gradle index ac89a66cc7..ed8ba38689 100644 --- a/automq-shell/build.gradle +++ b/automq-shell/build.gradle @@ -2,9 +2,6 @@ plugins { id 'java' } -group = 'org.apache.kafka' -version = '3.8.0-SNAPSHOT' - project(':automq-shell') { archivesBaseName = "automq-shell" } @@ -57,7 +54,7 @@ test { } jar { - enabled false + dependsOn(':s3stream:jar', ':clients:shadowJar') duplicatesStrategy = DuplicatesStrategy.EXCLUDE from { configurations.runtimeClasspath.collect { it.isDirectory() ? it : zipTree(it) } diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala index 0cea8a4658..72b588788c 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala @@ -326,57 +326,59 @@ class BrokerMetadataPublisher( private def handleTopicsDelta(deltaName: String, topicsDelta: TopicsDelta, delta: MetadataDelta, newImage: MetadataImage): Unit = { // Callback for each topic delta. - def callback(topicDelta: TopicDelta, partition: Int): Unit = { - if (Topic.GROUP_METADATA_TOPIC_NAME.equals(topicDelta.name())) { - try { - // Handle the case where the group metadata topic was deleted - if (topicsDelta.topicWasDeleted(Topic.GROUP_METADATA_TOPIC_NAME)) { - val partitionRegistration = topicsDelta.image.getTopic(Topic.GROUP_METADATA_TOPIC_NAME).partitions.get(partition) - if (partitionRegistration != null && partitionRegistration.leader == brokerId) { - groupCoordinator.onResignation(partition, OptionalInt.of(partitionRegistration.leaderEpoch)) - } - } + def callback(topicPartition: TopicPartition): Unit = { + // Handle the case where the group metadata topic was deleted + if (topicsDelta.topicWasDeleted(Topic.GROUP_METADATA_TOPIC_NAME)) { + val partitionRegistration = topicsDelta.image.getTopic(Topic.GROUP_METADATA_TOPIC_NAME).partitions.get(topicPartition.partition()) + if (partitionRegistration != null && partitionRegistration.leader == brokerId) { + groupCoordinator.onResignation(topicPartition.partition(), OptionalInt.of(partitionRegistration.leaderEpoch)) + } + } - // Update the group coordinator of local changes - updateCoordinator( - topicDelta, - partition, - groupCoordinator.onElection, - (partitionIndex, leaderEpochOpt) => groupCoordinator.onResignation(partitionIndex, toOptionalInt(leaderEpochOpt)) - ) - } catch { - case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating group " + - s"coordinator with local changes in $deltaName", t) + // Handle the case where the transaction state topic was deleted + if (topicsDelta.topicWasDeleted(Topic.TRANSACTION_STATE_TOPIC_NAME)) { + val partitionRegistration = topicsDelta.image.getTopic(Topic.TRANSACTION_STATE_TOPIC_NAME).partitions.get(topicPartition.partition()) + if (partitionRegistration != null && partitionRegistration.leader == brokerId) { + groupCoordinator.onResignation(topicPartition.partition(), OptionalInt.of(partitionRegistration.leaderEpoch)) } } - if (Topic.TRANSACTION_STATE_TOPIC_NAME.equals(topicDelta.name())) { - try { - // Handle the case where the transaction state topic was deleted - if (topicsDelta.topicWasDeleted(Topic.TRANSACTION_STATE_TOPIC_NAME)) { - val partitionRegistration = topicsDelta.image.getTopic(Topic.TRANSACTION_STATE_TOPIC_NAME).partitions.get(partition) - if (partitionRegistration != null && partitionRegistration.leader == brokerId) { - groupCoordinator.onResignation(partition, OptionalInt.of(partitionRegistration.leaderEpoch)) - } + getTopicDelta(topicPartition.topic(), newImage, delta).foreach(topicDelta => { + if (Topic.GROUP_METADATA_TOPIC_NAME.equals(topicDelta.name())) { + try { + // Update the group coordinator of local changes + updateCoordinator( + topicDelta, + topicPartition.partition(), + groupCoordinator.onElection, + (partitionIndex, leaderEpochOpt) => groupCoordinator.onResignation(partitionIndex, toOptionalInt(leaderEpochOpt)) + ) + } catch { + case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating group " + + s"coordinator with local changes in $deltaName", t) } + } - // Update the transaction coordinator of local changes - updateCoordinator( - topicDelta, - partition, - txnCoordinator.onElection, - txnCoordinator.onResignation) - } catch { - case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating txn " + - s"coordinator with local changes in $deltaName", t) + if (Topic.TRANSACTION_STATE_TOPIC_NAME.equals(topicDelta.name())) { + try { + // Update the transaction coordinator of local changes + updateCoordinator( + topicDelta, + topicPartition.partition(), + txnCoordinator.onElection, + txnCoordinator.onResignation) + } catch { + case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating txn " + + s"coordinator with local changes in $deltaName", t) + } } - } + }) try { // Notify the group coordinator about deleted topics. - if (topicsDelta.deletedTopicIds().contains(topicDelta.id())) { + if (topicsDelta.topicWasDeleted(topicPartition.topic())) { groupCoordinator.onPartitionsDeleted( - util.List.of(new TopicPartition(topicDelta.name(), partition)), + util.List.of(new TopicPartition(topicPartition.topic(), topicPartition.partition())), RequestLocal.NoCaching.bufferSupplier) } } catch { diff --git a/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala b/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala index 917e000581..4e18838663 100644 --- a/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala +++ b/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala @@ -23,7 +23,7 @@ import org.apache.kafka.common.requests.FetchRequest import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.utils.{ThreadUtils, Time} import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} -import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicDelta, TopicsDelta} +import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta} import org.apache.kafka.metadata.LeaderConstants import org.apache.kafka.server.common.{DirectoryEventHandler, MetadataVersion} import org.apache.kafka.server.metrics.s3stream.S3StreamKafkaMetricsConstants._ @@ -939,7 +939,7 @@ class ElasticReplicaManager( * @param newImage The new metadata image. */ override def applyDelta(delta: TopicsDelta, newImage: MetadataImage): Unit = { - asyncApplyDelta(delta, newImage, (_, _) => {}).get() + asyncApplyDelta(delta, newImage, _ => {}).get() } /** @@ -950,10 +950,10 @@ class ElasticReplicaManager( * @return A future which completes when the partition has been deleted. */ private def doPartitionDeletionAsyncLocked(stopPartition: StopPartition): CompletableFuture[Void] = { - doPartitionDeletionAsyncLocked(stopPartition, null, null, (_, _) => {}) + doPartitionDeletionAsyncLocked(stopPartition, _ => {}) } - private def doPartitionDeletionAsyncLocked(stopPartition: StopPartition, delta: TopicsDelta, newImage: MetadataImage, callback: (TopicDelta, Int) => Unit): CompletableFuture[Void] = { + private def doPartitionDeletionAsyncLocked(stopPartition: StopPartition, callback: TopicPartition => Unit): CompletableFuture[Void] = { val prevOp = partitionOpMap.getOrDefault(stopPartition.topicPartition, CompletableFuture.completedFuture(null)) val opCf = new CompletableFuture[Void]() val tracker = partitionStatusTracker.tracker(stopPartition.topicPartition) @@ -975,9 +975,7 @@ class ElasticReplicaManager( s"we got an unexpected ${e.getClass.getName} exception: ${e.getMessage}") } } - if (delta != null && newImage != null) { - getTopicDelta(stopPartition.topicPartition.topic(), newImage, delta).foreach(callback(_, stopPartition.topicPartition.partition())) - } + callback(stopPartition.topicPartition) } finally { opCf.complete(null) partitionOpMap.remove(stopPartition.topicPartition, opCf) @@ -990,21 +988,13 @@ class ElasticReplicaManager( opCf } - def getTopicDelta(topicName: String, newImage: MetadataImage, delta: TopicsDelta): Option[TopicDelta] = { - Option(newImage.topics().getTopic(topicName)).flatMap { - topicImage => Option(delta).flatMap { - topicDelta => Option(topicDelta.changedTopic(topicImage.id())) - } - } - } - /** * Apply a KRaft topic change delta. * * @param delta The delta to apply. * @param newImage The new metadata image. */ - def asyncApplyDelta(delta: TopicsDelta, newImage: MetadataImage, callback: (TopicDelta, Int) => Unit): CompletableFuture[Void] = { + def asyncApplyDelta(delta: TopicsDelta, newImage: MetadataImage, callback: TopicPartition => Unit): CompletableFuture[Void] = { // Before taking the lock, compute the local changes val localChanges = delta.localChanges(config.nodeId) val metadataVersion = newImage.features().metadataVersion() @@ -1029,7 +1019,7 @@ class ElasticReplicaManager( def doPartitionDeletion(): Unit = { stateChangeLogger.info(s"Deleting ${deletes.size} partition(s).") deletes.foreach(stopPartition => { - val opCf = doPartitionDeletionAsyncLocked(stopPartition, delta, newImage, callback) + val opCf = doPartitionDeletionAsyncLocked(stopPartition, callback) opCfList.add(opCf) }) } @@ -1065,7 +1055,7 @@ class ElasticReplicaManager( applyLocalLeadersDelta(leaderChangedPartitions, newImage, delta, lazyOffsetCheckpoints, leader, directoryIds) tracker.opened() // Apply the delta before elect leader. - getTopicDelta(tp.topic(), newImage, delta).foreach(callback(_, tp.partition())) + callback(tp) // Elect the leader to let client can find the partition by metadata. if (info.partition().leader < 0) { // The tryElectLeader may be failed, tracker will check the partition status and elect leader if needed.