diff --git a/core/src/main/scala/kafka/log/s3/S3Storage.java b/core/src/main/scala/kafka/log/s3/S3Storage.java index 13ebbc79d0..ca4fd58b22 100644 --- a/core/src/main/scala/kafka/log/s3/S3Storage.java +++ b/core/src/main/scala/kafka/log/s3/S3Storage.java @@ -32,6 +32,7 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -41,13 +42,13 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; public class S3Storage implements Storage { private static final Logger LOGGER = LoggerFactory.getLogger(S3Storage.class); + private final long maxWALCacheSize; private final KafkaConfig config; private final WriteAheadLog log; private final LogCache logCache; @@ -64,6 +65,7 @@ public class S3Storage implements Storage { public S3Storage(KafkaConfig config, WriteAheadLog log, ObjectManager objectManager, S3BlockCache blockCache, S3Operator s3Operator) { this.config = config; + this.maxWALCacheSize = config.s3WALCacheSize(); this.log = log; this.logCache = new LogCache(config.s3WALObjectSize()); this.objectManager = objectManager; @@ -79,12 +81,12 @@ public void close() { @Override public CompletableFuture append(StreamRecordBatch streamRecord) { - for (;;) { - if (logCache.size() < 2L * 1024 * 1024 * 1024) { + for (; ; ) { + if (logCache.size() < maxWALCacheSize) { break; } else { // TODO: log limit - LOGGER.warn("log cache size {} is larger than 2GB, wait 100ms", logCache.size()); + LOGGER.warn("log cache size {} is larger than {}, wait 100ms", maxWALCacheSize, logCache.size()); try { //noinspection BusyWait Thread.sleep(100); @@ -97,7 +99,7 @@ public CompletableFuture append(StreamRecordBatch streamRecord) { WriteAheadLog.AppendResult appendResult = log.append(streamRecord.encoded()); CompletableFuture cf = new CompletableFuture<>(); WalWriteRequest writeRequest = new WalWriteRequest(streamRecord, appendResult.offset, cf); - callbackSequencer.before(writeRequest); + handleAppendRequest(writeRequest); appendResult.future.thenAccept(nil -> handleAppendCallback(writeRequest)); return cf; } @@ -136,10 +138,15 @@ public CompletableFuture forceUpload(long streamId) { } else { cf.complete(null); } + callbackSequencer.tryFree(streamId); }); return cf; } + private void handleAppendRequest(WalWriteRequest request) { + mainExecutor.execute(() -> callbackSequencer.before(request)); + } + private void handleAppendCallback(WalWriteRequest request) { mainExecutor.execute(() -> { List waitingAckRequests = callbackSequencer.after(request); @@ -224,9 +231,12 @@ private void freeCache(LogCache.LogCacheBlock cacheBlock) { }); } + /** + * WALCallbackSequencer is modified in single thread mainExecutor. + */ static class WALCallbackSequencer { public static final long NOOP_OFFSET = -1L; - private final Map> stream2requests = new ConcurrentHashMap<>(); + private final Map> stream2requests = new HashMap<>(); private final BlockingQueue walRequests = new ArrayBlockingQueue<>(4096); private long walConfirmOffset = NOOP_OFFSET; @@ -276,9 +286,6 @@ public List after(WalWriteRequest request) { } rst.add(streamRequests.poll()); } - if (streamRequests.isEmpty()) { - stream2requests.computeIfPresent(streamId, (id, requests) -> requests.isEmpty() ? null : requests); - } return rst; } @@ -291,6 +298,15 @@ public long getWALConfirmOffset() { return walConfirmOffset; } + /** + * Try free stream related resources. + */ + public void tryFree(long streamId) { + Queue queue = stream2requests.get(streamId); + if (queue != null && queue.isEmpty()) { + stream2requests.remove(streamId, queue); + } + } } static class WALObjectUploadTaskContext { diff --git a/core/src/main/scala/kafka/log/s3/operator/DefaultS3Operator.java b/core/src/main/scala/kafka/log/s3/operator/DefaultS3Operator.java index 9bee9b4618..d217098a91 100644 --- a/core/src/main/scala/kafka/log/s3/operator/DefaultS3Operator.java +++ b/core/src/main/scala/kafka/log/s3/operator/DefaultS3Operator.java @@ -62,6 +62,7 @@ public class DefaultS3Operator implements S3Operator { private final String bucket; private final S3AsyncClient s3; private static final Timer PART_UPLOAD_COST = new Timer(); + private static final Timer OBJECT_INTO_CLOSE_COST = new Timer(); private static final Timer OBJECT_UPLOAD_COST = new Timer(); private static final Counter OBJECT_UPLOAD_SIZE = new Counter(); private static final AtomicLong LAST_LOG_TIMESTAMP = new AtomicLong(System.currentTimeMillis()); @@ -269,23 +270,24 @@ public CompletableFuture close() { if (closeCf != null) { return closeCf; } - System.out.println("start await close: " + (System.nanoTime() - start) / 1000 / 1000); + OBJECT_INTO_CLOSE_COST.update(System.nanoTime() - start); closeCf = new CompletableFuture<>(); CompletableFuture uploadDoneCf = uploadIdCf.thenCompose(uploadId -> CompletableFuture.allOf(parts.toArray(new CompletableFuture[0]))); uploadDoneCf.thenAccept(nil -> { - System.out.println("start complete: " + (System.nanoTime() - start) / 1000 / 1000); CompletedMultipartUpload multipartUpload = CompletedMultipartUpload.builder().parts(genCompleteParts()).build(); CompleteMultipartUploadRequest request = CompleteMultipartUploadRequest.builder().bucket(bucket).key(path).uploadId(uploadId).multipartUpload(multipartUpload).build(); close0(request); }); closeCf.whenComplete((nil, ex) -> { - System.out.println("complete: " + (System.nanoTime() - start) / 1000 / 1000); OBJECT_UPLOAD_COST.update(System.nanoTime() - start); long now = System.currentTimeMillis(); if (now - LAST_LOG_TIMESTAMP.get() > 10000) { LAST_LOG_TIMESTAMP.set(now); - LOGGER.info("upload s3 metrics, object_timer {}, object_size {}, part_timer {}", - OBJECT_UPLOAD_COST.getAndReset(), OBJECT_UPLOAD_SIZE.getAndReset(), PART_UPLOAD_COST.getAndReset()); + LOGGER.info("upload s3 metrics, object_part_upload_timer {}, object_into_close_timer {}, object_upload_timer {}, object_upload_size {}", + PART_UPLOAD_COST.getAndReset(), + OBJECT_INTO_CLOSE_COST.getAndReset(), + OBJECT_UPLOAD_COST.getAndReset(), + OBJECT_UPLOAD_SIZE.getAndReset()); } }); return closeCf; diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index c76842a1a7..01d44a1595 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -679,6 +679,8 @@ object KafkaConfig { val S3EndpointProp = "s3.endpoint" val S3RegionProp = "s3.region" val S3BucketProp = "s3.bucket" + val S3WALCacheSizeProp = "s3.wal.cache.size" + val S3WALLogSizeProp = "s3.wal.log.size" val S3WALObjectSizeProp = "s3.wal.object.size" val S3StreamSplitSizeProp = "s3.stream.object.split.size" val S3ObjectBlockSizeProp = "s3.object.block.size" @@ -688,6 +690,9 @@ object KafkaConfig { val S3EndpointDoc = "The S3 endpoint, ex. https://s3.{region}.amazonaws.com." val S3RegionDoc = "The S3 region, ex. us-east-1." val S3BucketDoc = "The S3 bucket, ex. my-bucket." + val S3WALCacheSizeDoc = "The S3 storage max WAL cache size. When WAL cache is full, storage will hang the request, \n" + + "until WAL cache is free by S3 WAL object upload." + val S3WALLogSizeDoc = "The S3 WAL log max size. The value should be larger than s3.wal.cache.size cause of log storage format may not compact." val S3WALObjectSizeDoc = "The S3 WAL object size threshold." val S3StreamSplitSizeDoc = "The S3 stream object split size threshold when upload WAL object or compact object." val S3ObjectBlockSizeDoc = "The S3 object compressed block size threshold." @@ -1496,11 +1501,13 @@ object KafkaConfig { .define(S3EndpointProp, STRING, null, HIGH, S3EndpointDoc) .define(S3RegionProp, STRING, null, HIGH, S3RegionDoc) .define(S3BucketProp, STRING, null, HIGH, S3BucketDoc) - .define(S3WALObjectSizeProp, LONG, 524288000, MEDIUM, S3WALObjectSizeDoc) + .define(S3WALCacheSizeProp, LONG, 1073741824L, MEDIUM, S3WALCacheSizeDoc) + .define(S3WALLogSizeProp, LONG, 2147483648L, MEDIUM, S3WALLogSizeDoc) + .define(S3WALObjectSizeProp, LONG, 524288000L, MEDIUM, S3WALObjectSizeDoc) .define(S3StreamSplitSizeProp, INT, 16777216, MEDIUM, S3StreamSplitSizeDoc) .define(S3ObjectBlockSizeProp, INT, 8388608, MEDIUM, S3ObjectBlockSizeDoc) .define(S3ObjectPartSizeProp, INT, 16777216, MEDIUM, S3ObjectPartSizeDoc) - .define(S3CacheSizeProp, LONG, 1073741824, MEDIUM, S3CacheSizeDoc) + .define(S3CacheSizeProp, LONG, 1073741824L, MEDIUM, S3CacheSizeDoc) // Kafka on S3 inject end } @@ -2039,6 +2046,8 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val s3Endpoint = getString(KafkaConfig.S3EndpointProp) val s3Region = getString(KafkaConfig.S3RegionProp) val s3Bucket = getString(KafkaConfig.S3BucketProp) + val s3WALCacheSize = getLong(KafkaConfig.S3WALCacheSizeProp) + val s3WALLogSize = getLong(KafkaConfig.S3WALLogSizeProp) val s3WALObjectSize = getLong(KafkaConfig.S3WALObjectSizeProp) val s3StreamSplitSize = getInt(KafkaConfig.S3StreamSplitSizeProp) val s3ObjectBlockSize = getInt(KafkaConfig.S3ObjectBlockSizeProp)