From 3dcd2796f4ae8d89b9a03a76ef7d568433e2e36b Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Thu, 30 May 2024 16:20:54 +0800 Subject: [PATCH 1/2] refactor: remove config "s3.object.tagging" Signed-off-by: Ning Yu --- core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java | 1 - core/src/main/scala/kafka/server/KafkaConfig.scala | 4 ---- 2 files changed, 5 deletions(-) diff --git a/core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java b/core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java index c399297287..75b19dd57f 100644 --- a/core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java +++ b/core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java @@ -32,7 +32,6 @@ public static Config to(KafkaConfig s) { .streamSplitSize(s.s3StreamSplitSize()) .objectBlockSize(s.s3ObjectBlockSize()) .objectPartSize(s.s3ObjectPartSize()) - .objectTagging(s.s3ObjectTagging()) .blockCacheSize(s.s3BlockCacheSize()) .streamObjectCompactionIntervalMinutes(s.s3StreamObjectCompactionTaskIntervalMinutes()) .streamObjectCompactionMaxSizeBytes(s.s3StreamObjectCompactionMaxSizeBytes()) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index e0c8452a68..c28705faac 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -457,7 +457,6 @@ object KafkaConfig { val S3StreamSplitSizeProp = "s3.stream.object.split.size" val S3ObjectBlockSizeProp = "s3.object.block.size" val S3ObjectPartSizeProp = "s3.object.part.size" - val S3ObjectTaggingProp = "s3.object.tagging" val S3BlockCacheSizeProp = "s3.block.cache.size" val S3StreamAllocatorPolicyProp = "s3.stream.allocator.policy" val S3StreamObjectCompactionIntervalMinutesProp = "s3.stream.object.compaction.interval.minutes" @@ -515,7 +514,6 @@ object KafkaConfig { val S3StreamSplitSizeDoc = "The S3 stream object split size threshold when upload delta WAL or compact stream set object." val S3ObjectBlockSizeDoc = "The S3 object compressed block size threshold." val S3ObjectPartSizeDoc = "The S3 object multi-part upload part size threshold." - val S3ObjectTaggingDoc = "Whether to tag S3 objects" val S3BlockCacheSizeDoc = "s3.block.cache.size is the size of the block cache. The block cache is used to cache cold data read from object storage. " + "It is recommended to set this configuration item to be greater than 4MB * the concurrent cold reads per partition, to achieve better cold read performance." val S3StreamAllocatorPolicyDoc = "The S3 stream memory allocator policy, supported value: " + ByteBufAllocPolicy.values().mkString(", ") + ".\n" + @@ -1425,7 +1423,6 @@ object KafkaConfig { .define(S3StreamSplitSizeProp, INT, 8388608, MEDIUM, S3StreamSplitSizeDoc) .define(S3ObjectBlockSizeProp, INT, 524288, MEDIUM, S3ObjectBlockSizeDoc) .define(S3ObjectPartSizeProp, INT, 16777216, MEDIUM, S3ObjectPartSizeDoc) - .define(S3ObjectTaggingProp, BOOLEAN, false, MEDIUM, S3ObjectTaggingDoc) .define(S3BlockCacheSizeProp, LONG, -1L, MEDIUM, S3BlockCacheSizeDoc) .define(S3StreamAllocatorPolicyProp, STRING, ByteBufAllocPolicy.POOLED_HEAP.name, MEDIUM, S3StreamAllocatorPolicyDoc) .define(S3StreamObjectCompactionIntervalMinutesProp, INT, 30, MEDIUM, S3StreamObjectCompactionIntervalMinutesDoc) @@ -2150,7 +2147,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val s3StreamSplitSize = getInt(KafkaConfig.S3StreamSplitSizeProp) val s3ObjectBlockSize = getInt(KafkaConfig.S3ObjectBlockSizeProp) val s3ObjectPartSize = getInt(KafkaConfig.S3ObjectPartSizeProp) - val s3ObjectTagging = getBoolean(KafkaConfig.S3ObjectTaggingProp) val s3StreamAllocatorPolicy = Enum.valueOf(classOf[ByteBufAllocPolicy], getString(KafkaConfig.S3StreamAllocatorPolicyProp)) val (s3WALCacheSize, s3BlockCacheSize, s3WALUploadThreshold) = adjustS3Configs(s3StreamAllocatorPolicy, s3WALCapacity) val s3StreamObjectCompactionTaskIntervalMinutes = getInt(KafkaConfig.S3StreamObjectCompactionIntervalMinutesProp) From 7e3c5c8de05fc96dde36be7477c7407183a122ad Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Thu, 30 May 2024 17:25:10 +0800 Subject: [PATCH 2/2] refactor: make `objectTagging` a `Map` rather than a `boolean` Signed-off-by: Ning Yu --- .../java/com/automq/shell/log/LogUploader.java | 2 +- .../shell/metrics/S3MetricsExporter.java | 2 +- .../main/scala/kafka/admin/S3TestCommand.scala | 2 +- .../main/java/com/automq/stream/s3/Config.java | 8 +++++--- .../automq/stream/s3/metadata/ObjectUtils.java | 18 ++++++++++++++---- .../stream/s3/operator/DefaultS3Operator.java | 10 +++++----- .../com/automq/stream/utils/PingS3Helper.java | 9 +++++---- .../kafka/tools/automq/GenerateS3UrlCmd.java | 1 - 8 files changed, 32 insertions(+), 20 deletions(-) diff --git a/automq-shell/src/main/java/com/automq/shell/log/LogUploader.java b/automq-shell/src/main/java/com/automq/shell/log/LogUploader.java index d80c1a5ce9..3f5e9696dc 100644 --- a/automq-shell/src/main/java/com/automq/shell/log/LogUploader.java +++ b/automq-shell/src/main/java/com/automq/shell/log/LogUploader.java @@ -119,7 +119,7 @@ private void initUploadComponent() { startFuture = CompletableFuture.runAsync(() -> { try { s3Operator = new DefaultS3Operator(config.s3Endpoint(), config.s3Region(), - config.s3OpsBucket(), config.s3PathStyle(), List.of(CredentialsProviderHolder.getAwsCredentialsProvider()), false); + config.s3OpsBucket(), config.s3PathStyle(), List.of(CredentialsProviderHolder.getAwsCredentialsProvider()), null); uploadThread = new Thread(new UploadTask()); uploadThread.setName("log-uploader-upload-thread"); uploadThread.setDaemon(true); diff --git a/automq-shell/src/main/java/com/automq/shell/metrics/S3MetricsExporter.java b/automq-shell/src/main/java/com/automq/shell/metrics/S3MetricsExporter.java index 01be7d78fb..e55976c9ef 100644 --- a/automq-shell/src/main/java/com/automq/shell/metrics/S3MetricsExporter.java +++ b/automq-shell/src/main/java/com/automq/shell/metrics/S3MetricsExporter.java @@ -61,7 +61,7 @@ public class S3MetricsExporter implements MetricExporter { public S3MetricsExporter(S3MetricsConfig config) { this.config = config; s3Operator = new DefaultS3Operator(config.s3Endpoint(), config.s3Region(), - config.s3OpsBucket(), config.s3PathStyle(), List.of(CredentialsProviderHolder.getAwsCredentialsProvider()), false); + config.s3OpsBucket(), config.s3PathStyle(), List.of(CredentialsProviderHolder.getAwsCredentialsProvider()), null); defalutTagMap.put("host_name", getHostName()); defalutTagMap.put("service_name", config.clusterId()); diff --git a/core/src/main/scala/kafka/admin/S3TestCommand.scala b/core/src/main/scala/kafka/admin/S3TestCommand.scala index 5e19f4775d..75ac9992af 100644 --- a/core/src/main/scala/kafka/admin/S3TestCommand.scala +++ b/core/src/main/scala/kafka/admin/S3TestCommand.scala @@ -54,7 +54,7 @@ object S3TestCommand extends Logging { .region(s3Region) .credentialsProviders(util.List.of(StaticCredentialsProvider.create(AwsBasicCredentials.create(s3AccessKey, s3SecretKey)))) .isForcePathStyle(forcePathStyle) - .tagging(tagging) + .tagging(if (tagging) util.Map.of("test-tag-key", "test-tag-value") else null) .needPrintToConsole(true) .build() pingS3Helper.pingS3() diff --git a/s3stream/src/main/java/com/automq/stream/s3/Config.java b/s3stream/src/main/java/com/automq/stream/s3/Config.java index 4a9ff053de..c95576105d 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/Config.java +++ b/s3stream/src/main/java/com/automq/stream/s3/Config.java @@ -11,6 +11,8 @@ package com.automq.stream.s3; +import java.util.Map; + // TODO: rename & init public class Config { private int nodeId; @@ -33,7 +35,7 @@ public class Config { private int streamSplitSize = 16777216; private int objectBlockSize = 1048576; private int objectPartSize = 16777216; - private boolean objectTagging = false; + private Map objectTagging = null; private long blockCacheSize = 100 * 1024 * 1024; private int streamObjectCompactionIntervalMinutes = 60; private long streamObjectCompactionMaxSizeBytes = 10737418240L; @@ -136,7 +138,7 @@ public int objectPartSize() { return objectPartSize; } - public boolean objectTagging() { + public Map objectTagging() { return objectTagging; } @@ -312,7 +314,7 @@ public Config objectPartSize(int s3ObjectPartSize) { return this; } - public Config objectTagging(boolean s3ObjectTagging) { + public Config objectTagging(Map s3ObjectTagging) { this.objectTagging = s3ObjectTagging; return this; } diff --git a/s3stream/src/main/java/com/automq/stream/s3/metadata/ObjectUtils.java b/s3stream/src/main/java/com/automq/stream/s3/metadata/ObjectUtils.java index 6a9d85e9d6..c7cc63ce4a 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metadata/ObjectUtils.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metadata/ObjectUtils.java @@ -11,6 +11,9 @@ package com.automq.stream.s3.metadata; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import software.amazon.awssdk.services.s3.model.Tag; import software.amazon.awssdk.services.s3.model.Tagging; @@ -65,10 +68,17 @@ public static long parseObjectId(int version, String key, String namespace) { } /** - * Common tagging for all objects, identifying the namespace + * Convert a list of tags to a tagging object. + * It will return null if the input is null. */ - public static Tagging tagging() { - Tag tag = Tag.builder().key(OBJECT_TAG_KEY).value(namespace).build(); - return Tagging.builder().tagSet(tag).build(); + public static Tagging tagging(Map tagging) { + if (null == tagging) { + return null; + } + List tags = tagging.entrySet().stream() + .map(e -> Tag.builder().key(e.getKey()).value(e.getValue())) + .map(Tag.Builder::build) + .collect(Collectors.toList()); + return Tagging.builder().tagSet(tags).build(); } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java index c0205ac62a..9350369966 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java @@ -132,20 +132,20 @@ public class DefaultS3Operator implements S3Operator { private boolean checkS3ApiModel = false; public DefaultS3Operator(String endpoint, String region, String bucket, boolean forcePathStyle, - List credentialsProviders, boolean tagging) { + List credentialsProviders, Map tagging) { this(endpoint, region, bucket, forcePathStyle, credentialsProviders, tagging, null, null, false, false); } public DefaultS3Operator(String endpoint, String region, String bucket, boolean forcePathStyle, List credentialsProviders, - boolean tagging, + Map tagging, AsyncNetworkBandwidthLimiter networkInboundBandwidthLimiter, AsyncNetworkBandwidthLimiter networkOutboundBandwidthLimiter, boolean readWriteIsolate, boolean checkS3ApiModel) { this.currentIndex = INDEX.incrementAndGet(); this.maxMergeReadSparsityRate = Utils.getMaxMergeReadSparsityRate(); this.networkInboundBandwidthLimiter = networkInboundBandwidthLimiter; this.networkOutboundBandwidthLimiter = networkOutboundBandwidthLimiter; - this.tagging = tagging ? tagging() : null; + this.tagging = tagging(tagging); int maxS3Concurrency = getMaxS3Concurrency(); this.writeS3Client = newS3Client(endpoint, region, forcePathStyle, credentialsProviders, maxS3Concurrency); this.readS3Client = readWriteIsolate ? newS3Client(endpoint, region, forcePathStyle, credentialsProviders, maxS3Concurrency) : writeS3Client; @@ -1016,7 +1016,7 @@ public static class Builder { private String bucket; private boolean forcePathStyle; private List credentialsProviders; - private boolean tagging; + private Map tagging; private AsyncNetworkBandwidthLimiter inboundLimiter; private AsyncNetworkBandwidthLimiter outboundLimiter; private boolean readWriteIsolate; @@ -1049,7 +1049,7 @@ public Builder credentialsProviders(List credentialsProv return this; } - public Builder tagging(boolean tagging) { + public Builder tagging(Map tagging) { this.tagging = tagging; return this; } diff --git a/s3stream/src/main/java/com/automq/stream/utils/PingS3Helper.java b/s3stream/src/main/java/com/automq/stream/utils/PingS3Helper.java index 89d142dbac..51840b20b5 100644 --- a/s3stream/src/main/java/com/automq/stream/utils/PingS3Helper.java +++ b/s3stream/src/main/java/com/automq/stream/utils/PingS3Helper.java @@ -16,6 +16,7 @@ import com.automq.stream.s3.operator.S3Operator; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.slf4j.Logger; @@ -47,7 +48,7 @@ public class PingS3Helper { private String bucket; private boolean isForcePathStyle; private List credentialsProviders; - private boolean tagging; + private Map tagging; private final boolean needPrintToConsole; private PingS3Helper(Builder builder) { @@ -65,7 +66,7 @@ public static class Builder { private String bucket; private boolean isForcePathStyle; private List credentialsProviders; - private boolean tagging; + private Map tagging; public Builder needPrintToConsole(boolean needPrintToConsole) { this.needPrintToConsole = needPrintToConsole; @@ -97,7 +98,7 @@ public Builder credentialsProviders(List credentialsProv return this; } - public Builder tagging(boolean tagging) { + public Builder tagging(Map tagging) { this.tagging = tagging; return this; } @@ -432,7 +433,7 @@ private void checkForcePathStyle(List advises) { } private void checkTagging(List advises) { - if (tagging) { + if (null != tagging) { advises.add("currently, it's only supported in AWS S3. Please make sure your object storage service supports tagging."); } } diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/GenerateS3UrlCmd.java b/tools/src/main/java/org/apache/kafka/tools/automq/GenerateS3UrlCmd.java index 69741ae887..7ce2b41d5c 100644 --- a/tools/src/main/java/org/apache/kafka/tools/automq/GenerateS3UrlCmd.java +++ b/tools/src/main/java/org/apache/kafka/tools/automq/GenerateS3UrlCmd.java @@ -157,7 +157,6 @@ public String run() { .region(parameter.s3Region) .credentialsProviders(List.of(() -> AwsBasicCredentials.create(parameter.s3AccessKey, parameter.s3SecretKey))) .isForcePathStyle(parameter.s3PathStyle) - .tagging(false) .needPrintToConsole(true) .build(); pingS3Helper.pingS3();