Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 25 additions & 9 deletions core/src/main/scala/kafka/log/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand All @@ -41,13 +42,13 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;

public class S3Storage implements Storage {
private static final Logger LOGGER = LoggerFactory.getLogger(S3Storage.class);
private final long maxWALCacheSize;
private final KafkaConfig config;
private final WriteAheadLog log;
private final LogCache logCache;
Expand All @@ -64,6 +65,7 @@ public class S3Storage implements Storage {

public S3Storage(KafkaConfig config, WriteAheadLog log, ObjectManager objectManager, S3BlockCache blockCache, S3Operator s3Operator) {
this.config = config;
this.maxWALCacheSize = config.s3WALCacheSize();
this.log = log;
this.logCache = new LogCache(config.s3WALObjectSize());
this.objectManager = objectManager;
Expand All @@ -79,12 +81,12 @@ public void close() {

@Override
public CompletableFuture<Void> append(StreamRecordBatch streamRecord) {
for (;;) {
if (logCache.size() < 2L * 1024 * 1024 * 1024) {
for (; ; ) {
if (logCache.size() < maxWALCacheSize) {
break;
} else {
// TODO: log limit
LOGGER.warn("log cache size {} is larger than 2GB, wait 100ms", logCache.size());
LOGGER.warn("log cache size {} is larger than {}, wait 100ms", maxWALCacheSize, logCache.size());
try {
//noinspection BusyWait
Thread.sleep(100);
Expand All @@ -97,7 +99,7 @@ public CompletableFuture<Void> append(StreamRecordBatch streamRecord) {
WriteAheadLog.AppendResult appendResult = log.append(streamRecord.encoded());
CompletableFuture<Void> cf = new CompletableFuture<>();
WalWriteRequest writeRequest = new WalWriteRequest(streamRecord, appendResult.offset, cf);
callbackSequencer.before(writeRequest);
handleAppendRequest(writeRequest);
appendResult.future.thenAccept(nil -> handleAppendCallback(writeRequest));
return cf;
}
Expand Down Expand Up @@ -136,10 +138,15 @@ public CompletableFuture<Void> forceUpload(long streamId) {
} else {
cf.complete(null);
}
callbackSequencer.tryFree(streamId);
});
return cf;
}

private void handleAppendRequest(WalWriteRequest request) {
mainExecutor.execute(() -> callbackSequencer.before(request));
}

private void handleAppendCallback(WalWriteRequest request) {
mainExecutor.execute(() -> {
List<WalWriteRequest> waitingAckRequests = callbackSequencer.after(request);
Expand Down Expand Up @@ -224,9 +231,12 @@ private void freeCache(LogCache.LogCacheBlock cacheBlock) {
});
}

/**
* WALCallbackSequencer is modified in single thread mainExecutor.
*/
static class WALCallbackSequencer {
public static final long NOOP_OFFSET = -1L;
private final Map<Long, Queue<WalWriteRequest>> stream2requests = new ConcurrentHashMap<>();
private final Map<Long, Queue<WalWriteRequest>> stream2requests = new HashMap<>();
private final BlockingQueue<WalWriteRequest> walRequests = new ArrayBlockingQueue<>(4096);
private long walConfirmOffset = NOOP_OFFSET;

Expand Down Expand Up @@ -276,9 +286,6 @@ public List<WalWriteRequest> after(WalWriteRequest request) {
}
rst.add(streamRequests.poll());
}
if (streamRequests.isEmpty()) {
stream2requests.computeIfPresent(streamId, (id, requests) -> requests.isEmpty() ? null : requests);
}
return rst;
}

Expand All @@ -291,6 +298,15 @@ public long getWALConfirmOffset() {
return walConfirmOffset;
}

/**
* Try free stream related resources.
*/
public void tryFree(long streamId) {
Queue<?> queue = stream2requests.get(streamId);
if (queue != null && queue.isEmpty()) {
stream2requests.remove(streamId, queue);
}
}
}

static class WALObjectUploadTaskContext {
Expand Down
12 changes: 7 additions & 5 deletions core/src/main/scala/kafka/log/s3/operator/DefaultS3Operator.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class DefaultS3Operator implements S3Operator {
private final String bucket;
private final S3AsyncClient s3;
private static final Timer PART_UPLOAD_COST = new Timer();
private static final Timer OBJECT_INTO_CLOSE_COST = new Timer();
private static final Timer OBJECT_UPLOAD_COST = new Timer();
private static final Counter OBJECT_UPLOAD_SIZE = new Counter();
private static final AtomicLong LAST_LOG_TIMESTAMP = new AtomicLong(System.currentTimeMillis());
Expand Down Expand Up @@ -269,23 +270,24 @@ public CompletableFuture<Void> close() {
if (closeCf != null) {
return closeCf;
}
System.out.println("start await close: " + (System.nanoTime() - start) / 1000 / 1000);
OBJECT_INTO_CLOSE_COST.update(System.nanoTime() - start);
closeCf = new CompletableFuture<>();
CompletableFuture<Void> uploadDoneCf = uploadIdCf.thenCompose(uploadId -> CompletableFuture.allOf(parts.toArray(new CompletableFuture[0])));
uploadDoneCf.thenAccept(nil -> {
System.out.println("start complete: " + (System.nanoTime() - start) / 1000 / 1000);
CompletedMultipartUpload multipartUpload = CompletedMultipartUpload.builder().parts(genCompleteParts()).build();
CompleteMultipartUploadRequest request = CompleteMultipartUploadRequest.builder().bucket(bucket).key(path).uploadId(uploadId).multipartUpload(multipartUpload).build();
close0(request);
});
closeCf.whenComplete((nil, ex) -> {
System.out.println("complete: " + (System.nanoTime() - start) / 1000 / 1000);
OBJECT_UPLOAD_COST.update(System.nanoTime() - start);
long now = System.currentTimeMillis();
if (now - LAST_LOG_TIMESTAMP.get() > 10000) {
LAST_LOG_TIMESTAMP.set(now);
LOGGER.info("upload s3 metrics, object_timer {}, object_size {}, part_timer {}",
OBJECT_UPLOAD_COST.getAndReset(), OBJECT_UPLOAD_SIZE.getAndReset(), PART_UPLOAD_COST.getAndReset());
LOGGER.info("upload s3 metrics, object_part_upload_timer {}, object_into_close_timer {}, object_upload_timer {}, object_upload_size {}",
PART_UPLOAD_COST.getAndReset(),
OBJECT_INTO_CLOSE_COST.getAndReset(),
OBJECT_UPLOAD_COST.getAndReset(),
OBJECT_UPLOAD_SIZE.getAndReset());
}
});
return closeCf;
Expand Down
13 changes: 11 additions & 2 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,8 @@ object KafkaConfig {
val S3EndpointProp = "s3.endpoint"
val S3RegionProp = "s3.region"
val S3BucketProp = "s3.bucket"
val S3WALCacheSizeProp = "s3.wal.cache.size"
val S3WALLogSizeProp = "s3.wal.log.size"
val S3WALObjectSizeProp = "s3.wal.object.size"
val S3StreamSplitSizeProp = "s3.stream.object.split.size"
val S3ObjectBlockSizeProp = "s3.object.block.size"
Expand All @@ -688,6 +690,9 @@ object KafkaConfig {
val S3EndpointDoc = "The S3 endpoint, ex. <code>https://s3.{region}.amazonaws.com</code>."
val S3RegionDoc = "The S3 region, ex. <code>us-east-1</code>."
val S3BucketDoc = "The S3 bucket, ex. <code>my-bucket</code>."
val S3WALCacheSizeDoc = "The S3 storage max WAL cache size. When WAL cache is full, storage will hang the request, \n" +
"until WAL cache is free by S3 WAL object upload."
val S3WALLogSizeDoc = "The S3 WAL log max size. The value should be larger than s3.wal.cache.size cause of log storage format may not compact."
val S3WALObjectSizeDoc = "The S3 WAL object size threshold."
val S3StreamSplitSizeDoc = "The S3 stream object split size threshold when upload WAL object or compact object."
val S3ObjectBlockSizeDoc = "The S3 object compressed block size threshold."
Expand Down Expand Up @@ -1496,11 +1501,13 @@ object KafkaConfig {
.define(S3EndpointProp, STRING, null, HIGH, S3EndpointDoc)
.define(S3RegionProp, STRING, null, HIGH, S3RegionDoc)
.define(S3BucketProp, STRING, null, HIGH, S3BucketDoc)
.define(S3WALObjectSizeProp, LONG, 524288000, MEDIUM, S3WALObjectSizeDoc)
.define(S3WALCacheSizeProp, LONG, 1073741824L, MEDIUM, S3WALCacheSizeDoc)
.define(S3WALLogSizeProp, LONG, 2147483648L, MEDIUM, S3WALLogSizeDoc)
.define(S3WALObjectSizeProp, LONG, 524288000L, MEDIUM, S3WALObjectSizeDoc)
.define(S3StreamSplitSizeProp, INT, 16777216, MEDIUM, S3StreamSplitSizeDoc)
.define(S3ObjectBlockSizeProp, INT, 8388608, MEDIUM, S3ObjectBlockSizeDoc)
.define(S3ObjectPartSizeProp, INT, 16777216, MEDIUM, S3ObjectPartSizeDoc)
.define(S3CacheSizeProp, LONG, 1073741824, MEDIUM, S3CacheSizeDoc)
.define(S3CacheSizeProp, LONG, 1073741824L, MEDIUM, S3CacheSizeDoc)
// Kafka on S3 inject end
}

Expand Down Expand Up @@ -2039,6 +2046,8 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val s3Endpoint = getString(KafkaConfig.S3EndpointProp)
val s3Region = getString(KafkaConfig.S3RegionProp)
val s3Bucket = getString(KafkaConfig.S3BucketProp)
val s3WALCacheSize = getLong(KafkaConfig.S3WALCacheSizeProp)
val s3WALLogSize = getLong(KafkaConfig.S3WALLogSizeProp)
val s3WALObjectSize = getLong(KafkaConfig.S3WALObjectSizeProp)
val s3StreamSplitSize = getInt(KafkaConfig.S3StreamSplitSizeProp)
val s3ObjectBlockSize = getInt(KafkaConfig.S3ObjectBlockSizeProp)
Expand Down