diff --git a/core/src/main/scala/kafka/log/es/FutureUtil.java b/core/src/main/scala/kafka/log/es/FutureUtil.java index 4b8a9f4f23..f4415b9088 100644 --- a/core/src/main/scala/kafka/log/es/FutureUtil.java +++ b/core/src/main/scala/kafka/log/es/FutureUtil.java @@ -20,6 +20,7 @@ import org.slf4j.Logger; import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; public class FutureUtil { public static CompletableFuture failedFuture(Throwable ex) { @@ -48,4 +49,13 @@ public static void propagate(CompletableFuture source, CompletableFuture< } }); } + + public static CompletableFuture exec(Supplier> run, Logger logger, String name) { + try { + return run.get(); + } catch (Throwable ex) { + logger.error("{} run with unexpected exception", name, ex); + return failedFuture(ex); + } + } } diff --git a/core/src/main/scala/kafka/log/s3/DefaultS3Client.java b/core/src/main/scala/kafka/log/s3/DefaultS3Client.java index 1d4369122f..e43e4056de 100644 --- a/core/src/main/scala/kafka/log/s3/DefaultS3Client.java +++ b/core/src/main/scala/kafka/log/s3/DefaultS3Client.java @@ -23,11 +23,10 @@ import kafka.log.es.MemoryClient.KVClientImpl; import kafka.log.s3.cache.DefaultS3BlockCache; import kafka.log.s3.cache.S3BlockCache; +import kafka.log.s3.memory.MemoryMetadataManager; import kafka.log.s3.network.ControllerRequestSender; -import kafka.log.s3.objects.ControllerObjectManager; import kafka.log.s3.objects.ObjectManager; import kafka.log.s3.operator.S3Operator; -import kafka.log.s3.streams.ControllerStreamManager; import kafka.log.s3.streams.StreamManager; import kafka.log.s3.wal.MemoryWriteAheadLog; import kafka.server.BrokerServer; @@ -37,7 +36,7 @@ public class DefaultS3Client implements Client { private final KafkaConfig config; - private final StreamMetadataManager metadataManager; +// private final StreamMetadataManager metadataManager; private final BrokerToControllerChannelManager channelManager; @@ -60,11 +59,15 @@ public class DefaultS3Client implements Client { public DefaultS3Client(BrokerServer brokerServer, KafkaConfig config, S3Operator operator) { this.config = config; this.channelManager = brokerServer.clientToControllerChannelManager(); - this.metadataManager = new StreamMetadataManager(brokerServer, config); +// this.metadataManager = new StreamMetadataManager(brokerServer, config); this.operator = operator; this.requestSender = new ControllerRequestSender(channelManager); - this.streamManager = new ControllerStreamManager(this.requestSender, config); - this.objectManager = new ControllerObjectManager(this.requestSender, this.metadataManager, this.config); +// this.streamManager = new ControllerStreamManager(this.requestSender, config); +// this.objectManager = new ControllerObjectManager(this.requestSender, this.metadataManager, this.config); + MemoryMetadataManager memoryMetadataManager = new MemoryMetadataManager(); + memoryMetadataManager.start(); + this.streamManager = memoryMetadataManager; + this.objectManager = memoryMetadataManager; this.blockCache = new DefaultS3BlockCache(objectManager, operator); this.storage = new S3Storage(new MemoryWriteAheadLog(), objectManager, blockCache, operator); this.streamClient = new S3StreamClient(this.streamManager, this.storage); diff --git a/core/src/main/scala/kafka/log/s3/ObjectReader.java b/core/src/main/scala/kafka/log/s3/ObjectReader.java index 41d9814c61..860d7db1e7 100644 --- a/core/src/main/scala/kafka/log/s3/ObjectReader.java +++ b/core/src/main/scala/kafka/log/s3/ObjectReader.java @@ -19,21 +19,21 @@ import io.netty.buffer.ByteBuf; import kafka.log.s3.model.StreamRecordBatch; -import org.apache.kafka.metadata.stream.S3ObjectMetadata; import kafka.log.s3.operator.S3Operator; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.compress.ZstdFactory; import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.CloseableIterator; +import org.apache.kafka.metadata.stream.S3ObjectMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.DataInputStream; import java.io.IOException; import java.util.LinkedList; +import java.util.List; import java.util.NoSuchElementException; import java.util.concurrent.CompletableFuture; -import java.util.List; import java.util.concurrent.atomic.AtomicInteger; public class ObjectReader { @@ -97,7 +97,7 @@ public static IndexBlock parse(ByteBuf objectTailBuf, long objectSize) throws In throw new IndexBlockParseException(indexBlockPosition); } else { int indexRelativePosition = objectTailBuf.readableBytes() - (int) (objectSize - indexBlockPosition); - ByteBuf indexBlockBuf = objectTailBuf.slice(indexRelativePosition, indexBlockSize); + ByteBuf indexBlockBuf = objectTailBuf.slice(objectTailBuf.readerIndex() + indexRelativePosition, indexBlockSize); int blockCount = indexBlockBuf.readInt(); ByteBuf blocks = indexBlockBuf.slice(indexBlockBuf.readerIndex(), blockCount * 16); indexBlockBuf.skipBytes(blockCount * 16); diff --git a/core/src/main/scala/kafka/log/s3/ObjectWriter.java b/core/src/main/scala/kafka/log/s3/ObjectWriter.java index f17c479869..2bf4455cb9 100644 --- a/core/src/main/scala/kafka/log/s3/ObjectWriter.java +++ b/core/src/main/scala/kafka/log/s3/ObjectWriter.java @@ -77,7 +77,10 @@ public void write(FlatStreamRecordBatch record) { public void closeCurrentBlock() { if (dataBlock != null) { dataBlock.close(); + waitingUploadBlocks.add(dataBlock); + nextDataBlockPosition += dataBlock.size(); dataBlock = null; + tryUploadPart(); } } @@ -98,11 +101,11 @@ public CompletableFuture close() { dataBlock.close(); nextDataBlockPosition += dataBlock.size(); waitingUploadBlocks.add(dataBlock); - completedBlocks.add(dataBlock); dataBlock = null; } for (DataBlock block : waitingUploadBlocks) { buf.addComponent(true, block.buffer()); + completedBlocks.add(block); } waitingUploadBlocks.clear(); indexBlock = new IndexBlock(); diff --git a/core/src/main/scala/kafka/log/s3/S3Storage.java b/core/src/main/scala/kafka/log/s3/S3Storage.java index baa6e873fb..123d63249d 100644 --- a/core/src/main/scala/kafka/log/s3/S3Storage.java +++ b/core/src/main/scala/kafka/log/s3/S3Storage.java @@ -45,7 +45,7 @@ public class S3Storage implements Storage { private final WriteAheadLog log; private final LogCache logCache; private final AtomicLong logConfirmOffset = new AtomicLong(); - private final AtomicLong processedLogConfirmOffset = new AtomicLong(); + private final AtomicLong processedLogConfirmOffset = new AtomicLong(-1L); private final ScheduledExecutorService mainExecutor = Executors.newSingleThreadScheduledExecutor( ThreadUtils.createThreadFactory("s3-storage-main", false)); private final ScheduledExecutorService backgroundExecutor = Executors.newSingleThreadScheduledExecutor( diff --git a/core/src/main/scala/kafka/log/s3/S3Stream.java b/core/src/main/scala/kafka/log/s3/S3Stream.java index 2e602d3a2e..bffdd68cca 100644 --- a/core/src/main/scala/kafka/log/s3/S3Stream.java +++ b/core/src/main/scala/kafka/log/s3/S3Stream.java @@ -38,6 +38,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; +import static kafka.log.es.FutureUtil.exec; + public class S3Stream implements Stream { private static final Logger LOGGER = LoggerFactory.getLogger(S3Stream.class); private final String logIdent; @@ -80,6 +82,10 @@ public long nextOffset() { @Override public CompletableFuture append(RecordBatch recordBatch) { + return exec(() -> append0(recordBatch), LOGGER, "append"); + } + + private CompletableFuture append0(RecordBatch recordBatch) { if (!status.isWritable()) { return FutureUtil.failedFuture(new ElasticStreamClientException(ErrorCode.STREAM_ALREADY_CLOSED, logIdent + " stream is not writable")); } @@ -105,6 +111,10 @@ public CompletableFuture append(RecordBatch recordBatch) { @Override public CompletableFuture fetch(long startOffset, long endOffset, int maxBytes) { + return exec(() -> fetch0(startOffset, endOffset, maxBytes), LOGGER, "fetch"); + } + + private CompletableFuture fetch0(long startOffset, long endOffset, int maxBytes) { if (status.isClosed()) { return FutureUtil.failedFuture(new ElasticStreamClientException(ErrorCode.STREAM_ALREADY_CLOSED, logIdent + " stream is already closed")); } @@ -122,8 +132,13 @@ public CompletableFuture fetch(long startOffset, long endOffset, in }); } + @Override public CompletableFuture trim(long newStartOffset) { + return exec(() -> trim0(newStartOffset), LOGGER, "trim"); + } + + private CompletableFuture trim0(long newStartOffset) { if (newStartOffset < this.startOffset) { throw new IllegalArgumentException("newStartOffset[" + newStartOffset + "] cannot be less than current start offset[" + this.startOffset + "]"); @@ -132,14 +147,23 @@ public CompletableFuture trim(long newStartOffset) { return streamManager.trimStream(streamId, epoch, newStartOffset); } + @Override public CompletableFuture close() { + return exec(this::close0, LOGGER, "close"); + } + + private CompletableFuture close0() { status.markClosed(); return storage.forceUpload(streamId).thenCompose(nil -> streamManager.closeStream(streamId, epoch)); } @Override public CompletableFuture destroy() { + return exec(this::destroy0, LOGGER, "destroy"); + } + + private CompletableFuture destroy0() { status.markDestroy(); startOffset = this.confirmOffset.get(); return streamManager.deleteStream(streamId, epoch); diff --git a/core/src/main/scala/kafka/log/s3/S3StreamClient.java b/core/src/main/scala/kafka/log/s3/S3StreamClient.java index 5409d3e67e..768ba06a52 100644 --- a/core/src/main/scala/kafka/log/s3/S3StreamClient.java +++ b/core/src/main/scala/kafka/log/s3/S3StreamClient.java @@ -22,10 +22,15 @@ import com.automq.elasticstream.client.api.Stream; import com.automq.elasticstream.client.api.StreamClient; import kafka.log.s3.streams.StreamManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.concurrent.CompletableFuture; +import static kafka.log.es.FutureUtil.exec; + public class S3StreamClient implements StreamClient { + private static final Logger LOGGER = LoggerFactory.getLogger(S3StreamClient.class); private final StreamManager streamManager; private final Storage storage; @@ -37,19 +42,20 @@ public S3StreamClient(StreamManager streamManager, Storage storage) { @Override public CompletableFuture createAndOpenStream(CreateStreamOptions options) { - return streamManager.createStream().thenCompose(streamId -> openStream0(streamId, options.epoch())); + return exec(() -> streamManager.createStream().thenCompose(streamId -> openStream0(streamId, options.epoch())), + LOGGER, "createAndOpenStream"); } @Override public CompletableFuture openStream(long streamId, OpenStreamOptions openStreamOptions) { - return openStream0(streamId, openStreamOptions.epoch()); + return exec(() -> openStream0(streamId, openStreamOptions.epoch()), LOGGER, "openStream"); } private CompletableFuture openStream0(long streamId, long epoch) { return streamManager.openStream(streamId, epoch). - thenApply(metadata -> new S3Stream( - metadata.getStreamId(), metadata.getEpoch(), - metadata.getStartOffset(), metadata.getNextOffset(), - storage, streamManager)); + thenApply(metadata -> new S3Stream( + metadata.getStreamId(), metadata.getEpoch(), + metadata.getStartOffset(), metadata.getNextOffset(), + storage, streamManager)); } } diff --git a/core/src/main/scala/kafka/log/s3/WALObjectUploadTask.java b/core/src/main/scala/kafka/log/s3/WALObjectUploadTask.java index ed237fbb3e..b380182ece 100644 --- a/core/src/main/scala/kafka/log/s3/WALObjectUploadTask.java +++ b/core/src/main/scala/kafka/log/s3/WALObjectUploadTask.java @@ -40,6 +40,7 @@ public class WALObjectUploadTask { private final ObjectManager objectManager; private final S3Operator s3Operator; private final CompletableFuture prepareCf = new CompletableFuture<>(); + private volatile CommitWALObjectRequest commitWALObjectRequest; private final CompletableFuture uploadCf = new CompletableFuture<>(); public WALObjectUploadTask(Map> streamRecordsMap, int streamSplitSizeThreshold, ObjectManager objectManager, S3Operator s3Operator) { @@ -62,9 +63,9 @@ public CompletableFuture upload() { prepareCf.thenAccept(objectId -> { List streamIds = new ArrayList<>(streamRecordsMap.keySet()); Collections.sort(streamIds); - CommitWALObjectRequest compactRequest = new CommitWALObjectRequest(); + CommitWALObjectRequest request = new CommitWALObjectRequest(); - ObjectWriter minorCompactObject = new ObjectWriter(objectId, s3Operator); + ObjectWriter walObject = new ObjectWriter(objectId, s3Operator); List> streamObjectCfList = new LinkedList<>(); @@ -75,29 +76,27 @@ public CompletableFuture upload() { streamObjectCfList.add(writeStreamObject(streamRecords)); } else { for (FlatStreamRecordBatch record : streamRecords) { - minorCompactObject.write(record); + walObject.write(record); } long startOffset = streamRecords.get(0).baseOffset; long endOffset = streamRecords.get(streamRecords.size() - 1).lastOffset(); - compactRequest.addStreamRange(new ObjectStreamRange(streamId, -1L, startOffset, endOffset)); - // minor compact object block only contain single stream's data. - minorCompactObject.closeCurrentBlock(); + request.addStreamRange(new ObjectStreamRange(streamId, -1L, startOffset, endOffset)); + // log object block only contain single stream's data. + walObject.closeCurrentBlock(); } } - compactRequest.setObjectId(objectId); - compactRequest.setOrderId(objectId); - CompletableFuture minorCompactObjectCf = minorCompactObject.close().thenAccept(nil -> { - compactRequest.setObjectSize(minorCompactObject.size()); - }); - compactRequest.setObjectSize(minorCompactObject.size()); + request.setObjectId(objectId); + request.setOrderId(objectId); + CompletableFuture walObjectCf = walObject.close().thenAccept(nil -> request.setObjectSize(walObject.size())); for (CompletableFuture streamObjectCf : streamObjectCfList) { - streamObjectCf.thenAccept(compactRequest::addStreamObject); + streamObjectCf.thenAccept(request::addStreamObject); } List> allCf = new LinkedList<>(streamObjectCfList); - allCf.add(minorCompactObjectCf); + allCf.add(walObjectCf); CompletableFuture.allOf(allCf.toArray(new CompletableFuture[0])).thenAccept(nil -> { - uploadCf.complete(compactRequest); + commitWALObjectRequest = request; + uploadCf.complete(request); }).exceptionally(ex -> { uploadCf.completeExceptionally(ex); return null; @@ -107,7 +106,10 @@ public CompletableFuture upload() { } public CompletableFuture commit() { - return uploadCf.thenCompose(request -> objectManager.commitWALObject(request).thenApply(resp -> null)); + return uploadCf.thenCompose(request -> objectManager.commitWALObject(request).thenApply(resp -> { + LOGGER.debug("Commit WAL object {}", commitWALObjectRequest); + return null; + })); } private CompletableFuture writeStreamObject(List streamRecords) { diff --git a/core/src/main/scala/kafka/log/s3/objects/CommitWALObjectRequest.java b/core/src/main/scala/kafka/log/s3/objects/CommitWALObjectRequest.java index 2ac3dcaa95..c7ad801876 100644 --- a/core/src/main/scala/kafka/log/s3/objects/CommitWALObjectRequest.java +++ b/core/src/main/scala/kafka/log/s3/objects/CommitWALObjectRequest.java @@ -94,4 +94,15 @@ public long getOrderId() { public void setOrderId(long orderId) { this.orderId = orderId; } + + @Override + public String toString() { + return "CommitWALObjectRequest{" + + "objectId=" + objectId + + ", objectSize=" + objectSize + + ", streamRanges=" + streamRanges + + ", streamObjects=" + streamObjects + + ", orderId=" + orderId + + '}'; + } } diff --git a/core/src/main/scala/kafka/log/s3/objects/ObjectStreamRange.java b/core/src/main/scala/kafka/log/s3/objects/ObjectStreamRange.java index da86bbf87b..0dd767d0aa 100644 --- a/core/src/main/scala/kafka/log/s3/objects/ObjectStreamRange.java +++ b/core/src/main/scala/kafka/log/s3/objects/ObjectStreamRange.java @@ -62,4 +62,14 @@ public void setStartOffset(long startOffset) { public void setEndOffset(long endOffset) { this.endOffset = endOffset; } + + @Override + public String toString() { + return "ObjectStreamRange{" + + "streamId=" + streamId + + ", epoch=" + epoch + + ", startOffset=" + startOffset + + ", endOffset=" + endOffset + + '}'; + } } diff --git a/core/src/main/scala/kafka/log/s3/objects/StreamObject.java b/core/src/main/scala/kafka/log/s3/objects/StreamObject.java index 175a5baa96..ebf511ed29 100644 --- a/core/src/main/scala/kafka/log/s3/objects/StreamObject.java +++ b/core/src/main/scala/kafka/log/s3/objects/StreamObject.java @@ -17,6 +17,8 @@ package kafka.log.s3.objects; +import java.util.Arrays; + public class StreamObject { private long objectId; private long objectSize; @@ -76,4 +78,16 @@ public long[] getSourceObjectIds() { public void setSourceObjectIds(long[] sourceObjectIds) { this.sourceObjectIds = sourceObjectIds; } + + @Override + public String toString() { + return "StreamObject{" + + "objectId=" + objectId + + ", objectSize=" + objectSize + + ", streamId=" + streamId + + ", startOffset=" + startOffset + + ", endOffset=" + endOffset + + ", sourceObjectIds=" + Arrays.toString(sourceObjectIds) + + '}'; + } } diff --git a/core/src/test/java/kafka/log/s3/ObjectWriterTest.java b/core/src/test/java/kafka/log/s3/ObjectWriterTest.java index accdb53651..5a10372b44 100644 --- a/core/src/test/java/kafka/log/s3/ObjectWriterTest.java +++ b/core/src/test/java/kafka/log/s3/ObjectWriterTest.java @@ -105,6 +105,11 @@ public void testWrite() throws ExecutionException, InterruptedException { assertEquals(1, blockIndexes.get(0).blockId()); } + @Test + public void testWrite_closeBlock() { + + } + StreamRecordBatch newRecord(long streamId, long offset, int count, int payloadSize) { RecordBatch recordBatch = DefaultRecordBatch.of(count, payloadSize); return new StreamRecordBatch(streamId, 0, offset, recordBatch); diff --git a/core/src/test/java/kafka/log/s3/WALObjectUploadTaskTest.java b/core/src/test/java/kafka/log/s3/WALObjectUploadTaskTest.java index cf6a889f66..d3ea450d27 100644 --- a/core/src/test/java/kafka/log/s3/WALObjectUploadTaskTest.java +++ b/core/src/test/java/kafka/log/s3/WALObjectUploadTaskTest.java @@ -24,6 +24,9 @@ import kafka.log.s3.objects.StreamObject; import kafka.log.s3.operator.MemoryS3Operator; import kafka.log.s3.operator.S3Operator; +import org.apache.kafka.common.utils.CloseableIterator; +import org.apache.kafka.metadata.stream.S3ObjectMetadata; +import org.apache.kafka.metadata.stream.S3ObjectType; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -97,6 +100,31 @@ public void testTryCompact() throws Exception { assertEquals(11, streamObject.getObjectId()); assertEquals(10, streamObject.getStartOffset()); assertEquals(16, streamObject.getEndOffset()); + + { + S3ObjectMetadata s3ObjectMetadata = new S3ObjectMetadata(request.getObjectId(), request.getObjectSize(), S3ObjectType.WAL_LOOSE); + ObjectReader objectReader = new ObjectReader(s3ObjectMetadata, s3Operator); + ObjectReader.DataBlockIndex blockIndex = objectReader.find(234, 20, 24).get().get(0); + ObjectReader.DataBlock dataBlock = objectReader.read(blockIndex).get(); + try (CloseableIterator it = dataBlock.iterator()) { + StreamRecordBatch record = it.next(); + assertEquals(20, record.getBaseOffset()); + record = it.next(); + assertEquals(24, record.getLastOffset()); + } + } + + { + S3ObjectMetadata streamObjectMetadata = new S3ObjectMetadata(11, request.getStreamObjects().get(0).getObjectSize(), S3ObjectType.STREAM); + ObjectReader objectReader = new ObjectReader(streamObjectMetadata, s3Operator); + ObjectReader.DataBlockIndex blockIndex = objectReader.find(233, 10, 16).get().get(0); + ObjectReader.DataBlock dataBlock = objectReader.read(blockIndex).get(); + try (CloseableIterator it = dataBlock.iterator()) { + assertEquals(10, it.next().getBaseOffset()); + assertEquals(12, it.next().getBaseOffset()); + assertEquals(14, it.next().getBaseOffset()); + } + } } }