Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
import org.apache.kafka.metadata.migration.{KRaftMigrationDriver, LegacyPropagator}
import org.apache.kafka.metadata.stream.S3Config
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.ApiMessageAndVersion
Expand Down Expand Up @@ -206,7 +207,7 @@ class ControllerServer(
}

val maxIdleIntervalNs = config.metadataMaxIdleIntervalNs.fold(OptionalLong.empty)(OptionalLong.of)

val s3Config = new S3Config(config.s3Region, config.s3Bucket)
new QuorumController.Builder(config.nodeId, sharedServer.metaProps.clusterId).
setTime(time).
setThreadNamePrefix(threadNamePrefix).
Expand All @@ -227,6 +228,7 @@ class ControllerServer(
setBootstrapMetadata(bootstrapMetadata).
setFatalFaultHandler(sharedServer.quorumControllerFaultHandler).
setZkMigrationEnabled(config.migrationEnabled)
.setS3Config(s3Config)
}
authorizer match {
case Some(a: ClusterMetadataAuthorizer) => controllerBuilder.setAuthorizer(a)
Expand Down
21 changes: 20 additions & 1 deletion core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,14 @@ object KafkaConfig {
val ElasticStreamNamespaceDoc = "The kafka cluster in which elastic stream namespace which should conflict with other kafka cluster sharing the same elastic stream."
// elastic stream inject end

// Kafka on S3 inject start
val S3RegionProp = "s3.region";
val S3BucketProp = "s3.bucket";

val S3RegionDoc = "Specifies the S3 region, ex. <code>us-east-1</code>.\n"
val S3BucketDoc = "Specifies the S3 bucket, ex. <code>my-bucket</code>.\n"

// Kafka on S3 inject end
/* Documentation */
/** ********* Zookeeper Configuration ***********/
val ZkConnectDoc = "Specifies the ZooKeeper connection string in the form <code>hostname:port</code> where host and port are the " +
Expand Down Expand Up @@ -1461,6 +1469,11 @@ object KafkaConfig {
.define(ElasticStreamKvEndpointProp, STRING, null, HIGH, ElasticStreamKvEndpointDoc)
.define(ElasticStreamNamespaceProp, STRING, null, MEDIUM, ElasticStreamNamespaceDoc)
// elastic stream inject end

// Kafka on S3 inject start
.define(S3RegionProp, STRING, null, HIGH, S3RegionDoc)
.define(S3BucketProp, STRING, null, HIGH, S3BucketDoc)
// Kafka on S3 inject end
}

/** ********* Remote Log Management Configuration *********/
Expand Down Expand Up @@ -1569,7 +1582,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
// We make it part of each instance rather than the object to facilitate testing.
private val zkClientConfigViaSystemProperties = new ZKClientConfig()

override def originals: util.Map[String, AnyRef] =
override def /** ********* Raft Quorum Configuration *********/originals: util.Map[String, AnyRef] =
if (this eq currentConfig) super.originals else currentConfig.originals
override def values: util.Map[String, _] =
if (this eq currentConfig) super.values else currentConfig.values
Expand Down Expand Up @@ -1992,6 +2005,12 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val elasticStreamNamespace = getString(KafkaConfig.ElasticStreamNamespaceProp)
// elastic stream inject end

// Kafka on S3 inject start
/** ********* Kafka on S3 Configuration *********/
val s3Region = getString(KafkaConfig.S3RegionProp)
val s3Bucket = getString(KafkaConfig.S3BucketProp)
// Kafka on S3 inject end

def addReconfigurable(reconfigurable: Reconfigurable): Unit = {
dynamicConfig.addReconfigurable(reconfigurable)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ object MetadataCacheTest {
image.clientQuotas(),
image.producerIds(),
image.acls(),
image.streamsMetadata())
image.streamsMetadata(),
image.objectsMetadata())
val delta = new MetadataDelta.Builder().setImage(partialImage).build()

def toRecord(broker: UpdateMetadataBroker): RegisterBrokerRecord = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
import org.apache.kafka.common.{IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, FeaturesImage, MetadataImage, MetadataProvenance, ProducerIdsImage, TopicsDelta, TopicsImage, S3StreamsMetadataImage}
import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, FeaturesImage, MetadataImage, MetadataProvenance, ProducerIdsImage, S3ObjectsImage, S3StreamsMetadataImage, TopicsDelta, TopicsImage}
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
Expand Down Expand Up @@ -4129,7 +4129,8 @@ class ReplicaManagerTest {
ClientQuotasImage.EMPTY,
ProducerIdsImage.EMPTY,
AclsImage.EMPTY,
S3StreamsMetadataImage.EMPTY
S3StreamsMetadataImage.EMPTY,
S3ObjectsImage.EMPTY,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.controller.stream.S3ObjectControlManager;
import org.apache.kafka.controller.stream.StreamControlManager;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
Expand All @@ -83,6 +85,7 @@
import org.apache.kafka.metadata.migration.ZkRecordConsumer;
import org.apache.kafka.metadata.placement.ReplicaPlacer;
import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
import org.apache.kafka.metadata.stream.S3Config;
import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
Expand Down Expand Up @@ -180,6 +183,12 @@ static public class Builder {
private int maxRecordsPerBatch = MAX_RECORDS_PER_BATCH;
private boolean zkMigrationEnabled = false;

// Kafka on S3 inject start

private S3Config s3Config;

// Kafka on S3 inject end

public Builder(int nodeId, String clusterId) {
this.nodeId = nodeId;
this.clusterId = clusterId;
Expand Down Expand Up @@ -299,6 +308,15 @@ public Builder setZkMigrationEnabled(boolean zkMigrationEnabled) {
return this;
}

// Kafka on S3 inject start

public Builder setS3Config(S3Config s3Config) {
this.s3Config = s3Config;
return this;
}

// Kafka on S3 inject end

@SuppressWarnings("unchecked")
public QuorumController build() throws Exception {
if (raftClient == null) {
Expand Down Expand Up @@ -349,7 +367,8 @@ public QuorumController build() throws Exception {
staticConfig,
bootstrapMetadata,
maxRecordsPerBatch,
zkMigrationEnabled
zkMigrationEnabled,
s3Config
);
} catch (Exception e) {
Utils.closeQuietly(queue, "event queue");
Expand Down Expand Up @@ -1644,6 +1663,24 @@ private enum ImbalanceSchedule {
*/
private final int maxRecordsPerBatch;

// Kafka on S3 inject start

private final S3Config s3Config;

/**
* An object which stores the controller's view of the S3 objects.
* This must be accessed only by the event queue thread.
*/
private final S3ObjectControlManager s3ObjectControlManager;

/**
* An object which stores the controller's view of the Streams.
* This must be accessed only by the event queue thread.
*/
private final StreamControlManager streamControlManager;

// Kafka on S3 inject end

private QuorumController(
FaultHandler fatalFaultHandler,
LogContext logContext,
Expand All @@ -1668,7 +1705,8 @@ private QuorumController(
Map<String, Object> staticConfig,
BootstrapMetadata bootstrapMetadata,
int maxRecordsPerBatch,
boolean zkMigrationEnabled
boolean zkMigrationEnabled,
S3Config s3Config
) {
this.fatalFaultHandler = fatalFaultHandler;
this.logContext = logContext;
Expand Down Expand Up @@ -1744,6 +1782,12 @@ private QuorumController(
this.curClaimEpoch = -1;
this.needToCompleteAuthorizerLoad = authorizer.isPresent();
this.zkRecordConsumer = new MigrationRecordConsumer();

// Kafka on S3 inject start
this.s3Config = s3Config;
this.s3ObjectControlManager = new S3ObjectControlManager(snapshotRegistry, logContext, clusterId, s3Config);
this.streamControlManager = new StreamControlManager(snapshotRegistry, logContext, this.s3ObjectControlManager);
// Kafka on S3 inject end
updateWriteOffset(-1);

resetToEmptyState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@

import java.util.LinkedList;
import java.util.Queue;
import org.apache.kafka.common.metadata.RemoveS3ObjectRecord;
import org.apache.kafka.common.metadata.S3ObjectRecord;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.controller.stream.S3ObjectKeyGeneratorManager.GenerateContextV0;
import org.apache.kafka.metadata.stream.S3Config;
import org.apache.kafka.metadata.stream.S3Object;
import org.apache.kafka.metadata.stream.S3ObjectState;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.slf4j.Logger;
Expand All @@ -34,20 +39,28 @@ public class S3ObjectControlManager {

private final TimelineHashMap<Long/*objectId*/, S3Object> objectsMetadata;

private final String clusterId;

private final S3Config config;

/**
* The objectId of the next object to be applied. (start from 0)
*/
private Long nextApplyObjectId = 0L;

// TODO: add timer task to periodically check if there are objects to be destroyed or created
// TODO: add timer task to periodically check if there are objects to be destroyed or expired
private final Queue<Long/*objectId*/> appliedObjects;
private final Queue<Long/*objectId*/> markDestroyedObjects;

public S3ObjectControlManager(
SnapshotRegistry snapshotRegistry,
LogContext logContext) {
LogContext logContext,
String clusterId,
S3Config config) {
this.snapshotRegistry = snapshotRegistry;
this.log = logContext.logger(S3ObjectControlManager.class);
this.clusterId = clusterId;
this.config = config;
this.objectsMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
this.appliedObjects = new LinkedList<>();
this.markDestroyedObjects = new LinkedList<>();
Expand All @@ -56,5 +69,18 @@ public S3ObjectControlManager(
public Long appliedObjectNum() {
return nextApplyObjectId;
}

public void replay(S3ObjectRecord record) {
GenerateContextV0 ctx = new GenerateContextV0(clusterId, record.objectId());
String objectKey = S3ObjectKeyGeneratorManager.getByVersion(0).generate(ctx);
S3Object object = new S3Object(record.objectId(), record.objectSize(), objectKey,
record.appliedTimeInMs(), record.expiredTimeInMs(), record.committedTimeInMs(), record.destroyedTimeInMs(), S3ObjectState.fromByte(record.objectState()));
objectsMetadata.put(record.objectId(), object);
}

public void replay(RemoveS3ObjectRecord record) {
objectsMetadata.remove(record.objectId());
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.controller.stream;

/**
* The S3ObjectKeyGeneratorManager manages all S3Object
*/
public final class S3ObjectKeyGeneratorManager {

public static class GenerateContext {
protected Long objectId;
protected GenerateContext(Long objectId) {
this.objectId = objectId;
}
}
interface S3ObjectKeyGenerator {
String generate(GenerateContext context);
}
public static S3ObjectKeyGenerator getByVersion(int version) {
switch (version) {
case 0: return generatorV0;
default: throw new IllegalArgumentException("Unsupported version " + version);
}
}

public static class GenerateContextV0 extends GenerateContext {
private String clusterName;

GenerateContextV0(String clusterName, Long objectId) {
super(objectId);
this.clusterName = clusterName;
}
}

static S3ObjectKeyGenerator generatorV0 = (GenerateContext ctx) -> {
if (!(ctx instanceof GenerateContextV0)) {
throw new IllegalArgumentException("Unsupported context " + ctx.getClass().getName());
}
GenerateContextV0 ctx0 = (GenerateContextV0) ctx;
return String.format("%s/%s/%d", ctx0.objectId.hashCode(), ctx0.clusterName, ctx0.objectId);
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,32 @@
public class BrokerS3WALMetadataDelta {

private final BrokerS3WALMetadataImage image;
private final Set<S3WALObject> changedS3WALObjects = new HashSet<>();
private final Set<S3WALObject> addedS3WALObjects = new HashSet<>();

private final Set<S3WALObject/*objectId*/> removedS3WALObjects = new HashSet<>();
private final Set<Long/*objectId*/> removedS3WALObjects = new HashSet<>();

public BrokerS3WALMetadataDelta(BrokerS3WALMetadataImage image) {
this.image = image;
}

public void replay(WALObjectRecord record) {
changedS3WALObjects.add(S3WALObject.of(record));
addedS3WALObjects.add(S3WALObject.of(record));
// new add or update, so remove from removedObjects
removedS3WALObjects.remove(record.objectId());
}

public void replay(RemoveWALObjectRecord record) {
removedS3WALObjects.add(new S3WALObject(record.objectId()));
removedS3WALObjects.add(record.objectId());
// new remove, so remove from addedObjects
addedS3WALObjects.remove(record.objectId());
}

public BrokerS3WALMetadataImage apply() {
List<S3WALObject> newS3WALObjects = new ArrayList<>(image.getWalObjects());
// add all changed WAL objects
newS3WALObjects.addAll(addedS3WALObjects);
// remove all removed WAL objects
newS3WALObjects.removeAll(removedS3WALObjects);
// add all changed WAL objects
newS3WALObjects.addAll(changedS3WALObjects);
return new BrokerS3WALMetadataImage(image.getBrokerId(), newS3WALObjects);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.kafka.image.writer.ImageWriterOptions;

public class BrokerS3WALMetadataImage {
public static final BrokerS3WALMetadataImage EMPTY = new BrokerS3WALMetadataImage(-1, List.of());
private final Integer brokerId;
private final List<S3WALObject> s3WalObjects;

Expand Down
Loading