From 8dae08d7c827847d27da3d0e2a7b2b2a6b3fb91a Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Fri, 25 Aug 2023 15:08:09 +0800 Subject: [PATCH 1/2] feat(s3): support basic memory manager mock 1. support basic memory manager mock Signed-off-by: TheR1sing3un --- .../src/main/scala/kafka/log/s3/S3Stream.java | 36 +- .../scala/kafka/log/s3/S3StreamClient.java | 7 +- .../log/s3/memory/MemoryMetadataManager.java | 379 ++++++++++++++++++ .../kafka/log/s3/model/RangeMetadata.java | 23 +- .../kafka/log/s3/model/StreamMetadata.java | 17 + .../log/s3/objects/OpenStreamMetadata.java | 48 +++ .../kafka/log/s3/streams/StreamManager.java | 7 +- .../java/kafka/log/s3/S3StreamMemoryTest.java | 284 +++++++++++++ .../test/java/kafka/log/s3/S3StreamTest.java | 9 +- .../stream/StreamControlManager.java | 2 + .../kafka/metadata/stream/S3WALObject.java | 19 +- 11 files changed, 784 insertions(+), 47 deletions(-) create mode 100644 core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java create mode 100644 core/src/main/scala/kafka/log/s3/objects/OpenStreamMetadata.java create mode 100644 core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java diff --git a/core/src/main/scala/kafka/log/s3/S3Stream.java b/core/src/main/scala/kafka/log/s3/S3Stream.java index 0843004bd6..5239605e80 100644 --- a/core/src/main/scala/kafka/log/s3/S3Stream.java +++ b/core/src/main/scala/kafka/log/s3/S3Stream.java @@ -28,7 +28,6 @@ import kafka.log.es.FutureUtil; import kafka.log.es.RecordBatchWithContextWrapper; import kafka.log.s3.cache.S3BlockCache; -import kafka.log.s3.model.StreamMetadata; import kafka.log.s3.model.StreamRecordBatch; import kafka.log.s3.streams.StreamManager; import org.slf4j.Logger; @@ -42,10 +41,10 @@ public class S3Stream implements Stream { private static final Logger LOGGER = LoggerFactory.getLogger(S3Stream.class); - private final StreamMetadata metadata; private final String logIdent; private final long streamId; private final long epoch; + private long startOffset; final AtomicLong confirmOffset; private final AtomicLong nextOffset; private final Wal wal; @@ -53,27 +52,28 @@ public class S3Stream implements Stream { private final StreamManager streamManager; private final Status status; - public S3Stream(StreamMetadata metadata, Wal wal, S3BlockCache blockCache, StreamManager streamManager) { - this.metadata = metadata; - this.logIdent = "[Stream id=" + metadata.getStreamId() + " epoch" + metadata.getEpoch() + "]"; - this.streamId = metadata.getStreamId(); - this.epoch = metadata.getEpoch(); - this.nextOffset = new AtomicLong(metadata.getRanges().get(metadata.getRanges().size() - 1).getStartOffset()); - this.confirmOffset = new AtomicLong(nextOffset.get()); + public S3Stream(long streamId, long epoch, long startOffset, long nextOffset, Wal wal, S3BlockCache blockCache, StreamManager streamManager) { + this.streamId = streamId; + this.epoch = epoch; + this.startOffset = startOffset; + this.logIdent = "[Stream id=" + streamId + " epoch" + epoch + "]"; + this.nextOffset = new AtomicLong(nextOffset); + this.confirmOffset = new AtomicLong(nextOffset); this.status = new Status(); this.wal = wal; this.blockCache = blockCache; this.streamManager = streamManager; } + @Override public long streamId() { - return metadata.getStreamId(); + return this.streamId; } @Override public long startOffset() { - return metadata.getStartOffset(); + return this.startOffset; } @Override @@ -112,11 +112,11 @@ public CompletableFuture fetch(long startOffset, long endOffset, in return FutureUtil.failedFuture(new ElasticStreamClientException(ErrorCode.STREAM_ALREADY_CLOSED, logIdent + " stream is already closed")); } long confirmOffset = this.confirmOffset.get(); - if (startOffset < metadata.getStartOffset() || endOffset > confirmOffset) { + if (startOffset < startOffset() || endOffset > confirmOffset) { return FutureUtil.failedFuture( new ElasticStreamClientException( ErrorCode.OFFSET_OUT_OF_RANGE_BOUNDS, - String.format("fetch range[%s, %s) is out of stream bound [%s, %s)", startOffset, endOffset, metadata.getStartOffset(), confirmOffset) + String.format("fetch range[%s, %s) is out of stream bound [%s, %s)", startOffset, endOffset, startOffset(), confirmOffset) )); } return blockCache.read(streamId, startOffset, endOffset, maxBytes).thenApply(dataBlock -> { @@ -127,12 +127,12 @@ public CompletableFuture fetch(long startOffset, long endOffset, in @Override public CompletableFuture trim(long newStartOffset) { - if (newStartOffset < metadata.getStartOffset()) { + if (newStartOffset < this.startOffset) { throw new IllegalArgumentException("newStartOffset[" + newStartOffset + "] cannot be less than current start offset[" - + metadata.getStartOffset() + "]"); + + this.startOffset + "]"); } - metadata.setStartOffset(newStartOffset); - return streamManager.trimStream(metadata.getStreamId(), metadata.getEpoch(), newStartOffset); + this.startOffset = newStartOffset; + return streamManager.trimStream(streamId, epoch, newStartOffset); } @Override @@ -144,7 +144,7 @@ public CompletableFuture close() { @Override public CompletableFuture destroy() { status.markDestroy(); - metadata.setStartOffset(this.confirmOffset.get()); + startOffset = this.confirmOffset.get(); return streamManager.deleteStream(streamId, epoch); } diff --git a/core/src/main/scala/kafka/log/s3/S3StreamClient.java b/core/src/main/scala/kafka/log/s3/S3StreamClient.java index 722b7d9b7f..1903267abf 100644 --- a/core/src/main/scala/kafka/log/s3/S3StreamClient.java +++ b/core/src/main/scala/kafka/log/s3/S3StreamClient.java @@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture; public class S3StreamClient implements StreamClient { + private final StreamManager streamController; private final Wal wal; private final S3BlockCache blockCache; @@ -51,6 +52,10 @@ public CompletableFuture openStream(long streamId, OpenStreamOptions ope } private CompletableFuture openStream0(long streamId, long epoch) { - return streamController.openStream(streamId, epoch).thenApply(metadata -> new S3Stream(metadata, wal, blockCache, streamController)); + return streamController.openStream(streamId, epoch). + thenApply(metadata -> new S3Stream( + metadata.getStreamId(), metadata.getEpoch(), + metadata.getStartOffset(), metadata.getNextOffset(), + wal, blockCache, streamController)); } } diff --git a/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java b/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java new file mode 100644 index 0000000000..bbad82112f --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.memory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import kafka.log.s3.model.RangeMetadata; +import kafka.log.s3.model.StreamMetadata; +import kafka.log.s3.objects.CommitCompactObjectRequest; +import kafka.log.s3.objects.CommitStreamObjectRequest; +import kafka.log.s3.objects.CommitWalObjectRequest; +import kafka.log.s3.objects.CommitWalObjectResponse; +import kafka.log.s3.objects.ObjectManager; +import kafka.log.s3.objects.ObjectStreamRange; +import kafka.log.s3.objects.OpenStreamMetadata; +import kafka.log.s3.objects.S3ObjectMetadata; +import kafka.log.s3.streams.StreamManager; +import kafka.log.s3.utils.ObjectUtils; +import org.apache.kafka.common.errors.s3.StreamFencedException; +import org.apache.kafka.common.errors.s3.StreamNotExistException; +import org.apache.kafka.metadata.stream.S3Object; +import org.apache.kafka.metadata.stream.S3ObjectState; +import org.apache.kafka.metadata.stream.S3ObjectStreamIndex; +import org.apache.kafka.metadata.stream.S3WALObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MemoryMetadataManager implements StreamManager, ObjectManager { + + private static final int MOCK_BROKER_ID = 0; + private static final Logger LOGGER = LoggerFactory.getLogger(MemoryMetadataManager.class); + private final EventDriver eventDriver; + private final Map objectsMetadata; + + private volatile long nextAssignedObjectId = 0; + + private final Map streamsMetadata; + + private final Map brokerWALMetadata; + + private static class MemoryBrokerWALMetadata { + + private final int brokerId; + private final List walObjects; + + public MemoryBrokerWALMetadata(int brokerId) { + this.brokerId = brokerId; + this.walObjects = new ArrayList<>(); + } + } + + private volatile long nextAssignedStreamId = 0; + + public MemoryMetadataManager() { + this.eventDriver = new EventDriver(); + this.objectsMetadata = new HashMap<>(); + this.streamsMetadata = new HashMap<>(); + this.brokerWALMetadata = new HashMap<>(); + } + + public CompletableFuture submitEvent(Supplier eventHandler) { + CompletableFuture cb = new CompletableFuture<>(); + MemoryMetadataEvent event = new MemoryMetadataEvent(cb, eventHandler); + if (!eventDriver.submit(event)) { + throw new RuntimeException("Offer event failed"); + } + return cb; + } + + public void shutdown() { + this.eventDriver.stop(); + } + + public void start() { + this.eventDriver.start(); + } + + @Override + public CompletableFuture prepareObject(int count, long ttl) { + return this.submitEvent(() -> { + long objectRangeStart = this.nextAssignedObjectId; + for (int i = 0; i < count; i++) { + long objectId = this.nextAssignedObjectId++; + S3Object object = prepareObject(objectId, ttl); + this.objectsMetadata.put(objectId, object); + } + return objectRangeStart; + }); + } + + @Override + public CompletableFuture commitWalObject(CommitWalObjectRequest request) { + return this.submitEvent(() -> { + CommitWalObjectResponse resp = new CommitWalObjectResponse(); + List failedStreamIds = new ArrayList<>(); + resp.setFailedStreamIds(failedStreamIds); + long objectId = request.getObjectId(); + long objectSize = request.getObjectSize(); + List streamRanges = request.getStreamRanges(); + S3Object object = this.objectsMetadata.get(objectId); + if (object == null) { + throw new RuntimeException("Object " + objectId + " does not exist"); + } + if (object.getS3ObjectState() != S3ObjectState.PREPARED) { + throw new RuntimeException("Object " + objectId + " is not in prepared state"); + } + // verify the stream + streamRanges.stream().filter(range -> !verifyWalStreamRanges(range)).mapToLong(ObjectStreamRange::getStreamId) + .forEach(failedStreamIds::add); + if (!failedStreamIds.isEmpty()) { + return resp; + } + // commit object + this.objectsMetadata.put(objectId, new S3Object( + objectId, objectSize, object.getObjectKey(), + object.getPreparedTimeInMs(), object.getExpiredTimeInMs(), System.currentTimeMillis(), -1, + S3ObjectState.COMMITTED) + ); + // build metadata + MemoryBrokerWALMetadata walMetadata = this.brokerWALMetadata.computeIfAbsent(MOCK_BROKER_ID, + k -> new MemoryBrokerWALMetadata(k)); + Map index = new HashMap<>(); + streamRanges.stream().forEach(range -> { + long streamId = range.getStreamId(); + long startOffset = range.getStartOffset(); + long endOffset = range.getEndOffset(); + index.put(streamId, new S3ObjectStreamIndex(streamId, startOffset, endOffset)); + // update range endOffset + StreamMetadata streamMetadata = this.streamsMetadata.get(streamId); + streamMetadata.getRanges().get(streamMetadata.getRanges().size() - 1).setEndOffset(endOffset); + }); + S3WALObject walObject = new S3WALObject(objectId, MOCK_BROKER_ID, index); + walMetadata.walObjects.add(walObject); + return resp; + }); + } + + private boolean verifyWalStreamRanges(ObjectStreamRange range) { + long streamId = range.getStreamId(); + long epoch = range.getEpoch(); + // verify + StreamMetadata streamMetadata = this.streamsMetadata.get(streamId); + if (streamMetadata == null) { + return false; + } + // compare epoch + if (streamMetadata.getEpoch() > epoch) { + return false; + } + if (streamMetadata.getEpoch() < epoch) { + return false; + } + return true; + } + + @Override + public CompletableFuture commitMinorCompactObject(CommitCompactObjectRequest request) { + return null; + } + + @Override + public CompletableFuture commitMajorCompactObject(CommitCompactObjectRequest request) { + return null; + } + + @Override + public CompletableFuture commitStreamObject(CommitStreamObjectRequest request) { + return null; + } + + @Override + public List getServerObjects() { + CompletableFuture> future = this.submitEvent(() -> { + return this.brokerWALMetadata.get(MOCK_BROKER_ID).walObjects.stream().map(obj -> { + S3Object s3Object = this.objectsMetadata.get(obj.objectId()); + return new S3ObjectMetadata(obj.objectId(), s3Object.getObjectSize(), obj.objectType()); + }).collect(Collectors.toList()); + }); + try { + return future.get(); + } catch (Exception e) { + LOGGER.error("Error in getServerObjects", e); + return Collections.emptyList(); + } + } + + @Override + public List getObjects(long streamId, long startOffset, long endOffset, int limit) { + // TODO: support search not only in wal objects + CompletableFuture> future = this.submitEvent(() -> { + int need = limit; + List objs = new ArrayList<>(); + StreamMetadata streamMetadata = this.streamsMetadata.get(streamId); + if (endOffset <= streamMetadata.getStartOffset()) { + return objs; + } + List ranges = streamMetadata.getRanges(); + for (RangeMetadata range : ranges) { + if (endOffset < range.getStartOffset() || need <= 0) { + break; + } + if (startOffset >= range.getEndOffset()) { + continue; + } + // find range, get wal objects + int brokerId = range.getBrokerId(); + MemoryBrokerWALMetadata walMetadata = this.brokerWALMetadata.get(brokerId); + for (S3WALObject walObject : walMetadata.walObjects) { + if (need <= 0) { + break; + } + // TODO: speed up query + if (!walObject.intersect(streamId, startOffset, endOffset)) { + continue; + } + // find stream index, get object + S3Object object = this.objectsMetadata.get(walObject.objectId()); + S3ObjectMetadata obj = new S3ObjectMetadata(walObject.objectId(), object.getObjectSize(), walObject.objectType()); + objs.add(obj); + need--; + } + } + return objs; + }); + try { + return future.get(); + } catch (Exception e) { + LOGGER.error("Error in getObjects", e); + return Collections.emptyList(); + } + } + + @Override + public CompletableFuture createStream() { + return this.submitEvent(() -> { + long streamId = this.nextAssignedStreamId++; + this.streamsMetadata.put(streamId, + new StreamMetadata(streamId, 0, -1, 0, new ArrayList<>())); + return streamId; + }); + } + + @Override + public CompletableFuture openStream(long streamId, long epoch) { + return this.submitEvent(() -> { + // verify stream exist + if (!this.streamsMetadata.containsKey(streamId)) { + throw new StreamNotExistException("Stream " + streamId + " does not exist"); + } + // verify epoch match + StreamMetadata streamMetadata = this.streamsMetadata.get(streamId); + if (streamMetadata.getEpoch() > epoch) { + throw new StreamFencedException("Stream " + streamId + " is fenced"); + } + if (streamMetadata.getEpoch() == epoch) { + // get active range + long endOffset = streamMetadata.getRanges().get(streamMetadata.getRanges().size() - 1).getEndOffset(); + return new OpenStreamMetadata(streamId, epoch, streamMetadata.getStartOffset(), endOffset); + } + // create new range + long newEpoch = epoch; + int newRangeIndex = streamMetadata.getRangeIndex() + 1; + long startOffset = 0; + if (newRangeIndex > 0) { + startOffset = streamMetadata.getRanges().get(streamMetadata.getRanges().size() - 1).getEndOffset(); + } + RangeMetadata rangeMetadata = new RangeMetadata(newRangeIndex, startOffset, startOffset, MOCK_BROKER_ID); + streamMetadata.getRanges().add(rangeMetadata); + // update epoch and rangeIndex + streamMetadata.setRangeIndex(newRangeIndex); + streamMetadata.setEpoch(newEpoch); + return new OpenStreamMetadata(streamId, newEpoch, startOffset, startOffset); + }); + } + + @Override + public CompletableFuture trimStream(long streamId, long epoch, long newStartOffset) { + return null; + } + + @Override + public CompletableFuture deleteStream(long streamId, long epoch) { + return null; + } + + private S3Object prepareObject(long objectId, long ttl) { + long preparedTs = System.currentTimeMillis(); + String objectKey = ObjectUtils.genKey(0, "todocluster", objectId); + return new S3Object( + objectId, -1, objectKey, + preparedTs, preparedTs + ttl, -1, -1, + S3ObjectState.PREPARED); + } + + + static class EventDriver implements Runnable { + + private final ExecutorService service; + + private final BlockingQueue eventQueue; + + public EventDriver() { + this.service = Executors.newSingleThreadExecutor(); + this.eventQueue = new LinkedBlockingQueue<>(1024); + } + + public void start() { + this.service.submit(this::run); + } + + public void stop() { + this.service.shutdownNow(); + } + + public boolean submit(MemoryMetadataEvent event) { + return eventQueue.offer(event); + } + + @Override + public void run() { + while (true) { + try { + run0(); + } catch (Exception e) { + LOGGER.error("Error in memory manager event driver", e); + } + } + } + + private void run0() throws InterruptedException { + MemoryMetadataEvent event = eventQueue.poll(5, TimeUnit.SECONDS); + if (event != null) { + event.done(); + } + } + + } + + static class MemoryMetadataEvent { + + CompletableFuture cb; + Supplier eventHandler; + + public MemoryMetadataEvent(CompletableFuture cb, Supplier eventHandler) { + this.cb = cb; + this.eventHandler = eventHandler; + } + + public void done() { + cb.complete(eventHandler.get()); + } + } +} diff --git a/core/src/main/scala/kafka/log/s3/model/RangeMetadata.java b/core/src/main/scala/kafka/log/s3/model/RangeMetadata.java index 4f3e0c9992..85b1d1c4a3 100644 --- a/core/src/main/scala/kafka/log/s3/model/RangeMetadata.java +++ b/core/src/main/scala/kafka/log/s3/model/RangeMetadata.java @@ -17,20 +17,19 @@ package kafka.log.s3.model; -import java.util.OptionalLong; public class RangeMetadata { private static final long NOOP_OFFSET = -1; private int index; private long startOffset; private long endOffset; - private long serverId; + private int brokerId; - public RangeMetadata(int index, long startOffset, long endOffset, long serverId) { + public RangeMetadata(int index, long startOffset, long endOffset, int serverId) { this.index = index; this.startOffset = startOffset; this.endOffset = endOffset; - this.serverId = serverId; + this.brokerId = serverId; } public int getIndex() { @@ -49,23 +48,19 @@ public void setStartOffset(long startOffset) { this.startOffset = startOffset; } - public OptionalLong getEndOffset() { - if (endOffset == NOOP_OFFSET) { - return OptionalLong.empty(); - } else { - return OptionalLong.of(endOffset); - } + public long getEndOffset() { + return endOffset; } public void setEndOffset(long endOffset) { this.endOffset = endOffset; } - public long getServerId() { - return serverId; + public int getBrokerId() { + return brokerId; } - public void setServerId(long serverId) { - this.serverId = serverId; + public void setBrokerId(int brokerId) { + this.brokerId = brokerId; } } diff --git a/core/src/main/scala/kafka/log/s3/model/StreamMetadata.java b/core/src/main/scala/kafka/log/s3/model/StreamMetadata.java index 7be834a100..1db1020ada 100644 --- a/core/src/main/scala/kafka/log/s3/model/StreamMetadata.java +++ b/core/src/main/scala/kafka/log/s3/model/StreamMetadata.java @@ -22,10 +22,19 @@ public class StreamMetadata { private long streamId; private long epoch; + private int rangeIndex; private long startOffset; private List ranges; + public StreamMetadata(long streamId, long epoch, int rangeIndex, long startOffset, List ranges) { + this.streamId = streamId; + this.epoch = epoch; + this.rangeIndex = rangeIndex; + this.startOffset = startOffset; + this.ranges = ranges; + } + public long getStreamId() { return streamId; } @@ -57,4 +66,12 @@ public List getRanges() { public void setRanges(List ranges) { this.ranges = ranges; } + + public int getRangeIndex() { + return rangeIndex; + } + + public void setRangeIndex(int rangeIndex) { + this.rangeIndex = rangeIndex; + } } diff --git a/core/src/main/scala/kafka/log/s3/objects/OpenStreamMetadata.java b/core/src/main/scala/kafka/log/s3/objects/OpenStreamMetadata.java new file mode 100644 index 0000000000..6a135d8895 --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/objects/OpenStreamMetadata.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.objects; + +public class OpenStreamMetadata { + private final long streamId; + private final long epoch; + private final long startOffset; + private final long nextOffset; + + public OpenStreamMetadata(long streamId, long epoch, long startOffset, long nextOffset) { + this.streamId = streamId; + this.epoch = epoch; + this.startOffset = startOffset; + this.nextOffset = nextOffset; + } + + public long getStreamId() { + return streamId; + } + + public long getEpoch() { + return epoch; + } + + public long getStartOffset() { + return startOffset; + } + + public long getNextOffset() { + return nextOffset; + } +} diff --git a/core/src/main/scala/kafka/log/s3/streams/StreamManager.java b/core/src/main/scala/kafka/log/s3/streams/StreamManager.java index 7e2cdf3291..eb563754da 100644 --- a/core/src/main/scala/kafka/log/s3/streams/StreamManager.java +++ b/core/src/main/scala/kafka/log/s3/streams/StreamManager.java @@ -17,9 +17,8 @@ package kafka.log.s3.streams; -import kafka.log.s3.model.StreamMetadata; - import java.util.concurrent.CompletableFuture; +import kafka.log.s3.objects.OpenStreamMetadata; public interface StreamManager { @@ -38,9 +37,9 @@ public interface StreamManager { * * @param streamId stream id. * @param epoch stream epoch. - * @return {@link StreamMetadata} + * @return {@link OpenStreamMetadata} */ - CompletableFuture openStream(long streamId, long epoch); + CompletableFuture openStream(long streamId, long epoch); /** * Trim stream to new start offset. diff --git a/core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java b/core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java new file mode 100644 index 0000000000..85abd92a9d --- /dev/null +++ b/core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3; + + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import com.automq.elasticstream.client.api.AppendResult; +import com.automq.elasticstream.client.api.CreateStreamOptions; +import com.automq.elasticstream.client.api.FetchResult; +import com.automq.elasticstream.client.api.RecordBatch; +import com.automq.elasticstream.client.api.RecordBatchWithContext; +import com.automq.elasticstream.client.api.Stream; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicLong; +import kafka.log.s3.cache.DefaultS3BlockCache; +import kafka.log.s3.cache.S3BlockCache; +import kafka.log.s3.memory.MemoryMetadataManager; +import kafka.log.s3.objects.ObjectManager; +import kafka.log.s3.operator.MemoryS3Operator; +import kafka.log.s3.operator.S3Operator; +import kafka.log.s3.streams.StreamManager; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Tag("S3Unit") +public class S3StreamMemoryTest { + + static class MockRecordBatch implements RecordBatch { + + ByteBuffer byteBuffer; + + int count; + + public MockRecordBatch(String payload, int count) { + this.byteBuffer = ByteBuffer.wrap(payload.getBytes()); + this.count = count; + } + + @Override + public int count() { + return count; + } + + @Override + public long baseTimestamp() { + return 0; + } + + @Override + public Map properties() { + return Collections.emptyMap(); + } + + @Override + public ByteBuffer rawPayload() { + return this.byteBuffer.duplicate(); + } + } + + private static final Logger LOGGER = LoggerFactory.getLogger(S3StreamMemoryTest.class); + MemoryMetadataManager manager; + Wal wal; + S3BlockCache blockCache; + S3Operator operator; + StreamManager streamManager; + ObjectManager objectManager; + S3StreamClient streamClient; + private final static long MAX_APPENDED_OFFSET = 1000; + + Random random = new Random(); + + @BeforeEach + public void setUp() { + manager = new MemoryMetadataManager(); + manager.start(); + streamManager = manager; + objectManager = manager; + operator = new MemoryS3Operator(); + wal = new S3Wal(objectManager, operator); + blockCache = new DefaultS3BlockCache(objectManager, operator); + streamClient = new S3StreamClient(streamManager, wal, blockCache, objectManager); + } + + @Test + public void testBasic() throws Exception { + CreateStreamOptions options = CreateStreamOptions.newBuilder().epoch(1L).build(); + Stream stream = this.streamClient.createAndOpenStream(options).get(); + RecordBatch recordBatch = new MockRecordBatch("hello", 1); + CompletableFuture append0 = stream.append(recordBatch); + RecordBatch recordBatch1 = new MockRecordBatch("world", 1); + CompletableFuture append1 = stream.append(recordBatch1); + CompletableFuture.allOf(append0, append1).get(); + // fetch + FetchResult result0 = stream.fetch(0, 1, 100).get(); + assertEquals(1, result0.recordBatchList().size()); + RecordBatchWithContext record0 = result0.recordBatchList().get(0); + assertEquals(0, record0.baseOffset()); + assertEquals(1, record0.lastOffset()); + assertEquals("hello", new String(record0.rawPayload().array())); + FetchResult result1 = stream.fetch(1, 2, 100).get(); + assertEquals(1, result1.recordBatchList().size()); + RecordBatchWithContext record1 = result1.recordBatchList().get(0); + assertEquals(1, record1.baseOffset()); + assertEquals(2, record1.lastOffset()); + assertEquals("world", new String(record1.rawPayload().array())); + // fetch all + FetchResult result = stream.fetch(0, 2, 100000).get(); + assertEquals(2, result.recordBatchList().size()); + RecordBatchWithContext record = result.recordBatchList().get(0); + assertEquals("hello", new String(record.rawPayload().array())); + RecordBatchWithContext record2 = result.recordBatchList().get(1); + assertEquals("world", new String(record2.rawPayload().array())); + } + + @Test + public void testPressure() throws Exception { + CreateStreamOptions options = CreateStreamOptions.newBuilder().epoch(1L).build(); + Stream stream0 = this.streamClient.createAndOpenStream(options).get(); + Stream stream1 = this.streamClient.createAndOpenStream(options).get(); + Stream stream2 = this.streamClient.createAndOpenStream(options).get(); + List streams = List.of(stream0, stream1, stream2); + CountDownLatch latch = new CountDownLatch(1 * 3 + 5 * 3); + CyclicBarrier barrier = new CyclicBarrier(1 * 3 + 5 * 3); + for (int i = 0; i < 3; i++) { + AtomicLong appendedOffset = new AtomicLong(-1); + final Stream stream = streams.get(i); + new Thread(() -> { + Producer producer = new Producer(stream, latch, appendedOffset); + try { + barrier.await(); + producer.run(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (BrokenBarrierException e) { + throw new RuntimeException(e); + } + + }).start(); + for (int j = 0; j < 5; j++) { + final int id = j; + new Thread(() -> { + Consumer consumer = new Consumer(id, stream, latch, appendedOffset); + try { + barrier.await(); + consumer.run(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (BrokenBarrierException e) { + throw new RuntimeException(e); + } + + }).start(); + } + } + latch.await(); + } + + + static class Producer implements Runnable { + + private long nextAppendOffset = 0; + private Stream stream; + private CountDownLatch latch; + private AtomicLong appendedOffset; + private Random random = new Random(); + private volatile boolean start = true; + + public Producer(Stream stream, CountDownLatch latch, AtomicLong appendedOffset) { + this.stream = stream; + this.latch = latch; + this.appendedOffset = appendedOffset; + } + + + @Override + public void run() { + while (start) { + try { + append(); + } catch (Exception e) { + LOGGER.error("Error in producer", e); + } + } + latch.countDown(); + } + + public void append() throws InterruptedException { + if (nextAppendOffset > MAX_APPENDED_OFFSET) { + start = false; + return; + } + MockRecordBatch recordBatch = new MockRecordBatch("hello[" + stream.streamId() + "][" + nextAppendOffset++ + "]", 1); + stream.append(recordBatch).whenCompleteAsync((result, error) -> { + assertNull(error); + LOGGER.info("[Producer-{}]: produce: {}", stream.streamId(), result.baseOffset()); + this.appendedOffset.incrementAndGet(); + }); + Thread.sleep(random.nextInt(30)); + } + } + + static class Consumer implements Runnable { + + private long consumeOffset = 0; + private int id; + private Stream stream; + private CountDownLatch latch; + private AtomicLong appendedOffset; + private Random random = new Random(); + private volatile boolean start = true; + + public Consumer(int id, Stream stream, CountDownLatch latch, AtomicLong appendedOffset) { + this.id = id; + this.stream = stream; + this.latch = latch; + this.appendedOffset = appendedOffset; + } + + @Override + public void run() { + while (start) { + try { + fetch(); + } catch (Exception e) { + LOGGER.error("Error in consumer", e); + } + } + latch.countDown(); + } + + public void fetch() throws InterruptedException, ExecutionException { + if (consumeOffset >= MAX_APPENDED_OFFSET) { + start = false; + return; + } + Thread.sleep(random.nextInt(200)); + long appendEndOffset = appendedOffset.get(); + if (consumeOffset > appendEndOffset) { + return; + } + FetchResult result = stream.fetch(consumeOffset, appendEndOffset + 1, Integer.MAX_VALUE).get(); + LOGGER.info("[Consumer-{}-{}] fetch records: {}", stream.streamId(), id, result.recordBatchList().size()); + result.recordBatchList().forEach( + record -> { + long offset = record.baseOffset(); + assertEquals("hello[" + stream.streamId() + "][" + offset + "]", new String(record.rawPayload().array())); + LOGGER.info("[Consumer-{}-{}] consume: {}", stream.streamId(), id, offset); + consumeOffset = Math.max(consumeOffset, offset + 1); + } + ); + } + + } + +} diff --git a/core/src/test/java/kafka/log/s3/S3StreamTest.java b/core/src/test/java/kafka/log/s3/S3StreamTest.java index 5d44b65330..9b902d5ff7 100644 --- a/core/src/test/java/kafka/log/s3/S3StreamTest.java +++ b/core/src/test/java/kafka/log/s3/S3StreamTest.java @@ -22,8 +22,6 @@ import com.automq.elasticstream.client.api.RecordBatch; import kafka.log.s3.cache.ReadDataBlock; import kafka.log.s3.cache.S3BlockCache; -import kafka.log.s3.model.RangeMetadata; -import kafka.log.s3.model.StreamMetadata; import kafka.log.s3.model.StreamRecordBatch; import kafka.log.s3.streams.StreamManager; import org.junit.jupiter.api.BeforeEach; @@ -53,12 +51,7 @@ public void setup() { wal = mock(Wal.class); blockCache = mock(S3BlockCache.class); streamManager = mock(StreamManager.class); - StreamMetadata metadata = new StreamMetadata(); - metadata.setStreamId(233); - metadata.setEpoch(1); - metadata.setStartOffset(100); - metadata.setRanges(List.of(new RangeMetadata(1, 50, -1, 10))); - stream = new S3Stream(metadata, wal, blockCache, streamManager); + stream = new S3Stream(233, 1, 100, 233, wal, blockCache, streamManager); } @Test diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index f65d6a02dd..4a48dff9ef 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -58,6 +58,8 @@ */ public class StreamControlManager { + // TODO: assigned record + // TODO: timeline check public static class S3StreamMetadata { // current epoch, when created but not open, use 0 represent private long currentEpoch; diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java index 6611e2bc8a..64ce5f1df5 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java @@ -29,9 +29,9 @@ public class S3WALObject { private final long objectId; private final int brokerId; - private Map streamsIndex; + private final Map streamsIndex; - private S3ObjectType objectType = S3ObjectType.UNKNOWN; + private final S3ObjectType objectType = S3ObjectType.UNKNOWN; public S3WALObject(long objectId, int brokerId, final Map streamsIndex) { this.objectId = objectId; @@ -39,6 +39,17 @@ public S3WALObject(long objectId, int brokerId, final Map= streamIndex.getEndOffset()) { + return false; + } + return true; + } + public ApiMessageAndVersion toRecord() { return new ApiMessageAndVersion(new WALObjectRecord() .setObjectId(objectId) @@ -68,6 +79,10 @@ public Long objectId() { return objectId; } + public S3ObjectType objectType() { + return objectType; + } + @Override public boolean equals(Object o) { if (this == o) { From 37f310111134e2eb8879ce147beda95b0fcaa805 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Fri, 25 Aug 2023 15:53:33 +0800 Subject: [PATCH 2/2] fix(s3): fix confirmOffset's cas function 1. fix confirmOffset's cas function Signed-off-by: TheR1sing3un --- core/src/main/scala/kafka/log/s3/S3Stream.java | 2 +- core/src/test/java/kafka/log/s3/S3StreamTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/log/s3/S3Stream.java b/core/src/main/scala/kafka/log/s3/S3Stream.java index 5239605e80..99b83225d1 100644 --- a/core/src/main/scala/kafka/log/s3/S3Stream.java +++ b/core/src/main/scala/kafka/log/s3/S3Stream.java @@ -151,7 +151,7 @@ public CompletableFuture destroy() { private void updateConfirmOffset(long newOffset) { for (; ; ) { long oldConfirmOffset = confirmOffset.get(); - if (oldConfirmOffset <= newOffset) { + if (oldConfirmOffset >= newOffset) { break; } if (confirmOffset.compareAndSet(oldConfirmOffset, newOffset)) { diff --git a/core/src/test/java/kafka/log/s3/S3StreamTest.java b/core/src/test/java/kafka/log/s3/S3StreamTest.java index 9b902d5ff7..614e1a4f6b 100644 --- a/core/src/test/java/kafka/log/s3/S3StreamTest.java +++ b/core/src/test/java/kafka/log/s3/S3StreamTest.java @@ -67,7 +67,7 @@ public void testFetch() throws Throwable { boolean isException = false; try { stream.fetch(120, 140, 100).get(); - }catch (ExecutionException e) { + } catch (ExecutionException e) { if (e.getCause() instanceof ElasticStreamClientException) { isException = true; }