diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index e6563f5a0e..f424b4988d 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -117,7 +117,7 @@ public enum ApiKeys { CLOSE_STREAMS(ApiMessageType.CLOSE_STREAMS, false, true), TRIM_STREAMS(ApiMessageType.TRIM_STREAMS, false, true), PREPARE_S3_OBJECT(ApiMessageType.PREPARE_S3_OBJECT, false, true), - COMMIT_WALOBJECT(ApiMessageType.COMMIT_WALOBJECT, false, true), + COMMIT_SST_OBJECT(ApiMessageType.COMMIT_SSTOBJECT, false, true), COMMIT_STREAM_OBJECT(ApiMessageType.COMMIT_STREAM_OBJECT, false, true), GET_OPENING_STREAMS(ApiMessageType.GET_OPENING_STREAMS, false, true), GET_KVS(ApiMessageType.GET_KVS, false, true), diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index 78b7c1b989..78f96ae7f3 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -28,7 +28,7 @@ import java.util.Map; import org.apache.kafka.common.requests.s3.CloseStreamsRequest; import org.apache.kafka.common.requests.s3.CommitStreamObjectRequest; -import org.apache.kafka.common.requests.s3.CommitWALObjectRequest; +import org.apache.kafka.common.requests.s3.CommitSSTObjectRequest; import org.apache.kafka.common.requests.s3.CreateStreamsRequest; import org.apache.kafka.common.requests.s3.DeleteKVsRequest; import org.apache.kafka.common.requests.s3.DeleteStreamsRequest; @@ -331,8 +331,8 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion, return TrimStreamsRequest.parse(buffer, apiVersion); case PREPARE_S3_OBJECT: return PrepareS3ObjectRequest.parse(buffer, apiVersion); - case COMMIT_WALOBJECT: - return CommitWALObjectRequest.parse(buffer, apiVersion); + case COMMIT_SST_OBJECT: + return CommitSSTObjectRequest.parse(buffer, apiVersion); case COMMIT_STREAM_OBJECT: return CommitStreamObjectRequest.parse(buffer, apiVersion); case GET_OPENING_STREAMS: diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index 6822a311af..5a0a607311 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -31,7 +31,7 @@ import java.util.stream.Stream; import org.apache.kafka.common.requests.s3.CloseStreamsResponse; import org.apache.kafka.common.requests.s3.CommitStreamObjectResponse; -import org.apache.kafka.common.requests.s3.CommitWALObjectResponse; +import org.apache.kafka.common.requests.s3.CommitSSTObjectResponse; import org.apache.kafka.common.requests.s3.CreateStreamsResponse; import org.apache.kafka.common.requests.s3.DeleteKVsResponse; import org.apache.kafka.common.requests.s3.DeleteStreamsResponse; @@ -277,8 +277,8 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, ByteBuffer response return PrepareS3ObjectResponse.parse(responseBuffer, version); case COMMIT_STREAM_OBJECT: return CommitStreamObjectResponse.parse(responseBuffer, version); - case COMMIT_WALOBJECT: - return CommitWALObjectResponse.parse(responseBuffer, version); + case COMMIT_SST_OBJECT: + return CommitSSTObjectResponse.parse(responseBuffer, version); case GET_OPENING_STREAMS: return GetOpeningStreamsResponse.parse(responseBuffer, version); case GET_KVS: diff --git a/clients/src/main/java/org/apache/kafka/common/requests/s3/CommitWALObjectRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/CommitSSTObjectRequest.java similarity index 63% rename from clients/src/main/java/org/apache/kafka/common/requests/s3/CommitWALObjectRequest.java rename to clients/src/main/java/org/apache/kafka/common/requests/s3/CommitSSTObjectRequest.java index 51a1df24c0..59655a3be5 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/s3/CommitWALObjectRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/CommitSSTObjectRequest.java @@ -18,26 +18,26 @@ package org.apache.kafka.common.requests.s3; import java.nio.ByteBuffer; -import org.apache.kafka.common.message.CommitWALObjectRequestData; -import org.apache.kafka.common.message.CommitWALObjectResponseData; +import org.apache.kafka.common.message.CommitSSTObjectRequestData; +import org.apache.kafka.common.message.CommitSSTObjectResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.ApiError; -public class CommitWALObjectRequest extends AbstractRequest { +public class CommitSSTObjectRequest extends AbstractRequest { - public static class Builder extends AbstractRequest.Builder { + public static class Builder extends AbstractRequest.Builder { - private final CommitWALObjectRequestData data; - public Builder(CommitWALObjectRequestData data) { - super(ApiKeys.COMMIT_WALOBJECT); + private final CommitSSTObjectRequestData data; + public Builder(CommitSSTObjectRequestData data) { + super(ApiKeys.COMMIT_SST_OBJECT); this.data = data; } @Override - public CommitWALObjectRequest build(short version) { - return new CommitWALObjectRequest(data, version); + public CommitSSTObjectRequest build(short version) { + return new CommitSSTObjectRequest(data, version); } @Override @@ -45,29 +45,29 @@ public String toString() { return data.toString(); } } - private final CommitWALObjectRequestData data; + private final CommitSSTObjectRequestData data; - public CommitWALObjectRequest(CommitWALObjectRequestData data, short version) { - super(ApiKeys.COMMIT_WALOBJECT, version); + public CommitSSTObjectRequest(CommitSSTObjectRequestData data, short version) { + super(ApiKeys.COMMIT_SST_OBJECT, version); this.data = data; } @Override - public CommitWALObjectResponse getErrorResponse(int throttleTimeMs, Throwable e) { + public CommitSSTObjectResponse getErrorResponse(int throttleTimeMs, Throwable e) { ApiError apiError = ApiError.fromThrowable(e); - CommitWALObjectResponseData response = new CommitWALObjectResponseData() + CommitSSTObjectResponseData response = new CommitSSTObjectResponseData() .setErrorCode(apiError.error().code()) .setThrottleTimeMs(throttleTimeMs); - return new CommitWALObjectResponse(response); + return new CommitSSTObjectResponse(response); } @Override - public CommitWALObjectRequestData data() { + public CommitSSTObjectRequestData data() { return data; } - public static CommitWALObjectRequest parse(ByteBuffer buffer, short version) { - return new CommitWALObjectRequest(new CommitWALObjectRequestData( + public static CommitSSTObjectRequest parse(ByteBuffer buffer, short version) { + return new CommitSSTObjectRequest(new CommitSSTObjectRequestData( new ByteBufferAccessor(buffer), version), version); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/s3/CommitWALObjectResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/CommitSSTObjectResponse.java similarity index 78% rename from clients/src/main/java/org/apache/kafka/common/requests/s3/CommitWALObjectResponse.java rename to clients/src/main/java/org/apache/kafka/common/requests/s3/CommitSSTObjectResponse.java index f46c8b4442..34f5646895 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/s3/CommitWALObjectResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/CommitSSTObjectResponse.java @@ -19,24 +19,24 @@ import java.nio.ByteBuffer; import java.util.Map; -import org.apache.kafka.common.message.CommitWALObjectResponseData; +import org.apache.kafka.common.message.CommitSSTObjectResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.AbstractResponse; -public class CommitWALObjectResponse extends AbstractResponse { +public class CommitSSTObjectResponse extends AbstractResponse { - private final CommitWALObjectResponseData data; + private final CommitSSTObjectResponseData data; - public CommitWALObjectResponse(CommitWALObjectResponseData data) { - super(ApiKeys.COMMIT_WALOBJECT); + public CommitSSTObjectResponse(CommitSSTObjectResponseData data) { + super(ApiKeys.COMMIT_SST_OBJECT); this.data = data; } @Override - public CommitWALObjectResponseData data() { + public CommitSSTObjectResponseData data() { return data; } @@ -55,8 +55,8 @@ public void maybeSetThrottleTimeMs(int throttleTimeMs) { data.setThrottleTimeMs(throttleTimeMs); } - public static CommitWALObjectResponse parse(ByteBuffer buffer, short version) { - return new CommitWALObjectResponse(new CommitWALObjectResponseData( + public static CommitSSTObjectResponse parse(ByteBuffer buffer, short version) { + return new CommitSSTObjectResponse(new CommitSSTObjectResponseData( new ByteBufferAccessor(buffer), version)); } diff --git a/clients/src/main/resources/common/message/CommitWALObjectRequest.json b/clients/src/main/resources/common/message/CommitSSTObjectRequest.json similarity index 92% rename from clients/src/main/resources/common/message/CommitWALObjectRequest.json rename to clients/src/main/resources/common/message/CommitSSTObjectRequest.json index 6f2691334f..1c545e41cf 100644 --- a/clients/src/main/resources/common/message/CommitWALObjectRequest.json +++ b/clients/src/main/resources/common/message/CommitSSTObjectRequest.json @@ -20,7 +20,7 @@ "controller", "broker" ], - "name": "CommitWALObjectRequest", + "name": "CommitSSTObjectRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ @@ -40,25 +40,25 @@ "name": "ObjectId", "type": "int64", "versions": "0+", - "about": "The ID of the WAL S3 object to commit" + "about": "The ID of the SST S3 object to commit" }, { "name": "OrderId", "type": "int64", "versions": "0+", - "about": "The order ID of the WAL S3 object" + "about": "The order ID of the SST S3 object" }, { "name": "ObjectSize", "type": "int64", "versions": "0+", - "about": "The size of the WAL S3 object to commit" + "about": "The size of the SST S3 object to commit" }, { "name": "ObjectStreamRanges", "type": "[]ObjectStreamRange", "versions": "0+", - "about": "The stream ranges of the WAL S3 object to commit", + "about": "The stream ranges of the SST S3 object to commit", "fields": [ { "name": "StreamId", diff --git a/clients/src/main/resources/common/message/CommitWALObjectResponse.json b/clients/src/main/resources/common/message/CommitSSTObjectResponse.json similarity index 97% rename from clients/src/main/resources/common/message/CommitWALObjectResponse.json rename to clients/src/main/resources/common/message/CommitSSTObjectResponse.json index 83a436542a..3142dbd6e4 100644 --- a/clients/src/main/resources/common/message/CommitWALObjectResponse.json +++ b/clients/src/main/resources/common/message/CommitSSTObjectResponse.json @@ -16,7 +16,7 @@ { "apiKey": 506, "type": "response", - "name": "CommitWALObjectResponse", + "name": "CommitSSTObjectResponse", "validVersions": "0", "flexibleVersions": "0+", "fields": [ diff --git a/config/kraft/broker.properties b/config/kraft/broker.properties index 3c1cd8873d..34a65f6b93 100644 --- a/config/kraft/broker.properties +++ b/config/kraft/broker.properties @@ -151,8 +151,8 @@ s3.wal.capacity=1073741824 # The maximum size of memory cache delta WAL can use, default 200MB s3.wal.cache.size=209715200 -# The batched size of WAL object before being uploaded to S3, default 100MB -s3.wal.object.size=104857600 +# The batched size of delta WAL before being uploaded to S3, default 100MB +s3.wal.upload.threshold=104857600 # The maximum size of block cache the broker can use to cache data read from S3, default 100MB s3.block.cache.size=104857600 @@ -166,17 +166,17 @@ s3.stream.object.compaction.living.time.minutes=60 # The maximum size of stream object allowed to be generated in stream compaction, default 10GB s3.stream.object.compaction.max.size.bytes=10737418240 -# The execution interval for WAL object compaction, default 20 minutes -s3.wal.object.compaction.interval.minutes=20 +# The execution interval for SST object compaction, default 20 minutes +s3.sst.compaction.interval.minutes=20 -# The maximum allowed memory consumption for WAL object compaction, default 200MB -s3.wal.object.compaction.cache.size=209715200 +# The maximum allowed memory consumption for SST object compaction, default 200MB +s3.sst.compaction.cache.size=209715200 -# The minimum time before a WAL object to be force split into multiple stream object, default 120 minutes -s3.wal.object.compaction.force.split.time=120 +# The minimum time before a SST object to be force split into multiple stream object, default 120 minutes +s3.sst.compaction.force.split.minutes=120 -# The maximum WAL objects allowed to be compacted in one execution, default 500 -s3.wal.object.compaction.max.num=500 +# The maximum SST objects allowed to be compacted in one execution, default 500 +s3.sst.compaction.max.num=500 # The baseline network bandwidth of the broker, default 100MB. This is used to throttle the network usage during compaction s3.network.baseline.bandwidth=104857600 diff --git a/config/kraft/server.properties b/config/kraft/server.properties index 1c1fc2cf32..221c4016b3 100644 --- a/config/kraft/server.properties +++ b/config/kraft/server.properties @@ -157,8 +157,8 @@ s3.wal.capacity=1073741824 # The maximum size of memory cache delta WAL can use, default 200MB s3.wal.cache.size=209715200 -# The batched size of WAL object before being uploaded to S3, default 100MB -s3.wal.object.size=104857600 +# The batched size of delta WAL before being uploaded to S3, default 100MB +s3.wal.upload.threshold=104857600 # The maximum size of block cache the broker can use to cache data read from S3, default 100MB s3.block.cache.size=104857600 @@ -172,17 +172,17 @@ s3.stream.object.compaction.living.time.minutes=60 # The maximum size of stream object allowed to be generated in stream compaction, default 10GB s3.stream.object.compaction.max.size.bytes=10737418240 -# The execution interval for WAL object compaction, default 20 minutes -s3.wal.object.compaction.interval.minutes=20 +# The execution interval for SST object compaction, default 20 minutes +s3.sst.compaction.interval.minutes=20 -# The maximum allowed memory consumption for WAL object compaction, default 200MB -s3.wal.object.compaction.cache.size=209715200 +# The maximum allowed memory consumption for SST object compaction, default 200MB +s3.sst.compaction.cache.size=209715200 -# The minimum time before a WAL object to be force split into multiple stream object, default 120 minutes -s3.wal.object.compaction.force.split.time=120 +# The minimum time before a SST object to be force split into multiple stream object, default 120 minutes +s3.sst.compaction.force.split.minutes=120 -# The maximum WAL objects allowed to be compacted in one execution, default 500 -s3.wal.object.compaction.max.num=500 +# The maximum SST objects allowed to be compacted in one execution, default 500 +s3.sst.compaction.max.num=500 # The baseline network bandwidth of the broker, default 100MB. This is used to throttle the network usage during compaction s3.network.baseline.bandwidth=104857600 diff --git a/core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java b/core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java index 75e4e148cc..cdfaca1621 100644 --- a/core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java +++ b/core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java @@ -25,37 +25,36 @@ public class ConfigUtils { public static Config to(KafkaConfig s) { return new Config() .brokerId(s.brokerId()) - .s3Endpoint(s.s3Endpoint()) - .s3Region(s.s3Region()) - .s3Bucket(s.s3Bucket()) - .s3WALPath(s.s3WALPath()) - .s3WALCacheSize(s.s3WALCacheSize()) - .s3WALCapacity(s.s3WALCapacity()) - .s3WALHeaderFlushIntervalSeconds(s.s3WALHeaderFlushIntervalSeconds()) - .s3WALThread(s.s3WALThread()) - .s3WALWindowInitial(s.s3WALWindowInitial()) - .s3WALWindowIncrement(s.s3WALWindowIncrement()) - .s3WALWindowMax(s.s3WALWindowMax()) - .s3WALObjectSize(s.s3WALObjectSize()) - .s3StreamSplitSize(s.s3StreamSplitSize()) - .s3ObjectBlockSize(s.s3ObjectBlockSize()) - .s3ObjectPartSize(s.s3ObjectPartSize()) - .s3BlockCacheSize(s.s3BlockCacheSize()) - .s3StreamObjectCompactionIntervalMinutes(s.s3StreamObjectCompactionTaskIntervalMinutes()) - .s3StreamObjectCompactionMaxSizeBytes(s.s3StreamObjectCompactionMaxSizeBytes()) - .s3StreamObjectCompactionLivingTimeMinutes(s.s3StreamObjectCompactionLivingTimeMinutes()) - .s3ControllerRequestRetryMaxCount(s.s3ControllerRequestRetryMaxCount()) - .s3ControllerRequestRetryBaseDelayMs(s.s3ControllerRequestRetryBaseDelayMs()) - .s3WALObjectCompactionInterval(s.s3WALObjectCompactionInterval()) - .s3WALObjectCompactionCacheSize(s.s3WALObjectCompactionCacheSize()) - .s3WALObjectCompactionUploadConcurrency(s.s3WALObjectCompactionUploadConcurrency()) - .s3MaxStreamNumPerWALObject(s.s3MaxStreamNumPerWALObject()) - .s3MaxStreamObjectNumPerCommit(s.s3MaxStreamObjectNumPerCommit()) - .s3WALObjectCompactionStreamSplitSize(s.s3WALObjectCompactionStreamSplitSize()) - .s3WALObjectCompactionForceSplitPeriod(s.s3WALObjectCompactionForceSplitPeriod()) - .s3WALObjectCompactionMaxObjectNum(s.s3WALObjectCompactionMaxObjectNum()) - .s3MockEnable(s.s3MockEnable()) - .s3ObjectLogEnable(s.s3ObjectLogEnable()) + .endpoint(s.s3Endpoint()) + .region(s.s3Region()) + .bucket(s.s3Bucket()) + .walPath(s.s3WALPath()) + .walCacheSize(s.s3WALCacheSize()) + .walCapacity(s.s3WALCapacity()) + .walHeaderFlushIntervalSeconds(s.s3WALHeaderFlushIntervalSeconds()) + .walThread(s.s3WALThread()) + .walWindowInitial(s.s3WALWindowInitial()) + .walWindowIncrement(s.s3WALWindowIncrement()) + .walWindowMax(s.s3WALWindowMax()) + .walUploadThreshold(s.s3WALUploadThreshold()) + .streamSplitSize(s.s3StreamSplitSize()) + .objectBlockSize(s.s3ObjectBlockSize()) + .objectPartSize(s.s3ObjectPartSize()) + .blockCacheSize(s.s3BlockCacheSize()) + .streamObjectCompactionIntervalMinutes(s.s3StreamObjectCompactionTaskIntervalMinutes()) + .streamObjectCompactionMaxSizeBytes(s.s3StreamObjectCompactionMaxSizeBytes()) + .streamObjectCompactionLivingTimeMinutes(s.s3StreamObjectCompactionLivingTimeMinutes()) + .controllerRequestRetryMaxCount(s.s3ControllerRequestRetryMaxCount()) + .controllerRequestRetryBaseDelayMs(s.s3ControllerRequestRetryBaseDelayMs()) + .sstCompactionInterval(s.s3SSTCompactionInterval()) + .sstCompactionCacheSize(s.s3SSTCompactionCacheSize()) + .maxStreamNumPerSST(s.s3MaxStreamNumPerSST()) + .maxStreamObjectNumPerCommit(s.s3MaxStreamObjectNumPerCommit()) + .sstCompactionStreamSplitSize(s.s3SSTCompactionStreamSplitSize()) + .sstCompactionForceSplitPeriod(s.s3SSTCompactionForceSplitMinutes()) + .sstCompactionMaxObjectNum(s.s3SSTCompactionMaxObjectNum()) + .mockEnable(s.s3MockEnable()) + .objectLogEnable(s.s3ObjectLogEnable()) .networkBaselineBandwidth(s.s3NetworkBaselineBandwidthProp()) .refillPeriodMs(s.s3RefillPeriodMsProp()); diff --git a/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java b/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java index 560d61a267..906fea8319 100644 --- a/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java +++ b/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java @@ -92,11 +92,11 @@ public DefaultS3Client(BrokerServer brokerServer, KafkaConfig kafkaConfig) { this.requestSender = new ControllerRequestSender(brokerServer, retryPolicyContext); this.streamManager = new ControllerStreamManager(this.metadataManager, this.requestSender, kafkaConfig); this.objectManager = new ControllerObjectManager(this.requestSender, this.metadataManager, kafkaConfig); - this.blockCache = new DefaultS3BlockCache(this.config.s3BlockCacheSize(), objectManager, s3Operator); + this.blockCache = new DefaultS3BlockCache(this.config.blockCacheSize(), objectManager, s3Operator); this.compactionManager = new CompactionManager(this.config, this.objectManager, this.streamManager, compactionS3Operator); - this.writeAheadLog = BlockWALService.builder(this.config.s3WALPath(), this.config.s3WALCapacity()).config(this.config).build(); + this.writeAheadLog = BlockWALService.builder(this.config.walPath(), this.config.walCapacity()).config(this.config).build(); this.storage = new S3Storage(this.config, writeAheadLog, streamManager, objectManager, blockCache, s3Operator); - // stream object compactions share the same s3Operator with wal object compactions + // stream object compactions share the same s3Operator with SST object compactions this.streamClient = new S3StreamClient(this.streamManager, this.storage, this.objectManager, compactionS3Operator, this.config, networkInboundLimiter, networkOutboundLimiter); this.kvClient = new ControllerKVClient(this.requestSender); } diff --git a/core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java b/core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java index da832c247e..964473d9b7 100644 --- a/core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java +++ b/core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java @@ -101,9 +101,9 @@ private void onImageChanged(MetadataDelta delta, MetadataImage newImage) { } } - public CompletableFuture> getWALObjects() { + public CompletableFuture> getSSTObjects() { synchronized (this) { - List s3ObjectMetadataList = this.streamsImage.getWALObjects(config.brokerId()).stream() + List s3ObjectMetadataList = this.streamsImage.getSSTObjects(config.brokerId()).stream() .map(object -> { S3Object s3Object = this.objectsImage.getObjectMetadata(object.objectId()); return new S3ObjectMetadata(object.objectId(), object.objectType(), diff --git a/core/src/main/scala/kafka/log/stream/s3/objects/ControllerObjectManager.java b/core/src/main/scala/kafka/log/stream/s3/objects/ControllerObjectManager.java index f8ae738992..909787532d 100644 --- a/core/src/main/scala/kafka/log/stream/s3/objects/ControllerObjectManager.java +++ b/core/src/main/scala/kafka/log/stream/s3/objects/ControllerObjectManager.java @@ -19,8 +19,8 @@ import com.automq.stream.s3.objects.CommitStreamObjectRequest; -import com.automq.stream.s3.objects.CommitWALObjectRequest; -import com.automq.stream.s3.objects.CommitWALObjectResponse; +import com.automq.stream.s3.objects.CommitSSTObjectRequest; +import com.automq.stream.s3.objects.CommitSSTObjectResponse; import com.automq.stream.s3.objects.ObjectManager; import kafka.log.stream.s3.metadata.StreamMetadataManager; import kafka.log.stream.s3.network.ControllerRequestSender; @@ -30,8 +30,8 @@ import kafka.server.KafkaConfig; import org.apache.kafka.common.message.CommitStreamObjectRequestData; import org.apache.kafka.common.message.CommitStreamObjectResponseData; -import org.apache.kafka.common.message.CommitWALObjectRequestData; -import org.apache.kafka.common.message.CommitWALObjectResponseData; +import org.apache.kafka.common.message.CommitSSTObjectRequestData; +import org.apache.kafka.common.message.CommitSSTObjectResponseData; import org.apache.kafka.common.message.PrepareS3ObjectRequestData; import org.apache.kafka.common.message.PrepareS3ObjectResponseData; import org.apache.kafka.common.protocol.ApiKeys; @@ -101,38 +101,38 @@ public Builder toRequestBuilder() { } @Override - public CompletableFuture commitWALObject(CommitWALObjectRequest commitWALObjectRequest) { - CommitWALObjectRequestData request = new CommitWALObjectRequestData() + public CompletableFuture commitSSTObject(CommitSSTObjectRequest commitSSTObjectRequest) { + CommitSSTObjectRequestData request = new CommitSSTObjectRequestData() .setNodeId(nodeId) .setNodeEpoch(nodeEpoch) - .setOrderId(commitWALObjectRequest.getOrderId()) - .setObjectId(commitWALObjectRequest.getObjectId()) - .setObjectSize(commitWALObjectRequest.getObjectSize()) - .setObjectStreamRanges(commitWALObjectRequest.getStreamRanges() + .setOrderId(commitSSTObjectRequest.getOrderId()) + .setObjectId(commitSSTObjectRequest.getObjectId()) + .setObjectSize(commitSSTObjectRequest.getObjectSize()) + .setObjectStreamRanges(commitSSTObjectRequest.getStreamRanges() .stream() .map(Convertor::toObjectStreamRangeInRequest).collect(Collectors.toList())) - .setStreamObjects(commitWALObjectRequest.getStreamObjects() + .setStreamObjects(commitSSTObjectRequest.getStreamObjects() .stream() .map(Convertor::toStreamObjectInRequest).collect(Collectors.toList())) - .setCompactedObjectIds(commitWALObjectRequest.getCompactedObjectIds()); + .setCompactedObjectIds(commitSSTObjectRequest.getCompactedObjectIds()); WrapRequest req = new WrapRequest() { @Override public ApiKeys apiKey() { - return ApiKeys.COMMIT_WALOBJECT; + return ApiKeys.COMMIT_SST_OBJECT; } @Override public Builder toRequestBuilder() { - return new org.apache.kafka.common.requests.s3.CommitWALObjectRequest.Builder(request); + return new org.apache.kafka.common.requests.s3.CommitSSTObjectRequest.Builder(request); } }; - CompletableFuture future = new CompletableFuture<>(); - RequestTask task = new RequestTask<>(req, future, response -> { - CommitWALObjectResponseData resp = response.data(); + CompletableFuture future = new CompletableFuture<>(); + RequestTask task = new RequestTask<>(req, future, response -> { + CommitSSTObjectResponseData resp = response.data(); Errors code = Errors.forCode(resp.errorCode()); switch (code) { case NONE: - return ResponseHandleResult.withSuccess(new CommitWALObjectResponse()); + return ResponseHandleResult.withSuccess(new CommitSSTObjectResponse()); case NODE_EPOCH_EXPIRED: case NODE_EPOCH_NOT_EXIST: LOGGER.error("Node epoch expired or not exist: {}, code: {}", request, Errors.forCode(resp.errorCode())); @@ -141,7 +141,7 @@ public Builder toRequestBuilder() { case COMPACTED_OBJECTS_NOT_FOUND: throw code.exception(); default: - LOGGER.error("Error while committing WAL object: {}, code: {}, retry later", request, code); + LOGGER.error("Error while committing SST object: {}, code: {}, retry later", request, code); return ResponseHandleResult.withRetry(); } }); @@ -207,7 +207,7 @@ public CompletableFuture> getObjects(long streamId, long @Override public CompletableFuture> getServerObjects() { try { - return this.metadataManager.getWALObjects(); + return this.metadataManager.getSSTObjects(); } catch (Exception e) { LOGGER.error("Error while get server objects", e); return CompletableFuture.completedFuture(Collections.emptyList()); diff --git a/core/src/main/scala/kafka/log/stream/s3/objects/Convertor.java b/core/src/main/scala/kafka/log/stream/s3/objects/Convertor.java index 3214bedf28..c785645800 100644 --- a/core/src/main/scala/kafka/log/stream/s3/objects/Convertor.java +++ b/core/src/main/scala/kafka/log/stream/s3/objects/Convertor.java @@ -19,12 +19,12 @@ import com.automq.stream.s3.objects.ObjectStreamRange; import com.automq.stream.s3.objects.StreamObject; -import org.apache.kafka.common.message.CommitWALObjectRequestData; +import org.apache.kafka.common.message.CommitSSTObjectRequestData; public class Convertor { - public static CommitWALObjectRequestData.StreamObject toStreamObjectInRequest(StreamObject s) { - return new CommitWALObjectRequestData.StreamObject() + public static CommitSSTObjectRequestData.StreamObject toStreamObjectInRequest(StreamObject s) { + return new CommitSSTObjectRequestData.StreamObject() .setStreamId(s.getStreamId()) .setObjectId(s.getObjectId()) .setObjectSize(s.getObjectSize()) @@ -32,8 +32,8 @@ public static CommitWALObjectRequestData.StreamObject toStreamObjectInRequest(St .setEndOffset(s.getEndOffset()); } - public static CommitWALObjectRequestData.ObjectStreamRange toObjectStreamRangeInRequest(ObjectStreamRange s) { - return new CommitWALObjectRequestData.ObjectStreamRange() + public static CommitSSTObjectRequestData.ObjectStreamRange toObjectStreamRangeInRequest(ObjectStreamRange s) { + return new CommitSSTObjectRequestData.ObjectStreamRange() .setStreamId(s.getStreamId()) .setStartOffset(s.getStartOffset()) .setEndOffset(s.getEndOffset()); diff --git a/core/src/main/scala/kafka/network/RequestConvertToJson.scala b/core/src/main/scala/kafka/network/RequestConvertToJson.scala index 661200433a..243d01b20f 100644 --- a/core/src/main/scala/kafka/network/RequestConvertToJson.scala +++ b/core/src/main/scala/kafka/network/RequestConvertToJson.scala @@ -103,7 +103,7 @@ object RequestConvertToJson { case req: CloseStreamsRequest => CloseStreamsRequestDataJsonConverter.write(req.data, request.version) case req: TrimStreamsRequest => TrimStreamsRequestDataJsonConverter.write(req.data, request.version) case req: PrepareS3ObjectRequest => PrepareS3ObjectRequestDataJsonConverter.write(req.data, request.version) - case req: CommitWALObjectRequest => CommitWALObjectRequestDataJsonConverter.write(req.data, request.version) + case req: CommitSSTObjectRequest => CommitSSTObjectRequestDataJsonConverter.write(req.data, request.version) case req: CommitStreamObjectRequest => CommitStreamObjectRequestDataJsonConverter.write(req.data, request.version) case req: GetOpeningStreamsRequest => GetOpeningStreamsRequestDataJsonConverter.write(req.data, request.version) case req: GetKVsRequest => GetKVsRequestDataJsonConverter.write(req.data, request.version) @@ -193,7 +193,7 @@ object RequestConvertToJson { case res: CloseStreamsResponse => CloseStreamsResponseDataJsonConverter.write(res.data, version) case res: TrimStreamsResponse => TrimStreamsResponseDataJsonConverter.write(res.data, version) case res: PrepareS3ObjectResponse => PrepareS3ObjectResponseDataJsonConverter.write(res.data, version) - case res: CommitWALObjectResponse => CommitWALObjectResponseDataJsonConverter.write(res.data, version) + case res: CommitSSTObjectResponse => CommitSSTObjectResponseDataJsonConverter.write(res.data, version) case res: CommitStreamObjectResponse => CommitStreamObjectResponseDataJsonConverter.write(res.data, version) case res: GetOpeningStreamsResponse => GetOpeningStreamsResponseDataJsonConverter.write(res.data, version) case res: GetKVsResponse => GetKVsResponseDataJsonConverter.write(res.data, version) diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala index a87d00e5c0..11ecc9cc71 100644 --- a/core/src/main/scala/kafka/server/ControllerApis.scala +++ b/core/src/main/scala/kafka/server/ControllerApis.scala @@ -42,7 +42,7 @@ import org.apache.kafka.common.message._ import org.apache.kafka.common.protocol.Errors._ import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors} import org.apache.kafka.common.requests._ -import org.apache.kafka.common.requests.s3.{CloseStreamsRequest, CloseStreamsResponse, CommitStreamObjectRequest, CommitStreamObjectResponse, CommitWALObjectRequest, CommitWALObjectResponse, CreateStreamsRequest, CreateStreamsResponse, DeleteKVsRequest, DeleteKVsResponse, DeleteStreamsRequest, DeleteStreamsResponse, GetKVsRequest, GetKVsResponse, GetNextNodeIdRequest, GetNextNodeIdResponse, GetOpeningStreamsRequest, GetOpeningStreamsResponse, OpenStreamsRequest, OpenStreamsResponse, PrepareS3ObjectRequest, PrepareS3ObjectResponse, PutKVsRequest, PutKVsResponse, TrimStreamsRequest, TrimStreamsResponse} +import org.apache.kafka.common.requests.s3.{CloseStreamsRequest, CloseStreamsResponse, CommitStreamObjectRequest, CommitStreamObjectResponse, CommitSSTObjectRequest, CommitSSTObjectResponse, CreateStreamsRequest, CreateStreamsResponse, DeleteKVsRequest, DeleteKVsResponse, DeleteStreamsRequest, DeleteStreamsResponse, GetKVsRequest, GetKVsResponse, GetNextNodeIdRequest, GetNextNodeIdResponse, GetOpeningStreamsRequest, GetOpeningStreamsResponse, OpenStreamsRequest, OpenStreamsResponse, PrepareS3ObjectRequest, PrepareS3ObjectResponse, PutKVsRequest, PutKVsResponse, TrimStreamsRequest, TrimStreamsResponse} import org.apache.kafka.common.resource.Resource.CLUSTER_NAME import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC} import org.apache.kafka.common.utils.Time @@ -117,7 +117,7 @@ class ControllerApis(val requestChannel: RequestChannel, case ApiKeys.DELETE_STREAMS => handleDeleteStream(request) case ApiKeys.TRIM_STREAMS => handleTrimStream(request) case ApiKeys.PREPARE_S3_OBJECT => handlePrepareS3Object(request) - case ApiKeys.COMMIT_WALOBJECT => handleCommitWALObject(request) + case ApiKeys.COMMIT_SST_OBJECT => handleCommitSSTObject(request) case ApiKeys.COMMIT_STREAM_OBJECT => handleCommitStreamObject(request) case ApiKeys.GET_OPENING_STREAMS => handleGetStreamsOffset(request) case ApiKeys.GET_KVS => handleGetKV(request) @@ -1003,17 +1003,17 @@ class ControllerApis(val requestChannel: RequestChannel, } } - def handleCommitWALObject(request: RequestChannel.Request): CompletableFuture[Unit] = { - val commitWALObjectRequest = request.body[CommitWALObjectRequest] + def handleCommitSSTObject(request: RequestChannel.Request): CompletableFuture[Unit] = { + val commitSSTObjectRequest = request.body[CommitSSTObjectRequest] val context = new ControllerRequestContext(request.context.header.data, request.context.principal, OptionalLong.empty()) - controller.commitWALObject(context, commitWALObjectRequest.data) + controller.commitSSTObject(context, commitSSTObjectRequest.data) .handle[Unit] { (result, exception) => if (exception != null) { requestHelper.handleError(request, exception) } else { requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { - new CommitWALObjectResponse(result.setThrottleTimeMs(requestThrottleMs)) + new CommitSSTObjectResponse(result.setThrottleTimeMs(requestThrottleMs)) }) } } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 87a968ad5a..30445d559d 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -311,15 +311,14 @@ object Defaults { val QuorumRetryBackoffMs = RaftConfig.DEFAULT_QUORUM_RETRY_BACKOFF_MS /** ********* Kafka on S3 Configuration *********/ - val S3WALObjectCompactionInterval: Int = 20 // 20min - val S3WALObjectCompactionCacheSize: Long = 200 * 1024 * 1024 // 200MB - val S3WALObjectCompactionUploadConcurrency: Int = 8 - val S3WALObjectCompactionStreamSplitSize: Long = 16 * 1024 * 1024 // 16MB - val S3WALObjectCompactionForceSplitPeriod: Int = 120 // 120min - val S3WALObjectCompactionMaxObjectNum: Int = 500 - val S3MaxStreamNumPerWALObject: Int = 10000 + val S3SSTCompactionInterval: Int = 20 // 20min + val S3SSTCompactionCacheSize: Long = 200 * 1024 * 1024 // 200MB + val S3SSTCompactionStreamSplitSize: Long = 16 * 1024 * 1024 // 16MB + val S3SSTCompactionForceSplitMinutes: Int = 120 // 120min + val S3SSTCompactionMaxObjectNum: Int = 500 + val S3MaxStreamNumPerSST: Int = 10000 val S3MaxStreamObjectNumPerCommit: Int = 10000 - val S3ObjectRetentionTimeInSecond: Long = 10 * 60 // 10min + val S3ObjectRetentionMinutes: Long = 10 // 10min val S3NetworkBaselineBandwidth: Long = 100 * 1024 * 1024 // 100MB/s val S3RefillPeriodMs: Int = 1000 // 1s } @@ -698,7 +697,7 @@ object KafkaConfig { val S3WALWindowIncrementProp = "s3.wal.window.increment" val S3WALWindowMaxProp = "s3.wal.window.max" val S3WALCacheSizeProp = "s3.wal.cache.size" - val S3WALObjectSizeProp = "s3.wal.object.size" + val S3WALUploadThresholdProp = "s3.wal.upload.threshold" val S3StreamSplitSizeProp = "s3.stream.object.split.size" val S3ObjectBlockSizeProp = "s3.object.block.size" val S3ObjectPartSizeProp = "s3.object.part.size" @@ -708,16 +707,15 @@ object KafkaConfig { val S3StreamObjectCompactionLivingTimeMinutesProp = "s3.stream.object.compaction.living.time.minutes" val S3ControllerRequestRetryMaxCountProp = "s3.controller.request.retry.max.count" val S3ControllerRequestRetryBaseDelayMsProp = "s3.controller.request.retry.base.delay.ms" - val S3WALObjectCompactionIntervalProp = "s3.wal.object.compaction.interval.minutes" - val S3WALObjectCompactionCacheSizeProp = "s3.wal.object.compaction.cache.size" - val S3WALObjectCompactionUploadConcurrencyProp = "s3.wal.object.compaction.upload.concurrency" - val S3WALObjectCompactionStreamSplitSizeProp = "s3.wal.object.compaction.stream.split.size" - val S3WALObjectCompactionForceSplitPeriodProp = "s3.wal.object.compaction.force.split.time" - val S3WALObjectCompactionMaxObjectNumProp = "s3.wal.object.compaction.max.num" - val S3MaxStreamNumPerWALObjectProp = "s3.max.stream.num.per.wal.object" + val S3SSTCompactionIntervalProp = "s3.sst.compaction.interval.minutes" + val S3SSTCompactionCacheSizeProp = "s3.sst.compaction.cache.size" + val S3SSTCompactionStreamSplitSizeProp = "s3.sst.compaction.stream.split.size" + val S3SSTCompactionForceSplitMinutesProp = "s3.sst.compaction.force.split.minutes" + val S3SSTCompactionMaxObjectNumProp = "s3.sst.compaction.max.num" + val S3MaxStreamNumPerSSTProp = "s3.max.stream.num.per.sst" val S3MaxStreamObjectNumPerCommit = "s3.max.stream.object.num.per.commit" val S3MockEnableProp = "s3.mock.enable" - val S3ObjectRetentionTimeInSecondProp = "s3.object.retention.time.in.second" + val S3ObjectRetentionMinutes = "s3.object.retention.minutes" val S3ObjectLogEnableProp = "s3.object.log.enable" val S3NetworkBaselineBandwidthProp = "s3.network.baseline.bandwidth" val S3RefillPeriodMsProp = "s3.network.refill.period.ms" @@ -734,9 +732,9 @@ object KafkaConfig { val S3WALWindowIncrementDoc = "The increment of S3 WAL window size in bytes." val S3WALWindowMaxDoc = "The max S3 WAL window size in bytes." val S3WALCacheSizeDoc = "The S3 storage max WAL cache size. When WAL cache is full, storage will hang the request, \n" + - "until WAL cache is free by S3 WAL object upload." - val S3WALObjectSizeDoc = "The S3 WAL object size threshold." - val S3StreamSplitSizeDoc = "The S3 stream object split size threshold when upload WAL object or compact object." + "until WAL cache is free by S3 WAL upload." + val S3WALUploadThresholdDoc = "The S3 WAL trigger upload (bytes) threshold." + val S3StreamSplitSizeDoc = "The S3 stream object split size threshold when upload delta WAL or compact SST object." val S3ObjectBlockSizeDoc = "The S3 object compressed block size threshold." val S3ObjectPartSizeDoc = "The S3 object multi-part upload part size threshold." val S3BlockCacheSizeDoc = "The S3 block cache size in MiB." @@ -745,13 +743,12 @@ object KafkaConfig { val S3StreamObjectCompactionLivingTimeMinutesDoc = "The S3 stream object compaction living time threshold in minutes." val S3ControllerRequestRetryMaxCountDoc = "The S3 controller request retry max count." val S3ControllerRequestRetryBaseDelayMsDoc = "The S3 controller request retry base delay in milliseconds." - val S3WALObjectCompactionIntervalDoc = "The execution interval of S3 object compaction in minutes." - val S3WALObjectCompactionCacheSizeDoc = "The S3 object compaction cache size in Bytes." - val S3WALObjectCompactionUploadConcurrencyDoc = "The S3 object compaction upload concurrency." - val S3WALObjectCompactionStreamSplitSizeDoc = "The S3 object compaction stream split size threshold in Bytes." - val S3WALObjectCompactionForceSplitPeriodDoc = "The S3 object compaction force split period in minutes." - val S3WALObjectCompactionMaxObjectNumDoc = "The maximum num of WAL objects to be compact at one time" - val S3MaxStreamNumPerWALObjectDoc = "The maximum number of streams allowed in single WAL object" + val S3SSTCompactionIntervalDoc = "The execution interval of SST object compaction in minutes." + val S3SSTCompactionCacheSizeDoc = "The SST object compaction cache size in Bytes." + val S3SSTCompactionStreamSplitSizeDoc = "The SST compaction stream split size threshold in Bytes." + val S3SSTCompactionForceSplitMinutesDoc = "The SST compaction force split period in minutes." + val S3SSTCompactionMaxObjectNumDoc = "The maximum num of SST objects to be compact at one time" + val S3MaxStreamNumPerSSTDoc = "The maximum number of streams allowed in single SST object" val S3MaxStreamObjectNumPerCommitDoc = "The maximum number of stream objects in single commit request" val S3MockEnableDoc = "The S3 mock enable flag, replace all S3 related module with memory-mocked implement." val S3ObjectRetentionTimeInSecondDoc = "The S3 object retention time in second, default is 10 minutes (600s)." @@ -1569,7 +1566,7 @@ object KafkaConfig { .define(S3WALWindowInitialProp, LONG, 1048576L, MEDIUM, S3WALWindowInitialDoc) .define(S3WALWindowIncrementProp, LONG, 4194304L, MEDIUM, S3WALWindowIncrementDoc) .define(S3WALWindowMaxProp, LONG, 536870912L, MEDIUM, S3WALWindowMaxDoc) - .define(S3WALObjectSizeProp, LONG, 104857600L, MEDIUM, S3WALObjectSizeDoc) + .define(S3WALUploadThresholdProp, LONG, 104857600L, MEDIUM, S3WALUploadThresholdDoc) .define(S3StreamSplitSizeProp, INT, 16777216, MEDIUM, S3StreamSplitSizeDoc) .define(S3ObjectBlockSizeProp, INT, 8388608, MEDIUM, S3ObjectBlockSizeDoc) .define(S3ObjectPartSizeProp, INT, 16777216, MEDIUM, S3ObjectPartSizeDoc) @@ -1579,16 +1576,15 @@ object KafkaConfig { .define(S3StreamObjectCompactionLivingTimeMinutesProp, INT, 60, MEDIUM, S3StreamObjectCompactionLivingTimeMinutesDoc) .define(S3ControllerRequestRetryMaxCountProp, INT, Integer.MAX_VALUE, MEDIUM, S3ControllerRequestRetryMaxCountDoc) .define(S3ControllerRequestRetryBaseDelayMsProp, LONG, 500, MEDIUM, S3ControllerRequestRetryBaseDelayMsDoc) - .define(S3WALObjectCompactionIntervalProp, INT, Defaults.S3WALObjectCompactionInterval, MEDIUM, S3WALObjectCompactionIntervalDoc) - .define(S3WALObjectCompactionCacheSizeProp, LONG, Defaults.S3WALObjectCompactionCacheSize, MEDIUM, S3WALObjectCompactionCacheSizeDoc) - .define(S3WALObjectCompactionUploadConcurrencyProp, INT, Defaults.S3WALObjectCompactionUploadConcurrency, MEDIUM, S3WALObjectCompactionUploadConcurrencyDoc) - .define(S3WALObjectCompactionStreamSplitSizeProp, LONG, Defaults.S3WALObjectCompactionStreamSplitSize, MEDIUM, S3WALObjectCompactionStreamSplitSizeDoc) - .define(S3WALObjectCompactionForceSplitPeriodProp, INT, Defaults.S3WALObjectCompactionForceSplitPeriod, MEDIUM, S3WALObjectCompactionForceSplitPeriodDoc) - .define(S3WALObjectCompactionMaxObjectNumProp, INT, Defaults.S3WALObjectCompactionMaxObjectNum, MEDIUM, S3WALObjectCompactionMaxObjectNumDoc) - .define(S3MaxStreamNumPerWALObjectProp, INT, Defaults.S3MaxStreamNumPerWALObject, MEDIUM, S3MaxStreamNumPerWALObjectDoc) + .define(S3SSTCompactionIntervalProp, INT, Defaults.S3SSTCompactionInterval, MEDIUM, S3SSTCompactionIntervalDoc) + .define(S3SSTCompactionCacheSizeProp, LONG, Defaults.S3SSTCompactionCacheSize, MEDIUM, S3SSTCompactionCacheSizeDoc) + .define(S3SSTCompactionStreamSplitSizeProp, LONG, Defaults.S3SSTCompactionStreamSplitSize, MEDIUM, S3SSTCompactionStreamSplitSizeDoc) + .define(S3SSTCompactionForceSplitMinutesProp, INT, Defaults.S3SSTCompactionForceSplitMinutes, MEDIUM, S3SSTCompactionForceSplitMinutesDoc) + .define(S3SSTCompactionMaxObjectNumProp, INT, Defaults.S3SSTCompactionMaxObjectNum, MEDIUM, S3SSTCompactionMaxObjectNumDoc) + .define(S3MaxStreamNumPerSSTProp, INT, Defaults.S3MaxStreamNumPerSST, MEDIUM, S3MaxStreamNumPerSSTDoc) .define(S3MaxStreamObjectNumPerCommit, INT, Defaults.S3MaxStreamObjectNumPerCommit, MEDIUM, S3MaxStreamObjectNumPerCommitDoc) .define(S3MockEnableProp, BOOLEAN, false, LOW, S3MockEnableDoc) - .define(S3ObjectRetentionTimeInSecondProp, LONG, Defaults.S3ObjectRetentionTimeInSecond, MEDIUM, S3ObjectRetentionTimeInSecondDoc) + .define(S3ObjectRetentionMinutes, LONG, Defaults.S3ObjectRetentionMinutes, MEDIUM, S3ObjectRetentionTimeInSecondDoc) .define(S3ObjectLogEnableProp, BOOLEAN, false, LOW, S3ObjectLogEnableDoc) .define(S3NetworkBaselineBandwidthProp, LONG, Defaults.S3NetworkBaselineBandwidth, MEDIUM, S3NetworkBaselineBandwidthDoc) .define(S3RefillPeriodMsProp, INT, Defaults.S3RefillPeriodMs, MEDIUM, S3RefillPeriodMsDoc) @@ -2138,7 +2134,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val s3WALWindowInitial = getLong(KafkaConfig.S3WALWindowInitialProp) val s3WALWindowIncrement = getLong(KafkaConfig.S3WALWindowIncrementProp) val s3WALWindowMax = getLong(KafkaConfig.S3WALWindowMaxProp) - val s3WALObjectSize = getLong(KafkaConfig.S3WALObjectSizeProp) + val s3WALUploadThreshold = getLong(KafkaConfig.S3WALUploadThresholdProp) val s3StreamSplitSize = getInt(KafkaConfig.S3StreamSplitSizeProp) val s3ObjectBlockSize = getInt(KafkaConfig.S3ObjectBlockSizeProp) val s3ObjectPartSize = getInt(KafkaConfig.S3ObjectPartSizeProp) @@ -2150,16 +2146,15 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val s3ControllerRequestRetryBaseDelayMs = getLong(KafkaConfig.S3ControllerRequestRetryBaseDelayMsProp) // TODO: ensure incremental epoch => Store epoch in disk, if timestamp flip back, we could use disk epoch to keep the incremental epoch. val brokerEpoch = System.currentTimeMillis() - val s3WALObjectCompactionInterval = getInt(KafkaConfig.S3WALObjectCompactionIntervalProp) - val s3WALObjectCompactionCacheSize = getLong(KafkaConfig.S3WALObjectCompactionCacheSizeProp) - val s3WALObjectCompactionUploadConcurrency = getInt(KafkaConfig.S3WALObjectCompactionUploadConcurrencyProp) - val s3WALObjectCompactionStreamSplitSize = getLong(KafkaConfig.S3WALObjectCompactionStreamSplitSizeProp) - val s3WALObjectCompactionForceSplitPeriod = getInt(KafkaConfig.S3WALObjectCompactionForceSplitPeriodProp) - val s3WALObjectCompactionMaxObjectNum = getInt(KafkaConfig.S3WALObjectCompactionMaxObjectNumProp) - val s3MaxStreamNumPerWALObject = getInt(KafkaConfig.S3MaxStreamNumPerWALObjectProp) + val s3SSTCompactionInterval = getInt(KafkaConfig.S3SSTCompactionIntervalProp) + val s3SSTCompactionCacheSize = getLong(KafkaConfig.S3SSTCompactionCacheSizeProp) + val s3SSTCompactionStreamSplitSize = getLong(KafkaConfig.S3SSTCompactionStreamSplitSizeProp) + val s3SSTCompactionForceSplitMinutes = getInt(KafkaConfig.S3SSTCompactionForceSplitMinutesProp) + val s3SSTCompactionMaxObjectNum = getInt(KafkaConfig.S3SSTCompactionMaxObjectNumProp) + val s3MaxStreamNumPerSST = getInt(KafkaConfig.S3MaxStreamNumPerSSTProp) val s3MaxStreamObjectNumPerCommit = getInt(KafkaConfig.S3MaxStreamObjectNumPerCommit) val s3MockEnable = getBoolean(KafkaConfig.S3MockEnableProp) - val s3ObjectRetentionTimeInSecond = getLong(KafkaConfig.S3ObjectRetentionTimeInSecondProp) + val s3ObjectRetentionTimeInSecond = getLong(KafkaConfig.S3ObjectRetentionMinutes) * 60 val s3ObjectLogEnable = getBoolean(KafkaConfig.S3ObjectLogEnableProp) val s3NetworkBaselineBandwidthProp = getLong(KafkaConfig.S3NetworkBaselineBandwidthProp) val s3RefillPeriodMsProp = getInt(KafkaConfig.S3RefillPeriodMsProp) diff --git a/core/src/test/java/kafka/log/stream/s3/StreamMetadataManagerTest.java b/core/src/test/java/kafka/log/stream/s3/StreamMetadataManagerTest.java index 0d320baab3..04125a2b7a 100644 --- a/core/src/test/java/kafka/log/stream/s3/StreamMetadataManagerTest.java +++ b/core/src/test/java/kafka/log/stream/s3/StreamMetadataManagerTest.java @@ -23,7 +23,7 @@ import kafka.server.KafkaConfig; import kafka.server.metadata.BrokerMetadataListener; import kafka.server.metadata.KRaftMetadataCache; -import org.apache.kafka.image.NodeS3WALMetadataImage; +import org.apache.kafka.image.NodeS3SSTMetadataImage; import org.apache.kafka.image.MetadataImage; import org.apache.kafka.image.MetadataProvenance; import org.apache.kafka.image.S3ObjectsImage; @@ -35,7 +35,7 @@ import org.apache.kafka.metadata.stream.S3ObjectState; import com.automq.stream.s3.metadata.S3StreamConstant; import org.apache.kafka.metadata.stream.S3StreamObject; -import org.apache.kafka.metadata.stream.S3WALObject; +import org.apache.kafka.metadata.stream.S3SSTObject; import com.automq.stream.s3.metadata.StreamOffsetRange; import com.automq.stream.s3.metadata.StreamState; import org.junit.jupiter.api.BeforeEach; @@ -106,10 +106,10 @@ public void setUp() { 0L, new S3StreamObject(0L, STREAM0, 10L, 100L, S3StreamConstant.INVALID_TS)); S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 1L, StreamState.OPENED, 0, 10L, ranges, streamObjects); - NodeS3WALMetadataImage walMetadataImage0 = new NodeS3WALMetadataImage(BROKER0, S3StreamConstant.INVALID_BROKER_EPOCH, Map.of( - 1L, new S3WALObject(1L, BROKER0, Map.of( + NodeS3SSTMetadataImage walMetadataImage0 = new NodeS3SSTMetadataImage(BROKER0, S3StreamConstant.INVALID_BROKER_EPOCH, Map.of( + 1L, new S3SSTObject(1L, BROKER0, Map.of( STREAM1, new StreamOffsetRange(STREAM1, 0L, 100L)), 1L), - 2L, new S3WALObject(2L, BROKER0, Map.of( + 2L, new S3SSTObject(2L, BROKER0, Map.of( STREAM2, new StreamOffsetRange(STREAM2, 0L, 100L)), 2L))); S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage), @@ -122,7 +122,7 @@ STREAM1, new StreamOffsetRange(STREAM1, 0L, 100L)), 1L), streamObjects.put(1L, new S3StreamObject(1L, STREAM0, 100L, 150L, S3StreamConstant.INVALID_TS)); streamImage = new S3StreamMetadataImage(STREAM0, 2L, StreamState.OPENED, 1, 10L, ranges, streamObjects); streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage), - Map.of(BROKER0, NodeS3WALMetadataImage.EMPTY)); + Map.of(BROKER0, NodeS3SSTMetadataImage.EMPTY)); image1 = new MetadataImage(new MetadataProvenance(1, 1, 1), null, null, null, null, null, null, null, streamsImage, objectsImage, null); ranges = new HashMap<>(ranges); @@ -131,7 +131,7 @@ STREAM1, new StreamOffsetRange(STREAM1, 0L, 100L)), 1L), streamObjects.put(2L, new S3StreamObject(2L, STREAM0, 150L, 200L, S3StreamConstant.INVALID_TS)); streamImage = new S3StreamMetadataImage(STREAM0, 3L, StreamState.OPENED, 2, 10L, ranges, streamObjects); streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage), - Map.of(BROKER0, NodeS3WALMetadataImage.EMPTY)); + Map.of(BROKER0, NodeS3SSTMetadataImage.EMPTY)); image2 = new MetadataImage(new MetadataProvenance(2, 2, 2), null, null, null, null, null, null, null, streamsImage, objectsImage, null); } diff --git a/core/src/test/java/kafka/test/MockController.java b/core/src/test/java/kafka/test/MockController.java index f7320b3baa..f4a2077654 100644 --- a/core/src/test/java/kafka/test/MockController.java +++ b/core/src/test/java/kafka/test/MockController.java @@ -35,8 +35,8 @@ import org.apache.kafka.common.message.CloseStreamsResponseData; import org.apache.kafka.common.message.CommitStreamObjectRequestData; import org.apache.kafka.common.message.CommitStreamObjectResponseData; -import org.apache.kafka.common.message.CommitWALObjectRequestData; -import org.apache.kafka.common.message.CommitWALObjectResponseData; +import org.apache.kafka.common.message.CommitSSTObjectRequestData; +import org.apache.kafka.common.message.CommitSSTObjectResponseData; import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic; import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult; import org.apache.kafka.common.message.CreateStreamsRequestData; @@ -530,7 +530,7 @@ public CompletableFuture prepareObject(ControllerRe } @Override - public CompletableFuture commitWALObject(ControllerRequestContext context, CommitWALObjectRequestData request) { + public CompletableFuture commitSSTObject(ControllerRequestContext context, CommitSSTObjectRequestData request) { throw new UnsupportedOperationException(); } diff --git a/docker/scripts/start.sh b/docker/scripts/start.sh index ca48bcc3a7..528fcc6130 100644 --- a/docker/scripts/start.sh +++ b/docker/scripts/start.sh @@ -185,7 +185,7 @@ add_settings_for_s3() { if [[ "${role}" == "broker" || "${role}" == "server" ]]; then add_or_setup_value "s3.wal.capacity" "4294967296" "${file_name}" add_or_setup_value "s3.wal.cache.size" "1073741824" "${file_name}" - add_or_setup_value "s3.wal.object.size" "536870912" "${file_name}" + add_or_setup_value "s3.wal.upload.threshold" "536870912" "${file_name}" add_or_setup_value "s3.stream.object.split.size" "16777216" "${file_name}" add_or_setup_value "s3.object.block.size" "16777216" "${file_name}" add_or_setup_value "s3.object.part.size" "33554432" "${file_name}" diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 5205987cf3..f13a656bfc 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -128,7 +128,7 @@ versions += [ zookeeper: "3.6.3", zstd: "1.5.2-1", commonLang: "3.12.0", - s3stream: "0.1.21-SNAPSHOT", + s3stream: "0.2.0-SNAPSHOT", ] libs += [ activation: "javax.activation:activation:$versions.activation", diff --git a/metadata/src/main/java/org/apache/kafka/controller/Controller.java b/metadata/src/main/java/org/apache/kafka/controller/Controller.java index 444e4c3ac4..3643d11b2c 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/Controller.java +++ b/metadata/src/main/java/org/apache/kafka/controller/Controller.java @@ -32,8 +32,8 @@ import org.apache.kafka.common.message.CloseStreamsResponseData; import org.apache.kafka.common.message.CommitStreamObjectRequestData; import org.apache.kafka.common.message.CommitStreamObjectResponseData; -import org.apache.kafka.common.message.CommitWALObjectRequestData; -import org.apache.kafka.common.message.CommitWALObjectResponseData; +import org.apache.kafka.common.message.CommitSSTObjectRequestData; +import org.apache.kafka.common.message.CommitSSTObjectResponseData; import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic; import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult; import org.apache.kafka.common.message.CreateStreamsRequestData; @@ -458,11 +458,11 @@ CompletableFuture prepareObject( ); /** - * Broker trys to commit a WAL object. + * Broker trys to commit a SST object. */ - CompletableFuture commitWALObject( + CompletableFuture commitSSTObject( ControllerRequestContext context, - CommitWALObjectRequestData request + CommitSSTObjectRequestData request ); /** diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java index c7e39541de..02c252e98b 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java @@ -153,8 +153,8 @@ void replay(ApiMessage message) { case REMOVE_RANGE_RECORD: case S3_STREAM_OBJECT_RECORD: case REMOVE_S3_STREAM_OBJECT_RECORD: - case WALOBJECT_RECORD: - case REMOVE_WALOBJECT_RECORD: + case S3_SSTOBJECT_RECORD: + case REMOVE_SSTOBJECT_RECORD: case S3_OBJECT_RECORD: case REMOVE_S3_OBJECT_RECORD: case ASSIGNED_STREAM_ID_RECORD: diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 8c75a40699..262102b5e8 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -47,8 +47,8 @@ import org.apache.kafka.common.message.CloseStreamsResponseData.CloseStreamResponse; import org.apache.kafka.common.message.CommitStreamObjectRequestData; import org.apache.kafka.common.message.CommitStreamObjectResponseData; -import org.apache.kafka.common.message.CommitWALObjectRequestData; -import org.apache.kafka.common.message.CommitWALObjectResponseData; +import org.apache.kafka.common.message.CommitSSTObjectRequestData; +import org.apache.kafka.common.message.CommitSSTObjectResponseData; import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic; import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult; import org.apache.kafka.common.message.CreateStreamsRequestData; @@ -110,14 +110,14 @@ import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord; import org.apache.kafka.common.metadata.RemoveS3StreamRecord; import org.apache.kafka.common.metadata.RemoveTopicRecord; -import org.apache.kafka.common.metadata.RemoveWALObjectRecord; +import org.apache.kafka.common.metadata.RemoveSSTObjectRecord; import org.apache.kafka.common.metadata.S3ObjectRecord; import org.apache.kafka.common.metadata.S3StreamObjectRecord; import org.apache.kafka.common.metadata.S3StreamRecord; import org.apache.kafka.common.metadata.TopicRecord; import org.apache.kafka.common.metadata.UnfenceBrokerRecord; import org.apache.kafka.common.metadata.UnregisterBrokerRecord; -import org.apache.kafka.common.metadata.WALObjectRecord; +import org.apache.kafka.common.metadata.S3SSTObjectRecord; import org.apache.kafka.common.metadata.ZkMigrationStateRecord; import org.apache.kafka.common.metadata.UpdateNextNodeIdRecord; import org.apache.kafka.common.protocol.ApiMessage; @@ -1519,11 +1519,11 @@ private void replay(ApiMessage message, Optional snapshotId, lon case REMOVE_S3_STREAM_OBJECT_RECORD: streamControlManager.replay((RemoveS3StreamObjectRecord) message); break; - case WALOBJECT_RECORD: - streamControlManager.replay((WALObjectRecord) message); + case S3_SSTOBJECT_RECORD: + streamControlManager.replay((S3SSTObjectRecord) message); break; - case REMOVE_WALOBJECT_RECORD: - streamControlManager.replay((RemoveWALObjectRecord) message); + case REMOVE_SSTOBJECT_RECORD: + streamControlManager.replay((RemoveSSTObjectRecord) message); break; case S3_OBJECT_RECORD: s3ObjectControlManager.replay((S3ObjectRecord) message); @@ -2394,9 +2394,9 @@ public CompletableFuture prepareObject(ControllerRe } @Override - public CompletableFuture commitWALObject(ControllerRequestContext context, CommitWALObjectRequestData request) { - return appendWriteEvent("commitWALObject", context.deadlineNs(), - () -> streamControlManager.commitWALObject(request)); + public CompletableFuture commitSSTObject(ControllerRequestContext context, CommitSSTObjectRequestData request) { + return appendWriteEvent("commitSSTObject", context.deadlineNs(), + () -> streamControlManager.commitSSTObject(request)); } @Override diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java index d6248512e0..d91b350491 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java @@ -172,7 +172,7 @@ public ControllerResult commitObject(long objectId, long objectSize, lon } S3Object object = this.objectsMetadata.get(objectId); if (object == null) { - log.error("object {} not exist when commit wal object", objectId); + log.error("object {} not exist when commit SST object", objectId); return ControllerResult.of(Collections.emptyList(), Errors.OBJECT_NOT_EXIST); } // verify the state diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index 8aa507bb90..1f03da5a65 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -25,10 +25,10 @@ import org.apache.kafka.common.message.CloseStreamsResponseData.CloseStreamResponse; import org.apache.kafka.common.message.CommitStreamObjectRequestData; import org.apache.kafka.common.message.CommitStreamObjectResponseData; -import org.apache.kafka.common.message.CommitWALObjectRequestData; -import org.apache.kafka.common.message.CommitWALObjectRequestData.ObjectStreamRange; -import org.apache.kafka.common.message.CommitWALObjectRequestData.StreamObject; -import org.apache.kafka.common.message.CommitWALObjectResponseData; +import org.apache.kafka.common.message.CommitSSTObjectRequestData; +import org.apache.kafka.common.message.CommitSSTObjectRequestData.ObjectStreamRange; +import org.apache.kafka.common.message.CommitSSTObjectRequestData.StreamObject; +import org.apache.kafka.common.message.CommitSSTObjectResponseData; import org.apache.kafka.common.message.CreateStreamsRequestData.CreateStreamRequest; import org.apache.kafka.common.message.CreateStreamsResponseData.CreateStreamResponse; import org.apache.kafka.common.message.DeleteStreamsRequestData.DeleteStreamRequest; @@ -47,18 +47,18 @@ import org.apache.kafka.common.metadata.RemoveRangeRecord; import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord; import org.apache.kafka.common.metadata.RemoveS3StreamRecord; -import org.apache.kafka.common.metadata.RemoveWALObjectRecord; +import org.apache.kafka.common.metadata.RemoveSSTObjectRecord; import org.apache.kafka.common.metadata.S3StreamObjectRecord; import org.apache.kafka.common.metadata.S3StreamRecord; -import org.apache.kafka.common.metadata.WALObjectRecord; -import org.apache.kafka.common.metadata.WALObjectRecord.StreamIndex; +import org.apache.kafka.common.metadata.S3SSTObjectRecord; +import org.apache.kafka.common.metadata.S3SSTObjectRecord.StreamIndex; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.controller.ControllerResult; import org.apache.kafka.metadata.stream.Convertor; import org.apache.kafka.metadata.stream.RangeMetadata; import org.apache.kafka.metadata.stream.S3StreamObject; -import org.apache.kafka.metadata.stream.S3WALObject; +import org.apache.kafka.metadata.stream.S3SSTObject; import com.automq.stream.s3.metadata.StreamState; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.timeline.SnapshotRegistry; @@ -153,17 +153,17 @@ public String toString() { } } - public static class NodeS3WALMetadata { + public static class NodeS3SSTMetadata { private final int nodeId; private final TimelineLong nodeEpoch; - private final TimelineHashMap walObjects; + private final TimelineHashMap sstObjects; - public NodeS3WALMetadata(int nodeId, long nodeEpoch, SnapshotRegistry registry) { + public NodeS3SSTMetadata(int nodeId, long nodeEpoch, SnapshotRegistry registry) { this.nodeId = nodeId; this.nodeEpoch = new TimelineLong(registry); this.nodeEpoch.set(nodeEpoch); - this.walObjects = new TimelineHashMap<>(registry, 0); + this.sstObjects = new TimelineHashMap<>(registry, 0); } public int getNodeId() { @@ -174,16 +174,16 @@ public long getNodeEpoch() { return nodeEpoch.get(); } - public TimelineHashMap walObjects() { - return walObjects; + public TimelineHashMap sstObjects() { + return sstObjects; } @Override public String toString() { - return "NodeS3WALMetadata{" + + return "NodeS3SSTMetadata{" + "nodeId=" + nodeId + ", nodeEpoch=" + nodeEpoch + - ", walObjects=" + walObjects + + ", sstObjects=" + sstObjects + '}'; } } @@ -201,7 +201,7 @@ public String toString() { private final TimelineHashMap streamsMetadata; - private final TimelineHashMap nodesMetadata; + private final TimelineHashMap nodesMetadata; public StreamControlManager( SnapshotRegistry snapshotRegistry, @@ -551,40 +551,40 @@ public ControllerResult trimStream(int nodeId, long nodeEpoc if (resp.errorCode() != Errors.NONE.code()) { return ControllerResult.of(Collections.emptyList(), resp); } - // remove wal object or remove stream range in wal object + // remove SST object or remove stream range in SST object // TODO: optimize this.nodesMetadata.values() .stream() - .flatMap(entry -> entry.walObjects.values().stream()) - .filter(walObject -> walObject.offsetRanges().containsKey(streamId)) - .filter(walObject -> walObject.offsetRanges().get(streamId).getEndOffset() <= newStartOffset) - .forEach(walObj -> { - if (walObj.offsetRanges().size() == 1) { - // only this range, but we will remove this range, so now we can remove this wal object + .flatMap(entry -> entry.sstObjects.values().stream()) + .filter(sstObject -> sstObject.offsetRanges().containsKey(streamId)) + .filter(sstObject -> sstObject.offsetRanges().get(streamId).getEndOffset() <= newStartOffset) + .forEach(sstObj -> { + if (sstObj.offsetRanges().size() == 1) { + // only this range, but we will remove this range, so now we can remove this SST object records.add(new ApiMessageAndVersion( - new RemoveWALObjectRecord() - .setNodeId(walObj.nodeId()) - .setObjectId(walObj.objectId()), (short) 0 + new RemoveSSTObjectRecord() + .setNodeId(sstObj.nodeId()) + .setObjectId(sstObj.objectId()), (short) 0 )); ControllerResult markDestroyResult = this.s3ObjectControlManager.markDestroyObjects( - List.of(walObj.objectId())); + List.of(sstObj.objectId())); if (!markDestroyResult.response()) { - log.error("[TrimStream] Mark destroy wal object: {} failed", walObj.objectId()); + log.error("[TrimStream] Mark destroy SST object: {} failed", sstObj.objectId()); resp.setErrorCode(Errors.STREAM_INNER_ERROR.code()); return; } records.addAll(markDestroyResult.records()); return; } - Map newOffsetRange = new HashMap<>(walObj.offsetRanges()); + Map newOffsetRange = new HashMap<>(sstObj.offsetRanges()); // remove offset range newOffsetRange.remove(streamId); - records.add(new ApiMessageAndVersion(new WALObjectRecord() - .setObjectId(walObj.objectId()) - .setNodeId(walObj.nodeId()) + records.add(new ApiMessageAndVersion(new S3SSTObjectRecord() + .setObjectId(sstObj.objectId()) + .setNodeId(sstObj.nodeId()) .setStreamsIndex(newOffsetRange.values().stream().map(Convertor::to).collect(Collectors.toList())) - .setDataTimeInMs(walObj.dataTimeInMs()) - .setOrderId(walObj.orderId()), (short) 0)); + .setDataTimeInMs(sstObj.dataTimeInMs()) + .setOrderId(sstObj.orderId()), (short) 0)); }); if (resp.errorCode() != Errors.NONE.code()) { return ControllerResult.of(Collections.emptyList(), resp); @@ -629,38 +629,38 @@ public ControllerResult deleteStream(int nodeId, long node return ControllerResult.of(Collections.emptyList(), resp); } records.addAll(markDestroyResult.records()); - // remove wal object or remove stream-offset-range in wal object + // remove SST object or remove stream-offset-range in SST object this.nodesMetadata.values() .stream() - .flatMap(entry -> entry.walObjects.values().stream()) - .filter(walObject -> walObject.offsetRanges().containsKey(streamId)) - .forEach(walObj -> { - if (walObj.offsetRanges().size() == 1) { - // only this range, but we will remove this range, so now we can remove this wal object + .flatMap(entry -> entry.sstObjects.values().stream()) + .filter(sstObject -> sstObject.offsetRanges().containsKey(streamId)) + .forEach(sstObj -> { + if (sstObj.offsetRanges().size() == 1) { + // only this range, but we will remove this range, so now we can remove this SST object records.add(new ApiMessageAndVersion( - new RemoveWALObjectRecord() - .setNodeId(walObj.nodeId()) - .setObjectId(walObj.objectId()), (short) 0 + new RemoveSSTObjectRecord() + .setNodeId(sstObj.nodeId()) + .setObjectId(sstObj.objectId()), (short) 0 )); ControllerResult result = this.s3ObjectControlManager.markDestroyObjects( - List.of(walObj.objectId())); + List.of(sstObj.objectId())); if (!result.response()) { - log.error("[DeleteStream]: Mark destroy wal object: {} failed", walObj.objectId()); + log.error("[DeleteStream]: Mark destroy SST object: {} failed", sstObj.objectId()); resp.setErrorCode(Errors.STREAM_INNER_ERROR.code()); return; } records.addAll(result.records()); return; } - Map newOffsetRange = new HashMap<>(walObj.offsetRanges()); + Map newOffsetRange = new HashMap<>(sstObj.offsetRanges()); // remove offset range newOffsetRange.remove(streamId); - records.add(new ApiMessageAndVersion(new WALObjectRecord() - .setObjectId(walObj.objectId()) - .setNodeId(walObj.nodeId()) + records.add(new ApiMessageAndVersion(new S3SSTObjectRecord() + .setObjectId(sstObj.objectId()) + .setNodeId(sstObj.nodeId()) .setStreamsIndex(newOffsetRange.values().stream().map(Convertor::to).collect(Collectors.toList())) - .setDataTimeInMs(walObj.dataTimeInMs()) - .setOrderId(walObj.orderId()), (short) 0)); + .setDataTimeInMs(sstObj.dataTimeInMs()) + .setOrderId(sstObj.orderId()), (short) 0)); }); if (resp.errorCode() != Errors.NONE.code()) { return ControllerResult.of(Collections.emptyList(), resp); @@ -670,14 +670,14 @@ public ControllerResult deleteStream(int nodeId, long node } /** - * Commit wal object. + * Commit SST object. *

* Response Errors Enum: *

    *
  • * OBJECT_NOT_EXIST *
      - *
    1. wal object not exist when commit
    2. + *
    3. SST object not exist when commit
    4. *
    5. stream object not exist when commit
    6. *
    *
  • @@ -690,8 +690,8 @@ public ControllerResult deleteStream(int nodeId, long node *
*/ @SuppressWarnings("all") - public ControllerResult commitWALObject(CommitWALObjectRequestData data) { - CommitWALObjectResponseData resp = new CommitWALObjectResponseData(); + public ControllerResult commitSSTObject(CommitSSTObjectRequestData data) { + CommitSSTObjectResponseData resp = new CommitSSTObjectResponseData(); long objectId = data.objectId(); int nodeId = data.nodeId(); long nodeEpoch = data.nodeEpoch(); @@ -702,7 +702,7 @@ public ControllerResult commitWALObject(CommitWALOb Errors nodeEpochCheckResult = nodeEpochCheck(nodeId, nodeEpoch); if (nodeEpochCheckResult != Errors.NONE) { resp.setErrorCode(nodeEpochCheckResult.code()); - log.warn("[CommitWALObject] nodeId={}'s epoch={} check failed, code: {}", + log.warn("[CommitSSTObject] nodeId={}'s epoch={} check failed, code: {}", nodeId, nodeEpoch, nodeEpochCheckResult.code()); return ControllerResult.of(Collections.emptyList(), resp); } @@ -724,7 +724,7 @@ public ControllerResult commitWALObject(CommitWALOb .collect(Collectors.toList()); Errors continuityCheckResult = streamAdvanceCheck(offsetRanges, data.nodeId()); if (continuityCheckResult != Errors.NONE) { - log.error("[CommitWALObject] streamId={} advance check failed, error: {}", offsetRanges, continuityCheckResult); + log.error("[CommitSSTObject] streamId={} advance check failed, error: {}", offsetRanges, continuityCheckResult); resp.setErrorCode(continuityCheckResult.code()); return ControllerResult.of(Collections.emptyList(), resp); } @@ -733,13 +733,13 @@ public ControllerResult commitWALObject(CommitWALOb // commit object ControllerResult commitResult = this.s3ObjectControlManager.commitObject(objectId, objectSize, committedTs); if (commitResult.response() == Errors.OBJECT_NOT_EXIST) { - log.error("[CommitWALObject] object={} not exist when commit wal object", objectId); + log.error("[CommitSSTObject] object={} not exist when commit SST object", objectId); resp.setErrorCode(Errors.OBJECT_NOT_EXIST.code()); return ControllerResult.of(Collections.emptyList(), resp); } if (commitResult.response() == Errors.REDUNDANT_OPERATION) { // regard it as redundant commit operation, just return success - log.warn("[CommitWALObject] object={} already committed", objectId); + log.warn("[CommitSSTObject] object={} already committed", objectId); return ControllerResult.of(Collections.emptyList(), resp); } List records = new ArrayList<>(commitResult.records()); @@ -748,7 +748,7 @@ public ControllerResult commitWALObject(CommitWALOb if (compactedObjectIds != null && !compactedObjectIds.isEmpty()) { ControllerResult destroyResult = this.s3ObjectControlManager.markDestroyObjects(compactedObjectIds); if (!destroyResult.response()) { - log.error("[CommitWALObject]: Mark destroy compacted objects: {} failed", compactedObjectIds); + log.error("[CommitSSTObject]: Mark destroy compacted objects: {} failed", compactedObjectIds); resp.setErrorCode(Errors.COMPACTED_OBJECTS_NOT_FOUND.code()); return ControllerResult.of(Collections.emptyList(), resp); } @@ -756,32 +756,32 @@ public ControllerResult commitWALObject(CommitWALOb // update dataTs to the min compacted object's dataTs //noinspection OptionalGetWithoutIsPresent dataTs = compactedObjectIds.stream() - .map(id -> this.nodesMetadata.get(nodeId).walObjects.get(id)) - .map(S3WALObject::dataTimeInMs) + .map(id -> this.nodesMetadata.get(nodeId).sstObjects.get(id)) + .map(S3SSTObject::dataTimeInMs) .min(Long::compareTo).get(); } List indexes = streamRanges.stream() .map(range -> new StreamOffsetRange(range.streamId(), range.startOffset(), range.endOffset())) .collect(Collectors.toList()); - // update node's wal object - NodeS3WALMetadata nodeMetadata = this.nodesMetadata.get(nodeId); + // update node's SST object + NodeS3SSTMetadata nodeMetadata = this.nodesMetadata.get(nodeId); if (nodeMetadata == null) { - // first time commit wal object, generate node's metadata record + // first time commit SST object, generate node's metadata record records.add(new ApiMessageAndVersion(new NodeWALMetadataRecord() .setNodeId(nodeId), (short) 0)); } if (objectId != NOOP_OBJECT_ID) { - // generate node's wal object record + // generate node's SST object record List streamIndexes = indexes.stream() .map(Convertor::to) .collect(Collectors.toList()); - WALObjectRecord walObjectRecord = new WALObjectRecord() + S3SSTObjectRecord S3SSTObjectRecord = new S3SSTObjectRecord() .setObjectId(objectId) .setDataTimeInMs(dataTs) .setOrderId(orderId) .setNodeId(nodeId) .setStreamsIndex(streamIndexes); - records.add(new ApiMessageAndVersion(walObjectRecord, (short) 0)); + records.add(new ApiMessageAndVersion(S3SSTObjectRecord, (short) 0)); } // commit stream objects if (streamObjects != null && !streamObjects.isEmpty()) { @@ -790,7 +790,7 @@ public ControllerResult commitWALObject(CommitWALOb ControllerResult streamObjectCommitResult = this.s3ObjectControlManager.commitObject(streamObject.objectId(), streamObject.objectSize(), committedTs); if (streamObjectCommitResult.response() != Errors.NONE) { - log.error("[CommitWALObject]: stream object={} not exist when commit wal object: {}", streamObject.objectId(), objectId); + log.error("[CommitSSTObject]: stream object={} not exist when commit SST object: {}", streamObject.objectId(), objectId); resp.setErrorCode(streamObjectCommitResult.response().code()); return ControllerResult.of(Collections.emptyList(), resp); } @@ -806,11 +806,11 @@ public ControllerResult commitWALObject(CommitWALOb } // generate compacted objects' remove record if (compactedObjectIds != null && !compactedObjectIds.isEmpty()) { - compactedObjectIds.forEach(id -> records.add(new ApiMessageAndVersion(new RemoveWALObjectRecord() + compactedObjectIds.forEach(id -> records.add(new ApiMessageAndVersion(new RemoveSSTObjectRecord() .setNodeId(nodeId) .setObjectId(id), (short) 0))); } - log.info("[CommitWALObject]: nodeId={} commit wal object: {} success, compacted objects: {}, WAL stream range: {}, stream objects: {}", + log.info("[CommitSSTObject]: nodeId={} commit SST object: {} success, compacted objects: {}, SST stream range: {}, stream objects: {}", nodeId, objectId, compactedObjectIds, data.objectStreamRanges(), streamObjects); return ControllerResult.atomicOf(records, resp); } @@ -1110,32 +1110,32 @@ public void replay(NodeWALMetadataRecord record) { long nodeEpoch = record.nodeEpoch(); // already exist, update the node's self metadata if (this.nodesMetadata.containsKey(nodeId)) { - NodeS3WALMetadata nodeMetadata = this.nodesMetadata.get(nodeId); + NodeS3SSTMetadata nodeMetadata = this.nodesMetadata.get(nodeId); nodeMetadata.nodeEpoch.set(nodeEpoch); return; } // not exist, create a new node - this.nodesMetadata.put(nodeId, new NodeS3WALMetadata(nodeId, nodeEpoch, this.snapshotRegistry)); + this.nodesMetadata.put(nodeId, new NodeS3SSTMetadata(nodeId, nodeEpoch, this.snapshotRegistry)); } - public void replay(WALObjectRecord record) { + public void replay(S3SSTObjectRecord record) { long objectId = record.objectId(); int nodeId = record.nodeId(); long orderId = record.orderId(); long dataTs = record.dataTimeInMs(); List streamIndexes = record.streamsIndex(); - NodeS3WALMetadata nodeMetadata = this.nodesMetadata.get(nodeId); + NodeS3SSTMetadata nodeMetadata = this.nodesMetadata.get(nodeId); if (nodeMetadata == null) { // should not happen - log.error("nodeId={} not exist when replay wal object record {}", nodeId, record); + log.error("nodeId={} not exist when replay SST object record {}", nodeId, record); return; } - // create wal object + // create SST object Map indexMap = streamIndexes .stream() .collect(Collectors.toMap(StreamIndex::streamId, Convertor::to)); - nodeMetadata.walObjects.put(objectId, new S3WALObject(objectId, nodeId, indexMap, orderId, dataTs)); + nodeMetadata.sstObjects.put(objectId, new S3SSTObject(objectId, nodeId, indexMap, orderId, dataTs)); // update range record.streamsIndex().forEach(index -> { @@ -1143,36 +1143,36 @@ public void replay(WALObjectRecord record) { S3StreamMetadata metadata = this.streamsMetadata.get(streamId); if (metadata == null) { // ignore it - LOGGER.error("[REPLAY_WAL_FAIL] cannot find streamId={} metadata", streamId); + LOGGER.error("[REPLAY_SST_FAIL] cannot find streamId={} metadata", streamId); return; } RangeMetadata rangeMetadata = metadata.currentRangeMetadata(); if (rangeMetadata == null) { // ignore it - LOGGER.error("[REPLAY_WAL_FAIL] cannot find streamId={} stream range metadata", streamId); + LOGGER.error("[REPLAY_SST_FAIL] cannot find streamId={} stream range metadata", streamId); return; } if (rangeMetadata.endOffset() < index.startOffset()) { - LOGGER.error("[REPLAY_WAL_FAIL] streamId={} offset is not continuous, expect {} real {}", streamId, + LOGGER.error("[REPLAY_SST_FAIL] streamId={} offset is not continuous, expect {} real {}", streamId, rangeMetadata.endOffset(), index.startOffset()); return; } else if (rangeMetadata.endOffset() > index.startOffset()) { - // ignore it, the WAL object is the compacted WAL object. + // ignore it, the SST object is the compacted SST object. return; } rangeMetadata.setEndOffset(index.endOffset()); }); } - public void replay(RemoveWALObjectRecord record) { + public void replay(RemoveSSTObjectRecord record) { long objectId = record.objectId(); - NodeS3WALMetadata walMetadata = this.nodesMetadata.get(record.nodeId()); + NodeS3SSTMetadata walMetadata = this.nodesMetadata.get(record.nodeId()); if (walMetadata == null) { // should not happen - log.error("node {} not exist when replay remove wal object record {}", record.nodeId(), record); + log.error("node {} not exist when replay remove SST object record {}", record.nodeId(), record); return; } - walMetadata.walObjects.remove(objectId); + walMetadata.sstObjects.remove(objectId); } public void replay(S3StreamObjectRecord record) { @@ -1192,15 +1192,15 @@ public void replay(S3StreamObjectRecord record) { // update range RangeMetadata rangeMetadata = streamMetadata.currentRangeMetadata(); if (rangeMetadata == null) { - LOGGER.error("[REPLAY_WAL_FAIL] cannot find streamId={} stream range metadata", streamId); + LOGGER.error("[REPLAY_SST_FAIL] cannot find streamId={} stream range metadata", streamId); return; } if (rangeMetadata.endOffset() < startOffset) { - LOGGER.error("[REPLAY_WAL_FAIL] streamId={} offset is not continuous, expect {} real {}", streamId, + LOGGER.error("[REPLAY_SST_FAIL] streamId={} offset is not continuous, expect {} real {}", streamId, rangeMetadata.endOffset(), startOffset); return; } else if (rangeMetadata.endOffset() > startOffset) { - // ignore it, the WAL object compact and stream compact may generate this StreamObjectRecord. + // ignore it, the SST object compact and stream compact may generate this StreamObjectRecord. return; } rangeMetadata.setEndOffset(endOffset); @@ -1228,7 +1228,7 @@ public Map streamsMetadata() { return streamsMetadata; } - public Map nodesMetadata() { + public Map nodesMetadata() { return nodesMetadata; } diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java index e2cfa05afc..d3ed78ed4d 100644 --- a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java @@ -41,7 +41,7 @@ import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord; import org.apache.kafka.common.metadata.RemoveS3StreamRecord; import org.apache.kafka.common.metadata.RemoveTopicRecord; -import org.apache.kafka.common.metadata.RemoveWALObjectRecord; +import org.apache.kafka.common.metadata.RemoveSSTObjectRecord; import org.apache.kafka.common.metadata.S3ObjectRecord; import org.apache.kafka.common.metadata.S3StreamObjectRecord; import org.apache.kafka.common.metadata.S3StreamRecord; @@ -49,7 +49,7 @@ import org.apache.kafka.common.metadata.UnfenceBrokerRecord; import org.apache.kafka.common.metadata.UnregisterBrokerRecord; import org.apache.kafka.common.metadata.UpdateNextNodeIdRecord; -import org.apache.kafka.common.metadata.WALObjectRecord; +import org.apache.kafka.common.metadata.S3SSTObjectRecord; import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.server.common.MetadataVersion; @@ -305,11 +305,11 @@ public void replay(ApiMessage record) { case REMOVE_S3_STREAM_OBJECT_RECORD: replay((RemoveS3StreamObjectRecord) record); break; - case WALOBJECT_RECORD: - replay((WALObjectRecord) record); + case S3_SSTOBJECT_RECORD: + replay((S3SSTObjectRecord) record); break; - case REMOVE_WALOBJECT_RECORD: - replay((RemoveWALObjectRecord) record); + case REMOVE_SSTOBJECT_RECORD: + replay((RemoveSSTObjectRecord) record); break; case S3_OBJECT_RECORD: replay((S3ObjectRecord) record); @@ -441,11 +441,11 @@ public void replay(RemoveS3StreamObjectRecord record) { getOrCreateStreamsMetadataDelta().replay(record); } - public void replay(WALObjectRecord record) { + public void replay(S3SSTObjectRecord record) { getOrCreateStreamsMetadataDelta().replay(record); } - public void replay(RemoveWALObjectRecord record) { + public void replay(RemoveSSTObjectRecord record) { getOrCreateStreamsMetadataDelta().replay(record); } diff --git a/metadata/src/main/java/org/apache/kafka/image/NodeS3WALMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/NodeS3SSTMetadataImage.java similarity index 71% rename from metadata/src/main/java/org/apache/kafka/image/NodeS3WALMetadataImage.java rename to metadata/src/main/java/org/apache/kafka/image/NodeS3SSTMetadataImage.java index 4129a9b1f1..05d18a80a1 100644 --- a/metadata/src/main/java/org/apache/kafka/image/NodeS3WALMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/NodeS3SSTMetadataImage.java @@ -29,30 +29,30 @@ import com.automq.stream.s3.metadata.S3StreamConstant; import org.apache.kafka.common.metadata.NodeWALMetadataRecord; -import org.apache.kafka.metadata.stream.S3WALObject; +import org.apache.kafka.metadata.stream.S3SSTObject; import org.apache.kafka.image.writer.ImageWriter; import org.apache.kafka.image.writer.ImageWriterOptions; import org.apache.kafka.server.common.ApiMessageAndVersion; -public class NodeS3WALMetadataImage { +public class NodeS3SSTMetadataImage { - public static final NodeS3WALMetadataImage EMPTY = new NodeS3WALMetadataImage(S3StreamConstant.INVALID_BROKER_ID, + public static final NodeS3SSTMetadataImage EMPTY = new NodeS3SSTMetadataImage(S3StreamConstant.INVALID_BROKER_ID, S3StreamConstant.INVALID_BROKER_EPOCH, Collections.emptyMap()); private final int nodeId; private final long nodeEpoch; - private final Map s3WalObjects; - private final SortedMap orderIndex; + private final Map s3SSTObjects; + private final SortedMap orderIndex; - public NodeS3WALMetadataImage(int nodeId, long nodeEpoch, Map walObjects) { + public NodeS3SSTMetadataImage(int nodeId, long nodeEpoch, Map sstObjects) { this.nodeId = nodeId; this.nodeEpoch = nodeEpoch; - this.s3WalObjects = new HashMap<>(walObjects); + this.s3SSTObjects = new HashMap<>(sstObjects); // build order index - if (s3WalObjects.isEmpty()) { + if (s3SSTObjects.isEmpty()) { this.orderIndex = Collections.emptySortedMap(); } else { this.orderIndex = new TreeMap<>(); - s3WalObjects.values().forEach(s3WALObject -> orderIndex.put(s3WALObject.orderId(), s3WALObject)); + s3SSTObjects.values().forEach(s3SSTObject -> orderIndex.put(s3SSTObject.orderId(), s3SSTObject)); } } @@ -64,33 +64,33 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - NodeS3WALMetadataImage that = (NodeS3WALMetadataImage) o; - return nodeId == that.nodeId && nodeEpoch == that.nodeEpoch && Objects.equals(s3WalObjects, that.s3WalObjects); + NodeS3SSTMetadataImage that = (NodeS3SSTMetadataImage) o; + return nodeId == that.nodeId && nodeEpoch == that.nodeEpoch && Objects.equals(s3SSTObjects, that.s3SSTObjects); } @Override public int hashCode() { - return Objects.hash(nodeId, nodeEpoch, s3WalObjects); + return Objects.hash(nodeId, nodeEpoch, s3SSTObjects); } public void write(ImageWriter writer, ImageWriterOptions options) { writer.write(new ApiMessageAndVersion(new NodeWALMetadataRecord() .setNodeId(nodeId) .setNodeEpoch(nodeEpoch), (short) 0)); - s3WalObjects.values().forEach(wal -> { + s3SSTObjects.values().forEach(wal -> { writer.write(wal.toRecord()); }); } - public Map getWalObjects() { - return s3WalObjects; + public Map getSSTObjects() { + return s3SSTObjects; } - public SortedMap getOrderIndex() { + public SortedMap getOrderIndex() { return orderIndex; } - public List orderList() { + public List orderList() { return orderIndex.values().stream().collect(Collectors.toList()); } @@ -107,7 +107,7 @@ public String toString() { return "NodeS3WALMetadataImage{" + "nodeId=" + nodeId + ", nodeEpoch=" + nodeEpoch + - ", s3WalObjects=" + s3WalObjects + + ", s3SSTObjects=" + s3SSTObjects + '}'; } } diff --git a/metadata/src/main/java/org/apache/kafka/image/NodeS3WALMetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/NodeS3WALMetadataDelta.java index dd9a0f23a2..79c6d8c822 100644 --- a/metadata/src/main/java/org/apache/kafka/image/NodeS3WALMetadataDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/NodeS3WALMetadataDelta.java @@ -22,20 +22,20 @@ import java.util.Map; import java.util.Set; import org.apache.kafka.common.metadata.NodeWALMetadataRecord; -import org.apache.kafka.common.metadata.RemoveWALObjectRecord; -import org.apache.kafka.common.metadata.WALObjectRecord; -import org.apache.kafka.metadata.stream.S3WALObject; +import org.apache.kafka.common.metadata.RemoveSSTObjectRecord; +import org.apache.kafka.common.metadata.S3SSTObjectRecord; +import org.apache.kafka.metadata.stream.S3SSTObject; public class NodeS3WALMetadataDelta { - private final NodeS3WALMetadataImage image; + private final NodeS3SSTMetadataImage image; private int nodeId; private long nodeEpoch; - private final Map addedS3WALObjects = new HashMap<>(); + private final Map addedS3SSTObjects = new HashMap<>(); - private final Set removedS3WALObjects = new HashSet<>(); + private final Set removedS3SSTObjects = new HashSet<>(); - public NodeS3WALMetadataDelta(NodeS3WALMetadataImage image) { + public NodeS3WALMetadataDelta(NodeS3SSTMetadataImage image) { this.image = image; this.nodeId = image.getNodeId(); this.nodeEpoch = image.getNodeEpoch(); @@ -46,25 +46,25 @@ public void replay(NodeWALMetadataRecord record) { this.nodeEpoch = record.nodeEpoch(); } - public void replay(WALObjectRecord record) { - addedS3WALObjects.put(record.objectId(), S3WALObject.of(record)); + public void replay(S3SSTObjectRecord record) { + addedS3SSTObjects.put(record.objectId(), S3SSTObject.of(record)); // new add or update, so remove from removedObjects - removedS3WALObjects.remove(record.objectId()); + removedS3SSTObjects.remove(record.objectId()); } - public void replay(RemoveWALObjectRecord record) { - removedS3WALObjects.add(record.objectId()); + public void replay(RemoveSSTObjectRecord record) { + removedS3SSTObjects.add(record.objectId()); // new remove, so remove from addedObjects - addedS3WALObjects.remove(record.objectId()); + addedS3SSTObjects.remove(record.objectId()); } - public NodeS3WALMetadataImage apply() { - Map newS3WALObjects = new HashMap<>(image.getWalObjects()); - // add all changed WAL objects - newS3WALObjects.putAll(addedS3WALObjects); - // remove all removed WAL objects - removedS3WALObjects.forEach(newS3WALObjects::remove); - return new NodeS3WALMetadataImage(this.nodeId, this.nodeEpoch, newS3WALObjects); + public NodeS3SSTMetadataImage apply() { + Map newS3SSTObjects = new HashMap<>(image.getSSTObjects()); + // add all changed SST objects + newS3SSTObjects.putAll(addedS3SSTObjects); + // remove all removed SST objects + removedS3SSTObjects.forEach(newS3SSTObjects::remove); + return new NodeS3SSTMetadataImage(this.nodeId, this.nodeEpoch, newS3SSTObjects); } } diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java index fbf5a072a1..5586f2589c 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java @@ -29,10 +29,10 @@ import org.apache.kafka.common.metadata.RemoveRangeRecord; import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord; import org.apache.kafka.common.metadata.RemoveS3StreamRecord; -import org.apache.kafka.common.metadata.RemoveWALObjectRecord; +import org.apache.kafka.common.metadata.RemoveSSTObjectRecord; import org.apache.kafka.common.metadata.S3StreamRecord; import org.apache.kafka.common.metadata.S3StreamObjectRecord; -import org.apache.kafka.common.metadata.WALObjectRecord; +import org.apache.kafka.common.metadata.S3SSTObjectRecord; public final class S3StreamsMetadataDelta { @@ -47,7 +47,7 @@ public final class S3StreamsMetadataDelta { private final Set deletedStreams = new HashSet<>(); // TODO: when we recycle the node's memory data structure // We don't use pair of specify NodeCreateRecord and NodeRemoveRecord to create or remove nodes, and - // we create NodeStreamMetadataImage when we create the first WALObjectRecord for a node, + // we create NodeStreamMetadataImage when we create the first SSTObjectRecord for a node, // so we should decide when to recycle the node's memory data structure private final Set deletedNodes = new HashSet<>(); @@ -105,7 +105,7 @@ public void replay(RemoveS3StreamObjectRecord record) { getOrCreateStreamMetadataDelta(record.streamId()).replay(record); } - public void replay(WALObjectRecord record) { + public void replay(S3SSTObjectRecord record) { getOrCreateNodeStreamMetadataDelta(record.nodeId()).replay(record); record.streamsIndex().forEach(index -> { getOrCreateStreamMetadataDelta(index.streamId()).replay(new AdvanceRangeRecord() @@ -114,7 +114,7 @@ public void replay(WALObjectRecord record) { }); } - public void replay(RemoveWALObjectRecord record) { + public void replay(RemoveSSTObjectRecord record) { getOrCreateNodeStreamMetadataDelta(record.nodeId()).replay(record); } @@ -132,7 +132,7 @@ private NodeS3WALMetadataDelta getOrCreateNodeStreamMetadataDelta(Integer nodeId if (delta == null) { delta = new NodeS3WALMetadataDelta( image.nodeWALMetadata(). - getOrDefault(nodeId, NodeS3WALMetadataImage.EMPTY)); + getOrDefault(nodeId, NodeS3SSTMetadataImage.EMPTY)); changedNodes.put(nodeId, delta); } return delta; @@ -140,7 +140,7 @@ private NodeS3WALMetadataDelta getOrCreateNodeStreamMetadataDelta(Integer nodeId S3StreamsMetadataImage apply() { Map newStreams = new HashMap<>(image.streamsMetadata()); - Map newNodeStreams = new HashMap<>(image.nodeWALMetadata()); + Map newNodeStreams = new HashMap<>(image.nodeWALMetadata()); // apply the delta changes of old streams since the last image this.changedStreams.forEach((streamId, delta) -> { @@ -152,8 +152,8 @@ S3StreamsMetadataImage apply() { // apply the delta changes of old nodes since the last image this.changedNodes.forEach((nodeId, delta) -> { - NodeS3WALMetadataImage newNodeS3WALMetadataImage = delta.apply(); - newNodeStreams.put(nodeId, newNodeS3WALMetadataImage); + NodeS3SSTMetadataImage newNodeS3SSTMetadataImage = delta.apply(); + newNodeStreams.put(nodeId, newNodeS3SSTMetadataImage); }); // remove the deleted nodes deletedNodes.forEach(newNodeStreams::remove); diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java index 27e649ae1d..5e3cbbfa27 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java @@ -25,7 +25,7 @@ import org.apache.kafka.metadata.stream.InRangeObjects; import org.apache.kafka.metadata.stream.RangeMetadata; import org.apache.kafka.metadata.stream.S3StreamObject; -import org.apache.kafka.metadata.stream.S3WALObject; +import org.apache.kafka.metadata.stream.S3SSTObject; import org.apache.kafka.server.common.ApiMessageAndVersion; import java.util.ArrayList; @@ -47,20 +47,20 @@ public final class S3StreamsMetadataImage { private final Map streamsMetadata; - private final Map nodeWALMetadata; + private final Map nodeSSTMetadata; public S3StreamsMetadataImage( long assignedStreamId, Map streamsMetadata, - Map nodeWALMetadata) { + Map nodeSSTMetadata) { this.nextAssignedStreamId = assignedStreamId + 1; this.streamsMetadata = streamsMetadata; - this.nodeWALMetadata = nodeWALMetadata; + this.nodeSSTMetadata = nodeSSTMetadata; } boolean isEmpty() { - return this.nodeWALMetadata.isEmpty() && this.streamsMetadata.isEmpty(); + return this.nodeSSTMetadata.isEmpty() && this.streamsMetadata.isEmpty(); } public void write(ImageWriter writer, ImageWriterOptions options) { @@ -68,7 +68,7 @@ public void write(ImageWriter writer, ImageWriterOptions options) { new ApiMessageAndVersion( new AssignedStreamIdRecord().setAssignedStreamId(nextAssignedStreamId - 1), (short) 0)); streamsMetadata.values().forEach(image -> image.write(writer, options)); - nodeWALMetadata.values().forEach(image -> image.write(writer, options)); + nodeSSTMetadata.values().forEach(image -> image.write(writer, options)); } public InRangeObjects getObjects(long streamId, long startOffset, long endOffset, int limit) { @@ -130,8 +130,8 @@ public List getStreamObjects(long streamId, long startOffset, lo }).sorted(Comparator.comparing(S3StreamObject::streamOffsetRange)).limit(limit).collect(Collectors.toCollection(ArrayList::new)); } - public List getWALObjects(int nodeId) { - NodeS3WALMetadataImage wal = nodeWALMetadata.get(nodeId); + public List getSSTObjects(int nodeId) { + NodeS3SSTMetadataImage wal = nodeSSTMetadata.get(nodeId); if (wal == null) { return Collections.emptyList(); } @@ -174,9 +174,9 @@ public RangeSearcher(long startOffset, long endOffset, long streamId, int nodeId this.nodeId = nodeId; } - private Queue rangeOfWalObjects() { - NodeS3WALMetadataImage wal = nodeWALMetadata.get(nodeId); - return wal.orderList().stream() + private Queue rangeOfSSTObjects() { + NodeS3SSTMetadataImage sstImage = nodeSSTMetadata.get(nodeId); + return sstImage.orderList().stream() .filter(obj -> obj.offsetRanges().containsKey(streamId)) .filter(obj -> { StreamOffsetRange offsetRange = obj.offsetRanges().get(streamId); @@ -221,23 +221,23 @@ public InRangeObjects getObjects(int limit) { if (limit <= 0) { return InRangeObjects.INVALID; } - if (!nodeWALMetadata.containsKey(nodeId) || !streamsMetadata.containsKey(streamId)) { + if (!nodeSSTMetadata.containsKey(nodeId) || !streamsMetadata.containsKey(streamId)) { return InRangeObjects.INVALID; } Queue streamObjects = rangeOfStreamObjects(); - Queue walObjects = rangeOfWalObjects(); + Queue sstObjects = rangeOfSSTObjects(); List inRangeObjects = new ArrayList<>(); long nextStartOffset = startOffset; while (limit > 0 && nextStartOffset < endOffset - && (!streamObjects.isEmpty() || !walObjects.isEmpty())) { + && (!streamObjects.isEmpty() || !sstObjects.isEmpty())) { S3ObjectMetadataWrapper streamRange = null; - if (walObjects.isEmpty() || (!streamObjects.isEmpty() && streamObjects.peek().startOffset() < walObjects.peek().startOffset())) { + if (sstObjects.isEmpty() || (!streamObjects.isEmpty() && streamObjects.peek().startOffset() < sstObjects.peek().startOffset())) { streamRange = streamObjects.poll(); } else { - streamRange = walObjects.poll(); + streamRange = sstObjects.poll(); } long objectStartOffset = streamRange.startOffset(); long objectEndOffset = streamRange.endOffset(); @@ -301,16 +301,16 @@ public boolean equals(Object obj) { S3StreamsMetadataImage other = (S3StreamsMetadataImage) obj; return this.nextAssignedStreamId == other.nextAssignedStreamId && this.streamsMetadata.equals(other.streamsMetadata) - && this.nodeWALMetadata.equals(other.nodeWALMetadata); + && this.nodeSSTMetadata.equals(other.nodeSSTMetadata); } @Override public int hashCode() { - return Objects.hash(nextAssignedStreamId, streamsMetadata, nodeWALMetadata); + return Objects.hash(nextAssignedStreamId, streamsMetadata, nodeSSTMetadata); } - public Map nodeWALMetadata() { - return nodeWALMetadata; + public Map nodeWALMetadata() { + return nodeSSTMetadata; } public Map streamsMetadata() { @@ -336,7 +336,7 @@ public String toString() { "nextAssignedStreamId=" + nextAssignedStreamId + ", streamsMetadata=" + streamsMetadata.entrySet().stream(). map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.joining(", ")) + - ", nodeWALMetadata=" + nodeWALMetadata.entrySet().stream(). + ", nodeWALMetadata=" + nodeSSTMetadata.entrySet().stream(). map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.joining(", ")) + '}'; } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/Convertor.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/Convertor.java index c8a03cb883..922774cf04 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/Convertor.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/Convertor.java @@ -18,17 +18,17 @@ package org.apache.kafka.metadata.stream; import com.automq.stream.s3.metadata.StreamOffsetRange; -import org.apache.kafka.common.metadata.WALObjectRecord; +import org.apache.kafka.common.metadata.S3SSTObjectRecord; public class Convertor { - public static WALObjectRecord.StreamIndex to(StreamOffsetRange s) { - return new WALObjectRecord.StreamIndex() + public static S3SSTObjectRecord.StreamIndex to(StreamOffsetRange s) { + return new S3SSTObjectRecord.StreamIndex() .setStreamId(s.getStreamId()) .setStartOffset(s.getStartOffset()) .setEndOffset(s.getEndOffset()); } - public static StreamOffsetRange to(WALObjectRecord.StreamIndex s) { + public static StreamOffsetRange to(S3SSTObjectRecord.StreamIndex s) { return new StreamOffsetRange(s.streamId(), s.startOffset(), s.endOffset()); } } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3SSTObject.java similarity index 86% rename from metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java rename to metadata/src/main/java/org/apache/kafka/metadata/stream/S3SSTObject.java index 6731ddcd82..9502f48280 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3SSTObject.java @@ -24,10 +24,10 @@ import com.automq.stream.s3.metadata.S3ObjectType; import com.automq.stream.s3.metadata.S3StreamConstant; import com.automq.stream.s3.metadata.StreamOffsetRange; -import org.apache.kafka.common.metadata.WALObjectRecord; +import org.apache.kafka.common.metadata.S3SSTObjectRecord; import org.apache.kafka.server.common.ApiMessageAndVersion; -public class S3WALObject implements Comparable { +public class S3SSTObject implements Comparable { private final long objectId; private final int nodeId; @@ -43,11 +43,11 @@ public class S3WALObject implements Comparable { private final long dataTimeInMs; // Only used for testing - public S3WALObject(long objectId, int nodeId, final Map streamOffsetRanges, long orderId) { + public S3SSTObject(long objectId, int nodeId, final Map streamOffsetRanges, long orderId) { this(objectId, nodeId, streamOffsetRanges, orderId, S3StreamConstant.INVALID_TS); } - public S3WALObject(long objectId, int nodeId, final Map streamOffsetRanges, long orderId, long dataTimeInMs) { + public S3SSTObject(long objectId, int nodeId, final Map streamOffsetRanges, long orderId, long dataTimeInMs) { this.orderId = orderId; this.objectId = objectId; this.nodeId = nodeId; @@ -68,7 +68,7 @@ public Map offsetRanges() { } public ApiMessageAndVersion toRecord() { - return new ApiMessageAndVersion(new WALObjectRecord() + return new ApiMessageAndVersion(new S3SSTObjectRecord() .setObjectId(objectId) .setNodeId(nodeId) .setOrderId(orderId) @@ -81,14 +81,13 @@ public ApiMessageAndVersion toRecord() { .collect(Collectors.toList())), (short) 0); } - public static S3WALObject of(WALObjectRecord record) { + public static S3SSTObject of(S3SSTObjectRecord record) { Map offsetRanges = record.streamsIndex() .stream() - .collect(Collectors.toMap(index -> index.streamId(), + .collect(Collectors.toMap(S3SSTObjectRecord.StreamIndex::streamId, index -> new StreamOffsetRange(index.streamId(), index.startOffset(), index.endOffset()))); - S3WALObject s3WalObject = new S3WALObject(record.objectId(), record.nodeId(), + return new S3SSTObject(record.objectId(), record.nodeId(), offsetRanges, record.orderId(), record.dataTimeInMs()); - return s3WalObject; } public Integer nodeId() { @@ -100,7 +99,7 @@ public Long objectId() { } public S3ObjectType objectType() { - return S3ObjectType.WAL; + return S3ObjectType.SST; } public long orderId() { @@ -119,7 +118,7 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - S3WALObject that = (S3WALObject) o; + S3SSTObject that = (S3SSTObject) o; return objectId == that.objectId; } @@ -130,7 +129,7 @@ public int hashCode() { @Override public String toString() { - return "S3WALObject{" + + return "S3SSTObject{" + "objectId=" + objectId + ", orderId=" + orderId + ", nodeId=" + nodeId + @@ -140,7 +139,7 @@ public String toString() { } @Override - public int compareTo(S3WALObject o) { + public int compareTo(S3SSTObject o) { return Long.compare(this.orderId, o.orderId); } } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/SortedWALObjects.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/SortedSSTObjects.java similarity index 73% rename from metadata/src/main/java/org/apache/kafka/metadata/stream/SortedWALObjects.java rename to metadata/src/main/java/org/apache/kafka/metadata/stream/SortedSSTObjects.java index bc61009d78..62d6a76f81 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/SortedWALObjects.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/SortedSSTObjects.java @@ -17,37 +17,31 @@ package org.apache.kafka.metadata.stream; -import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.function.Predicate; -public interface SortedWALObjects { +public interface SortedSSTObjects { int size(); boolean isEmpty(); - Iterator iterator(); + Iterator iterator(); - List list(); + List list(); boolean contains(Object o); - boolean add(S3WALObject s3WALObject); - - default boolean addAll(Collection walObjects) { - walObjects.forEach(this::add); - return true; - } + boolean add(S3SSTObject s3SSTObject); boolean remove(Object o); - default boolean removeIf(Predicate filter) { + default boolean removeIf(Predicate filter) { return this.list().removeIf(filter); } - S3WALObject get(int index); + S3SSTObject get(int index); void clear(); diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/SortedWALObjectsList.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/SortedSSTObjectsList.java similarity index 72% rename from metadata/src/main/java/org/apache/kafka/metadata/stream/SortedWALObjectsList.java rename to metadata/src/main/java/org/apache/kafka/metadata/stream/SortedSSTObjectsList.java index b45e2e0ad9..aff5f9e26e 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/SortedWALObjectsList.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/SortedSSTObjectsList.java @@ -23,23 +23,23 @@ import java.util.Objects; import java.util.stream.Collectors; -public class SortedWALObjectsList implements SortedWALObjects { +public class SortedSSTObjectsList implements SortedSSTObjects { - private final List list; + private final List list; - public SortedWALObjectsList(SortedWALObjects source) { + public SortedSSTObjectsList(SortedSSTObjects source) { this.list = new LinkedList<>(source.list()); } - public SortedWALObjectsList() { + public SortedSSTObjectsList() { this.list = new LinkedList<>(); } /** - * Construct a SortedWALObjectsList from a list of S3WALObjects. - * @param list the list of S3WALObjects, must guarantee that the list is sorted + * Construct a SortedSSTObjectsList from a list of S3SSTObjects. + * @param list the list of S3SSTObjects, must guarantee that the list is sorted */ - public SortedWALObjectsList(List list) { + public SortedSSTObjectsList(List list) { this.list = list; } @@ -54,12 +54,12 @@ public boolean isEmpty() { } @Override - public Iterator iterator() { + public Iterator iterator() { return this.list.iterator(); } @Override - public List list() { + public List list() { return list; } @@ -69,16 +69,16 @@ public boolean contains(Object o) { } @Override - public boolean add(S3WALObject s3WALObject) { + public boolean add(S3SSTObject s3SSTObject) { // TODO: optimize by binary search for (int index = 0; index < this.list.size(); index++) { - S3WALObject current = this.list.get(index); - if (s3WALObject.compareTo(current) <= 0) { - this.list.add(index, s3WALObject); + S3SSTObject current = this.list.get(index); + if (s3SSTObject.compareTo(current) <= 0) { + this.list.add(index, s3SSTObject); return true; } } - this.list.add(s3WALObject); + this.list.add(s3SSTObject); return true; } @@ -91,7 +91,7 @@ public boolean remove(Object o) { @Override - public S3WALObject get(int index) { + public S3SSTObject get(int index) { return this.list.get(index); } @@ -108,7 +108,7 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - SortedWALObjectsList that = (SortedWALObjectsList) o; + SortedSSTObjectsList that = (SortedSSTObjectsList) o; return Objects.equals(list, that.list); } @@ -119,8 +119,8 @@ public int hashCode() { @Override public String toString() { - return "SortedWALObjectsList{" + - "list=" + list.stream().map(S3WALObject::toString).collect(Collectors.joining(",")) + + return "SortedSSTObjectsList{" + + "list=" + list.stream().map(S3SSTObject::toString).collect(Collectors.joining(",")) + '}'; } } diff --git a/metadata/src/main/resources/common/metadata/RemoveWALObjectRecord.json b/metadata/src/main/resources/common/metadata/RemoveSSTObjectRecord.json similarity index 97% rename from metadata/src/main/resources/common/metadata/RemoveWALObjectRecord.json rename to metadata/src/main/resources/common/metadata/RemoveSSTObjectRecord.json index 1666bc20fe..4906b92dfd 100644 --- a/metadata/src/main/resources/common/metadata/RemoveWALObjectRecord.json +++ b/metadata/src/main/resources/common/metadata/RemoveSSTObjectRecord.json @@ -16,7 +16,7 @@ { "apiKey": 508, "type": "metadata", - "name": "RemoveWALObjectRecord", + "name": "RemoveSSTObjectRecord", "validVersions": "0", "flexibleVersions": "0+", "fields": [ diff --git a/metadata/src/main/resources/common/metadata/WALObjectRecord.json b/metadata/src/main/resources/common/metadata/S3SSTObjectRecord.json similarity index 98% rename from metadata/src/main/resources/common/metadata/WALObjectRecord.json rename to metadata/src/main/resources/common/metadata/S3SSTObjectRecord.json index 7a73e04911..ec3ee22f53 100644 --- a/metadata/src/main/resources/common/metadata/WALObjectRecord.json +++ b/metadata/src/main/resources/common/metadata/S3SSTObjectRecord.json @@ -16,7 +16,7 @@ { "apiKey": 507, "type": "metadata", - "name": "WALObjectRecord", + "name": "S3SSTObjectRecord", "validVersions": "0", "flexibleVersions": "0+", "fields": [ diff --git a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java index 947d3231fc..1164cf3a6c 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java @@ -24,10 +24,10 @@ import org.apache.kafka.common.message.CloseStreamsResponseData.CloseStreamResponse; import org.apache.kafka.common.message.CommitStreamObjectRequestData; import org.apache.kafka.common.message.CommitStreamObjectResponseData; -import org.apache.kafka.common.message.CommitWALObjectRequestData; -import org.apache.kafka.common.message.CommitWALObjectRequestData.ObjectStreamRange; -import org.apache.kafka.common.message.CommitWALObjectRequestData.StreamObject; -import org.apache.kafka.common.message.CommitWALObjectResponseData; +import org.apache.kafka.common.message.CommitSSTObjectRequestData; +import org.apache.kafka.common.message.CommitSSTObjectRequestData.ObjectStreamRange; +import org.apache.kafka.common.message.CommitSSTObjectRequestData.StreamObject; +import org.apache.kafka.common.message.CommitSSTObjectResponseData; import org.apache.kafka.common.message.CreateStreamsRequestData.CreateStreamRequest; import org.apache.kafka.common.message.CreateStreamsResponseData.CreateStreamResponse; import org.apache.kafka.common.message.DeleteStreamsRequestData.DeleteStreamRequest; @@ -46,19 +46,19 @@ import org.apache.kafka.common.metadata.RemoveRangeRecord; import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord; import org.apache.kafka.common.metadata.RemoveS3StreamRecord; -import org.apache.kafka.common.metadata.RemoveWALObjectRecord; +import org.apache.kafka.common.metadata.RemoveSSTObjectRecord; import org.apache.kafka.common.metadata.S3StreamObjectRecord; import org.apache.kafka.common.metadata.S3StreamRecord; -import org.apache.kafka.common.metadata.WALObjectRecord; +import org.apache.kafka.common.metadata.S3SSTObjectRecord; import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.controller.stream.S3ObjectControlManager; import org.apache.kafka.controller.stream.StreamControlManager; -import org.apache.kafka.controller.stream.StreamControlManager.NodeS3WALMetadata; +import org.apache.kafka.controller.stream.StreamControlManager.NodeS3SSTMetadata; import org.apache.kafka.controller.stream.StreamControlManager.S3StreamMetadata; import org.apache.kafka.metadata.stream.RangeMetadata; -import org.apache.kafka.metadata.stream.S3WALObject; +import org.apache.kafka.metadata.stream.S3SSTObject; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.timeline.SnapshotRegistry; import org.junit.jupiter.api.BeforeEach; @@ -309,39 +309,39 @@ public void testCommitWalBasic() { new OpenStreamRequest().setStreamId(STREAM0).setStreamEpoch(EPOCH0)); verifyFirstTimeOpenStreamResult(result2, EPOCH0, BROKER0); replay(manager, result2.records()); - // 2. commit valid wal object + // 2. commit valid SST object List streamRanges0 = List.of(new ObjectStreamRange() .setStreamId(STREAM0) .setStreamEpoch(EPOCH0) .setStartOffset(0L) .setEndOffset(100L)); - CommitWALObjectRequestData commitRequest0 = new CommitWALObjectRequestData() + CommitSSTObjectRequestData commitRequest0 = new CommitSSTObjectRequestData() .setObjectId(0L) .setNodeId(BROKER0) .setObjectSize(999) .setObjectStreamRanges(streamRanges0); - ControllerResult result3 = manager.commitWALObject(commitRequest0); + ControllerResult result3 = manager.commitSSTObject(commitRequest0); assertEquals(Errors.NONE.code(), result3.response().errorCode()); replay(manager, result3.records()); - // verify range's end offset advanced and wal object is added + // verify range's end offset advanced and SST object is added S3StreamMetadata streamMetadata0 = manager.streamsMetadata().get(STREAM0); assertEquals(1, streamMetadata0.ranges().size()); RangeMetadata rangeMetadata0 = streamMetadata0.ranges().get(0); assertEquals(0L, rangeMetadata0.startOffset()); assertEquals(100L, rangeMetadata0.endOffset()); - assertEquals(1, manager.nodesMetadata().get(BROKER0).walObjects().size()); - // 3. commit a wal object that doesn't exist + assertEquals(1, manager.nodesMetadata().get(BROKER0).sstObjects().size()); + // 3. commit a SST object that doesn't exist List streamRanges1 = List.of(new ObjectStreamRange() .setStreamId(STREAM0) .setStreamEpoch(EPOCH0) .setStartOffset(100) .setEndOffset(200)); - CommitWALObjectRequestData commitRequest1 = new CommitWALObjectRequestData() + CommitSSTObjectRequestData commitRequest1 = new CommitSSTObjectRequestData() .setObjectId(1L) .setNodeId(BROKER0) .setObjectSize(999) .setObjectStreamRanges(streamRanges1); - ControllerResult result4 = manager.commitWALObject(commitRequest1); + ControllerResult result4 = manager.commitSSTObject(commitRequest1); assertEquals(Errors.OBJECT_NOT_EXIST.code(), result4.response().errorCode()); // 4. node_0 close stream_0 with epoch_0 and node_1 open stream_0 with epoch_1 ControllerResult result7 = manager.closeStream(BROKER0, BROKER_EPOCH0, @@ -354,21 +354,21 @@ public void testCommitWalBasic() { assertEquals(0L, result8.response().startOffset()); assertEquals(100L, result8.response().nextOffset()); replay(manager, result8.records()); - // 5. node_1 successfully commit wal object which contains stream_0's data + // 5. node_1 successfully commit SST object which contains stream_0's data List streamRanges6 = List.of(new ObjectStreamRange() .setStreamId(STREAM0) .setStreamEpoch(EPOCH1) .setStartOffset(100) .setEndOffset(300)); - CommitWALObjectRequestData commitRequest6 = new CommitWALObjectRequestData() + CommitSSTObjectRequestData commitRequest6 = new CommitSSTObjectRequestData() .setNodeId(BROKER1) .setObjectId(6L) .setObjectSize(999) .setObjectStreamRanges(streamRanges6); - ControllerResult result10 = manager.commitWALObject(commitRequest6); + ControllerResult result10 = manager.commitSSTObject(commitRequest6); assertEquals(Errors.NONE.code(), result10.response().errorCode()); replay(manager, result10.records()); - // verify range's end offset advanced and wal object is added + // verify range's end offset advanced and SST object is added streamMetadata0 = manager.streamsMetadata().get(STREAM0); assertEquals(2, streamMetadata0.ranges().size()); assertEquals(0L, streamMetadata0.ranges().get(0).startOffset()); @@ -376,7 +376,7 @@ public void testCommitWalBasic() { RangeMetadata rangeMetadata1 = streamMetadata0.ranges().get(1); assertEquals(100L, rangeMetadata1.startOffset()); assertEquals(300L, rangeMetadata1.endOffset()); - assertEquals(1, manager.nodesMetadata().get(BROKER1).walObjects().size()); + assertEquals(1, manager.nodesMetadata().get(BROKER1).sstObjects().size()); // 6. get stream's offset GetOpeningStreamsRequestData request = new GetOpeningStreamsRequestData() @@ -428,7 +428,7 @@ public void testCommitWalCompacted() { createAndOpenStream(BROKER0, EPOCH0); createAndOpenStream(BROKER0, EPOCH0); - // 2. commit first level wal object of stream_0 and stream_1 + // 2. commit first level SST object of stream_0 and stream_1 List streamRanges0 = List.of( new ObjectStreamRange() .setStreamId(STREAM0) @@ -440,13 +440,13 @@ public void testCommitWalCompacted() { .setStreamEpoch(EPOCH0) .setStartOffset(0L) .setEndOffset(200L)); - CommitWALObjectRequestData commitRequest0 = new CommitWALObjectRequestData() + CommitSSTObjectRequestData commitRequest0 = new CommitSSTObjectRequestData() .setObjectId(0L) .setOrderId(0L) .setNodeId(BROKER0) .setObjectSize(999) .setObjectStreamRanges(streamRanges0); - ControllerResult result4 = manager.commitWALObject(commitRequest0); + ControllerResult result4 = manager.commitSSTObject(commitRequest0); assertEquals(Errors.NONE.code(), result4.response().errorCode()); replay(manager, result4.records()); @@ -460,7 +460,7 @@ public void testCommitWalCompacted() { assertEquals(STREAM1, streamsOffset.streamMetadataList().get(1).streamId()); assertEquals(0L, streamsOffset.streamMetadataList().get(1).startOffset()); assertEquals(200L, streamsOffset.streamMetadataList().get(1).endOffset()); - long object0DataTs = manager.nodesMetadata().get(BROKER0).walObjects().get(0L).dataTimeInMs(); + long object0DataTs = manager.nodesMetadata().get(BROKER0).sstObjects().get(0L).dataTimeInMs(); // 4. keep committing first level object of stream_0 and stream_1 List streamRanges1 = List.of( @@ -474,13 +474,13 @@ public void testCommitWalCompacted() { .setStreamEpoch(EPOCH0) .setStartOffset(200L) .setEndOffset(300L)); - CommitWALObjectRequestData commitRequest1 = new CommitWALObjectRequestData() + CommitSSTObjectRequestData commitRequest1 = new CommitSSTObjectRequestData() .setObjectId(1L) .setOrderId(1L) .setNodeId(BROKER0) .setObjectSize(999) .setObjectStreamRanges(streamRanges1); - ControllerResult result5 = manager.commitWALObject(commitRequest1); + ControllerResult result5 = manager.commitSSTObject(commitRequest1); assertEquals(Errors.NONE.code(), result5.response().errorCode()); replay(manager, result5.records()); @@ -493,9 +493,9 @@ public void testCommitWalCompacted() { assertEquals(STREAM1, streamsOffset.streamMetadataList().get(1).streamId()); assertEquals(0L, streamsOffset.streamMetadataList().get(1).startOffset()); assertEquals(300L, streamsOffset.streamMetadataList().get(1).endOffset()); - long object1DataTs = manager.nodesMetadata().get(BROKER0).walObjects().get(1L).dataTimeInMs(); + long object1DataTs = manager.nodesMetadata().get(BROKER0).sstObjects().get(1L).dataTimeInMs(); - // 6. commit an invalid wal object which contains the destroyed or not exist wal object + // 6. commit an invalid SST object which contains the destroyed or not exist SST object Mockito.when(objectControlManager.markDestroyObjects(anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), false)); List streamRanges2 = List.of( new ObjectStreamRange() @@ -508,27 +508,27 @@ public void testCommitWalCompacted() { .setStreamEpoch(EPOCH0) .setStartOffset(0L) .setEndOffset(300L)); - CommitWALObjectRequestData commitRequest2 = new CommitWALObjectRequestData() + CommitSSTObjectRequestData commitRequest2 = new CommitSSTObjectRequestData() .setObjectId(2L) .setOrderId(0L) .setNodeId(BROKER0) .setObjectSize(999) .setObjectStreamRanges(streamRanges2) .setCompactedObjectIds(List.of(0L, 1L, 10L)); - ControllerResult result6 = manager.commitWALObject(commitRequest2); + ControllerResult result6 = manager.commitSSTObject(commitRequest2); assertEquals(Errors.COMPACTED_OBJECTS_NOT_FOUND.code(), result6.response().errorCode()); assertEquals(0, result6.records().size()); Mockito.when(objectControlManager.markDestroyObjects(anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), true)); - // 7. commit a second level wal object which compact wal_0 and wal_1 - commitRequest2 = new CommitWALObjectRequestData() + // 7. commit a second level SST object which compact wal_0 and wal_1 + commitRequest2 = new CommitSSTObjectRequestData() .setObjectId(2L) .setOrderId(0L) .setNodeId(BROKER0) .setObjectSize(999) .setObjectStreamRanges(streamRanges2) .setCompactedObjectIds(List.of(0L, 1L)); - result6 = manager.commitWALObject(commitRequest2); + result6 = manager.commitSSTObject(commitRequest2); assertEquals(Errors.NONE.code(), result6.response().errorCode()); replay(manager, result6.records()); @@ -541,12 +541,12 @@ public void testCommitWalCompacted() { assertEquals(STREAM1, streamsOffset.streamMetadataList().get(1).streamId()); assertEquals(0L, streamsOffset.streamMetadataList().get(1).startOffset()); assertEquals(300L, streamsOffset.streamMetadataList().get(1).endOffset()); - assertEquals(object0DataTs, manager.nodesMetadata().get(BROKER0).walObjects().get(2L).dataTimeInMs()); + assertEquals(object0DataTs, manager.nodesMetadata().get(BROKER0).sstObjects().get(2L).dataTimeInMs()); - // 9. verify compacted wal objects is removed - assertEquals(1, manager.nodesMetadata().get(BROKER0).walObjects().size()); - assertEquals(2, manager.nodesMetadata().get(BROKER0).walObjects().get(2L).objectId()); - assertEquals(0, manager.nodesMetadata().get(BROKER0).walObjects().get(2L).orderId()); + // 9. verify compacted SST objects is removed + assertEquals(1, manager.nodesMetadata().get(BROKER0).sstObjects().size()); + assertEquals(2, manager.nodesMetadata().get(BROKER0).sstObjects().get(2L).objectId()); + assertEquals(0, manager.nodesMetadata().get(BROKER0).sstObjects().get(2L).orderId()); } @@ -568,7 +568,7 @@ public void testCommitWalWithStreamObject() { .setStreamEpoch(EPOCH0) .setStartOffset(0L) .setEndOffset(100L)); - CommitWALObjectRequestData commitRequest0 = new CommitWALObjectRequestData() + CommitSSTObjectRequestData commitRequest0 = new CommitSSTObjectRequestData() .setObjectId(0L) .setOrderId(0L) .setNodeId(BROKER0) @@ -582,7 +582,7 @@ public void testCommitWalWithStreamObject() { .setStartOffset(0L) .setEndOffset(200L) )); - ControllerResult result4 = manager.commitWALObject(commitRequest0); + ControllerResult result4 = manager.commitSSTObject(commitRequest0); assertEquals(Errors.NONE.code(), result4.response().errorCode()); replay(manager, result4.records()); @@ -600,14 +600,14 @@ public void testCommitWalWithStreamObject() { // 4. verify stream object is added assertEquals(1, manager.streamsMetadata().get(STREAM1).streamObjects().size()); - // 5. commit wal object with not continuous stream + // 5. commit SST object with not continuous stream List streamRanges1 = List.of( new ObjectStreamRange() .setStreamId(STREAM0) .setStreamEpoch(EPOCH0) .setStartOffset(99L) .setEndOffset(200L)); - CommitWALObjectRequestData commitRequest1 = new CommitWALObjectRequestData() + CommitSSTObjectRequestData commitRequest1 = new CommitSSTObjectRequestData() .setObjectId(1L) .setOrderId(1L) .setNodeId(BROKER0) @@ -621,7 +621,7 @@ public void testCommitWalWithStreamObject() { .setStartOffset(200L) .setEndOffset(400L) )); - ControllerResult result5 = manager.commitWALObject(commitRequest1); + ControllerResult result5 = manager.commitSSTObject(commitRequest1); assertEquals(Errors.OFFSET_NOT_MATCHED.code(), result5.response().errorCode()); } @@ -643,7 +643,7 @@ public void testCommitStreamObject() { .setStreamEpoch(EPOCH0) .setStartOffset(0L) .setEndOffset(100L)); - CommitWALObjectRequestData commitRequest0 = new CommitWALObjectRequestData() + CommitSSTObjectRequestData commitRequest0 = new CommitSSTObjectRequestData() .setObjectId(0L) .setOrderId(0L) .setNodeId(BROKER0) @@ -657,7 +657,7 @@ public void testCommitStreamObject() { .setStartOffset(0L) .setEndOffset(200L) )); - ControllerResult result0 = manager.commitWALObject(commitRequest0); + ControllerResult result0 = manager.commitSSTObject(commitRequest0); assertEquals(Errors.NONE.code(), result0.response().errorCode()); replay(manager, result0.records()); long object0DataTs = manager.streamsMetadata().get(STREAM1).streamObjects().get(1L).dataTimeInMs(); @@ -669,7 +669,7 @@ public void testCommitStreamObject() { .setStreamEpoch(EPOCH0) .setStartOffset(100L) .setEndOffset(200L)); - CommitWALObjectRequestData commitRequest1 = new CommitWALObjectRequestData() + CommitSSTObjectRequestData commitRequest1 = new CommitSSTObjectRequestData() .setObjectId(2L) .setOrderId(1L) .setNodeId(BROKER0) @@ -683,7 +683,7 @@ public void testCommitStreamObject() { .setStartOffset(200L) .setEndOffset(400L) )); - ControllerResult result1 = manager.commitWALObject(commitRequest1); + ControllerResult result1 = manager.commitSSTObject(commitRequest1); assertEquals(Errors.NONE.code(), result1.response().errorCode()); replay(manager, result1.records()); long object1DataTs = manager.streamsMetadata().get(STREAM1).streamObjects().get(3L).dataTimeInMs(); @@ -742,8 +742,8 @@ private void mockData0() { // 1. create and open stream0 and stream1 for node0 createAndOpenStream(BROKER0, EPOCH0); createAndOpenStream(BROKER0, EPOCH0); - // 2. commit wal object with stream0-[0, 10) - CommitWALObjectRequestData requestData = new CommitWALObjectRequestData() + // 2. commit SST object with stream0-[0, 10) + CommitSSTObjectRequestData requestData = new CommitSSTObjectRequestData() .setNodeId(BROKER0) .setObjectSize(999) .setOrderId(0) @@ -753,10 +753,10 @@ private void mockData0() { .setStreamEpoch(EPOCH0) .setStartOffset(0) .setEndOffset(10))); - ControllerResult result = manager.commitWALObject(requestData); + ControllerResult result = manager.commitSSTObject(requestData); replay(manager, result.records()); - // 3. commit wal object with stream0-[10, 20), and stream1-[0, 10) - requestData = new CommitWALObjectRequestData() + // 3. commit SST object with stream0-[10, 20), and stream1-[0, 10) + requestData = new CommitSSTObjectRequestData() .setNodeId(BROKER0) .setObjectSize(999) .setOrderId(1) @@ -770,10 +770,10 @@ private void mockData0() { .setStreamEpoch(EPOCH0) .setStartOffset(0) .setEndOffset(10))); - result = manager.commitWALObject(requestData); + result = manager.commitSSTObject(requestData); replay(manager, result.records()); // 4. commit with a stream object with stream0-[20, 40) - requestData = new CommitWALObjectRequestData() + requestData = new CommitSSTObjectRequestData() .setNodeId(BROKER0) .setObjectSize(999) .setOrderId(S3StreamConstant.INVALID_ORDER_ID) @@ -784,13 +784,13 @@ private void mockData0() { .setObjectId(2) .setStartOffset(20) .setEndOffset(40))); - result = manager.commitWALObject(requestData); + result = manager.commitSSTObject(requestData); replay(manager, result.records()); // 5. node0 close stream0 and node1 open stream0 closeStream(BROKER0, EPOCH0, STREAM0); openStream(BROKER1, EPOCH1, STREAM0); - // 6. commit wal object with stream0-[40, 70) - requestData = new CommitWALObjectRequestData() + // 6. commit SST object with stream0-[40, 70) + requestData = new CommitSSTObjectRequestData() .setNodeId(BROKER1) .setObjectSize(999) .setObjectId(3) @@ -800,7 +800,7 @@ private void mockData0() { .setStreamEpoch(EPOCH1) .setStartOffset(40) .setEndOffset(70))); - result = manager.commitWALObject(requestData); + result = manager.commitSSTObject(requestData); replay(manager, result.records()); } @@ -826,17 +826,17 @@ public void testTrim() { assertEquals(60, rangeMetadata.startOffset()); assertEquals(70, rangeMetadata.endOffset()); assertEquals(0, streamMetadata.streamObjects().size()); - NodeS3WALMetadata node0Metadata = manager.nodesMetadata().get(BROKER0); - assertEquals(1, node0Metadata.walObjects().size()); - S3WALObject s3WALObject = node0Metadata.walObjects().get(1L); - assertEquals(1, s3WALObject.offsetRanges().size()); - StreamOffsetRange range = s3WALObject.offsetRanges().get(STREAM0); + NodeS3SSTMetadata node0Metadata = manager.nodesMetadata().get(BROKER0); + assertEquals(1, node0Metadata.sstObjects().size()); + S3SSTObject s3SSTObject = node0Metadata.sstObjects().get(1L); + assertEquals(1, s3SSTObject.offsetRanges().size()); + StreamOffsetRange range = s3SSTObject.offsetRanges().get(STREAM0); assertNull(range); - NodeS3WALMetadata node1Metadata = manager.nodesMetadata().get(BROKER1); - assertEquals(1, node1Metadata.walObjects().size()); - s3WALObject = node1Metadata.walObjects().get(3L); - assertEquals(1, s3WALObject.offsetRanges().size()); - range = s3WALObject.offsetRanges().get(STREAM0); + NodeS3SSTMetadata node1Metadata = manager.nodesMetadata().get(BROKER1); + assertEquals(1, node1Metadata.sstObjects().size()); + s3SSTObject = node1Metadata.sstObjects().get(3L); + assertEquals(1, s3SSTObject.offsetRanges().size()); + range = s3SSTObject.offsetRanges().get(STREAM0); assertNotNull(range); assertEquals(40, range.getStartOffset()); assertEquals(70, range.getEndOffset()); @@ -860,12 +860,12 @@ public void testTrim() { assertEquals(70, rangeMetadata.endOffset()); assertEquals(0, streamMetadata.streamObjects().size()); node0Metadata = manager.nodesMetadata().get(BROKER0); - assertEquals(1, node0Metadata.walObjects().size()); + assertEquals(1, node0Metadata.sstObjects().size()); node1Metadata = manager.nodesMetadata().get(BROKER1); - assertEquals(0, node1Metadata.walObjects().size()); + assertEquals(0, node1Metadata.sstObjects().size()); - // 5. commit wal object with stream0-[70, 100) - CommitWALObjectRequestData requestData = new CommitWALObjectRequestData() + // 5. commit SST object with stream0-[70, 100) + CommitSSTObjectRequestData requestData = new CommitSSTObjectRequestData() .setNodeId(BROKER1) .setObjectSize(999) .setObjectId(4) @@ -875,7 +875,7 @@ public void testTrim() { .setStreamEpoch(EPOCH0) .setStartOffset(70) .setEndOffset(100))); - ControllerResult result = manager.commitWALObject(requestData); + ControllerResult result = manager.commitSSTObject(requestData); replay(manager, result.records()); // 6. verify @@ -919,12 +919,12 @@ public void testDelete() { // 4. verify assertNull(manager.streamsMetadata().get(STREAM0)); - assertEquals(1, manager.nodesMetadata().get(BROKER0).walObjects().size()); - S3WALObject walObject = manager.nodesMetadata().get(BROKER0).walObjects().get(1L); - assertEquals(1, walObject.offsetRanges().size()); - StreamOffsetRange offsetRange = walObject.offsetRanges().get(STREAM1); + assertEquals(1, manager.nodesMetadata().get(BROKER0).sstObjects().size()); + S3SSTObject sstObject = manager.nodesMetadata().get(BROKER0).sstObjects().get(1L); + assertEquals(1, sstObject.offsetRanges().size()); + StreamOffsetRange offsetRange = sstObject.offsetRanges().get(STREAM1); assertNotNull(offsetRange); - assertEquals(0, manager.nodesMetadata().get(BROKER1).walObjects().size()); + assertEquals(0, manager.nodesMetadata().get(BROKER1).sstObjects().size()); // 5. delete again req = new DeleteStreamRequest() @@ -1020,11 +1020,11 @@ private void replay(StreamControlManager manager, List rec case REMOVE_NODE_WALMETADATA_RECORD: manager.replay((RemoveNodeWALMetadataRecord) message); break; - case WALOBJECT_RECORD: - manager.replay((WALObjectRecord) message); + case S3_SSTOBJECT_RECORD: + manager.replay((S3SSTObjectRecord) message); break; - case REMOVE_WALOBJECT_RECORD: - manager.replay((RemoveWALObjectRecord) message); + case REMOVE_SSTOBJECT_RECORD: + manager.replay((RemoveSSTObjectRecord) message); break; case S3_STREAM_OBJECT_RECORD: manager.replay((S3StreamObjectRecord) message); diff --git a/metadata/src/test/java/org/apache/kafka/image/NodeS3WALMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/NodeS3SSTMetadataImageTest.java similarity index 75% rename from metadata/src/test/java/org/apache/kafka/image/NodeS3WALMetadataImageTest.java rename to metadata/src/test/java/org/apache/kafka/image/NodeS3SSTMetadataImageTest.java index 9c4a74eeb2..b3f47ac98d 100644 --- a/metadata/src/test/java/org/apache/kafka/image/NodeS3WALMetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/NodeS3SSTMetadataImageTest.java @@ -27,12 +27,12 @@ import com.automq.stream.s3.metadata.S3StreamConstant; import com.automq.stream.s3.metadata.StreamOffsetRange; import org.apache.kafka.common.metadata.NodeWALMetadataRecord; -import org.apache.kafka.common.metadata.RemoveWALObjectRecord; -import org.apache.kafka.common.metadata.WALObjectRecord; +import org.apache.kafka.common.metadata.RemoveSSTObjectRecord; +import org.apache.kafka.common.metadata.S3SSTObjectRecord; import org.apache.kafka.image.writer.ImageWriterOptions; import org.apache.kafka.image.writer.RecordListWriter; import org.apache.kafka.metadata.RecordTestUtils; -import org.apache.kafka.metadata.stream.S3WALObject; +import org.apache.kafka.metadata.stream.S3SSTObject; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -40,7 +40,7 @@ @Timeout(value = 40) @Tag("S3Unit") -public class NodeS3WALMetadataImageTest { +public class NodeS3SSTMetadataImageTest { private static final int BROKER0 = 0; @@ -49,96 +49,96 @@ public class NodeS3WALMetadataImageTest { private static final long STREAM1 = 1; @Test - public void testS3WALObjects() { - NodeS3WALMetadataImage image0 = new NodeS3WALMetadataImage(BROKER0, S3StreamConstant.INVALID_BROKER_EPOCH, Collections.emptyMap()); + public void testS3SSTObjects() { + NodeS3SSTMetadataImage image0 = new NodeS3SSTMetadataImage(BROKER0, S3StreamConstant.INVALID_BROKER_EPOCH, Collections.emptyMap()); List delta0Records = new ArrayList<>(); NodeS3WALMetadataDelta delta0 = new NodeS3WALMetadataDelta(image0); - // 1. create WALObject0 and WALObject1 + // 1. create SSTObject0 and SSTObject1 delta0Records.add(new ApiMessageAndVersion(new NodeWALMetadataRecord() .setNodeId(BROKER0) .setNodeEpoch(1), (short) 0)); - delta0Records.add(new ApiMessageAndVersion(new WALObjectRecord() + delta0Records.add(new ApiMessageAndVersion(new S3SSTObjectRecord() .setObjectId(0L) .setNodeId(BROKER0) .setOrderId(0L) .setStreamsIndex(List.of( - new WALObjectRecord.StreamIndex() + new S3SSTObjectRecord.StreamIndex() .setStreamId(STREAM0) .setStartOffset(0L) .setEndOffset(100L), - new WALObjectRecord.StreamIndex() + new S3SSTObjectRecord.StreamIndex() .setStreamId(STREAM1) .setStartOffset(0) .setEndOffset(200))), (short) 0)); - delta0Records.add(new ApiMessageAndVersion(new WALObjectRecord() + delta0Records.add(new ApiMessageAndVersion(new S3SSTObjectRecord() .setObjectId(1L) .setNodeId(BROKER0) .setOrderId(1L) .setStreamsIndex(List.of( - new WALObjectRecord.StreamIndex() + new S3SSTObjectRecord.StreamIndex() .setStreamId(STREAM0) .setStartOffset(101L) .setEndOffset(200L))), (short) 0)); RecordTestUtils.replayAll(delta0, delta0Records); // verify delta and check image's write - NodeS3WALMetadataImage image1 = new NodeS3WALMetadataImage(BROKER0, 1, + NodeS3SSTMetadataImage image1 = new NodeS3SSTMetadataImage(BROKER0, 1, Map.of( - 0L, new S3WALObject(0L, BROKER0, Map.of( + 0L, new S3SSTObject(0L, BROKER0, Map.of( STREAM0, new StreamOffsetRange(STREAM0, 0L, 100L), STREAM1, new StreamOffsetRange(STREAM1, 0L, 200L)), 0L), - 1L, new S3WALObject(1L, BROKER0, Map.of( + 1L, new S3SSTObject(1L, BROKER0, Map.of( STREAM0, new StreamOffsetRange(STREAM0, 101L, 200L)), 1L))); assertEquals(image1, delta0.apply()); testToImageAndBack(image1); - // 2. remove range of stream0 in WALObject0 and update epoch + // 2. remove range of stream0 in SSTObject0 and update epoch List delta1Records = new ArrayList<>(); NodeS3WALMetadataDelta delta1 = new NodeS3WALMetadataDelta(image1); delta1Records.add(new ApiMessageAndVersion(new NodeWALMetadataRecord() .setNodeId(BROKER0) .setNodeEpoch(2), (short) 0)); - delta1Records.add(new ApiMessageAndVersion(new WALObjectRecord() + delta1Records.add(new ApiMessageAndVersion(new S3SSTObjectRecord() .setObjectId(0L) .setNodeId(BROKER0) .setOrderId(0L) .setStreamsIndex(List.of( - new WALObjectRecord.StreamIndex() + new S3SSTObjectRecord.StreamIndex() .setStreamId(STREAM1) .setStartOffset(0) .setEndOffset(200))), (short) 0)); RecordTestUtils.replayAll(delta1, delta1Records); // verify delta and check image's write - NodeS3WALMetadataImage image2 = new NodeS3WALMetadataImage(BROKER0, 2, + NodeS3SSTMetadataImage image2 = new NodeS3SSTMetadataImage(BROKER0, 2, Map.of( - 0L, new S3WALObject(0L, BROKER0, Map.of( + 0L, new S3SSTObject(0L, BROKER0, Map.of( STREAM1, new StreamOffsetRange(STREAM1, 0L, 200L)), 0L), - 1L, new S3WALObject(1L, BROKER0, Map.of( + 1L, new S3SSTObject(1L, BROKER0, Map.of( STREAM0, new StreamOffsetRange(STREAM0, 101L, 200L)), 1L))); assertEquals(image2, delta1.apply()); testToImageAndBack(image2); - // 3. remove WALObject1 + // 3. remove SSTObject1 List delta2Records = new ArrayList<>(); NodeS3WALMetadataDelta delta2 = new NodeS3WALMetadataDelta(image2); - delta2Records.add(new ApiMessageAndVersion(new RemoveWALObjectRecord() + delta2Records.add(new ApiMessageAndVersion(new RemoveSSTObjectRecord() .setObjectId(1L), (short) 0)); RecordTestUtils.replayAll(delta2, delta2Records); // verify delta and check image's write - NodeS3WALMetadataImage image3 = new NodeS3WALMetadataImage(BROKER0, 2, + NodeS3SSTMetadataImage image3 = new NodeS3SSTMetadataImage(BROKER0, 2, Map.of( - 0L, new S3WALObject(0L, BROKER0, Map.of( + 0L, new S3SSTObject(0L, BROKER0, Map.of( STREAM1, new StreamOffsetRange(STREAM1, 0L, 200L)), 0L))); assertEquals(image3, delta2.apply()); testToImageAndBack(image3); } - private void testToImageAndBack(NodeS3WALMetadataImage image) { + private void testToImageAndBack(NodeS3SSTMetadataImage image) { RecordListWriter writer = new RecordListWriter(); ImageWriterOptions options = new ImageWriterOptions.Builder().build(); image.write(writer, options); - NodeS3WALMetadataDelta delta = new NodeS3WALMetadataDelta(NodeS3WALMetadataImage.EMPTY); + NodeS3WALMetadataDelta delta = new NodeS3WALMetadataDelta(NodeS3SSTMetadataImage.EMPTY); RecordTestUtils.replayAll(delta, writer.records()); - NodeS3WALMetadataImage newImage = delta.apply(); + NodeS3SSTMetadataImage newImage = delta.apply(); assertEquals(image, newImage); } diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java index b221fe5b29..8e9d762a4d 100644 --- a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java @@ -28,7 +28,7 @@ import org.apache.kafka.metadata.stream.InRangeObjects; import org.apache.kafka.metadata.stream.RangeMetadata; import org.apache.kafka.metadata.stream.S3StreamObject; -import org.apache.kafka.metadata.stream.S3WALObject; +import org.apache.kafka.metadata.stream.S3SSTObject; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -101,21 +101,21 @@ private void testToImageAndBack(S3StreamsMetadataImage image) { @Test public void testGetObjects() { - Map broker0WalObjects = Map.of( - 0L, new S3WALObject(0, BROKER0, Map.of(STREAM0, new StreamOffsetRange(STREAM0, 100L, 120L)), 0L), - 1L, new S3WALObject(1, BROKER0, Map.of(STREAM0, new StreamOffsetRange(STREAM0, 120L, 140L)), 1L), - 2L, new S3WALObject(2, BROKER0, Map.of(STREAM0, new StreamOffsetRange(STREAM0, 180L, 200L)), 2L), - 3L, new S3WALObject(3, BROKER0, Map.of(STREAM0, + Map broker0SSTObjects = Map.of( + 0L, new S3SSTObject(0, BROKER0, Map.of(STREAM0, new StreamOffsetRange(STREAM0, 100L, 120L)), 0L), + 1L, new S3SSTObject(1, BROKER0, Map.of(STREAM0, new StreamOffsetRange(STREAM0, 120L, 140L)), 1L), + 2L, new S3SSTObject(2, BROKER0, Map.of(STREAM0, new StreamOffsetRange(STREAM0, 180L, 200L)), 2L), + 3L, new S3SSTObject(3, BROKER0, Map.of(STREAM0, new StreamOffsetRange(STREAM0, 400L, 420L)), 3L), - 4L, new S3WALObject(4, BROKER0, Map.of(STREAM0, new StreamOffsetRange(STREAM0, 520L, 600L)), 4L)); - Map broker1WalObjects = Map.of( - 5L, new S3WALObject(5, BROKER1, Map.of(STREAM0, new StreamOffsetRange(STREAM0, 140L, 160L)), 0L), - 6L, new S3WALObject(6, BROKER1, Map.of(STREAM0, new StreamOffsetRange(STREAM0, 160L, 180L)), 1L), - 7L, new S3WALObject(7, BROKER1, Map.of(STREAM0, new StreamOffsetRange(STREAM0, 420L, 520L)), 2L)); - NodeS3WALMetadataImage broker0WALMetadataImage = new NodeS3WALMetadataImage(BROKER0, S3StreamConstant.INVALID_BROKER_EPOCH, - new HashMap<>(broker0WalObjects)); - NodeS3WALMetadataImage broker1WALMetadataImage = new NodeS3WALMetadataImage(BROKER1, S3StreamConstant.INVALID_BROKER_EPOCH, - new HashMap<>(broker1WalObjects)); + 4L, new S3SSTObject(4, BROKER0, Map.of(STREAM0, new StreamOffsetRange(STREAM0, 520L, 600L)), 4L)); + Map broker1SSTObjects = Map.of( + 5L, new S3SSTObject(5, BROKER1, Map.of(STREAM0, new StreamOffsetRange(STREAM0, 140L, 160L)), 0L), + 6L, new S3SSTObject(6, BROKER1, Map.of(STREAM0, new StreamOffsetRange(STREAM0, 160L, 180L)), 1L), + 7L, new S3SSTObject(7, BROKER1, Map.of(STREAM0, new StreamOffsetRange(STREAM0, 420L, 520L)), 2L)); + NodeS3SSTMetadataImage broker0WALMetadataImage = new NodeS3SSTMetadataImage(BROKER0, S3StreamConstant.INVALID_BROKER_EPOCH, + new HashMap<>(broker0SSTObjects)); + NodeS3SSTMetadataImage broker1WALMetadataImage = new NodeS3SSTMetadataImage(BROKER1, S3StreamConstant.INVALID_BROKER_EPOCH, + new HashMap<>(broker1SSTObjects)); Map ranges = Map.of( 0, new RangeMetadata(STREAM0, 0L, 0, 10L, 140L, BROKER0), 1, new RangeMetadata(STREAM0, 1L, 1, 140L, 180L, BROKER1), diff --git a/metadata/src/test/java/org/apache/kafka/metadata/stream/SortedWALObjectsListTest.java b/metadata/src/test/java/org/apache/kafka/metadata/stream/SortedSSTObjectsListTest.java similarity index 78% rename from metadata/src/test/java/org/apache/kafka/metadata/stream/SortedWALObjectsListTest.java rename to metadata/src/test/java/org/apache/kafka/metadata/stream/SortedSSTObjectsListTest.java index fa478ac02f..ad103c46e3 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/stream/SortedWALObjectsListTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/stream/SortedSSTObjectsListTest.java @@ -25,29 +25,29 @@ import org.junit.jupiter.api.Test; @Tag("S3Unit") -public class SortedWALObjectsListTest { +public class SortedSSTObjectsListTest { @Test public void testSorted() { - SortedWALObjects objects = new SortedWALObjectsList(); - objects.add(new S3WALObject(0, -1, null, 2)); - objects.add(new S3WALObject(1, -1, null, 1)); - objects.add(new S3WALObject(2, -1, null, 3)); - objects.add(new S3WALObject(3, -1, null, 0)); - objects.add(new S3WALObject(4, -1, null, 4)); + SortedSSTObjects objects = new SortedSSTObjectsList(); + objects.add(new S3SSTObject(0, -1, null, 2)); + objects.add(new S3SSTObject(1, -1, null, 1)); + objects.add(new S3SSTObject(2, -1, null, 3)); + objects.add(new S3SSTObject(3, -1, null, 0)); + objects.add(new S3SSTObject(4, -1, null, 4)); assertEquals(5, objects.size()); List expectedOrderIds = List.of(0L, 1L, 2L, 3L, 4L); assertEquals(expectedOrderIds, objects.list() .stream() - .map(S3WALObject::orderId) + .map(S3SSTObject::orderId) .collect(Collectors.toList())); List expectedObjectIds = List.of(3L, 1L, 0L, 2L, 4L); assertEquals(expectedObjectIds, objects.list() .stream() - .map(S3WALObject::objectId) + .map(S3SSTObject::objectId) .collect(Collectors.toList())); objects.removeIf(obj -> obj.objectId() == 2 || obj.objectId() == 3); @@ -56,13 +56,13 @@ public void testSorted() { expectedOrderIds = List.of(1L, 2L, 4L); assertEquals(expectedOrderIds, objects.list() .stream() - .map(S3WALObject::orderId) + .map(S3SSTObject::orderId) .collect(Collectors.toList())); expectedObjectIds = List.of(1L, 0L, 4L); assertEquals(expectedObjectIds, objects.list() .stream() - .map(S3WALObject::objectId) + .map(S3SSTObject::objectId) .collect(Collectors.toList())); } diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties index a984bb471f..64f105a23b 100644 --- a/tests/kafkatest/services/kafka/templates/kafka.properties +++ b/tests/kafkatest/services/kafka/templates/kafka.properties @@ -157,14 +157,14 @@ s3.bucket=ko3 s3.wal.path=/mnt/kafka/kafka-data-logs-1/s3wal s3.wal.capacity=209715200 s3.wal.cache.size=104857600 -s3.wal.object.size=52428800 +s3.wal.upload.threshold=52428800 s3.block.cache.size=20971520 # 3 minutes s3.wal.object.compaction.interval.minute=3 # 50MB -s3.wal.object.compaction.cache.size=20971520 +s3.sst.compaction.cache.size=20971520 # 5MB -s3.wal.object.compaction.stream.split.size=20971520 +s3.sst.compaction.stream.split.size=20971520 # The S3 stream object compaction task interval in minutes. default 60 minutes s3.stream.object.compaction.interval.minutes=3 # The S3 stream object compaction max size in bytes. default 10G