Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public enum ApiKeys {
CLOSE_STREAMS(ApiMessageType.CLOSE_STREAMS, false, true),
TRIM_STREAMS(ApiMessageType.TRIM_STREAMS, false, true),
PREPARE_S3_OBJECT(ApiMessageType.PREPARE_S3_OBJECT, false, true),
COMMIT_WALOBJECT(ApiMessageType.COMMIT_WALOBJECT, false, true),
COMMIT_SST_OBJECT(ApiMessageType.COMMIT_SSTOBJECT, false, true),
COMMIT_STREAM_OBJECT(ApiMessageType.COMMIT_STREAM_OBJECT, false, true),
GET_OPENING_STREAMS(ApiMessageType.GET_OPENING_STREAMS, false, true),
GET_KVS(ApiMessageType.GET_KVS, false, true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import java.util.Map;
import org.apache.kafka.common.requests.s3.CloseStreamsRequest;
import org.apache.kafka.common.requests.s3.CommitStreamObjectRequest;
import org.apache.kafka.common.requests.s3.CommitWALObjectRequest;
import org.apache.kafka.common.requests.s3.CommitSSTObjectRequest;
import org.apache.kafka.common.requests.s3.CreateStreamsRequest;
import org.apache.kafka.common.requests.s3.DeleteKVsRequest;
import org.apache.kafka.common.requests.s3.DeleteStreamsRequest;
Expand Down Expand Up @@ -331,8 +331,8 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion,
return TrimStreamsRequest.parse(buffer, apiVersion);
case PREPARE_S3_OBJECT:
return PrepareS3ObjectRequest.parse(buffer, apiVersion);
case COMMIT_WALOBJECT:
return CommitWALObjectRequest.parse(buffer, apiVersion);
case COMMIT_SST_OBJECT:
return CommitSSTObjectRequest.parse(buffer, apiVersion);
case COMMIT_STREAM_OBJECT:
return CommitStreamObjectRequest.parse(buffer, apiVersion);
case GET_OPENING_STREAMS:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import java.util.stream.Stream;
import org.apache.kafka.common.requests.s3.CloseStreamsResponse;
import org.apache.kafka.common.requests.s3.CommitStreamObjectResponse;
import org.apache.kafka.common.requests.s3.CommitWALObjectResponse;
import org.apache.kafka.common.requests.s3.CommitSSTObjectResponse;
import org.apache.kafka.common.requests.s3.CreateStreamsResponse;
import org.apache.kafka.common.requests.s3.DeleteKVsResponse;
import org.apache.kafka.common.requests.s3.DeleteStreamsResponse;
Expand Down Expand Up @@ -277,8 +277,8 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, ByteBuffer response
return PrepareS3ObjectResponse.parse(responseBuffer, version);
case COMMIT_STREAM_OBJECT:
return CommitStreamObjectResponse.parse(responseBuffer, version);
case COMMIT_WALOBJECT:
return CommitWALObjectResponse.parse(responseBuffer, version);
case COMMIT_SST_OBJECT:
return CommitSSTObjectResponse.parse(responseBuffer, version);
case GET_OPENING_STREAMS:
return GetOpeningStreamsResponse.parse(responseBuffer, version);
case GET_KVS:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,56 +18,56 @@
package org.apache.kafka.common.requests.s3;

import java.nio.ByteBuffer;
import org.apache.kafka.common.message.CommitWALObjectRequestData;
import org.apache.kafka.common.message.CommitWALObjectResponseData;
import org.apache.kafka.common.message.CommitSSTObjectRequestData;
import org.apache.kafka.common.message.CommitSSTObjectResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiError;

public class CommitWALObjectRequest extends AbstractRequest {
public class CommitSSTObjectRequest extends AbstractRequest {

public static class Builder extends AbstractRequest.Builder<CommitWALObjectRequest> {
public static class Builder extends AbstractRequest.Builder<CommitSSTObjectRequest> {

private final CommitWALObjectRequestData data;
public Builder(CommitWALObjectRequestData data) {
super(ApiKeys.COMMIT_WALOBJECT);
private final CommitSSTObjectRequestData data;
public Builder(CommitSSTObjectRequestData data) {
super(ApiKeys.COMMIT_SST_OBJECT);
this.data = data;
}

@Override
public CommitWALObjectRequest build(short version) {
return new CommitWALObjectRequest(data, version);
public CommitSSTObjectRequest build(short version) {
return new CommitSSTObjectRequest(data, version);
}

@Override
public String toString() {
return data.toString();
}
}
private final CommitWALObjectRequestData data;
private final CommitSSTObjectRequestData data;

public CommitWALObjectRequest(CommitWALObjectRequestData data, short version) {
super(ApiKeys.COMMIT_WALOBJECT, version);
public CommitSSTObjectRequest(CommitSSTObjectRequestData data, short version) {
super(ApiKeys.COMMIT_SST_OBJECT, version);
this.data = data;
}

@Override
public CommitWALObjectResponse getErrorResponse(int throttleTimeMs, Throwable e) {
public CommitSSTObjectResponse getErrorResponse(int throttleTimeMs, Throwable e) {
ApiError apiError = ApiError.fromThrowable(e);
CommitWALObjectResponseData response = new CommitWALObjectResponseData()
CommitSSTObjectResponseData response = new CommitSSTObjectResponseData()
.setErrorCode(apiError.error().code())
.setThrottleTimeMs(throttleTimeMs);
return new CommitWALObjectResponse(response);
return new CommitSSTObjectResponse(response);
}

@Override
public CommitWALObjectRequestData data() {
public CommitSSTObjectRequestData data() {
return data;
}

public static CommitWALObjectRequest parse(ByteBuffer buffer, short version) {
return new CommitWALObjectRequest(new CommitWALObjectRequestData(
public static CommitSSTObjectRequest parse(ByteBuffer buffer, short version) {
return new CommitSSTObjectRequest(new CommitSSTObjectRequestData(
new ByteBufferAccessor(buffer), version), version);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,24 @@

import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.kafka.common.message.CommitWALObjectResponseData;
import org.apache.kafka.common.message.CommitSSTObjectResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;

public class CommitWALObjectResponse extends AbstractResponse {
public class CommitSSTObjectResponse extends AbstractResponse {


private final CommitWALObjectResponseData data;
private final CommitSSTObjectResponseData data;

public CommitWALObjectResponse(CommitWALObjectResponseData data) {
super(ApiKeys.COMMIT_WALOBJECT);
public CommitSSTObjectResponse(CommitSSTObjectResponseData data) {
super(ApiKeys.COMMIT_SST_OBJECT);
this.data = data;
}

@Override
public CommitWALObjectResponseData data() {
public CommitSSTObjectResponseData data() {
return data;
}

Expand All @@ -55,8 +55,8 @@ public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}

public static CommitWALObjectResponse parse(ByteBuffer buffer, short version) {
return new CommitWALObjectResponse(new CommitWALObjectResponseData(
public static CommitSSTObjectResponse parse(ByteBuffer buffer, short version) {
return new CommitSSTObjectResponse(new CommitSSTObjectResponseData(
new ByteBufferAccessor(buffer), version));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"controller",
"broker"
],
"name": "CommitWALObjectRequest",
"name": "CommitSSTObjectRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
Expand All @@ -40,25 +40,25 @@
"name": "ObjectId",
"type": "int64",
"versions": "0+",
"about": "The ID of the WAL S3 object to commit"
"about": "The ID of the SST S3 object to commit"
},
{
"name": "OrderId",
"type": "int64",
"versions": "0+",
"about": "The order ID of the WAL S3 object"
"about": "The order ID of the SST S3 object"
},
{
"name": "ObjectSize",
"type": "int64",
"versions": "0+",
"about": "The size of the WAL S3 object to commit"
"about": "The size of the SST S3 object to commit"
},
{
"name": "ObjectStreamRanges",
"type": "[]ObjectStreamRange",
"versions": "0+",
"about": "The stream ranges of the WAL S3 object to commit",
"about": "The stream ranges of the SST S3 object to commit",
"fields": [
{
"name": "StreamId",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
{
"apiKey": 506,
"type": "response",
"name": "CommitWALObjectResponse",
"name": "CommitSSTObjectResponse",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
Expand Down
20 changes: 10 additions & 10 deletions config/kraft/broker.properties
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ s3.wal.capacity=1073741824
# The maximum size of memory cache delta WAL can use, default 200MB
s3.wal.cache.size=209715200

# The batched size of WAL object before being uploaded to S3, default 100MB
s3.wal.object.size=104857600
# The batched size of delta WAL before being uploaded to S3, default 100MB
s3.wal.upload.threshold=104857600

# The maximum size of block cache the broker can use to cache data read from S3, default 100MB
s3.block.cache.size=104857600
Expand All @@ -166,17 +166,17 @@ s3.stream.object.compaction.living.time.minutes=60
# The maximum size of stream object allowed to be generated in stream compaction, default 10GB
s3.stream.object.compaction.max.size.bytes=10737418240

# The execution interval for WAL object compaction, default 20 minutes
s3.wal.object.compaction.interval.minutes=20
# The execution interval for SST object compaction, default 20 minutes
s3.sst.compaction.interval.minutes=20

# The maximum allowed memory consumption for WAL object compaction, default 200MB
s3.wal.object.compaction.cache.size=209715200
# The maximum allowed memory consumption for SST object compaction, default 200MB
s3.sst.compaction.cache.size=209715200

# The minimum time before a WAL object to be force split into multiple stream object, default 120 minutes
s3.wal.object.compaction.force.split.time=120
# The minimum time before a SST object to be force split into multiple stream object, default 120 minutes
s3.sst.compaction.force.split.minutes=120

# The maximum WAL objects allowed to be compacted in one execution, default 500
s3.wal.object.compaction.max.num=500
# The maximum SST objects allowed to be compacted in one execution, default 500
s3.sst.compaction.max.num=500

# The baseline network bandwidth of the broker, default 100MB. This is used to throttle the network usage during compaction
s3.network.baseline.bandwidth=104857600
Expand Down
20 changes: 10 additions & 10 deletions config/kraft/server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ s3.wal.capacity=1073741824
# The maximum size of memory cache delta WAL can use, default 200MB
s3.wal.cache.size=209715200

# The batched size of WAL object before being uploaded to S3, default 100MB
s3.wal.object.size=104857600
# The batched size of delta WAL before being uploaded to S3, default 100MB
s3.wal.upload.threshold=104857600

# The maximum size of block cache the broker can use to cache data read from S3, default 100MB
s3.block.cache.size=104857600
Expand All @@ -172,17 +172,17 @@ s3.stream.object.compaction.living.time.minutes=60
# The maximum size of stream object allowed to be generated in stream compaction, default 10GB
s3.stream.object.compaction.max.size.bytes=10737418240

# The execution interval for WAL object compaction, default 20 minutes
s3.wal.object.compaction.interval.minutes=20
# The execution interval for SST object compaction, default 20 minutes
s3.sst.compaction.interval.minutes=20

# The maximum allowed memory consumption for WAL object compaction, default 200MB
s3.wal.object.compaction.cache.size=209715200
# The maximum allowed memory consumption for SST object compaction, default 200MB
s3.sst.compaction.cache.size=209715200

# The minimum time before a WAL object to be force split into multiple stream object, default 120 minutes
s3.wal.object.compaction.force.split.time=120
# The minimum time before a SST object to be force split into multiple stream object, default 120 minutes
s3.sst.compaction.force.split.minutes=120

# The maximum WAL objects allowed to be compacted in one execution, default 500
s3.wal.object.compaction.max.num=500
# The maximum SST objects allowed to be compacted in one execution, default 500
s3.sst.compaction.max.num=500

# The baseline network bandwidth of the broker, default 100MB. This is used to throttle the network usage during compaction
s3.network.baseline.bandwidth=104857600
Expand Down
61 changes: 30 additions & 31 deletions core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,37 +25,36 @@ public class ConfigUtils {
public static Config to(KafkaConfig s) {
return new Config()
.brokerId(s.brokerId())
.s3Endpoint(s.s3Endpoint())
.s3Region(s.s3Region())
.s3Bucket(s.s3Bucket())
.s3WALPath(s.s3WALPath())
.s3WALCacheSize(s.s3WALCacheSize())
.s3WALCapacity(s.s3WALCapacity())
.s3WALHeaderFlushIntervalSeconds(s.s3WALHeaderFlushIntervalSeconds())
.s3WALThread(s.s3WALThread())
.s3WALWindowInitial(s.s3WALWindowInitial())
.s3WALWindowIncrement(s.s3WALWindowIncrement())
.s3WALWindowMax(s.s3WALWindowMax())
.s3WALObjectSize(s.s3WALObjectSize())
.s3StreamSplitSize(s.s3StreamSplitSize())
.s3ObjectBlockSize(s.s3ObjectBlockSize())
.s3ObjectPartSize(s.s3ObjectPartSize())
.s3BlockCacheSize(s.s3BlockCacheSize())
.s3StreamObjectCompactionIntervalMinutes(s.s3StreamObjectCompactionTaskIntervalMinutes())
.s3StreamObjectCompactionMaxSizeBytes(s.s3StreamObjectCompactionMaxSizeBytes())
.s3StreamObjectCompactionLivingTimeMinutes(s.s3StreamObjectCompactionLivingTimeMinutes())
.s3ControllerRequestRetryMaxCount(s.s3ControllerRequestRetryMaxCount())
.s3ControllerRequestRetryBaseDelayMs(s.s3ControllerRequestRetryBaseDelayMs())
.s3WALObjectCompactionInterval(s.s3WALObjectCompactionInterval())
.s3WALObjectCompactionCacheSize(s.s3WALObjectCompactionCacheSize())
.s3WALObjectCompactionUploadConcurrency(s.s3WALObjectCompactionUploadConcurrency())
.s3MaxStreamNumPerWALObject(s.s3MaxStreamNumPerWALObject())
.s3MaxStreamObjectNumPerCommit(s.s3MaxStreamObjectNumPerCommit())
.s3WALObjectCompactionStreamSplitSize(s.s3WALObjectCompactionStreamSplitSize())
.s3WALObjectCompactionForceSplitPeriod(s.s3WALObjectCompactionForceSplitPeriod())
.s3WALObjectCompactionMaxObjectNum(s.s3WALObjectCompactionMaxObjectNum())
.s3MockEnable(s.s3MockEnable())
.s3ObjectLogEnable(s.s3ObjectLogEnable())
.endpoint(s.s3Endpoint())
.region(s.s3Region())
.bucket(s.s3Bucket())
.walPath(s.s3WALPath())
.walCacheSize(s.s3WALCacheSize())
.walCapacity(s.s3WALCapacity())
.walHeaderFlushIntervalSeconds(s.s3WALHeaderFlushIntervalSeconds())
.walThread(s.s3WALThread())
.walWindowInitial(s.s3WALWindowInitial())
.walWindowIncrement(s.s3WALWindowIncrement())
.walWindowMax(s.s3WALWindowMax())
.walUploadThreshold(s.s3WALUploadThreshold())
.streamSplitSize(s.s3StreamSplitSize())
.objectBlockSize(s.s3ObjectBlockSize())
.objectPartSize(s.s3ObjectPartSize())
.blockCacheSize(s.s3BlockCacheSize())
.streamObjectCompactionIntervalMinutes(s.s3StreamObjectCompactionTaskIntervalMinutes())
.streamObjectCompactionMaxSizeBytes(s.s3StreamObjectCompactionMaxSizeBytes())
.streamObjectCompactionLivingTimeMinutes(s.s3StreamObjectCompactionLivingTimeMinutes())
.controllerRequestRetryMaxCount(s.s3ControllerRequestRetryMaxCount())
.controllerRequestRetryBaseDelayMs(s.s3ControllerRequestRetryBaseDelayMs())
.sstCompactionInterval(s.s3SSTCompactionInterval())
.sstCompactionCacheSize(s.s3SSTCompactionCacheSize())
.maxStreamNumPerSST(s.s3MaxStreamNumPerSST())
.maxStreamObjectNumPerCommit(s.s3MaxStreamObjectNumPerCommit())
.sstCompactionStreamSplitSize(s.s3SSTCompactionStreamSplitSize())
.sstCompactionForceSplitPeriod(s.s3SSTCompactionForceSplitMinutes())
.sstCompactionMaxObjectNum(s.s3SSTCompactionMaxObjectNum())
.mockEnable(s.s3MockEnable())
.objectLogEnable(s.s3ObjectLogEnable())
.networkBaselineBandwidth(s.s3NetworkBaselineBandwidthProp())
.refillPeriodMs(s.s3RefillPeriodMsProp());

Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ public DefaultS3Client(BrokerServer brokerServer, KafkaConfig kafkaConfig) {
this.requestSender = new ControllerRequestSender(brokerServer, retryPolicyContext);
this.streamManager = new ControllerStreamManager(this.metadataManager, this.requestSender, kafkaConfig);
this.objectManager = new ControllerObjectManager(this.requestSender, this.metadataManager, kafkaConfig);
this.blockCache = new DefaultS3BlockCache(this.config.s3BlockCacheSize(), objectManager, s3Operator);
this.blockCache = new DefaultS3BlockCache(this.config.blockCacheSize(), objectManager, s3Operator);
this.compactionManager = new CompactionManager(this.config, this.objectManager, this.streamManager, compactionS3Operator);
this.writeAheadLog = BlockWALService.builder(this.config.s3WALPath(), this.config.s3WALCapacity()).config(this.config).build();
this.writeAheadLog = BlockWALService.builder(this.config.walPath(), this.config.walCapacity()).config(this.config).build();
this.storage = new S3Storage(this.config, writeAheadLog, streamManager, objectManager, blockCache, s3Operator);
// stream object compactions share the same s3Operator with wal object compactions
// stream object compactions share the same s3Operator with SST object compactions
this.streamClient = new S3StreamClient(this.streamManager, this.storage, this.objectManager, compactionS3Operator, this.config, networkInboundLimiter, networkOutboundLimiter);
this.kvClient = new ControllerKVClient(this.requestSender);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ private void onImageChanged(MetadataDelta delta, MetadataImage newImage) {
}
}

public CompletableFuture<List<S3ObjectMetadata>> getWALObjects() {
public CompletableFuture<List<S3ObjectMetadata>> getSSTObjects() {
synchronized (this) {
List<S3ObjectMetadata> s3ObjectMetadataList = this.streamsImage.getWALObjects(config.brokerId()).stream()
List<S3ObjectMetadata> s3ObjectMetadataList = this.streamsImage.getSSTObjects(config.brokerId()).stream()
.map(object -> {
S3Object s3Object = this.objectsImage.getObjectMetadata(object.objectId());
return new S3ObjectMetadata(object.objectId(), object.objectType(),
Expand Down
Loading