From 552d15a117b8566e746fa5a12751a56c6806a912 Mon Sep 17 00:00:00 2001 From: Robin Han Date: Sat, 2 Sep 2023 20:17:27 +0800 Subject: [PATCH] feat(stream-client): add s3 config Signed-off-by: Robin Han --- .../scala/kafka/log/s3/DefaultS3Client.java | 2 +- .../main/scala/kafka/log/s3/ObjectWriter.java | 4 --- .../main/scala/kafka/log/s3/S3Storage.java | 10 +++++--- .../kafka/log/s3/WALObjectUploadTask.java | 11 +++++--- .../scala/kafka/log/s3/cache/LogCache.java | 10 ++++---- .../main/scala/kafka/server/KafkaConfig.scala | 25 ++++++++++++++++--- .../test/java/kafka/log/s3/S3StorageTest.java | 4 ++- .../java/kafka/log/s3/S3StreamMemoryTest.java | 4 ++- .../kafka/log/s3/WALObjectUploadTaskTest.java | 2 +- .../scala/unit/kafka/utils/TestUtils.scala | 4 +++ 10 files changed, 53 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/kafka/log/s3/DefaultS3Client.java b/core/src/main/scala/kafka/log/s3/DefaultS3Client.java index e43e4056de..1b769f707b 100644 --- a/core/src/main/scala/kafka/log/s3/DefaultS3Client.java +++ b/core/src/main/scala/kafka/log/s3/DefaultS3Client.java @@ -69,7 +69,7 @@ public DefaultS3Client(BrokerServer brokerServer, KafkaConfig config, S3Operator this.streamManager = memoryMetadataManager; this.objectManager = memoryMetadataManager; this.blockCache = new DefaultS3BlockCache(objectManager, operator); - this.storage = new S3Storage(new MemoryWriteAheadLog(), objectManager, blockCache, operator); + this.storage = new S3Storage(config, new MemoryWriteAheadLog(), objectManager, blockCache, operator); this.streamClient = new S3StreamClient(this.streamManager, this.storage); this.kvClient = new KVClientImpl(); } diff --git a/core/src/main/scala/kafka/log/s3/ObjectWriter.java b/core/src/main/scala/kafka/log/s3/ObjectWriter.java index 2bf4455cb9..4d6b1db788 100644 --- a/core/src/main/scala/kafka/log/s3/ObjectWriter.java +++ b/core/src/main/scala/kafka/log/s3/ObjectWriter.java @@ -58,10 +58,6 @@ public ObjectWriter(long objectId, S3Operator s3Operator, int blockSizeThreshold writer = s3Operator.writer(objectKey); } - public ObjectWriter(long objectId, S3Operator s3Operator) { - this(objectId, s3Operator, 16 * 1024 * 1024, 32 * 1024 * 1024); - } - public void write(FlatStreamRecordBatch record) { if (dataBlock == null) { dataBlock = new DataBlock(nextDataBlockPosition); diff --git a/core/src/main/scala/kafka/log/s3/S3Storage.java b/core/src/main/scala/kafka/log/s3/S3Storage.java index b17f98b02f..de53caaf0d 100644 --- a/core/src/main/scala/kafka/log/s3/S3Storage.java +++ b/core/src/main/scala/kafka/log/s3/S3Storage.java @@ -25,6 +25,7 @@ import kafka.log.s3.objects.ObjectManager; import kafka.log.s3.operator.S3Operator; import kafka.log.s3.wal.WriteAheadLog; +import kafka.server.KafkaConfig; import org.apache.kafka.common.utils.ThreadUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +47,7 @@ public class S3Storage implements Storage { private static final Logger LOGGER = LoggerFactory.getLogger(S3Storage.class); + private final KafkaConfig config; private final WriteAheadLog log; private final LogCache logCache; private final WALCallbackSequencer callbackSequencer = new WALCallbackSequencer(); @@ -57,9 +59,10 @@ public class S3Storage implements Storage { private final S3Operator s3Operator; private final S3BlockCache blockCache; - public S3Storage(WriteAheadLog log, ObjectManager objectManager, S3BlockCache blockCache, S3Operator s3Operator) { + public S3Storage(KafkaConfig config, WriteAheadLog log, ObjectManager objectManager, S3BlockCache blockCache, S3Operator s3Operator) { + this.config = config; this.log = log; - this.logCache = new LogCache(512 * 1024 * 1024); + this.logCache = new LogCache(config.s3WALObjectSize()); this.objectManager = objectManager; this.blockCache = blockCache; this.s3Operator = s3Operator; @@ -145,7 +148,8 @@ private CompletableFuture uploadWALObject(LogCache.LogCacheBlock logCacheB private void uploadWALObject0(LogCache.LogCacheBlock logCacheBlock, CompletableFuture cf) { // TODO: pipeline the WAL object upload to accelerate the upload. try { - WALObjectUploadTask walObjectUploadTask = new WALObjectUploadTask(logCacheBlock.records(), 16 * 1024 * 1024, objectManager, s3Operator); + WALObjectUploadTask walObjectUploadTask = new WALObjectUploadTask(logCacheBlock.records(), objectManager, s3Operator, + config.s3ObjectBlockSizeProp(), config.s3ObjectPartSizeProp(), config.s3StreamSplitSizeProp()); walObjectUploadTask.prepare().get(); walObjectUploadTask.upload().get(); walObjectUploadTask.commit().get(); diff --git a/core/src/main/scala/kafka/log/s3/WALObjectUploadTask.java b/core/src/main/scala/kafka/log/s3/WALObjectUploadTask.java index b380182ece..e2ef6e8ebb 100644 --- a/core/src/main/scala/kafka/log/s3/WALObjectUploadTask.java +++ b/core/src/main/scala/kafka/log/s3/WALObjectUploadTask.java @@ -36,6 +36,8 @@ public class WALObjectUploadTask { private static final Logger LOGGER = LoggerFactory.getLogger(WALObjectUploadTask.class); private final Map> streamRecordsMap; + private final int objectBlockSize; + private final int objectPartSize; private final int streamSplitSizeThreshold; private final ObjectManager objectManager; private final S3Operator s3Operator; @@ -43,8 +45,11 @@ public class WALObjectUploadTask { private volatile CommitWALObjectRequest commitWALObjectRequest; private final CompletableFuture uploadCf = new CompletableFuture<>(); - public WALObjectUploadTask(Map> streamRecordsMap, int streamSplitSizeThreshold, ObjectManager objectManager, S3Operator s3Operator) { + public WALObjectUploadTask(Map> streamRecordsMap, ObjectManager objectManager, S3Operator s3Operator, + int objectBlockSize, int objectPartSize, int streamSplitSizeThreshold) { this.streamRecordsMap = streamRecordsMap; + this.objectBlockSize = objectBlockSize; + this.objectPartSize = objectPartSize; this.streamSplitSizeThreshold = streamSplitSizeThreshold; this.objectManager = objectManager; this.s3Operator = s3Operator; @@ -65,7 +70,7 @@ public CompletableFuture upload() { Collections.sort(streamIds); CommitWALObjectRequest request = new CommitWALObjectRequest(); - ObjectWriter walObject = new ObjectWriter(objectId, s3Operator); + ObjectWriter walObject = new ObjectWriter(objectId, s3Operator, objectBlockSize, objectPartSize); List> streamObjectCfList = new LinkedList<>(); @@ -116,7 +121,7 @@ private CompletableFuture writeStreamObject(List objectIdCf = objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(30)); // TODO: retry until success return objectIdCf.thenCompose(objectId -> { - ObjectWriter streamObjectWriter = new ObjectWriter(objectId, s3Operator); + ObjectWriter streamObjectWriter = new ObjectWriter(objectId, s3Operator, objectBlockSize, objectPartSize); for (FlatStreamRecordBatch record : streamRecords) { streamObjectWriter.write(record); } diff --git a/core/src/main/scala/kafka/log/s3/cache/LogCache.java b/core/src/main/scala/kafka/log/s3/cache/LogCache.java index ca730e50c3..c840e014e2 100644 --- a/core/src/main/scala/kafka/log/s3/cache/LogCache.java +++ b/core/src/main/scala/kafka/log/s3/cache/LogCache.java @@ -29,12 +29,12 @@ import java.util.concurrent.atomic.AtomicLong; public class LogCache { - private final int cacheBlockMaxSize; + private final long cacheBlockMaxSize; private final List archiveBlocks = new ArrayList<>(); private LogCacheBlock activeBlock; private long confirmOffset; - public LogCache(int cacheBlockMaxSize) { + public LogCache(long cacheBlockMaxSize) { this.cacheBlockMaxSize = cacheBlockMaxSize; this.activeBlock = new LogCacheBlock(cacheBlockMaxSize); } @@ -96,12 +96,12 @@ public void setConfirmOffset(long confirmOffset) { public static class LogCacheBlock { private static final AtomicLong BLOCK_ID_ALLOC = new AtomicLong(); private final long blockId; - private final int maxSize; + private final long maxSize; private final Map> map = new HashMap<>(); - private int size = 0; + private long size = 0; private long confirmOffset; - public LogCacheBlock(int maxSize) { + public LogCacheBlock(long maxSize) { this.blockId = BLOCK_ID_ALLOC.getAndIncrement(); this.maxSize = maxSize; } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index b740186754..91814b0058 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -675,12 +675,21 @@ object KafkaConfig { val S3EndpointProp = "s3.endpoint" val S3RegionProp = "s3.region" val S3BucketProp = "s3.bucket" - - val S3EndpointDoc = "Specifies the S3 endpoint, ex. https://s3.{region}.amazonaws.com." - val S3RegionDoc = "Specifies the S3 region, ex. us-east-1." - val S3BucketDoc = "Specifies the S3 bucket, ex. my-bucket." + val S3WALObjectSizeProp = "s3.wal.object.size" + val S3StreamSplitSizeProp = "s3.stream.object.split.size" + val S3ObjectBlockSizeProp = "s3.object.block.size" + val S3ObjectPartSizeProp = "s3.object.part.size" + + val S3EndpointDoc = "The S3 endpoint, ex. https://s3.{region}.amazonaws.com." + val S3RegionDoc = "The S3 region, ex. us-east-1." + val S3BucketDoc = "The S3 bucket, ex. my-bucket." + val S3WALObjectSizeDoc = "The S3 WAL object size threshold." + val S3StreamSplitSizeDoc = "The S3 stream object split size threshold when upload WAL object or compact object." + val S3ObjectBlockSizeDoc = "The S3 object compressed block size threshold." + val S3ObjectPartSizeDoc = "The S3 object multi-part upload part size threshold." // Kafka on S3 inject end + /* Documentation */ /** ********* Zookeeper Configuration ***********/ val ZkConnectDoc = "Specifies the ZooKeeper connection string in the form hostname:port where host and port are the " + @@ -1479,6 +1488,10 @@ object KafkaConfig { .define(S3EndpointProp, STRING, null, HIGH, S3EndpointDoc) .define(S3RegionProp, STRING, null, HIGH, S3RegionDoc) .define(S3BucketProp, STRING, null, HIGH, S3BucketDoc) + .define(S3WALObjectSizeProp, LONG, 524288000, MEDIUM, S3WALObjectSizeDoc) + .define(S3StreamSplitSizeProp, INT, 16777216, MEDIUM, S3StreamSplitSizeDoc) + .define(S3ObjectBlockSizeProp, INT, 8388608, MEDIUM, S3ObjectBlockSizeDoc) + .define(S3ObjectPartSizeProp, INT, 16777216, MEDIUM, S3ObjectPartSizeDoc) // Kafka on S3 inject end } @@ -2017,6 +2030,10 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val s3Endpoint = getString(KafkaConfig.S3EndpointProp) val s3Region = getString(KafkaConfig.S3RegionProp) val s3Bucket = getString(KafkaConfig.S3BucketProp) + val s3WALObjectSize = getLong(KafkaConfig.S3WALObjectSizeProp) + val s3StreamSplitSizeProp = getInt(KafkaConfig.S3StreamSplitSizeProp) + val s3ObjectBlockSizeProp = getInt(KafkaConfig.S3ObjectBlockSizeProp) + val s3ObjectPartSizeProp = getInt(KafkaConfig.S3ObjectPartSizeProp) // Kafka on S3 inject end def addReconfigurable(reconfigurable: Reconfigurable): Unit = { diff --git a/core/src/test/java/kafka/log/s3/S3StorageTest.java b/core/src/test/java/kafka/log/s3/S3StorageTest.java index 423cc0c4fd..497ab07fd8 100644 --- a/core/src/test/java/kafka/log/s3/S3StorageTest.java +++ b/core/src/test/java/kafka/log/s3/S3StorageTest.java @@ -27,6 +27,8 @@ import kafka.log.s3.operator.MemoryS3Operator; import kafka.log.s3.operator.S3Operator; import kafka.log.s3.wal.MemoryWriteAheadLog; +import kafka.server.KafkaConfig; +import kafka.utils.TestUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -54,7 +56,7 @@ public class S3StorageTest { public void setup() { objectManager = mock(ObjectManager.class); S3Operator s3Operator = new MemoryS3Operator(); - storage = new S3Storage(new MemoryWriteAheadLog(), objectManager, new DefaultS3BlockCache(objectManager, s3Operator), s3Operator); + storage = new S3Storage(KafkaConfig.fromProps(TestUtils.defaultBrokerConfig()), new MemoryWriteAheadLog(), objectManager, new DefaultS3BlockCache(objectManager, s3Operator), s3Operator); } @Test diff --git a/core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java b/core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java index bd9d8a6163..0ed3508788 100644 --- a/core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java +++ b/core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java @@ -51,6 +51,8 @@ import kafka.log.s3.operator.S3Operator; import kafka.log.s3.streams.StreamManager; import kafka.log.s3.wal.MemoryWriteAheadLog; +import kafka.server.KafkaConfig; +import kafka.utils.TestUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -112,7 +114,7 @@ public void setUp() { objectManager = manager; operator = new MemoryS3Operator(); blockCache = new DefaultS3BlockCache(objectManager, operator); - storage = new S3Storage(new MemoryWriteAheadLog(), objectManager, blockCache, operator); + storage = new S3Storage(KafkaConfig.fromProps(TestUtils.defaultBrokerConfig()), new MemoryWriteAheadLog(), objectManager, blockCache, operator); streamClient = new S3StreamClient(streamManager, storage); } diff --git a/core/src/test/java/kafka/log/s3/WALObjectUploadTaskTest.java b/core/src/test/java/kafka/log/s3/WALObjectUploadTaskTest.java index d3ea450d27..5f6538049a 100644 --- a/core/src/test/java/kafka/log/s3/WALObjectUploadTaskTest.java +++ b/core/src/test/java/kafka/log/s3/WALObjectUploadTaskTest.java @@ -75,7 +75,7 @@ public void testTryCompact() throws Exception { FlatStreamRecordBatch.from(new StreamRecordBatch(234, 0, 22, DefaultRecordBatch.of(2, 128))) )); - walObjectUploadTask = new WALObjectUploadTask(map, 1000, objectManager, s3Operator); + walObjectUploadTask = new WALObjectUploadTask(map, objectManager, s3Operator, 16 * 1024 * 1024, 16 * 1024 * 1024, 1000); walObjectUploadTask.prepare().get(); walObjectUploadTask.upload().get(); diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index b4a0e0f23e..b64805321a 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -273,6 +273,10 @@ object TestUtils extends Logging { props } + def defaultBrokerConfig(): Properties = { + createBrokerConfig(1, "") + } + /** * Create a test config for the provided parameters. *