Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2577,6 +2577,7 @@ project(':shell') {
implementation project(':core')
implementation project(':metadata')
implementation project(':raft')
implementation project(':server')

implementation libs.jose4j // for SASL/OAUTHBEARER JWT validation
implementation libs.jacksonJakartarsJsonProvider
Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@
<allow pkg="org.apache.kafka.queue"/>
<allow pkg="org.apache.kafka.raft"/>
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.config" />
<allow pkg="org.apache.kafka.server.fault" />
<allow pkg="org.apache.kafka.server.util" />
<allow pkg="org.apache.kafka.shell"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1982,7 +1982,9 @@ public void testOnNewMetadataImage() {
verify(coordinator0).onLoaded(MetadataImage.EMPTY);

// Publish a new image.
MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
MetadataDelta delta = new MetadataDelta.Builder()
.setImage(MetadataImage.EMPTY)
.build();
MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY);
runtime.onNewMetadataImage(newImage, delta);

Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ class ControllerServer(
setCreateTopicPolicy(createTopicPolicy.toJava).
setAlterConfigPolicy(alterConfigPolicy.toJava).
setConfigurationValidator(new ControllerConfigurationValidator(sharedServer.brokerConfig)).
setSupportedConfigChecker(sharedServer.supportedConfigChecker).
setStaticConfig(config.originals).
setBootstrapMetadata(bootstrapMetadata).
setFatalFaultHandler(sharedServer.fatalQuorumControllerFaultHandler).
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/scala/kafka/server/SharedServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ import org.apache.kafka.image.loader.MetadataLoader
import org.apache.kafka.image.loader.metrics.MetadataLoaderMetrics
import org.apache.kafka.image.publisher.metrics.SnapshotEmitterMetrics
import org.apache.kafka.image.publisher.{SnapshotEmitter, SnapshotGenerator}
import org.apache.kafka.metadata.ListenerInfo
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.metadata.{SupportedConfigChecker, ListenerInfo, MetadataRecordSerde}
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble
import org.apache.kafka.raft.Endpoints
import org.apache.kafka.server.{ProcessRole, ServerSocketFactory}
import org.apache.kafka.server.config.DefaultSupportedConfigChecker
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.fault.{FaultHandler, LoggingFaultHandler, ProcessTerminatingFaultHandler}
import org.apache.kafka.server.metrics.{BrokerServerMetrics, KafkaYammerMetrics}
Expand Down Expand Up @@ -112,6 +112,7 @@ class SharedServer(
private var usedByController: Boolean = false
val brokerConfig = new KafkaConfig(sharedServerConfig.props, false)
val controllerConfig = new KafkaConfig(sharedServerConfig.props, false)
val supportedConfigChecker: SupportedConfigChecker = new DefaultSupportedConfigChecker()
@volatile var metrics: Metrics = _metrics
@volatile var raftManager: KafkaRaftManager[ApiMessageAndVersion] = _
@volatile var brokerMetrics: BrokerServerMetrics = _
Expand Down Expand Up @@ -315,7 +316,8 @@ class SharedServer(
setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-").
setFaultHandler(metadataLoaderFaultHandler).
setHighWaterMarkAccessor(() => _raftManager.client.highWatermark()).
setMetrics(metadataLoaderMetrics)
setMetrics(metadataLoaderMetrics).
setSupportedConfigChecker(supportedConfigChecker)
loader = loaderBuilder.build()
snapshotEmitter = new SnapshotEmitter.Builder().
setNodeId(nodeId).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ class LocalLeaderEndPointTest extends Logging {
alterPartitionManager = alterPartitionManager
)

val delta = new MetadataDelta(MetadataImage.EMPTY)
val delta = new MetadataDelta.Builder()
.setImage(MetadataImage.EMPTY)
.build()
delta.replay(new FeatureLevelRecord()
.setName(MetadataVersion.FEATURE_NAME)
.setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())
Expand Down Expand Up @@ -254,7 +256,9 @@ class LocalLeaderEndPointTest extends Logging {
}

private def bumpLeaderEpoch(): Unit = {
val delta = new MetadataDelta(image)
val delta = new MetadataDelta.Builder()
.setImage(image)
.build()
delta.replay(new PartitionChangeRecord()
.setTopicId(topicId)
.setPartitionId(partition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ class DefaultApiVersionManagerTest {
private val brokerFeatures = BrokerFeatures.createDefault(true)
private val metadataCache = {
val cache = new KRaftMetadataCache(1, () => KRaftVersion.LATEST_PRODUCTION)
val delta = new MetadataDelta(MetadataImage.EMPTY)
val delta = new MetadataDelta.Builder()
.setImage(MetadataImage.EMPTY)
.build()
delta.replay(new FeatureLevelRecord()
.setName(MetadataVersion.FEATURE_NAME)
.setFeatureLevel(MetadataVersion.latestProduction().featureLevel())
Expand Down
28 changes: 21 additions & 7 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,9 @@ class KafkaApisTest extends Logging {

def initializeMetadataCacheWithShareGroupsEnabled(enableShareGroups: Boolean = true): MetadataCache = {
val cache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_1)
val delta = new MetadataDelta(MetadataImage.EMPTY)
val delta = new MetadataDelta.Builder()
.setImage(MetadataImage.EMPTY)
.build()
delta.replay(new FeatureLevelRecord()
.setName(MetadataVersion.FEATURE_NAME)
.setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())
Expand Down Expand Up @@ -6258,7 +6260,9 @@ class KafkaApisTest extends Logging {

metadataCache = {
val cache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_1)
val delta = new MetadataDelta(MetadataImage.EMPTY)
val delta = new MetadataDelta.Builder()
.setImage(MetadataImage.EMPTY)
.build()
delta.replay(new FeatureLevelRecord()
.setName(MetadataVersion.FEATURE_NAME)
.setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())
Expand Down Expand Up @@ -6486,7 +6490,9 @@ class KafkaApisTest extends Logging {

metadataCache = {
val cache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_1)
val delta = new MetadataDelta(MetadataImage.EMPTY)
val delta = new MetadataDelta.Builder()
.setImage(MetadataImage.EMPTY)
.build()
delta.replay(new FeatureLevelRecord()
.setName(MetadataVersion.FEATURE_NAME)
.setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())
Expand Down Expand Up @@ -10066,7 +10072,9 @@ class KafkaApisTest extends Logging {
val requestChannelRequest = buildRequest(new ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build())
metadataCache = {
val cache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_1)
val delta = new MetadataDelta(MetadataImage.EMPTY)
val delta = new MetadataDelta.Builder()
.setImage(MetadataImage.EMPTY)
.build()
delta.replay(new FeatureLevelRecord()
.setName(MetadataVersion.FEATURE_NAME)
.setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())
Expand Down Expand Up @@ -10200,7 +10208,9 @@ class KafkaApisTest extends Logging {
val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build())
metadataCache = {
val cache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_1)
val delta = new MetadataDelta(MetadataImage.EMPTY)
val delta = new MetadataDelta.Builder()
.setImage(MetadataImage.EMPTY)
.build()
delta.replay(new FeatureLevelRecord()
.setName(MetadataVersion.FEATURE_NAME)
.setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())
Expand Down Expand Up @@ -10708,7 +10718,9 @@ class KafkaApisTest extends Logging {
expectedResponse.groups.add(expectedDescribedGroup)
metadataCache = {
val cache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_1)
val delta = new MetadataDelta(MetadataImage.EMPTY)
val delta = new MetadataDelta.Builder()
.setImage(MetadataImage.EMPTY)
.build()
delta.replay(new FeatureLevelRecord()
.setName(MetadataVersion.FEATURE_NAME)
.setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())
Expand Down Expand Up @@ -10871,7 +10883,9 @@ class KafkaApisTest extends Logging {
expectedResponse.groups.add(expectedDescribedGroup)
metadataCache = {
val cache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_1)
val delta = new MetadataDelta(MetadataImage.EMPTY)
val delta = new MetadataDelta.Builder()
.setImage(MetadataImage.EMPTY)
.build()
delta.replay(new FeatureLevelRecord()
.setName(MetadataVersion.FEATURE_NAME)
.setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,9 @@ class BrokerMetadataPublisherTest {
)

val topicId = Uuid.randomUuid()
var delta = new MetadataDelta(MetadataImage.EMPTY)
var delta = new MetadataDelta.Builder()
.setImage(MetadataImage.EMPTY)
.build()
delta.replay(new TopicRecord()
.setName(Topic.GROUP_METADATA_TOPIC_NAME)
.setTopicId(topicId)
Expand All @@ -231,7 +233,9 @@ class BrokerMetadataPublisherTest {
)
val image = delta.apply(MetadataProvenance.EMPTY)

delta = new MetadataDelta(image)
delta = new MetadataDelta.Builder()
.setImage(image)
.build()
delta.replay(new RemoveTopicRecord()
.setTopicId(topicId)
)
Expand Down Expand Up @@ -339,7 +343,9 @@ class BrokerMetadataPublisherTest {
)

// Share version 1 is getting passed to features delta.
val delta = new MetadataDelta(image)
val delta = new MetadataDelta.Builder()
.setImage(image)
.build()
delta.replay(new FeatureLevelRecord().setName(ShareVersion.FEATURE_NAME).setFeatureLevel(1))

metadataPublisher.onMetadataUpdate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1089,7 +1089,9 @@ private void cancelGroupSizeCounter() {
*/
@Override
public void onLoaded(MetadataImage newImage) {
MetadataDelta emptyDelta = new MetadataDelta(newImage);
MetadataDelta emptyDelta = new MetadataDelta.Builder()
.setImage(newImage)
.build();
groupMetadataManager.onNewMetadataImage(newImage, emptyDelta);
coordinatorMetrics.activateMetricsShard(metricsShard);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3152,7 +3152,12 @@ public void testOnPartitionsDeleted() {
.addTopic(Uuid.randomUuid(), "foo", 1)
.build();

service.onNewMetadataImage(image, new MetadataDelta(image));
service.onNewMetadataImage(
image,
new MetadataDelta.Builder()
.setImage(image)
.build()
);

when(runtime.scheduleWriteAllOperation(
ArgumentMatchers.eq("on-partition-deleted"),
Expand Down Expand Up @@ -3210,7 +3215,12 @@ public void testOnPartitionsDeletedCleanupShareGroupState() {
.addTopic(Uuid.randomUuid(), "foo", 1)
.build();

service.onNewMetadataImage(image, new MetadataDelta(image));
service.onNewMetadataImage(
image,
new MetadataDelta.Builder()
.setImage(image)
.build()
);

// No error in partition deleted callback
when(runtime.scheduleWriteAllOperation(
Expand Down
Loading
Loading