Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/s3/DefaultS3Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public DefaultS3Client(BrokerServer brokerServer, KafkaConfig config, S3Operator
this.streamManager = memoryMetadataManager;
this.objectManager = memoryMetadataManager;
this.blockCache = new DefaultS3BlockCache(objectManager, operator);
this.storage = new S3Storage(new MemoryWriteAheadLog(), objectManager, blockCache, operator);
this.storage = new S3Storage(config, new MemoryWriteAheadLog(), objectManager, blockCache, operator);
this.streamClient = new S3StreamClient(this.streamManager, this.storage);
this.kvClient = new KVClientImpl();
}
Expand Down
4 changes: 0 additions & 4 deletions core/src/main/scala/kafka/log/s3/ObjectWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ public ObjectWriter(long objectId, S3Operator s3Operator, int blockSizeThreshold
writer = s3Operator.writer(objectKey);
}

public ObjectWriter(long objectId, S3Operator s3Operator) {
this(objectId, s3Operator, 16 * 1024 * 1024, 32 * 1024 * 1024);
}

public void write(FlatStreamRecordBatch record) {
if (dataBlock == null) {
dataBlock = new DataBlock(nextDataBlockPosition);
Expand Down
10 changes: 7 additions & 3 deletions core/src/main/scala/kafka/log/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import kafka.log.s3.objects.ObjectManager;
import kafka.log.s3.operator.S3Operator;
import kafka.log.s3.wal.WriteAheadLog;
import kafka.server.KafkaConfig;
import org.apache.kafka.common.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -46,6 +47,7 @@

public class S3Storage implements Storage {
private static final Logger LOGGER = LoggerFactory.getLogger(S3Storage.class);
private final KafkaConfig config;
private final WriteAheadLog log;
private final LogCache logCache;
private final WALCallbackSequencer callbackSequencer = new WALCallbackSequencer();
Expand All @@ -57,9 +59,10 @@ public class S3Storage implements Storage {
private final S3Operator s3Operator;
private final S3BlockCache blockCache;

public S3Storage(WriteAheadLog log, ObjectManager objectManager, S3BlockCache blockCache, S3Operator s3Operator) {
public S3Storage(KafkaConfig config, WriteAheadLog log, ObjectManager objectManager, S3BlockCache blockCache, S3Operator s3Operator) {
this.config = config;
this.log = log;
this.logCache = new LogCache(512 * 1024 * 1024);
this.logCache = new LogCache(config.s3WALObjectSize());
this.objectManager = objectManager;
this.blockCache = blockCache;
this.s3Operator = s3Operator;
Expand Down Expand Up @@ -145,7 +148,8 @@ private CompletableFuture<Void> uploadWALObject(LogCache.LogCacheBlock logCacheB
private void uploadWALObject0(LogCache.LogCacheBlock logCacheBlock, CompletableFuture<Void> cf) {
// TODO: pipeline the WAL object upload to accelerate the upload.
try {
WALObjectUploadTask walObjectUploadTask = new WALObjectUploadTask(logCacheBlock.records(), 16 * 1024 * 1024, objectManager, s3Operator);
WALObjectUploadTask walObjectUploadTask = new WALObjectUploadTask(logCacheBlock.records(), objectManager, s3Operator,
config.s3ObjectBlockSizeProp(), config.s3ObjectPartSizeProp(), config.s3StreamSplitSizeProp());
walObjectUploadTask.prepare().get();
walObjectUploadTask.upload().get();
walObjectUploadTask.commit().get();
Expand Down
11 changes: 8 additions & 3 deletions core/src/main/scala/kafka/log/s3/WALObjectUploadTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,20 @@
public class WALObjectUploadTask {
private static final Logger LOGGER = LoggerFactory.getLogger(WALObjectUploadTask.class);
private final Map<Long, List<FlatStreamRecordBatch>> streamRecordsMap;
private final int objectBlockSize;
private final int objectPartSize;
private final int streamSplitSizeThreshold;
private final ObjectManager objectManager;
private final S3Operator s3Operator;
private final CompletableFuture<Long> prepareCf = new CompletableFuture<>();
private volatile CommitWALObjectRequest commitWALObjectRequest;
private final CompletableFuture<CommitWALObjectRequest> uploadCf = new CompletableFuture<>();

public WALObjectUploadTask(Map<Long, List<FlatStreamRecordBatch>> streamRecordsMap, int streamSplitSizeThreshold, ObjectManager objectManager, S3Operator s3Operator) {
public WALObjectUploadTask(Map<Long, List<FlatStreamRecordBatch>> streamRecordsMap, ObjectManager objectManager, S3Operator s3Operator,
int objectBlockSize, int objectPartSize, int streamSplitSizeThreshold) {
this.streamRecordsMap = streamRecordsMap;
this.objectBlockSize = objectBlockSize;
this.objectPartSize = objectPartSize;
this.streamSplitSizeThreshold = streamSplitSizeThreshold;
this.objectManager = objectManager;
this.s3Operator = s3Operator;
Expand All @@ -65,7 +70,7 @@ public CompletableFuture<CommitWALObjectRequest> upload() {
Collections.sort(streamIds);
CommitWALObjectRequest request = new CommitWALObjectRequest();

ObjectWriter walObject = new ObjectWriter(objectId, s3Operator);
ObjectWriter walObject = new ObjectWriter(objectId, s3Operator, objectBlockSize, objectPartSize);

List<CompletableFuture<StreamObject>> streamObjectCfList = new LinkedList<>();

Expand Down Expand Up @@ -116,7 +121,7 @@ private CompletableFuture<StreamObject> writeStreamObject(List<FlatStreamRecordB
CompletableFuture<Long> objectIdCf = objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(30));
// TODO: retry until success
return objectIdCf.thenCompose(objectId -> {
ObjectWriter streamObjectWriter = new ObjectWriter(objectId, s3Operator);
ObjectWriter streamObjectWriter = new ObjectWriter(objectId, s3Operator, objectBlockSize, objectPartSize);
for (FlatStreamRecordBatch record : streamRecords) {
streamObjectWriter.write(record);
}
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/kafka/log/s3/cache/LogCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@
import java.util.concurrent.atomic.AtomicLong;

public class LogCache {
private final int cacheBlockMaxSize;
private final long cacheBlockMaxSize;
private final List<LogCacheBlock> archiveBlocks = new ArrayList<>();
private LogCacheBlock activeBlock;
private long confirmOffset;

public LogCache(int cacheBlockMaxSize) {
public LogCache(long cacheBlockMaxSize) {
this.cacheBlockMaxSize = cacheBlockMaxSize;
this.activeBlock = new LogCacheBlock(cacheBlockMaxSize);
}
Expand Down Expand Up @@ -96,12 +96,12 @@ public void setConfirmOffset(long confirmOffset) {
public static class LogCacheBlock {
private static final AtomicLong BLOCK_ID_ALLOC = new AtomicLong();
private final long blockId;
private final int maxSize;
private final long maxSize;
private final Map<Long, List<FlatStreamRecordBatch>> map = new HashMap<>();
private int size = 0;
private long size = 0;
private long confirmOffset;

public LogCacheBlock(int maxSize) {
public LogCacheBlock(long maxSize) {
this.blockId = BLOCK_ID_ALLOC.getAndIncrement();
this.maxSize = maxSize;
}
Expand Down
25 changes: 21 additions & 4 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -675,12 +675,21 @@ object KafkaConfig {
val S3EndpointProp = "s3.endpoint"
val S3RegionProp = "s3.region"
val S3BucketProp = "s3.bucket"

val S3EndpointDoc = "Specifies the S3 endpoint, ex. <code>https://s3.{region}.amazonaws.com</code>."
val S3RegionDoc = "Specifies the S3 region, ex. <code>us-east-1</code>."
val S3BucketDoc = "Specifies the S3 bucket, ex. <code>my-bucket</code>."
val S3WALObjectSizeProp = "s3.wal.object.size"
val S3StreamSplitSizeProp = "s3.stream.object.split.size"
val S3ObjectBlockSizeProp = "s3.object.block.size"
val S3ObjectPartSizeProp = "s3.object.part.size"

val S3EndpointDoc = "The S3 endpoint, ex. <code>https://s3.{region}.amazonaws.com</code>."
val S3RegionDoc = "The S3 region, ex. <code>us-east-1</code>."
val S3BucketDoc = "The S3 bucket, ex. <code>my-bucket</code>."
val S3WALObjectSizeDoc = "The S3 WAL object size threshold."
val S3StreamSplitSizeDoc = "The S3 stream object split size threshold when upload WAL object or compact object."
val S3ObjectBlockSizeDoc = "The S3 object compressed block size threshold."
val S3ObjectPartSizeDoc = "The S3 object multi-part upload part size threshold."

// Kafka on S3 inject end

/* Documentation */
/** ********* Zookeeper Configuration ***********/
val ZkConnectDoc = "Specifies the ZooKeeper connection string in the form <code>hostname:port</code> where host and port are the " +
Expand Down Expand Up @@ -1479,6 +1488,10 @@ object KafkaConfig {
.define(S3EndpointProp, STRING, null, HIGH, S3EndpointDoc)
.define(S3RegionProp, STRING, null, HIGH, S3RegionDoc)
.define(S3BucketProp, STRING, null, HIGH, S3BucketDoc)
.define(S3WALObjectSizeProp, LONG, 524288000, MEDIUM, S3WALObjectSizeDoc)
.define(S3StreamSplitSizeProp, INT, 16777216, MEDIUM, S3StreamSplitSizeDoc)
.define(S3ObjectBlockSizeProp, INT, 8388608, MEDIUM, S3ObjectBlockSizeDoc)
.define(S3ObjectPartSizeProp, INT, 16777216, MEDIUM, S3ObjectPartSizeDoc)
// Kafka on S3 inject end
}

Expand Down Expand Up @@ -2017,6 +2030,10 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val s3Endpoint = getString(KafkaConfig.S3EndpointProp)
val s3Region = getString(KafkaConfig.S3RegionProp)
val s3Bucket = getString(KafkaConfig.S3BucketProp)
val s3WALObjectSize = getLong(KafkaConfig.S3WALObjectSizeProp)
val s3StreamSplitSizeProp = getInt(KafkaConfig.S3StreamSplitSizeProp)
val s3ObjectBlockSizeProp = getInt(KafkaConfig.S3ObjectBlockSizeProp)
val s3ObjectPartSizeProp = getInt(KafkaConfig.S3ObjectPartSizeProp)
// Kafka on S3 inject end

def addReconfigurable(reconfigurable: Reconfigurable): Unit = {
Expand Down
4 changes: 3 additions & 1 deletion core/src/test/java/kafka/log/s3/S3StorageTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import kafka.log.s3.operator.MemoryS3Operator;
import kafka.log.s3.operator.S3Operator;
import kafka.log.s3.wal.MemoryWriteAheadLog;
import kafka.server.KafkaConfig;
import kafka.utils.TestUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -54,7 +56,7 @@ public class S3StorageTest {
public void setup() {
objectManager = mock(ObjectManager.class);
S3Operator s3Operator = new MemoryS3Operator();
storage = new S3Storage(new MemoryWriteAheadLog(), objectManager, new DefaultS3BlockCache(objectManager, s3Operator), s3Operator);
storage = new S3Storage(KafkaConfig.fromProps(TestUtils.defaultBrokerConfig()), new MemoryWriteAheadLog(), objectManager, new DefaultS3BlockCache(objectManager, s3Operator), s3Operator);
}

@Test
Expand Down
4 changes: 3 additions & 1 deletion core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import kafka.log.s3.operator.S3Operator;
import kafka.log.s3.streams.StreamManager;
import kafka.log.s3.wal.MemoryWriteAheadLog;
import kafka.server.KafkaConfig;
import kafka.utils.TestUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -112,7 +114,7 @@ public void setUp() {
objectManager = manager;
operator = new MemoryS3Operator();
blockCache = new DefaultS3BlockCache(objectManager, operator);
storage = new S3Storage(new MemoryWriteAheadLog(), objectManager, blockCache, operator);
storage = new S3Storage(KafkaConfig.fromProps(TestUtils.defaultBrokerConfig()), new MemoryWriteAheadLog(), objectManager, blockCache, operator);
streamClient = new S3StreamClient(streamManager, storage);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void testTryCompact() throws Exception {
FlatStreamRecordBatch.from(new StreamRecordBatch(234, 0, 22, DefaultRecordBatch.of(2, 128)))
));

walObjectUploadTask = new WALObjectUploadTask(map, 1000, objectManager, s3Operator);
walObjectUploadTask = new WALObjectUploadTask(map, objectManager, s3Operator, 16 * 1024 * 1024, 16 * 1024 * 1024, 1000);

walObjectUploadTask.prepare().get();
walObjectUploadTask.upload().get();
Expand Down
4 changes: 4 additions & 0 deletions core/src/test/scala/unit/kafka/utils/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,10 @@ object TestUtils extends Logging {
props
}

def defaultBrokerConfig(): Properties = {
createBrokerConfig(1, "")
}

/**
* Create a test config for the provided parameters.
*
Expand Down