From b47c64d55efd8bf63af50c213ca2630fa05be567 Mon Sep 17 00:00:00 2001 From: Robin Han Date: Thu, 31 Aug 2023 20:26:02 +0800 Subject: [PATCH] feat(stream-client): write to local WAL Signed-off-by: Robin Han --- .../scala/kafka/log/s3/DefaultS3Client.java | 7 +- .../kafka/log/s3/FlatStreamRecordBatch.java | 61 +++++ .../scala/kafka/log/s3/MinorCompactTask.java | 229 ------------------ .../main/scala/kafka/log/s3/ObjectWriter.java | 19 +- .../main/scala/kafka/log/s3/S3Storage.java | 163 +++++++++++++ .../src/main/scala/kafka/log/s3/S3Stream.java | 14 +- .../scala/kafka/log/s3/S3StreamClient.java | 14 +- core/src/main/scala/kafka/log/s3/S3Wal.java | 132 ---------- .../log/s3/SingleWalObjectWriteTask.java | 111 --------- .../kafka/log/s3/{Wal.java => Storage.java} | 7 +- .../kafka/log/s3/StreamRecordBatchCodec.java | 19 ++ .../kafka/log/s3/WALObjectUploadTask.java | 135 +++++++++++ .../scala/kafka/log/s3/WalWriteRequest.java | 7 +- .../scala/kafka/log/s3/cache/LogCache.java | 144 +++++++++++ .../kafka/log/s3/memory/MemoryS3Client.java | 53 ---- .../kafka/log/s3/wal/MemoryWriteAheadLog.java | 57 +++++ .../scala/kafka/log/s3/wal/WriteAheadLog.java | 85 +++++++ .../kafka/log/s3/DefaultS3BlockCacheTest.java | 12 +- .../java/kafka/log/s3/ObjectWriterTest.java | 8 +- .../s3/{S3WalTest.java => S3StorageTest.java} | 53 ++-- .../java/kafka/log/s3/S3StreamMemoryTest.java | 38 +-- .../test/java/kafka/log/s3/S3StreamTest.java | 13 +- ...Test.java => WALObjectUploadTaskTest.java} | 52 ++-- 23 files changed, 787 insertions(+), 646 deletions(-) create mode 100644 core/src/main/scala/kafka/log/s3/FlatStreamRecordBatch.java delete mode 100644 core/src/main/scala/kafka/log/s3/MinorCompactTask.java create mode 100644 core/src/main/scala/kafka/log/s3/S3Storage.java delete mode 100644 core/src/main/scala/kafka/log/s3/S3Wal.java delete mode 100644 core/src/main/scala/kafka/log/s3/SingleWalObjectWriteTask.java rename core/src/main/scala/kafka/log/s3/{Wal.java => Storage.java} (84%) create mode 100644 core/src/main/scala/kafka/log/s3/WALObjectUploadTask.java create mode 100644 core/src/main/scala/kafka/log/s3/cache/LogCache.java delete mode 100644 core/src/main/scala/kafka/log/s3/memory/MemoryS3Client.java create mode 100644 core/src/main/scala/kafka/log/s3/wal/MemoryWriteAheadLog.java create mode 100644 core/src/main/scala/kafka/log/s3/wal/WriteAheadLog.java rename core/src/test/java/kafka/log/s3/{S3WalTest.java => S3StorageTest.java} (54%) rename core/src/test/java/kafka/log/s3/{MinorCompactTaskTest.java => WALObjectUploadTaskTest.java} (63%) diff --git a/core/src/main/scala/kafka/log/s3/DefaultS3Client.java b/core/src/main/scala/kafka/log/s3/DefaultS3Client.java index 96802bb9cf..1d4369122f 100644 --- a/core/src/main/scala/kafka/log/s3/DefaultS3Client.java +++ b/core/src/main/scala/kafka/log/s3/DefaultS3Client.java @@ -29,6 +29,7 @@ import kafka.log.s3.operator.S3Operator; import kafka.log.s3.streams.ControllerStreamManager; import kafka.log.s3.streams.StreamManager; +import kafka.log.s3.wal.MemoryWriteAheadLog; import kafka.server.BrokerServer; import kafka.server.BrokerToControllerChannelManager; import kafka.server.KafkaConfig; @@ -44,7 +45,7 @@ public class DefaultS3Client implements Client { private final S3Operator operator; - private final Wal wal; + private final Storage storage; private final S3BlockCache blockCache; @@ -64,9 +65,9 @@ public DefaultS3Client(BrokerServer brokerServer, KafkaConfig config, S3Operator this.requestSender = new ControllerRequestSender(channelManager); this.streamManager = new ControllerStreamManager(this.requestSender, config); this.objectManager = new ControllerObjectManager(this.requestSender, this.metadataManager, this.config); - this.wal = new S3Wal(objectManager, operator); this.blockCache = new DefaultS3BlockCache(objectManager, operator); - this.streamClient = new S3StreamClient(this.streamManager, this.wal, this.blockCache, this.objectManager); + this.storage = new S3Storage(new MemoryWriteAheadLog(), objectManager, blockCache, operator); + this.streamClient = new S3StreamClient(this.streamManager, this.storage); this.kvClient = new KVClientImpl(); } diff --git a/core/src/main/scala/kafka/log/s3/FlatStreamRecordBatch.java b/core/src/main/scala/kafka/log/s3/FlatStreamRecordBatch.java new file mode 100644 index 0000000000..3e21a52a6b --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/FlatStreamRecordBatch.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3; + +import io.netty.buffer.ByteBuf; +import kafka.log.s3.model.StreamRecordBatch; + +public class FlatStreamRecordBatch implements Comparable { + public long streamId; + public long epoch; + public long baseOffset; + public int count; + public ByteBuf encodedBuf; + + public static FlatStreamRecordBatch from(StreamRecordBatch streamRecord) { + FlatStreamRecordBatch self = new FlatStreamRecordBatch(); + self.streamId = streamRecord.getStreamId(); + self.epoch = streamRecord.getEpoch(); + self.baseOffset = streamRecord.getBaseOffset(); + self.count = streamRecord.getRecordBatch().count(); + self.encodedBuf = StreamRecordBatchCodec.encode(streamRecord); + return self; + } + + public long lastOffset() { + return baseOffset + count; + } + + public ByteBuf encodedBuf() { + return encodedBuf.duplicate(); + } + + @Override + public int compareTo(FlatStreamRecordBatch o) { + @SuppressWarnings("DuplicatedCode") + int rst = Long.compare(streamId, o.streamId); + if (rst != 0) { + return rst; + } + rst = Long.compare(epoch, o.epoch); + if (rst != 0) { + return rst; + } + return Long.compare(baseOffset, o.baseOffset); + } +} diff --git a/core/src/main/scala/kafka/log/s3/MinorCompactTask.java b/core/src/main/scala/kafka/log/s3/MinorCompactTask.java deleted file mode 100644 index 7250848096..0000000000 --- a/core/src/main/scala/kafka/log/s3/MinorCompactTask.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log.s3; - -import kafka.log.s3.model.StreamRecordBatch; -import kafka.log.s3.objects.CommitCompactObjectRequest; -import kafka.log.s3.objects.ObjectManager; -import kafka.log.s3.objects.ObjectStreamRange; -import kafka.log.s3.objects.StreamObject; -import kafka.log.s3.operator.S3Operator; -import org.apache.kafka.common.utils.ThreadUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; - -class MinorCompactTask implements Runnable { - private static final Logger LOGGER = LoggerFactory.getLogger(MinorCompactTask.class); - private static final long NOOP_TIMESTAMP = -1L; - private final long compactSizeThreshold; - private final long maxCompactInterval; - private final int streamSplitSizeThreshold; - private final BlockingQueue waitingCompactRecords = new LinkedBlockingQueue<>(); - private final AtomicLong waitingCompactRecordsBytesSize = new AtomicLong(); - private volatile long lastCompactTimestamp = NOOP_TIMESTAMP; - private final ObjectManager objectManager; - private final S3Operator s3Operator; - private final ScheduledExecutorService schedule = Executors.newSingleThreadScheduledExecutor( - ThreadUtils.createThreadFactory("minor compact", true)); - - public MinorCompactTask(long compactSizeThreshold, long maxCompactInterval, int streamSplitSizeThreshold, ObjectManager objectManager, S3Operator s3Operator) { - this.compactSizeThreshold = compactSizeThreshold; - this.maxCompactInterval = maxCompactInterval; - this.streamSplitSizeThreshold = streamSplitSizeThreshold; - // TODO: close - schedule.scheduleAtFixedRate(this, 1, 1, TimeUnit.SECONDS); - this.objectManager = objectManager; - this.s3Operator = s3Operator; - } - - public void tryCompact(MinorCompactPart part) { - // TODO: back pressure - if (lastCompactTimestamp == NOOP_TIMESTAMP) { - lastCompactTimestamp = System.currentTimeMillis(); - } - waitingCompactRecords.add(part); - if (waitingCompactRecordsBytesSize.addAndGet(part.size) >= compactSizeThreshold) { - schedule.execute(() -> tryCompact0(false)); - } - } - - @Override - public void run() { - tryCompact0(false); - } - - public void close() { - try { - schedule.submit(() -> tryCompact0(true)).get(); - schedule.shutdown(); - } catch (Throwable e) { - LOGGER.error("minor compact fail", e); - } - } - - private void tryCompact0(boolean force) { - long now = System.currentTimeMillis(); - boolean timeout = lastCompactTimestamp != NOOP_TIMESTAMP && (now - lastCompactTimestamp) >= maxCompactInterval; - boolean sizeExceed = waitingCompactRecordsBytesSize.get() >= compactSizeThreshold; - if (!force && !sizeExceed && !timeout) { - return; - } - try { - List parts = new ArrayList<>(waitingCompactRecords.size()); - - waitingCompactRecords.drainTo(parts); - lastCompactTimestamp = now; - waitingCompactRecordsBytesSize.getAndAdd(-parts.stream().mapToLong(r -> r.size).sum()); - if (parts.isEmpty()) { - return; - } - - CommitCompactObjectRequest compactRequest = new CommitCompactObjectRequest(); - compactRequest.setCompactedObjectIds(parts.stream().map(p -> p.walObjectId).collect(Collectors.toList())); - - long objectId = objectManager.prepareObject(1, TimeUnit.SECONDS.toMillis(30)).get(); - ObjectWriter minorCompactObject = new ObjectWriter(objectId, s3Operator); - - List> streamObjectCfList = new LinkedList<>(); - List> streamRecordsList = sortAndSplit(parts); - for (List streamRecords : streamRecordsList) { - long streamSize = streamRecords.stream().mapToLong(r -> r.getRecordBatch().rawPayload().remaining()).sum(); - if (streamSize >= streamSplitSizeThreshold) { - streamObjectCfList.add(writeStreamObject(streamRecords)); - } else { - for (StreamRecordBatch record : streamRecords) { - minorCompactObject.write(record); - } - long streamId = streamRecords.get(0).getStreamId(); - long startOffset = streamRecords.get(0).getBaseOffset(); - long endOffset = streamRecords.get(streamRecords.size() - 1).getLastOffset(); - compactRequest.addStreamRange(new ObjectStreamRange(streamId, -1L, startOffset, endOffset)); - // minor compact object block only contain single stream's data. - minorCompactObject.closeCurrentBlock(); - } - } - minorCompactObject.close().get(); - - compactRequest.setObjectId(objectId); - compactRequest.setObjectSize(minorCompactObject.size()); - - CompletableFuture.allOf(streamObjectCfList.toArray(new CompletableFuture[0])).get(); - for (CompletableFuture cf : streamObjectCfList) { - compactRequest.addStreamObject(cf.get()); - } - - objectManager.commitMinorCompactObject(compactRequest).get(); - } catch (Throwable e) { - //TODO: handle exception, only expect fail when quit. - LOGGER.error("minor compact fail", e); - } - - } - - private CompletableFuture writeStreamObject(List streamRecords) { - CompletableFuture objectIdCf = objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(30)); - return objectIdCf.thenCompose(objectId -> { - ObjectWriter streamObjectWriter = new ObjectWriter(objectId, s3Operator); - for (StreamRecordBatch record : streamRecords) { - streamObjectWriter.write(record); - } - long streamId = streamRecords.get(0).getStreamId(); - long startOffset = streamRecords.get(0).getBaseOffset(); - long endOffset = streamRecords.get(streamRecords.size() - 1).getLastOffset(); - StreamObject streamObject = new StreamObject(); - streamObject.setObjectId(objectId); - streamObject.setStreamId(streamId); - streamObject.setStartOffset(startOffset); - streamObject.setEndOffset(endOffset); - return streamObjectWriter.close().thenApply(nil -> { - streamObject.setObjectSize(streamObjectWriter.size()); - return streamObject; - }); - }); - } - - /** - * Sort records and split them in (stream, epoch) dimension. - * ex. - * part0: s1-e0-m1 s1-e0-m2 s2-e0-m1 s2-e0-m2 - * part1: s1-e0-m3 s1-e0-m4 - * part2: s1-e1-m10 s1-e1-m11 - * after split: - * list0: s1-e0-m1 s1-e0-m2 s1-e0-m3 s1-e0-m4 - * list1: s1-e1-m10 s1-e1-m11 - * list2: s2-e0-m1 s2-e0-m3 - */ - private List> sortAndSplit(List parts) { - int count = parts.stream().mapToInt(p -> p.records.size()).sum(); - // TODO: more efficient sort - List sortedList = new ArrayList<>(count); - for (MinorCompactPart part : parts) { - sortedList.addAll(part.records); - } - Collections.sort(sortedList); - List> streamRecordsList = new ArrayList<>(1024); - long streamId = -1L; - long epoch = -1L; - List streamRecords = null; - for (StreamRecordBatch record : sortedList) { - long recordStreamId = record.getStreamId(); - long recordEpoch = record.getEpoch(); - if (recordStreamId != streamId || recordEpoch != epoch) { - if (streamRecords != null) { - streamRecordsList.add(streamRecords); - } - streamRecords = new LinkedList<>(); - streamId = recordStreamId; - epoch = recordEpoch; - } - if (streamRecords != null) { - streamRecords.add(record); - } - } - if (streamRecords != null) { - streamRecordsList.add(streamRecords); - } - return streamRecordsList; - } - - - static class MinorCompactPart { - long walObjectId; - List records; - long size; - - public MinorCompactPart(long walObjectId, List records) { - this.walObjectId = walObjectId; - this.records = new ArrayList<>(records); - this.size = records.stream().mapToLong(r -> r.getRecordBatch().rawPayload().remaining()).sum(); - } - } -} diff --git a/core/src/main/scala/kafka/log/s3/ObjectWriter.java b/core/src/main/scala/kafka/log/s3/ObjectWriter.java index 81b886d6a3..f17c479869 100644 --- a/core/src/main/scala/kafka/log/s3/ObjectWriter.java +++ b/core/src/main/scala/kafka/log/s3/ObjectWriter.java @@ -20,7 +20,6 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; -import kafka.log.s3.model.StreamRecordBatch; import kafka.log.s3.objects.ObjectStreamRange; import kafka.log.s3.operator.S3Operator; import kafka.log.s3.operator.Writer; @@ -63,7 +62,7 @@ public ObjectWriter(long objectId, S3Operator s3Operator) { this(objectId, s3Operator, 16 * 1024 * 1024, 32 * 1024 * 1024); } - public void write(StreamRecordBatch record) { + public void write(FlatStreamRecordBatch record) { if (dataBlock == null) { dataBlock = new DataBlock(nextDataBlockPosition); } @@ -159,7 +158,7 @@ public DataBlock(long position) { streamRanges = new LinkedList<>(); } - public boolean write(StreamRecordBatch record) { + public boolean write(FlatStreamRecordBatch record) { try { recordCount++; return write0(record); @@ -169,17 +168,17 @@ public boolean write(StreamRecordBatch record) { } } - public boolean write0(StreamRecordBatch record) throws IOException { - if (streamRange == null || streamRange.getStreamId() != record.getStreamId()) { + public boolean write0(FlatStreamRecordBatch record) throws IOException { + if (streamRange == null || streamRange.getStreamId() != record.streamId) { streamRange = new ObjectStreamRange(); - streamRange.setStreamId(record.getStreamId()); - streamRange.setEpoch(record.getEpoch()); - streamRange.setStartOffset(record.getBaseOffset()); + streamRange.setStreamId(record.streamId); + streamRange.setEpoch(record.epoch); + streamRange.setStartOffset(record.baseOffset); streamRanges.add(streamRange); } - streamRange.setEndOffset(record.getBaseOffset() + record.getRecordBatch().count()); + streamRange.setEndOffset(record.lastOffset()); - ByteBuf recordBuf = StreamRecordBatchCodec.encode(record); + ByteBuf recordBuf = record.encodedBuf(); out.write(recordBuf.array(), recordBuf.arrayOffset(), recordBuf.readableBytes()); recordBuf.release(); blockSize += recordBuf.readableBytes(); diff --git a/core/src/main/scala/kafka/log/s3/S3Storage.java b/core/src/main/scala/kafka/log/s3/S3Storage.java new file mode 100644 index 0000000000..2e2815b04c --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/S3Storage.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3; + +import kafka.log.s3.cache.LogCache; +import kafka.log.s3.cache.ReadDataBlock; +import kafka.log.s3.cache.S3BlockCache; +import kafka.log.s3.model.StreamRecordBatch; +import kafka.log.s3.objects.ObjectManager; +import kafka.log.s3.operator.S3Operator; +import kafka.log.s3.wal.WriteAheadLog; +import org.apache.kafka.common.utils.ThreadUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicLong; + +public class S3Storage implements Storage { + private static final Logger LOGGER = LoggerFactory.getLogger(S3Storage.class); + private final BlockingQueue waitingLogConfirmedRequests; + private final WriteAheadLog log; + private final LogCache logCache; + private final AtomicLong logConfirmPosition = new AtomicLong(); + private final AtomicLong processedLogConfirmPosition = new AtomicLong(); + private final ScheduledExecutorService mainExecutor = Executors.newSingleThreadScheduledExecutor( + ThreadUtils.createThreadFactory("s3-storage-main", false)); + private final ScheduledExecutorService backgroundExecutor = Executors.newSingleThreadScheduledExecutor( + ThreadUtils.createThreadFactory("s3-storage-main", true)); + private final ObjectManager objectManager; + private final S3Operator s3Operator; + private final S3BlockCache blockCache; + + public S3Storage(WriteAheadLog log, ObjectManager objectManager, S3BlockCache blockCache, S3Operator s3Operator) { + this.waitingLogConfirmedRequests = new ArrayBlockingQueue<>(16384); + this.log = log; + this.logCache = new LogCache(512 * 1024 * 1024); + this.objectManager = objectManager; + this.blockCache = blockCache; + this.s3Operator = s3Operator; + } + + @Override + public void close() { + mainExecutor.shutdown(); + backgroundExecutor.shutdown(); + } + + @Override + public CompletableFuture append(StreamRecordBatch streamRecord) { + //TODO: copy to pooled bytebuffer to reduce gc, convert to flat record + FlatStreamRecordBatch flatStreamRecordBatch = FlatStreamRecordBatch.from(streamRecord); + WriteAheadLog.AppendResult appendResult = log.append(flatStreamRecordBatch.encodedBuf.duplicate()); + CompletableFuture cf = new CompletableFuture<>(); + WalWriteRequest writeRequest = new WalWriteRequest(flatStreamRecordBatch, appendResult.endPosition, cf); + try { + waitingLogConfirmedRequests.put(writeRequest); + } catch (InterruptedException e) { + cf.completeExceptionally(e); + } + appendResult.future.thenAccept(nil -> { + // TODO: callback is out of order, we need reorder ack in stream dimension. + // TODO: cache end offset update should consider log hollow. + logConfirmPosition.getAndUpdate(operand -> Math.max(operand, appendResult.endPosition)); + putToCache(writeRequest); + tryCallback(); + }); + // log#append may success before add writeRequest to waitingLogConfirmedRequests, so we need tryCallback here. + tryCallback(); + return cf; + } + + @Override + public CompletableFuture read(long streamId, long startOffset, long endOffset, int maxBytes) { + List records = logCache.get(streamId, startOffset, endOffset, maxBytes); + if (!records.isEmpty()) { + return CompletableFuture.completedFuture(new ReadDataBlock(StreamRecordBatchCodec.decode(records))); + } + return blockCache.read(streamId, startOffset, endOffset, maxBytes).thenApply(readDataBlock -> { + long nextStartOffset = readDataBlock.endOffset().orElse(startOffset); + int nextMaxBytes = maxBytes - Math.min(maxBytes, readDataBlock.sizeInBytes()); + if (nextStartOffset >= endOffset || nextMaxBytes == 0) { + return readDataBlock; + } + List finalRecords = new LinkedList<>(readDataBlock.getRecords()); + finalRecords.addAll(StreamRecordBatchCodec.decode(logCache.get(streamId, nextStartOffset, endOffset, maxBytes))); + return new ReadDataBlock(finalRecords); + }); + } + + private void tryCallback() { + if (processedLogConfirmPosition.get() == logConfirmPosition.get()) { + return; + } + mainExecutor.execute(this::tryCallback0); + } + + private void tryCallback0() { + long walConfirmOffset = this.logConfirmPosition.get(); + for (; ; ) { + WalWriteRequest request = waitingLogConfirmedRequests.peek(); + if (request == null) break; + if (request.position <= walConfirmOffset) { + waitingLogConfirmedRequests.poll(); + request.cf.complete(null); + } else { + break; + } + } + processedLogConfirmPosition.set(walConfirmOffset); + } + + private void putToCache(WalWriteRequest request) { + if (logCache.put(request.record, request.position)) { + // cache block is full, trigger WAL object upload. + LogCache.LogCacheBlock logCacheBlock = logCache.archiveCurrentBlock(); + uploadWALObject(logCacheBlock); + } + } + + private void uploadWALObject(LogCache.LogCacheBlock logCacheBlock) { + backgroundExecutor.execute(() -> uploadWALObject0(logCacheBlock)); + } + + private void uploadWALObject0(LogCache.LogCacheBlock logCacheBlock) { + // TODO: pipeline the WAL object upload to accelerate the upload. + try { + WALObjectUploadTask walObjectUploadTask = new WALObjectUploadTask(logCacheBlock.records(), 16 * 1024 * 1024, objectManager, s3Operator); + walObjectUploadTask.prepare().get(); + walObjectUploadTask.upload().get(); + walObjectUploadTask.commit().get(); + log.trim(logCacheBlock.logEndPosition()); + freeCache(logCacheBlock.blockId()); + } catch (Throwable e) { + LOGGER.error("unexpect upload wal object fail", e); + } + } + + private void freeCache(long blockId) { + mainExecutor.execute(() -> logCache.free(blockId)); + } +} diff --git a/core/src/main/scala/kafka/log/s3/S3Stream.java b/core/src/main/scala/kafka/log/s3/S3Stream.java index e0304ae21d..9f06de849f 100644 --- a/core/src/main/scala/kafka/log/s3/S3Stream.java +++ b/core/src/main/scala/kafka/log/s3/S3Stream.java @@ -27,7 +27,6 @@ import com.automq.elasticstream.client.flatc.header.ErrorCode; import kafka.log.es.FutureUtil; import kafka.log.es.RecordBatchWithContextWrapper; -import kafka.log.s3.cache.S3BlockCache; import kafka.log.s3.model.StreamRecordBatch; import kafka.log.s3.streams.StreamManager; import org.slf4j.Logger; @@ -47,12 +46,11 @@ public class S3Stream implements Stream { private long startOffset; final AtomicLong confirmOffset; private final AtomicLong nextOffset; - private final Wal wal; - private final S3BlockCache blockCache; + private final Storage storage; private final StreamManager streamManager; private final Status status; - public S3Stream(long streamId, long epoch, long startOffset, long nextOffset, Wal wal, S3BlockCache blockCache, StreamManager streamManager) { + public S3Stream(long streamId, long epoch, long startOffset, long nextOffset, Storage storage, StreamManager streamManager) { this.streamId = streamId; this.epoch = epoch; this.startOffset = startOffset; @@ -60,8 +58,7 @@ public S3Stream(long streamId, long epoch, long startOffset, long nextOffset, Wa this.nextOffset = new AtomicLong(nextOffset); this.confirmOffset = new AtomicLong(nextOffset); this.status = new Status(); - this.wal = wal; - this.blockCache = blockCache; + this.storage = storage; this.streamManager = streamManager; } @@ -88,7 +85,7 @@ public CompletableFuture append(RecordBatch recordBatch) { } long offset = nextOffset.getAndAdd(recordBatch.count()); StreamRecordBatch streamRecordBatch = new StreamRecordBatch(streamId, epoch, offset, recordBatch); - CompletableFuture cf = wal.append(streamRecordBatch).thenApply(nil -> { + CompletableFuture cf = storage.append(streamRecordBatch).thenApply(nil -> { updateConfirmOffset(offset + recordBatch.count()); return new DefaultAppendResult(offset); }); @@ -119,7 +116,7 @@ public CompletableFuture fetch(long startOffset, long endOffset, in String.format("fetch range[%s, %s) is out of stream bound [%s, %s)", startOffset, endOffset, startOffset(), confirmOffset) )); } - return blockCache.read(streamId, startOffset, endOffset, maxBytes).thenApply(dataBlock -> { + return storage.read(streamId, startOffset, endOffset, maxBytes).thenApply(dataBlock -> { List records = dataBlock.getRecords().stream().map(r -> new RecordBatchWithContextWrapper(r.getRecordBatch(), r.getBaseOffset())).collect(Collectors.toList()); return new DefaultFetchResult(records); }); @@ -138,6 +135,7 @@ public CompletableFuture trim(long newStartOffset) { @Override public CompletableFuture close() { status.markClosed(); + // TODO: force storage upload the stream data to s3 return streamManager.closeStream(streamId, epoch); } diff --git a/core/src/main/scala/kafka/log/s3/S3StreamClient.java b/core/src/main/scala/kafka/log/s3/S3StreamClient.java index bdf58baffa..5409d3e67e 100644 --- a/core/src/main/scala/kafka/log/s3/S3StreamClient.java +++ b/core/src/main/scala/kafka/log/s3/S3StreamClient.java @@ -21,8 +21,6 @@ import com.automq.elasticstream.client.api.OpenStreamOptions; import com.automq.elasticstream.client.api.Stream; import com.automq.elasticstream.client.api.StreamClient; -import kafka.log.s3.cache.S3BlockCache; -import kafka.log.s3.objects.ObjectManager; import kafka.log.s3.streams.StreamManager; import java.util.concurrent.CompletableFuture; @@ -30,15 +28,11 @@ public class S3StreamClient implements StreamClient { private final StreamManager streamManager; - private final Wal wal; - private final S3BlockCache blockCache; - private final ObjectManager objectManager; + private final Storage storage; - public S3StreamClient(StreamManager streamManager, Wal wal, S3BlockCache blockCache, ObjectManager objectManager) { + public S3StreamClient(StreamManager streamManager, Storage storage) { this.streamManager = streamManager; - this.wal = wal; - this.blockCache = blockCache; - this.objectManager = objectManager; + this.storage = storage; } @Override @@ -56,6 +50,6 @@ private CompletableFuture openStream0(long streamId, long epoch) { thenApply(metadata -> new S3Stream( metadata.getStreamId(), metadata.getEpoch(), metadata.getStartOffset(), metadata.getNextOffset(), - wal, blockCache, streamManager)); + storage, streamManager)); } } diff --git a/core/src/main/scala/kafka/log/s3/S3Wal.java b/core/src/main/scala/kafka/log/s3/S3Wal.java deleted file mode 100644 index cf8947f786..0000000000 --- a/core/src/main/scala/kafka/log/s3/S3Wal.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log.s3; - -import kafka.log.s3.model.StreamRecordBatch; -import kafka.log.s3.objects.ObjectManager; -import kafka.log.s3.operator.S3Operator; -import org.apache.kafka.common.utils.ThreadUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -public class S3Wal implements Wal { - private static final Logger LOGGER = LoggerFactory.getLogger(S3Wal.class); - private final int batchIntervalMs = 200; - private final BlockingQueue writeBuffer; - private final WalBatchWriteTask walBatchWriteTask; - private final MinorCompactTask minorCompactTask; - - - public S3Wal(ObjectManager objectManager, S3Operator s3Operator) { - writeBuffer = new ArrayBlockingQueue<>(16384); - walBatchWriteTask = new WalBatchWriteTask(objectManager, s3Operator); - minorCompactTask = new MinorCompactTask(5L * 1024 * 1024 * 1024, 60, 16 * 1024 * 1024, objectManager, s3Operator); - } - - @Override - public void close() { - walBatchWriteTask.close(); - } - - @Override - public CompletableFuture append(StreamRecordBatch streamRecord) { - CompletableFuture cf = new CompletableFuture<>(); - //TODO: copy to pooled bytebuffer to reduce gc, convert to flat record - try { - writeBuffer.put(new WalWriteRequest(streamRecord, cf)); - } catch (InterruptedException e) { - cf.completeExceptionally(e); - } - return cf; - } - - class WalBatchWriteTask implements Runnable { - private final ScheduledExecutorService schedule = Executors.newSingleThreadScheduledExecutor( - ThreadUtils.createThreadFactory("wal-batch-write", true)); - private final Queue writeTasks = new ConcurrentLinkedQueue<>(); - private final ObjectManager objectManager; - private final S3Operator s3Operator; - - public WalBatchWriteTask(ObjectManager objectManager, S3Operator s3Operator) { - this.objectManager = objectManager; - this.s3Operator = s3Operator; - schedule.scheduleAtFixedRate(this, batchIntervalMs, batchIntervalMs, TimeUnit.MILLISECONDS); - } - - public void close() { - schedule.shutdown(); - run(); - } - - @Override - public void run() { - try { - run0(); - } catch (Throwable e) { - LOGGER.error("Error in wal batch write task", e); - } - } - - void run0() { - List requests = new ArrayList<>(writeBuffer.size()); - writeBuffer.drainTo(requests); - if (requests.isEmpty()) { - return; - } - SingleWalObjectWriteTask singleWalObjectWriteTask = new SingleWalObjectWriteTask(requests, objectManager, s3Operator); - writeTasks.offer(singleWalObjectWriteTask); - runWriteTask(singleWalObjectWriteTask); - } - - void runWriteTask(SingleWalObjectWriteTask task) { - task.upload().thenAccept(nil -> schedule.execute(this::tryComplete)) - .exceptionally(ex -> { - LOGGER.warn("Write wal object fail, retry later", ex); - schedule.schedule(() -> runWriteTask(task), batchIntervalMs, TimeUnit.MILLISECONDS); - return null; - }); - } - - void tryComplete() { - while (true) { - SingleWalObjectWriteTask task = writeTasks.peek(); - if (task == null) { - return; - } - if (task.isDone()) { - writeTasks.poll(); - task.ack(); - minorCompactTask.tryCompact(new MinorCompactTask.MinorCompactPart(task.objectId(), task.records())); - } else { - return; - } - } - } - } -} diff --git a/core/src/main/scala/kafka/log/s3/SingleWalObjectWriteTask.java b/core/src/main/scala/kafka/log/s3/SingleWalObjectWriteTask.java deleted file mode 100644 index c2406b9a01..0000000000 --- a/core/src/main/scala/kafka/log/s3/SingleWalObjectWriteTask.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log.s3; - -import com.automq.elasticstream.client.api.ElasticStreamClientException; -import com.automq.elasticstream.client.flatc.header.ErrorCode; -import kafka.log.s3.model.StreamRecordBatch; -import kafka.log.s3.objects.CommitWalObjectRequest; -import kafka.log.s3.objects.CommitWalObjectResponse; -import kafka.log.s3.objects.ObjectManager; -import kafka.log.s3.operator.S3Operator; - -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -public class SingleWalObjectWriteTask { - private final List requests; - private final ObjectManager objectManager; - private final S3Operator s3Operator; - private ObjectWriter objectWriter; - private CommitWalObjectResponse response; - private volatile boolean isDone = false; - - - public SingleWalObjectWriteTask(List records, ObjectManager objectManager, S3Operator s3Operator) { - Collections.sort(records); - this.requests = records; - this.objectManager = objectManager; - this.s3Operator = s3Operator; - } - - public CompletableFuture upload() { - if (isDone) { - return CompletableFuture.completedFuture(null); - } - // TODO: partial retry - UploadContext context = new UploadContext(); - CompletableFuture objectIdCf = objectManager.prepareObject(1, TimeUnit.SECONDS.toMillis(30)); - CompletableFuture writeCf = objectIdCf.thenCompose(objectId -> { - context.objectId = objectId; - // TODO: fill cluster name - objectWriter = new ObjectWriter(objectId, s3Operator); - for (WalWriteRequest request : requests) { - objectWriter.write(request.record); - } - return objectWriter.close(); - }); - return writeCf - .thenCompose(nil -> { - CommitWalObjectRequest request = new CommitWalObjectRequest(); - request.setObjectId(context.objectId); - request.setObjectSize(objectWriter.size()); - request.setStreamRanges(objectWriter.getStreamRanges()); - return objectManager.commitWalObject(request); - }) - .thenApply(resp -> { - isDone = true; - response = resp; - return null; - }); - } - - public boolean isDone() { - return isDone; - } - - public void ack() { - Set failedStreamId = new HashSet<>(response.getFailedStreamIds()); - for (WalWriteRequest request : requests) { - long streamId = request.record.getStreamId(); - if (failedStreamId.contains(streamId)) { - request.cf.completeExceptionally(new ElasticStreamClientException(ErrorCode.EXPIRED_STREAM_EPOCH, "Stream " + streamId + " epoch expired")); - } else { - request.cf.complete(null); - } - } - } - - public long objectId() { - return objectWriter.objectId(); - } - - public List records() { - return requests.stream().map(r -> r.record).collect(Collectors.toList()); - } - - static class UploadContext { - long objectId; - } - -} diff --git a/core/src/main/scala/kafka/log/s3/Wal.java b/core/src/main/scala/kafka/log/s3/Storage.java similarity index 84% rename from core/src/main/scala/kafka/log/s3/Wal.java rename to core/src/main/scala/kafka/log/s3/Storage.java index 1ae1526bbe..785bbd25a8 100644 --- a/core/src/main/scala/kafka/log/s3/Wal.java +++ b/core/src/main/scala/kafka/log/s3/Storage.java @@ -17,6 +17,7 @@ package kafka.log.s3; +import kafka.log.s3.cache.ReadDataBlock; import kafka.log.s3.model.StreamRecordBatch; import java.util.concurrent.CompletableFuture; @@ -24,14 +25,16 @@ /** * Write ahead log for server. */ -public interface Wal { +public interface Storage { /** - * Append stream record to wal. + * Append stream record. * * @param streamRecord {@link StreamRecordBatch} */ CompletableFuture append(StreamRecordBatch streamRecord); + CompletableFuture read(long streamId, long startOffset, long endOffset, int maxBytes); + void close(); } diff --git a/core/src/main/scala/kafka/log/s3/StreamRecordBatchCodec.java b/core/src/main/scala/kafka/log/s3/StreamRecordBatchCodec.java index 997bf2e2db..06c78098fc 100644 --- a/core/src/main/scala/kafka/log/s3/StreamRecordBatchCodec.java +++ b/core/src/main/scala/kafka/log/s3/StreamRecordBatchCodec.java @@ -26,6 +26,8 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; public class StreamRecordBatchCodec { private static final byte MAGIC_V0 = 0x22; @@ -71,4 +73,21 @@ public static StreamRecordBatch decode(DataInputStream in) { throw new RuntimeException(e); } } + + public static StreamRecordBatch decode(ByteBuf buf) { + buf.readByte(); // magic + long streamId = buf.readLong(); + long epoch = buf.readLong(); + long baseOffset = buf.readLong(); + int lastOffsetDelta = buf.readInt(); + int payloadLength = buf.readInt(); + ByteBuffer payload = buf.slice(buf.readerIndex(), payloadLength).nioBuffer(); + buf.skipBytes(payloadLength); + DefaultRecordBatch defaultRecordBatch = new DefaultRecordBatch(lastOffsetDelta, 0, Collections.emptyMap(), payload); + return new StreamRecordBatch(streamId, epoch, baseOffset, defaultRecordBatch); + } + + public static List decode(List records) { + return records.stream().map(r -> decode(r.encodedBuf())).collect(Collectors.toList()); + } } diff --git a/core/src/main/scala/kafka/log/s3/WALObjectUploadTask.java b/core/src/main/scala/kafka/log/s3/WALObjectUploadTask.java new file mode 100644 index 0000000000..d0aa0f53f8 --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/WALObjectUploadTask.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3; + +import kafka.log.s3.objects.CommitCompactObjectRequest; +import kafka.log.s3.objects.ObjectManager; +import kafka.log.s3.objects.ObjectStreamRange; +import kafka.log.s3.objects.StreamObject; +import kafka.log.s3.operator.S3Operator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +public class WALObjectUploadTask { + private static final Logger LOGGER = LoggerFactory.getLogger(WALObjectUploadTask.class); + private final Map> streamRecordsMap; + private final int streamSplitSizeThreshold; + private final ObjectManager objectManager; + private final S3Operator s3Operator; + private final CompletableFuture prepareCf = new CompletableFuture<>(); + private final CompletableFuture uploadCf = new CompletableFuture<>(); + + public WALObjectUploadTask(Map> streamRecordsMap, int streamSplitSizeThreshold, ObjectManager objectManager, S3Operator s3Operator) { + this.streamRecordsMap = streamRecordsMap; + this.streamSplitSizeThreshold = streamSplitSizeThreshold; + this.objectManager = objectManager; + this.s3Operator = s3Operator; + } + + public CompletableFuture prepare() { + objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(30)).thenAccept(prepareCf::complete).exceptionally(ex -> { + prepareCf.completeExceptionally(ex); + return null; + }); + // TODO: retry when fail or prepareObject inner retry + return prepareCf; + } + + public CompletableFuture upload() { + prepareCf.thenAccept(objectId -> { + List streamIds = new ArrayList<>(streamRecordsMap.keySet()); + Collections.sort(streamIds); + CommitCompactObjectRequest compactRequest = new CommitCompactObjectRequest(); + compactRequest.setCompactedObjectIds(Collections.emptyList()); + + ObjectWriter minorCompactObject = new ObjectWriter(objectId, s3Operator); + + List> streamObjectCfList = new LinkedList<>(); + + for (Long streamId : streamIds) { + List streamRecords = streamRecordsMap.get(streamId); + long streamSize = streamRecords.stream().mapToLong(r -> r.encodedBuf.readableBytes()).sum(); + if (streamSize >= streamSplitSizeThreshold) { + streamObjectCfList.add(writeStreamObject(streamRecords)); + } else { + for (FlatStreamRecordBatch record : streamRecords) { + minorCompactObject.write(record); + } + long startOffset = streamRecords.get(0).baseOffset; + long endOffset = streamRecords.get(streamRecords.size() - 1).lastOffset(); + compactRequest.addStreamRange(new ObjectStreamRange(streamId, -1L, startOffset, endOffset)); + // minor compact object block only contain single stream's data. + minorCompactObject.closeCurrentBlock(); + } + } + compactRequest.setObjectId(objectId); + CompletableFuture minorCompactObjectCf = minorCompactObject.close().thenAccept(nil -> { + compactRequest.setObjectSize(minorCompactObject.size()); + }); + compactRequest.setObjectSize(minorCompactObject.size()); + for (CompletableFuture streamObjectCf : streamObjectCfList) { + streamObjectCf.thenAccept(compactRequest::addStreamObject); + } + List> allCf = new LinkedList<>(streamObjectCfList); + allCf.add(minorCompactObjectCf); + + CompletableFuture.allOf(allCf.toArray(new CompletableFuture[0])).thenAccept(nil -> { + uploadCf.complete(compactRequest); + }).exceptionally(ex -> { + uploadCf.completeExceptionally(ex); + return null; + }); + }); + return uploadCf; + } + + public CompletableFuture commit() { + return uploadCf.thenCompose(objectManager::commitMinorCompactObject); + } + + private CompletableFuture writeStreamObject(List streamRecords) { + CompletableFuture objectIdCf = objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(30)); + // TODO: retry until success + return objectIdCf.thenCompose(objectId -> { + ObjectWriter streamObjectWriter = new ObjectWriter(objectId, s3Operator); + for (FlatStreamRecordBatch record : streamRecords) { + streamObjectWriter.write(record); + } + long streamId = streamRecords.get(0).streamId; + long startOffset = streamRecords.get(0).baseOffset; + long endOffset = streamRecords.get(streamRecords.size() - 1).lastOffset(); + StreamObject streamObject = new StreamObject(); + streamObject.setObjectId(objectId); + streamObject.setStreamId(streamId); + streamObject.setStartOffset(startOffset); + streamObject.setEndOffset(endOffset); + return streamObjectWriter.close().thenApply(nil -> { + streamObject.setObjectSize(streamObjectWriter.size()); + return streamObject; + }); + }); + } +} diff --git a/core/src/main/scala/kafka/log/s3/WalWriteRequest.java b/core/src/main/scala/kafka/log/s3/WalWriteRequest.java index c6c8628249..de6aeb581b 100644 --- a/core/src/main/scala/kafka/log/s3/WalWriteRequest.java +++ b/core/src/main/scala/kafka/log/s3/WalWriteRequest.java @@ -17,16 +17,17 @@ package kafka.log.s3; -import kafka.log.s3.model.StreamRecordBatch; import java.util.concurrent.CompletableFuture; public class WalWriteRequest implements Comparable { - final StreamRecordBatch record; + final FlatStreamRecordBatch record; + final long position; final CompletableFuture cf; - public WalWriteRequest(StreamRecordBatch record, CompletableFuture cf) { + public WalWriteRequest(FlatStreamRecordBatch record, long position, CompletableFuture cf) { this.record = record; + this.position = position; this.cf = cf; } diff --git a/core/src/main/scala/kafka/log/s3/cache/LogCache.java b/core/src/main/scala/kafka/log/s3/cache/LogCache.java new file mode 100644 index 0000000000..a8d8612112 --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/cache/LogCache.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.cache; + +import kafka.log.s3.FlatStreamRecordBatch; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +public class LogCache { + private final int cacheBlockMaxSize; + private final List archiveBlocks = new ArrayList<>(); + private LogCacheBlock activeBlock; + + public LogCache(int cacheBlockMaxSize) { + this.cacheBlockMaxSize = cacheBlockMaxSize; + this.activeBlock = new LogCacheBlock(cacheBlockMaxSize); + } + + public boolean put(FlatStreamRecordBatch recordBatch, long endPosition) { + return activeBlock.put(recordBatch, endPosition); + } + + /** + * Get streamId [startOffset, endOffset) range records with maxBytes limit. + * If the cache only contain records after startOffset, the return list is empty. + */ + public List get(long streamId, long startOffset, long endOffset, int maxBytes) { + List rst = new LinkedList<>(); + long nextStartOffset = startOffset; + int nextMaxBytes = maxBytes; + for (LogCacheBlock archiveBlock : archiveBlocks) { + // TODO: fast break when cache doesn't contains the startOffset. + List records = archiveBlock.get(streamId, nextStartOffset, endOffset, nextMaxBytes); + if (records.isEmpty()) { + continue; + } + nextStartOffset = records.get(records.size() - 1).lastOffset(); + nextMaxBytes -= Math.min(nextMaxBytes, records.stream().mapToInt(r -> r.encodedBuf().readableBytes()).sum()); + rst.addAll(records); + if (nextStartOffset >= endOffset || nextMaxBytes == 0) { + return rst; + } + } + List records = activeBlock.get(streamId, nextStartOffset, endOffset, nextMaxBytes); + rst.addAll(records); + return rst; + } + + public LogCacheBlock archiveCurrentBlock() { + LogCacheBlock block = activeBlock; + archiveBlocks.add(block); + activeBlock = new LogCacheBlock(cacheBlockMaxSize); + return block; + } + + public void free(long blockId) { + archiveBlocks.removeIf(b -> b.blockId == blockId); + } + + public static class LogCacheBlock { + private static final AtomicLong BLOCK_ID_ALLOC = new AtomicLong(); + private final long blockId; + private final int maxSize; + private final Map> map = new HashMap<>(); + private int size = 0; + private long logEndPosition; + + public LogCacheBlock(int maxSize) { + this.blockId = BLOCK_ID_ALLOC.getAndIncrement(); + this.maxSize = maxSize; + } + + public long blockId() { + return blockId; + } + + public boolean put(FlatStreamRecordBatch recordBatch, long endPosition) { + List streamCache = map.computeIfAbsent(recordBatch.streamId, id -> new ArrayList<>()); + streamCache.add(recordBatch); + int recordSize = recordBatch.encodedBuf.readableBytes(); + size += recordSize; + logEndPosition = endPosition; + return size >= maxSize; + } + + public List get(long streamId, long startOffset, long endOffset, int maxBytes) { + List streamRecords = map.get(streamId); + if (streamRecords == null) { + return Collections.emptyList(); + } + if (streamRecords.get(0).baseOffset > startOffset || streamRecords.get(streamRecords.size() - 1).lastOffset() <= startOffset) { + return Collections.emptyList(); + } + int startIndex = -1; + int endIndex = -1; + int remainingBytesSize = maxBytes; + // TODO: binary search the startOffset. + for (int i = 0; i < streamRecords.size(); i++) { + FlatStreamRecordBatch record = streamRecords.get(i); + if (startIndex == -1 && record.baseOffset <= startOffset && record.lastOffset() > startOffset) { + startIndex = i; + } + if (startIndex != -1) { + endIndex = i + 1; + remainingBytesSize -= Math.min(remainingBytesSize, record.encodedBuf().readableBytes()); + if (record.lastOffset() >= endOffset || remainingBytesSize == 0) { + break; + } + } + } + return streamRecords.subList(startIndex, endIndex); + } + + public Map> records() { + return map; + } + + public long logEndPosition() { + return logEndPosition; + } + + } +} diff --git a/core/src/main/scala/kafka/log/s3/memory/MemoryS3Client.java b/core/src/main/scala/kafka/log/s3/memory/MemoryS3Client.java deleted file mode 100644 index cc6358e2b2..0000000000 --- a/core/src/main/scala/kafka/log/s3/memory/MemoryS3Client.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log.s3.memory; - -import com.automq.elasticstream.client.api.Client; -import com.automq.elasticstream.client.api.KVClient; -import com.automq.elasticstream.client.api.StreamClient; -import kafka.log.es.MemoryClient; -import kafka.log.s3.S3StreamClient; -import kafka.log.s3.S3Wal; -import kafka.log.s3.Wal; -import kafka.log.s3.cache.DefaultS3BlockCache; -import kafka.log.s3.cache.S3BlockCache; -import kafka.log.s3.operator.S3Operator; - -public class MemoryS3Client implements Client { - private final StreamClient streamClient; - private final KVClient kvClient; - - public MemoryS3Client(S3Operator s3Operator) { - MemoryMetadataManager manager = new MemoryMetadataManager(); - manager.start(); - Wal wal = new S3Wal(manager, s3Operator); - S3BlockCache blockCache = new DefaultS3BlockCache(manager, s3Operator); - this.streamClient = new S3StreamClient(manager, wal, blockCache, manager); - this.kvClient = new MemoryClient.KVClientImpl(); - } - - @Override - public StreamClient streamClient() { - return streamClient; - } - - @Override - public KVClient kvClient() { - return kvClient; - } -} diff --git a/core/src/main/scala/kafka/log/s3/wal/MemoryWriteAheadLog.java b/core/src/main/scala/kafka/log/s3/wal/MemoryWriteAheadLog.java new file mode 100644 index 0000000000..cd302ed70a --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/wal/MemoryWriteAheadLog.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.wal; + +import io.netty.buffer.ByteBuf; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; + +public class MemoryWriteAheadLog implements WriteAheadLog { + private final AtomicLong offsetAlloc = new AtomicLong(); + + @Override + public long startPosition() { + return 0; + } + + @Override + public long endPosition() { + return 0; + } + + @Override + public List read() { + return Collections.emptyList(); + } + + @Override + public AppendResult append(ByteBuf data) { + AppendResult appendResult = new AppendResult(); + appendResult.endPosition = offsetAlloc.getAndIncrement(); + appendResult.future = CompletableFuture.completedFuture(null); + return appendResult; + } + + @Override + public void trim(long position) { + + } +} diff --git a/core/src/main/scala/kafka/log/s3/wal/WriteAheadLog.java b/core/src/main/scala/kafka/log/s3/wal/WriteAheadLog.java new file mode 100644 index 0000000000..bf1fa9c261 --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/wal/WriteAheadLog.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.wal; + + +import io.netty.buffer.ByteBuf; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +public interface WriteAheadLog { + + /** + * Get log start position. + * @return start position. + */ + long startPosition(); + + /** + * Get log end position. + * @return end position. + */ + long endPosition(); + + /** + * Read data from log. + * @return list of {@link WalRecord}. + */ + List read(); + + /** + * Append data to log, note append may be out of order. + * ex. when sequence append R1 R2 , R2 maybe complete before R1. + * + * @return The data position will be written. + */ + AppendResult append(ByteBuf data); + + /** + * Trim log to new start position. + * + * @param position new start position. + */ + void trim(long position); + + + class WalRecord { + private long endPosition; + private ByteBuf data; + + public WalRecord(long endPosition, ByteBuf data) { + this.endPosition = endPosition; + this.data = data; + } + + public long endPosition() { + return endPosition; + } + + public ByteBuf data() { + return data; + } + } + + class AppendResult { + public long endPosition; + public CompletableFuture future; + } + +} diff --git a/core/src/test/java/kafka/log/s3/DefaultS3BlockCacheTest.java b/core/src/test/java/kafka/log/s3/DefaultS3BlockCacheTest.java index c2bc85bc56..38069dead6 100644 --- a/core/src/test/java/kafka/log/s3/DefaultS3BlockCacheTest.java +++ b/core/src/test/java/kafka/log/s3/DefaultS3BlockCacheTest.java @@ -54,20 +54,20 @@ public void setup() { @Test public void testRead() throws Exception { ObjectWriter objectWriter = new ObjectWriter(0, s3Operator, 1024, 1024); - objectWriter.write(newRecord(233, 10, 5, 512)); - objectWriter.write(newRecord(233, 15, 10, 512)); - objectWriter.write(newRecord(233, 25, 5, 512)); - objectWriter.write(newRecord(234, 0, 5, 512)); + objectWriter.write(FlatStreamRecordBatch.from(newRecord(233, 10, 5, 512))); + objectWriter.write(FlatStreamRecordBatch.from(newRecord(233, 15, 10, 512))); + objectWriter.write(FlatStreamRecordBatch.from(newRecord(233, 25, 5, 512))); + objectWriter.write(FlatStreamRecordBatch.from(newRecord(234, 0, 5, 512))); objectWriter.close(); S3ObjectMetadata metadata1 = new S3ObjectMetadata(0, objectWriter.size(), S3ObjectType.WAL_LOOSE); objectWriter = new ObjectWriter(1, s3Operator, 1024, 1024); - objectWriter.write(newRecord(233, 30, 10, 512)); + objectWriter.write(FlatStreamRecordBatch.from(newRecord(233, 30, 10, 512))); objectWriter.close(); S3ObjectMetadata metadata2 = new S3ObjectMetadata(1, objectWriter.size(), S3ObjectType.WAL_LOOSE); objectWriter = new ObjectWriter(2, s3Operator, 1024, 1024); - objectWriter.write(newRecord(233, 40, 20, 512)); + objectWriter.write(FlatStreamRecordBatch.from(newRecord(233, 40, 20, 512))); objectWriter.close(); S3ObjectMetadata metadata3 = new S3ObjectMetadata(2, objectWriter.size(), S3ObjectType.WAL_LOOSE); diff --git a/core/src/test/java/kafka/log/s3/ObjectWriterTest.java b/core/src/test/java/kafka/log/s3/ObjectWriterTest.java index fab21084a9..accdb53651 100644 --- a/core/src/test/java/kafka/log/s3/ObjectWriterTest.java +++ b/core/src/test/java/kafka/log/s3/ObjectWriterTest.java @@ -44,13 +44,13 @@ public void testWrite() throws ExecutionException, InterruptedException { S3Operator s3Operator = new MemoryS3Operator(); ObjectWriter objectWriter = new ObjectWriter(1, s3Operator, 1024, 1024); StreamRecordBatch r1 = newRecord(233, 10, 5, 512); - objectWriter.write(r1); + objectWriter.write(FlatStreamRecordBatch.from(r1)); StreamRecordBatch r2 = newRecord(233, 15, 10, 512); - objectWriter.write(r2); + objectWriter.write(FlatStreamRecordBatch.from(r2)); StreamRecordBatch r3 = newRecord(233, 25, 5, 512); - objectWriter.write(r3); + objectWriter.write(FlatStreamRecordBatch.from(r3)); StreamRecordBatch r4 = newRecord(234, 0, 5, 512); - objectWriter.write(r4); + objectWriter.write(FlatStreamRecordBatch.from(r4)); objectWriter.close().get(); List streamRanges = objectWriter.getStreamRanges(); diff --git a/core/src/test/java/kafka/log/s3/S3WalTest.java b/core/src/test/java/kafka/log/s3/S3StorageTest.java similarity index 54% rename from core/src/test/java/kafka/log/s3/S3WalTest.java rename to core/src/test/java/kafka/log/s3/S3StorageTest.java index 648566237e..dd71a70bb0 100644 --- a/core/src/test/java/kafka/log/s3/S3WalTest.java +++ b/core/src/test/java/kafka/log/s3/S3StorageTest.java @@ -17,18 +17,18 @@ package kafka.log.s3; +import kafka.log.s3.cache.DefaultS3BlockCache; +import kafka.log.s3.cache.ReadDataBlock; import kafka.log.s3.model.StreamRecordBatch; -import kafka.log.s3.objects.CommitWalObjectRequest; import kafka.log.s3.objects.CommitWalObjectResponse; import kafka.log.s3.objects.ObjectManager; -import kafka.log.s3.objects.ObjectStreamRange; import kafka.log.s3.operator.MemoryS3Operator; +import kafka.log.s3.operator.S3Operator; +import kafka.log.s3.wal.MemoryWriteAheadLog; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; -import org.mockito.ArgumentCaptor; -import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -37,18 +37,18 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @Tag("S3Unit") -public class S3WalTest { +public class S3StorageTest { ObjectManager objectManager; - S3Wal s3Wal; + S3Storage storage; @BeforeEach public void setup() { objectManager = mock(ObjectManager.class); - s3Wal = new S3Wal(objectManager, new MemoryS3Operator()); + S3Operator s3Operator = new MemoryS3Operator(); + storage = new S3Storage(new MemoryWriteAheadLog(), objectManager, new DefaultS3BlockCache(objectManager, s3Operator), s3Operator); } @Test @@ -57,26 +57,33 @@ public void testAppend() throws Exception { CommitWalObjectResponse resp = new CommitWalObjectResponse(); when(objectManager.commitWalObject(any())).thenReturn(CompletableFuture.completedFuture(resp)); - CompletableFuture cf1 = s3Wal.append(new StreamRecordBatch(233, 1, 10, DefaultRecordBatch.of(1, 100))); - CompletableFuture cf2 = s3Wal.append(new StreamRecordBatch(233, 1, 11, DefaultRecordBatch.of(2, 100))); - CompletableFuture cf3 = s3Wal.append(new StreamRecordBatch(234, 3, 100, DefaultRecordBatch.of(1, 100))); + CompletableFuture cf1 = storage.append(new StreamRecordBatch(233, 1, 10, DefaultRecordBatch.of(1, 100))); + CompletableFuture cf2 = storage.append(new StreamRecordBatch(233, 1, 11, DefaultRecordBatch.of(2, 100))); + CompletableFuture cf3 = storage.append(new StreamRecordBatch(234, 3, 100, DefaultRecordBatch.of(1, 100))); cf1.get(3, TimeUnit.SECONDS); cf2.get(3, TimeUnit.SECONDS); cf3.get(3, TimeUnit.SECONDS); - ArgumentCaptor commitArg = ArgumentCaptor.forClass(CommitWalObjectRequest.class); - verify(objectManager).commitWalObject(commitArg.capture()); - CommitWalObjectRequest commitReq = commitArg.getValue(); - assertEquals(16L, commitReq.getObjectId()); - List streamRanges = commitReq.getStreamRanges(); - assertEquals(2, streamRanges.size()); - assertEquals(233, streamRanges.get(0).getStreamId()); - assertEquals(10, streamRanges.get(0).getStartOffset()); - assertEquals(13, streamRanges.get(0).getEndOffset()); - assertEquals(234, streamRanges.get(1).getStreamId()); - assertEquals(100, streamRanges.get(1).getStartOffset()); - assertEquals(101, streamRanges.get(1).getEndOffset()); + ReadDataBlock readRst = storage.read(233, 10, 13, 90).get(); + assertEquals(1, readRst.getRecords().size()); + readRst = storage.read(233, 10, 13, 200).get(); + assertEquals(2, readRst.getRecords().size()); + + // TODO: add force upload to test commit wal object. + +// ArgumentCaptor commitArg = ArgumentCaptor.forClass(CommitWalObjectRequest.class); +// verify(objectManager).commitWalObject(commitArg.capture()); +// CommitWalObjectRequest commitReq = commitArg.getValue(); +// assertEquals(16L, commitReq.getObjectId()); +// List streamRanges = commitReq.getStreamRanges(); +// assertEquals(2, streamRanges.size()); +// assertEquals(233, streamRanges.get(0).getStreamId()); +// assertEquals(10, streamRanges.get(0).getStartOffset()); +// assertEquals(13, streamRanges.get(0).getEndOffset()); +// assertEquals(234, streamRanges.get(1).getStreamId()); +// assertEquals(100, streamRanges.get(1).getStartOffset()); +// assertEquals(101, streamRanges.get(1).getEndOffset()); } @Test diff --git a/core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java b/core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java index 79dc45999c..f3e117fcf1 100644 --- a/core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java +++ b/core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java @@ -30,6 +30,7 @@ import com.automq.elasticstream.client.api.RecordBatch; import com.automq.elasticstream.client.api.RecordBatchWithContext; import com.automq.elasticstream.client.api.Stream; + import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; @@ -41,6 +42,7 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicLong; + import kafka.log.s3.cache.DefaultS3BlockCache; import kafka.log.s3.cache.S3BlockCache; import kafka.log.s3.memory.MemoryMetadataManager; @@ -48,6 +50,7 @@ import kafka.log.s3.operator.MemoryS3Operator; import kafka.log.s3.operator.S3Operator; import kafka.log.s3.streams.StreamManager; +import kafka.log.s3.wal.MemoryWriteAheadLog; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -91,7 +94,7 @@ public ByteBuffer rawPayload() { private static final Logger LOGGER = LoggerFactory.getLogger(S3StreamMemoryTest.class); MemoryMetadataManager manager; - Wal wal; + Storage storage; S3BlockCache blockCache; S3Operator operator; StreamManager streamManager; @@ -108,9 +111,9 @@ public void setUp() { streamManager = manager; objectManager = manager; operator = new MemoryS3Operator(); - wal = new S3Wal(objectManager, operator); blockCache = new DefaultS3BlockCache(objectManager, operator); - streamClient = new S3StreamClient(streamManager, wal, blockCache, objectManager); + storage = new S3Storage(new MemoryWriteAheadLog(), objectManager, blockCache, operator); + streamClient = new S3StreamClient(streamManager, storage); } @Test @@ -124,13 +127,13 @@ public void testOpenAndClose() throws Exception { assertNotNull(stream); // open with new epoch but current epoch is not closed assertThrows(ExecutionException.class, - () -> this.streamClient.openStream(streamId, OpenStreamOptions.newBuilder().epoch(1L).build()).get()); + () -> this.streamClient.openStream(streamId, OpenStreamOptions.newBuilder().epoch(1L).build()).get()); stream.close().get(); // duplicate close stream.close().get(); // reopen with stale epoch assertThrows(ExecutionException.class, - () -> this.streamClient.openStream(streamId, OpenStreamOptions.newBuilder().epoch(0L).build()).get()); + () -> this.streamClient.openStream(streamId, OpenStreamOptions.newBuilder().epoch(0L).build()).get()); // reopen with new epoch Stream newStream = this.streamClient.openStream(streamId, OpenStreamOptions.newBuilder().epoch(1L).build()).get(); assertEquals(streamId, newStream.streamId()); @@ -154,20 +157,20 @@ public void testFetch() throws Exception { RecordBatchWithContext record0 = result0.recordBatchList().get(0); assertEquals(0, record0.baseOffset()); assertEquals(1, record0.lastOffset()); - assertEquals("hello", new String(record0.rawPayload().array())); + assertEquals("hello", new String(buf2array(record0.rawPayload()))); FetchResult result1 = stream.fetch(1, 2, 100).get(); assertEquals(1, result1.recordBatchList().size()); RecordBatchWithContext record1 = result1.recordBatchList().get(0); assertEquals(1, record1.baseOffset()); assertEquals(2, record1.lastOffset()); - assertEquals("world", new String(record1.rawPayload().array())); + assertEquals("world", new String(buf2array(record1.rawPayload()))); // fetch all FetchResult result = stream.fetch(0, 2, 100000).get(); assertEquals(2, result.recordBatchList().size()); RecordBatchWithContext record = result.recordBatchList().get(0); - assertEquals("hello", new String(record.rawPayload().array())); + assertEquals("hello", new String(buf2array(record.rawPayload()))); RecordBatchWithContext record2 = result.recordBatchList().get(1); - assertEquals("world", new String(record2.rawPayload().array())); + assertEquals("world", new String(buf2array(record2.rawPayload()))); } @Test @@ -213,6 +216,11 @@ public void testPressure() throws Exception { latch.await(); } + private static byte[] buf2array(ByteBuffer buffer) { + byte[] array = new byte[buffer.remaining()]; + buffer.get(array); + return array; + } static class Producer implements Runnable { @@ -299,12 +307,12 @@ public void fetch() throws InterruptedException, ExecutionException { FetchResult result = stream.fetch(consumeOffset, appendEndOffset + 1, Integer.MAX_VALUE).get(); LOGGER.info("[Consumer-{}-{}] fetch records: {}", stream.streamId(), id, result.recordBatchList().size()); result.recordBatchList().forEach( - record -> { - long offset = record.baseOffset(); - assertEquals("hello[" + stream.streamId() + "][" + offset + "]", new String(record.rawPayload().array())); - LOGGER.info("[Consumer-{}-{}] consume: {}", stream.streamId(), id, offset); - consumeOffset = Math.max(consumeOffset, offset + 1); - } + record -> { + long offset = record.baseOffset(); + assertEquals("hello[" + stream.streamId() + "][" + offset + "]", new String(buf2array(record.rawPayload()))); + LOGGER.info("[Consumer-{}-{}] consume: {}", stream.streamId(), id, offset); + consumeOffset = Math.max(consumeOffset, offset + 1); + } ); } diff --git a/core/src/test/java/kafka/log/s3/S3StreamTest.java b/core/src/test/java/kafka/log/s3/S3StreamTest.java index 614e1a4f6b..42229e6787 100644 --- a/core/src/test/java/kafka/log/s3/S3StreamTest.java +++ b/core/src/test/java/kafka/log/s3/S3StreamTest.java @@ -21,7 +21,6 @@ import com.automq.elasticstream.client.api.FetchResult; import com.automq.elasticstream.client.api.RecordBatch; import kafka.log.s3.cache.ReadDataBlock; -import kafka.log.s3.cache.S3BlockCache; import kafka.log.s3.model.StreamRecordBatch; import kafka.log.s3.streams.StreamManager; import org.junit.jupiter.api.BeforeEach; @@ -41,29 +40,29 @@ @Tag("S3Unit") public class S3StreamTest { - Wal wal; - S3BlockCache blockCache; + Storage storage; StreamManager streamManager; S3Stream stream; @BeforeEach public void setup() { - wal = mock(Wal.class); - blockCache = mock(S3BlockCache.class); + storage = mock(Storage.class); streamManager = mock(StreamManager.class); - stream = new S3Stream(233, 1, 100, 233, wal, blockCache, streamManager); + stream = new S3Stream(233, 1, 100, 233, storage, streamManager); } @Test public void testFetch() throws Throwable { stream.confirmOffset.set(120L); - when(blockCache.read(eq(233L), eq(110L), eq(120L), eq(100))) + when(storage.read(eq(233L), eq(110L), eq(120L), eq(100))) .thenReturn(CompletableFuture.completedFuture(newReadDataBlock(110, 115, 110))); FetchResult rst = stream.fetch(110, 120, 100).get(1, TimeUnit.SECONDS); assertEquals(1, rst.recordBatchList().size()); assertEquals(110, rst.recordBatchList().get(0).baseOffset()); assertEquals(115, rst.recordBatchList().get(0).lastOffset()); + // TODO: add fetch from WAL cache + boolean isException = false; try { stream.fetch(120, 140, 100).get(); diff --git a/core/src/test/java/kafka/log/s3/MinorCompactTaskTest.java b/core/src/test/java/kafka/log/s3/WALObjectUploadTaskTest.java similarity index 63% rename from core/src/test/java/kafka/log/s3/MinorCompactTaskTest.java rename to core/src/test/java/kafka/log/s3/WALObjectUploadTaskTest.java index 8c74ac112a..d21df1b009 100644 --- a/core/src/test/java/kafka/log/s3/MinorCompactTaskTest.java +++ b/core/src/test/java/kafka/log/s3/WALObjectUploadTaskTest.java @@ -27,7 +27,9 @@ import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; @@ -41,62 +43,52 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class MinorCompactTaskTest { +public class WALObjectUploadTaskTest { ObjectManager objectManager; S3Operator s3Operator; - MinorCompactTask minorCompactTask; + WALObjectUploadTask walObjectUploadTask; @BeforeEach public void setup() { objectManager = mock(ObjectManager.class); s3Operator = new MemoryS3Operator(); - minorCompactTask = new MinorCompactTask(128 * 1024, 1000, 1024, objectManager, s3Operator); } @Test - public void testTryCompact() { + public void testTryCompact() throws Exception { AtomicLong objectIdAlloc = new AtomicLong(10); doAnswer(invocation -> CompletableFuture.completedFuture(objectIdAlloc.getAndIncrement())).when(objectManager).prepareObject(anyInt(), anyLong()); when(objectManager.commitMinorCompactObject(any())).thenReturn(CompletableFuture.completedFuture(null)); - List records = List.of( - new StreamRecordBatch(233, 0, 10, DefaultRecordBatch.of(2, 512)), - new StreamRecordBatch(233, 0, 12, DefaultRecordBatch.of(2, 128)), - new StreamRecordBatch(234, 0, 20, DefaultRecordBatch.of(2, 128)) - ); - minorCompactTask.tryCompact(new MinorCompactTask.MinorCompactPart(0, records)); + Map> map = new HashMap<>(); + map.put(233L, List.of( + FlatStreamRecordBatch.from(new StreamRecordBatch(233, 0, 10, DefaultRecordBatch.of(2, 512))), + FlatStreamRecordBatch.from(new StreamRecordBatch(233, 0, 12, DefaultRecordBatch.of(2, 128))), + FlatStreamRecordBatch.from(new StreamRecordBatch(233, 0, 14, DefaultRecordBatch.of(2, 512))) + )); + map.put(234L, List.of( + FlatStreamRecordBatch.from(new StreamRecordBatch(234, 0, 20, DefaultRecordBatch.of(2, 128))), + FlatStreamRecordBatch.from(new StreamRecordBatch(234, 0, 22, DefaultRecordBatch.of(2, 128))) + )); + + walObjectUploadTask = new WALObjectUploadTask(map, 1000, objectManager, s3Operator); + + walObjectUploadTask.prepare().get(); + walObjectUploadTask.upload().get(); + walObjectUploadTask.commit().get(); - records = List.of( - new StreamRecordBatch(233, 0, 14, DefaultRecordBatch.of(2, 512)), - new StreamRecordBatch(234, 0, 22, DefaultRecordBatch.of(2, 128)), - new StreamRecordBatch(235, 0, 11, DefaultRecordBatch.of(2, 128)) - ); - minorCompactTask.tryCompact(new MinorCompactTask.MinorCompactPart(1, records)); - records = List.of( - new StreamRecordBatch(235, 1, 30, DefaultRecordBatch.of(2, 128)) - ); - minorCompactTask.tryCompact(new MinorCompactTask.MinorCompactPart(2, records)); - minorCompactTask.close(); ArgumentCaptor reqArg = ArgumentCaptor.forClass(CommitCompactObjectRequest.class); verify(objectManager, times(1)).commitMinorCompactObject(reqArg.capture()); // expect // - stream233 split // - stream234 write to one stream range - // - stream235 with different epoch, write to two stream range. CommitCompactObjectRequest request = reqArg.getValue(); assertEquals(10, request.getObjectId()); - assertEquals(List.of(0L, 1L, 2L), request.getCompactedObjectIds()); - assertEquals(3, request.getStreamRanges().size()); + assertEquals(1, request.getStreamRanges().size()); assertEquals(234, request.getStreamRanges().get(0).getStreamId()); assertEquals(20, request.getStreamRanges().get(0).getStartOffset()); assertEquals(24, request.getStreamRanges().get(0).getEndOffset()); - assertEquals(235, request.getStreamRanges().get(1).getStreamId()); - assertEquals(11, request.getStreamRanges().get(1).getStartOffset()); - assertEquals(13, request.getStreamRanges().get(1).getEndOffset()); - assertEquals(235, request.getStreamRanges().get(2).getStreamId()); - assertEquals(30, request.getStreamRanges().get(2).getStartOffset()); - assertEquals(32, request.getStreamRanges().get(2).getEndOffset()); assertEquals(1, request.getStreamObjects().size()); StreamObject streamObject = request.getStreamObjects().get(0);