Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private void initUploadComponent() {
startFuture = CompletableFuture.runAsync(() -> {
try {
s3Operator = new DefaultS3Operator(config.s3Endpoint(), config.s3Region(),
config.s3OpsBucket(), config.s3PathStyle(), List.of(CredentialsProviderHolder.getAwsCredentialsProvider()), false);
config.s3OpsBucket(), config.s3PathStyle(), List.of(CredentialsProviderHolder.getAwsCredentialsProvider()), null);
uploadThread = new Thread(new UploadTask());
uploadThread.setName("log-uploader-upload-thread");
uploadThread.setDaemon(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class S3MetricsExporter implements MetricExporter {
public S3MetricsExporter(S3MetricsConfig config) {
this.config = config;
s3Operator = new DefaultS3Operator(config.s3Endpoint(), config.s3Region(),
config.s3OpsBucket(), config.s3PathStyle(), List.of(CredentialsProviderHolder.getAwsCredentialsProvider()), false);
config.s3OpsBucket(), config.s3PathStyle(), List.of(CredentialsProviderHolder.getAwsCredentialsProvider()), null);

defalutTagMap.put("host_name", getHostName());
defalutTagMap.put("service_name", config.clusterId());
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/admin/S3TestCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ object S3TestCommand extends Logging {
.region(s3Region)
.credentialsProviders(util.List.of(StaticCredentialsProvider.create(AwsBasicCredentials.create(s3AccessKey, s3SecretKey))))
.isForcePathStyle(forcePathStyle)
.tagging(tagging)
.tagging(if (tagging) util.Map.of("test-tag-key", "test-tag-value") else null)
.needPrintToConsole(true)
.build()
pingS3Helper.pingS3()
Expand Down
1 change: 0 additions & 1 deletion core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public static Config to(KafkaConfig s) {
.streamSplitSize(s.s3StreamSplitSize())
.objectBlockSize(s.s3ObjectBlockSize())
.objectPartSize(s.s3ObjectPartSize())
.objectTagging(s.s3ObjectTagging())
.blockCacheSize(s.s3BlockCacheSize())
.streamObjectCompactionIntervalMinutes(s.s3StreamObjectCompactionTaskIntervalMinutes())
.streamObjectCompactionMaxSizeBytes(s.s3StreamObjectCompactionMaxSizeBytes())
Expand Down
4 changes: 0 additions & 4 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,6 @@ object KafkaConfig {
val S3StreamSplitSizeProp = "s3.stream.object.split.size"
val S3ObjectBlockSizeProp = "s3.object.block.size"
val S3ObjectPartSizeProp = "s3.object.part.size"
val S3ObjectTaggingProp = "s3.object.tagging"
val S3BlockCacheSizeProp = "s3.block.cache.size"
val S3StreamAllocatorPolicyProp = "s3.stream.allocator.policy"
val S3StreamObjectCompactionIntervalMinutesProp = "s3.stream.object.compaction.interval.minutes"
Expand Down Expand Up @@ -515,7 +514,6 @@ object KafkaConfig {
val S3StreamSplitSizeDoc = "The S3 stream object split size threshold when upload delta WAL or compact stream set object."
val S3ObjectBlockSizeDoc = "The S3 object compressed block size threshold."
val S3ObjectPartSizeDoc = "The S3 object multi-part upload part size threshold."
val S3ObjectTaggingDoc = "Whether to tag S3 objects"
val S3BlockCacheSizeDoc = "s3.block.cache.size is the size of the block cache. The block cache is used to cache cold data read from object storage. " +
"It is recommended to set this configuration item to be greater than 4MB * the concurrent cold reads per partition, to achieve better cold read performance."
val S3StreamAllocatorPolicyDoc = "The S3 stream memory allocator policy, supported value: " + ByteBufAllocPolicy.values().mkString(", ") + ".\n" +
Expand Down Expand Up @@ -1425,7 +1423,6 @@ object KafkaConfig {
.define(S3StreamSplitSizeProp, INT, 8388608, MEDIUM, S3StreamSplitSizeDoc)
.define(S3ObjectBlockSizeProp, INT, 524288, MEDIUM, S3ObjectBlockSizeDoc)
.define(S3ObjectPartSizeProp, INT, 16777216, MEDIUM, S3ObjectPartSizeDoc)
.define(S3ObjectTaggingProp, BOOLEAN, false, MEDIUM, S3ObjectTaggingDoc)
.define(S3BlockCacheSizeProp, LONG, -1L, MEDIUM, S3BlockCacheSizeDoc)
.define(S3StreamAllocatorPolicyProp, STRING, ByteBufAllocPolicy.POOLED_HEAP.name, MEDIUM, S3StreamAllocatorPolicyDoc)
.define(S3StreamObjectCompactionIntervalMinutesProp, INT, 30, MEDIUM, S3StreamObjectCompactionIntervalMinutesDoc)
Expand Down Expand Up @@ -2150,7 +2147,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val s3StreamSplitSize = getInt(KafkaConfig.S3StreamSplitSizeProp)
val s3ObjectBlockSize = getInt(KafkaConfig.S3ObjectBlockSizeProp)
val s3ObjectPartSize = getInt(KafkaConfig.S3ObjectPartSizeProp)
val s3ObjectTagging = getBoolean(KafkaConfig.S3ObjectTaggingProp)
val s3StreamAllocatorPolicy = Enum.valueOf(classOf[ByteBufAllocPolicy], getString(KafkaConfig.S3StreamAllocatorPolicyProp))
val (s3WALCacheSize, s3BlockCacheSize, s3WALUploadThreshold) = adjustS3Configs(s3StreamAllocatorPolicy, s3WALCapacity)
val s3StreamObjectCompactionTaskIntervalMinutes = getInt(KafkaConfig.S3StreamObjectCompactionIntervalMinutesProp)
Expand Down
8 changes: 5 additions & 3 deletions s3stream/src/main/java/com/automq/stream/s3/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

package com.automq.stream.s3;

import java.util.Map;

// TODO: rename & init
public class Config {
private int nodeId;
Expand All @@ -33,7 +35,7 @@ public class Config {
private int streamSplitSize = 16777216;
private int objectBlockSize = 1048576;
private int objectPartSize = 16777216;
private boolean objectTagging = false;
private Map<String, String> objectTagging = null;
private long blockCacheSize = 100 * 1024 * 1024;
private int streamObjectCompactionIntervalMinutes = 60;
private long streamObjectCompactionMaxSizeBytes = 10737418240L;
Expand Down Expand Up @@ -136,7 +138,7 @@ public int objectPartSize() {
return objectPartSize;
}

public boolean objectTagging() {
public Map<String, String> objectTagging() {
return objectTagging;
}

Expand Down Expand Up @@ -312,7 +314,7 @@ public Config objectPartSize(int s3ObjectPartSize) {
return this;
}

public Config objectTagging(boolean s3ObjectTagging) {
public Config objectTagging(Map<String, String> s3ObjectTagging) {
this.objectTagging = s3ObjectTagging;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@

package com.automq.stream.s3.metadata;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import software.amazon.awssdk.services.s3.model.Tag;
import software.amazon.awssdk.services.s3.model.Tagging;

Expand Down Expand Up @@ -65,10 +68,17 @@ public static long parseObjectId(int version, String key, String namespace) {
}

/**
* Common tagging for all objects, identifying the namespace
* Convert a list of tags to a tagging object.
* It will return null if the input is null.
*/
public static Tagging tagging() {
Tag tag = Tag.builder().key(OBJECT_TAG_KEY).value(namespace).build();
return Tagging.builder().tagSet(tag).build();
public static Tagging tagging(Map<String, String> tagging) {
if (null == tagging) {
return null;
}
List<Tag> tags = tagging.entrySet().stream()
.map(e -> Tag.builder().key(e.getKey()).value(e.getValue()))
.map(Tag.Builder::build)
.collect(Collectors.toList());
return Tagging.builder().tagSet(tags).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,20 +132,20 @@ public class DefaultS3Operator implements S3Operator {
private boolean checkS3ApiModel = false;

public DefaultS3Operator(String endpoint, String region, String bucket, boolean forcePathStyle,
List<AwsCredentialsProvider> credentialsProviders, boolean tagging) {
List<AwsCredentialsProvider> credentialsProviders, Map<String, String> tagging) {
this(endpoint, region, bucket, forcePathStyle, credentialsProviders, tagging, null, null, false, false);
}

public DefaultS3Operator(String endpoint, String region, String bucket, boolean forcePathStyle,
List<AwsCredentialsProvider> credentialsProviders,
boolean tagging,
Map<String, String> tagging,
AsyncNetworkBandwidthLimiter networkInboundBandwidthLimiter,
AsyncNetworkBandwidthLimiter networkOutboundBandwidthLimiter, boolean readWriteIsolate, boolean checkS3ApiModel) {
this.currentIndex = INDEX.incrementAndGet();
this.maxMergeReadSparsityRate = Utils.getMaxMergeReadSparsityRate();
this.networkInboundBandwidthLimiter = networkInboundBandwidthLimiter;
this.networkOutboundBandwidthLimiter = networkOutboundBandwidthLimiter;
this.tagging = tagging ? tagging() : null;
this.tagging = tagging(tagging);
int maxS3Concurrency = getMaxS3Concurrency();
this.writeS3Client = newS3Client(endpoint, region, forcePathStyle, credentialsProviders, maxS3Concurrency);
this.readS3Client = readWriteIsolate ? newS3Client(endpoint, region, forcePathStyle, credentialsProviders, maxS3Concurrency) : writeS3Client;
Expand Down Expand Up @@ -1016,7 +1016,7 @@ public static class Builder {
private String bucket;
private boolean forcePathStyle;
private List<AwsCredentialsProvider> credentialsProviders;
private boolean tagging;
private Map<String, String> tagging;
private AsyncNetworkBandwidthLimiter inboundLimiter;
private AsyncNetworkBandwidthLimiter outboundLimiter;
private boolean readWriteIsolate;
Expand Down Expand Up @@ -1049,7 +1049,7 @@ public Builder credentialsProviders(List<AwsCredentialsProvider> credentialsProv
return this;
}

public Builder tagging(boolean tagging) {
public Builder tagging(Map<String, String> tagging) {
this.tagging = tagging;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.automq.stream.s3.operator.S3Operator;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -47,7 +48,7 @@ public class PingS3Helper {
private String bucket;
private boolean isForcePathStyle;
private List<AwsCredentialsProvider> credentialsProviders;
private boolean tagging;
private Map<String, String> tagging;
private final boolean needPrintToConsole;

private PingS3Helper(Builder builder) {
Expand All @@ -65,7 +66,7 @@ public static class Builder {
private String bucket;
private boolean isForcePathStyle;
private List<AwsCredentialsProvider> credentialsProviders;
private boolean tagging;
private Map<String, String> tagging;

public Builder needPrintToConsole(boolean needPrintToConsole) {
this.needPrintToConsole = needPrintToConsole;
Expand Down Expand Up @@ -97,7 +98,7 @@ public Builder credentialsProviders(List<AwsCredentialsProvider> credentialsProv
return this;
}

public Builder tagging(boolean tagging) {
public Builder tagging(Map<String, String> tagging) {
this.tagging = tagging;
return this;
}
Expand Down Expand Up @@ -432,7 +433,7 @@ private void checkForcePathStyle(List<String> advises) {
}

private void checkTagging(List<String> advises) {
if (tagging) {
if (null != tagging) {
advises.add("currently, it's only supported in AWS S3. Please make sure your object storage service supports tagging.");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ public String run() {
.region(parameter.s3Region)
.credentialsProviders(List.of(() -> AwsBasicCredentials.create(parameter.s3AccessKey, parameter.s3SecretKey)))
.isForcePathStyle(parameter.s3PathStyle)
.tagging(false)
.needPrintToConsole(true)
.build();
pingS3Helper.pingS3();
Expand Down