Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions automq-shell/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ plugins {
id 'java'
}

group = 'org.apache.kafka'
version = '3.8.0-SNAPSHOT'

project(':automq-shell') {
archivesBaseName = "automq-shell"
}
Expand Down Expand Up @@ -57,7 +54,7 @@ test {
}

jar {
enabled false
dependsOn(':s3stream:jar', ':clients:shadowJar')
duplicatesStrategy = DuplicatesStrategy.EXCLUDE
from {
configurations.runtimeClasspath.collect { it.isDirectory() ? it : zipTree(it) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,57 +326,59 @@ class BrokerMetadataPublisher(
private def handleTopicsDelta(deltaName: String, topicsDelta: TopicsDelta, delta: MetadataDelta,
newImage: MetadataImage): Unit = {
// Callback for each topic delta.
def callback(topicDelta: TopicDelta, partition: Int): Unit = {
if (Topic.GROUP_METADATA_TOPIC_NAME.equals(topicDelta.name())) {
try {
// Handle the case where the group metadata topic was deleted
if (topicsDelta.topicWasDeleted(Topic.GROUP_METADATA_TOPIC_NAME)) {
val partitionRegistration = topicsDelta.image.getTopic(Topic.GROUP_METADATA_TOPIC_NAME).partitions.get(partition)
if (partitionRegistration != null && partitionRegistration.leader == brokerId) {
groupCoordinator.onResignation(partition, OptionalInt.of(partitionRegistration.leaderEpoch))
}
}
def callback(topicPartition: TopicPartition): Unit = {
// Handle the case where the group metadata topic was deleted
if (topicsDelta.topicWasDeleted(Topic.GROUP_METADATA_TOPIC_NAME)) {
val partitionRegistration = topicsDelta.image.getTopic(Topic.GROUP_METADATA_TOPIC_NAME).partitions.get(topicPartition.partition())
if (partitionRegistration != null && partitionRegistration.leader == brokerId) {
groupCoordinator.onResignation(topicPartition.partition(), OptionalInt.of(partitionRegistration.leaderEpoch))
}
}

// Update the group coordinator of local changes
updateCoordinator(
topicDelta,
partition,
groupCoordinator.onElection,
(partitionIndex, leaderEpochOpt) => groupCoordinator.onResignation(partitionIndex, toOptionalInt(leaderEpochOpt))
)
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating group " +
s"coordinator with local changes in $deltaName", t)
// Handle the case where the transaction state topic was deleted
if (topicsDelta.topicWasDeleted(Topic.TRANSACTION_STATE_TOPIC_NAME)) {
val partitionRegistration = topicsDelta.image.getTopic(Topic.TRANSACTION_STATE_TOPIC_NAME).partitions.get(topicPartition.partition())
if (partitionRegistration != null && partitionRegistration.leader == brokerId) {
groupCoordinator.onResignation(topicPartition.partition(), OptionalInt.of(partitionRegistration.leaderEpoch))
}
}

if (Topic.TRANSACTION_STATE_TOPIC_NAME.equals(topicDelta.name())) {
try {
// Handle the case where the transaction state topic was deleted
if (topicsDelta.topicWasDeleted(Topic.TRANSACTION_STATE_TOPIC_NAME)) {
val partitionRegistration = topicsDelta.image.getTopic(Topic.TRANSACTION_STATE_TOPIC_NAME).partitions.get(partition)
if (partitionRegistration != null && partitionRegistration.leader == brokerId) {
groupCoordinator.onResignation(partition, OptionalInt.of(partitionRegistration.leaderEpoch))
}
getTopicDelta(topicPartition.topic(), newImage, delta).foreach(topicDelta => {
if (Topic.GROUP_METADATA_TOPIC_NAME.equals(topicDelta.name())) {
try {
// Update the group coordinator of local changes
updateCoordinator(
topicDelta,
topicPartition.partition(),
groupCoordinator.onElection,
(partitionIndex, leaderEpochOpt) => groupCoordinator.onResignation(partitionIndex, toOptionalInt(leaderEpochOpt))
)
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating group " +
s"coordinator with local changes in $deltaName", t)
}
}

// Update the transaction coordinator of local changes
updateCoordinator(
topicDelta,
partition,
txnCoordinator.onElection,
txnCoordinator.onResignation)
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating txn " +
s"coordinator with local changes in $deltaName", t)
if (Topic.TRANSACTION_STATE_TOPIC_NAME.equals(topicDelta.name())) {
try {
// Update the transaction coordinator of local changes
updateCoordinator(
topicDelta,
topicPartition.partition(),
txnCoordinator.onElection,
txnCoordinator.onResignation)
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating txn " +
s"coordinator with local changes in $deltaName", t)
}
}
}
})

try {
// Notify the group coordinator about deleted topics.
if (topicsDelta.deletedTopicIds().contains(topicDelta.id())) {
if (topicsDelta.topicWasDeleted(topicPartition.topic())) {
groupCoordinator.onPartitionsDeleted(
util.List.of(new TopicPartition(topicDelta.name(), partition)),
util.List.of(new TopicPartition(topicPartition.topic(), topicPartition.partition())),
RequestLocal.NoCaching.bufferSupplier)
}
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.utils.{ThreadUtils, Time}
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicDelta, TopicsDelta}
import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
import org.apache.kafka.metadata.LeaderConstants
import org.apache.kafka.server.common.{DirectoryEventHandler, MetadataVersion}
import org.apache.kafka.server.metrics.s3stream.S3StreamKafkaMetricsConstants._
Expand Down Expand Up @@ -939,7 +939,7 @@ class ElasticReplicaManager(
* @param newImage The new metadata image.
*/
override def applyDelta(delta: TopicsDelta, newImage: MetadataImage): Unit = {
asyncApplyDelta(delta, newImage, (_, _) => {}).get()
asyncApplyDelta(delta, newImage, _ => {}).get()
}

/**
Expand All @@ -950,10 +950,10 @@ class ElasticReplicaManager(
* @return A future which completes when the partition has been deleted.
*/
private def doPartitionDeletionAsyncLocked(stopPartition: StopPartition): CompletableFuture[Void] = {
doPartitionDeletionAsyncLocked(stopPartition, null, null, (_, _) => {})
doPartitionDeletionAsyncLocked(stopPartition, _ => {})
}

private def doPartitionDeletionAsyncLocked(stopPartition: StopPartition, delta: TopicsDelta, newImage: MetadataImage, callback: (TopicDelta, Int) => Unit): CompletableFuture[Void] = {
private def doPartitionDeletionAsyncLocked(stopPartition: StopPartition, callback: TopicPartition => Unit): CompletableFuture[Void] = {
val prevOp = partitionOpMap.getOrDefault(stopPartition.topicPartition, CompletableFuture.completedFuture(null))
val opCf = new CompletableFuture[Void]()
val tracker = partitionStatusTracker.tracker(stopPartition.topicPartition)
Expand All @@ -975,9 +975,7 @@ class ElasticReplicaManager(
s"we got an unexpected ${e.getClass.getName} exception: ${e.getMessage}")
}
}
if (delta != null && newImage != null) {
getTopicDelta(stopPartition.topicPartition.topic(), newImage, delta).foreach(callback(_, stopPartition.topicPartition.partition()))
}
callback(stopPartition.topicPartition)
} finally {
opCf.complete(null)
partitionOpMap.remove(stopPartition.topicPartition, opCf)
Expand All @@ -990,21 +988,13 @@ class ElasticReplicaManager(
opCf
}

def getTopicDelta(topicName: String, newImage: MetadataImage, delta: TopicsDelta): Option[TopicDelta] = {
Option(newImage.topics().getTopic(topicName)).flatMap {
topicImage => Option(delta).flatMap {
topicDelta => Option(topicDelta.changedTopic(topicImage.id()))
}
}
}

/**
* Apply a KRaft topic change delta.
*
* @param delta The delta to apply.
* @param newImage The new metadata image.
*/
def asyncApplyDelta(delta: TopicsDelta, newImage: MetadataImage, callback: (TopicDelta, Int) => Unit): CompletableFuture[Void] = {
def asyncApplyDelta(delta: TopicsDelta, newImage: MetadataImage, callback: TopicPartition => Unit): CompletableFuture[Void] = {
// Before taking the lock, compute the local changes
val localChanges = delta.localChanges(config.nodeId)
val metadataVersion = newImage.features().metadataVersion()
Expand All @@ -1029,7 +1019,7 @@ class ElasticReplicaManager(
def doPartitionDeletion(): Unit = {
stateChangeLogger.info(s"Deleting ${deletes.size} partition(s).")
deletes.foreach(stopPartition => {
val opCf = doPartitionDeletionAsyncLocked(stopPartition, delta, newImage, callback)
val opCf = doPartitionDeletionAsyncLocked(stopPartition, callback)
opCfList.add(opCf)
})
}
Expand Down Expand Up @@ -1065,7 +1055,7 @@ class ElasticReplicaManager(
applyLocalLeadersDelta(leaderChangedPartitions, newImage, delta, lazyOffsetCheckpoints, leader, directoryIds)
tracker.opened()
// Apply the delta before elect leader.
getTopicDelta(tp.topic(), newImage, delta).foreach(callback(_, tp.partition()))
callback(tp)
// Elect the leader to let client can find the partition by metadata.
if (info.partition().leader < 0) {
// The tryElectLeader may be failed, tracker will check the partition status and elect leader if needed.
Expand Down