From 3c41c8eedc8a74ca50f6a399926da139903aed9b Mon Sep 17 00:00:00 2001 From: Robin Han Date: Fri, 25 Aug 2023 11:58:45 +0800 Subject: [PATCH 1/2] ~ Signed-off-by: Robin Han --- core/src/main/scala/kafka/log/s3/S3Stream.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/kafka/log/s3/S3Stream.java b/core/src/main/scala/kafka/log/s3/S3Stream.java index 1505ebec11..4c3dfa8492 100644 --- a/core/src/main/scala/kafka/log/s3/S3Stream.java +++ b/core/src/main/scala/kafka/log/s3/S3Stream.java @@ -38,6 +38,7 @@ public class S3Stream implements Stream { private final StreamMetadata metadata; private final long streamId; private final long epoch; + private final AtomicLong confirmOffset; private final AtomicLong nextOffset; private final Wal wal; private final S3BlockCache blockCache; @@ -48,6 +49,7 @@ public S3Stream(StreamMetadata metadata, Wal wal, S3BlockCache blockCache, Strea this.streamId = metadata.getStreamId(); this.epoch = metadata.getEpoch(); this.nextOffset = new AtomicLong(metadata.getRanges().get(metadata.getRanges().size() - 1).getStartOffset()); + this.confirmOffset = new AtomicLong(nextOffset.get()); this.wal = wal; this.blockCache = blockCache; this.streamManager = streamManager; From fa440207ce2faf3c43b0c110b58c4214291b60d7 Mon Sep 17 00:00:00 2001 From: Robin Han Date: Fri, 25 Aug 2023 14:53:01 +0800 Subject: [PATCH 2/2] feat(stream-client): add confirm offset to stream Signed-off-by: Robin Han --- .../src/main/scala/kafka/log/s3/S3Stream.java | 91 +++++++++++++++++-- .../log/s3/SingleWalObjectWriteTask.java | 5 +- .../s3/exception/StreamFencedException.java | 21 ----- .../test/java/kafka/log/s3/S3StreamTest.java | 14 +++ 4 files changed, 102 insertions(+), 29 deletions(-) delete mode 100644 core/src/main/scala/kafka/log/s3/exception/StreamFencedException.java diff --git a/core/src/main/scala/kafka/log/s3/S3Stream.java b/core/src/main/scala/kafka/log/s3/S3Stream.java index 4c3dfa8492..0843004bd6 100644 --- a/core/src/main/scala/kafka/log/s3/S3Stream.java +++ b/core/src/main/scala/kafka/log/s3/S3Stream.java @@ -19,37 +19,48 @@ import com.automq.elasticstream.client.DefaultAppendResult; import com.automq.elasticstream.client.api.AppendResult; +import com.automq.elasticstream.client.api.ElasticStreamClientException; import com.automq.elasticstream.client.api.FetchResult; import com.automq.elasticstream.client.api.RecordBatch; import com.automq.elasticstream.client.api.RecordBatchWithContext; import com.automq.elasticstream.client.api.Stream; +import com.automq.elasticstream.client.flatc.header.ErrorCode; +import kafka.log.es.FutureUtil; import kafka.log.es.RecordBatchWithContextWrapper; import kafka.log.s3.cache.S3BlockCache; import kafka.log.s3.model.StreamMetadata; import kafka.log.s3.model.StreamRecordBatch; import kafka.log.s3.streams.StreamManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; public class S3Stream implements Stream { + private static final Logger LOGGER = LoggerFactory.getLogger(S3Stream.class); private final StreamMetadata metadata; + private final String logIdent; private final long streamId; private final long epoch; - private final AtomicLong confirmOffset; + final AtomicLong confirmOffset; private final AtomicLong nextOffset; private final Wal wal; private final S3BlockCache blockCache; private final StreamManager streamManager; + private final Status status; public S3Stream(StreamMetadata metadata, Wal wal, S3BlockCache blockCache, StreamManager streamManager) { this.metadata = metadata; + this.logIdent = "[Stream id=" + metadata.getStreamId() + " epoch" + metadata.getEpoch() + "]"; this.streamId = metadata.getStreamId(); this.epoch = metadata.getEpoch(); this.nextOffset = new AtomicLong(metadata.getRanges().get(metadata.getRanges().size() - 1).getStartOffset()); this.confirmOffset = new AtomicLong(nextOffset.get()); + this.status = new Status(); this.wal = wal; this.blockCache = blockCache; this.streamManager = streamManager; @@ -72,14 +83,42 @@ public long nextOffset() { @Override public CompletableFuture append(RecordBatch recordBatch) { - long offset = nextOffset.getAndIncrement(); + if (!status.isWritable()) { + return FutureUtil.failedFuture(new ElasticStreamClientException(ErrorCode.STREAM_ALREADY_CLOSED, logIdent + " stream is not writable")); + } + long offset = nextOffset.getAndAdd(recordBatch.count()); StreamRecordBatch streamRecordBatch = new StreamRecordBatch(streamId, epoch, offset, recordBatch); - return wal.append(streamRecordBatch).thenApply(nil -> new DefaultAppendResult(offset)); + CompletableFuture cf = wal.append(streamRecordBatch).thenApply(nil -> { + updateConfirmOffset(offset + recordBatch.count()); + return new DefaultAppendResult(offset); + }); + return cf.whenComplete((rst, ex) -> { + if (ex == null) { + return; + } + // Wal should keep retry append until stream is fenced or wal is closed. + status.markFenced(); + if (ex instanceof ElasticStreamClientException && ((ElasticStreamClientException) ex).getCode() == ErrorCode.EXPIRED_STREAM_EPOCH) { + LOGGER.info("{} stream append, stream is fenced", logIdent); + } else { + LOGGER.warn("{} stream append fail", logIdent, ex); + } + }); } @Override public CompletableFuture fetch(long startOffset, long endOffset, int maxBytes) { - //TODO: bound check + if (status.isClosed()) { + return FutureUtil.failedFuture(new ElasticStreamClientException(ErrorCode.STREAM_ALREADY_CLOSED, logIdent + " stream is already closed")); + } + long confirmOffset = this.confirmOffset.get(); + if (startOffset < metadata.getStartOffset() || endOffset > confirmOffset) { + return FutureUtil.failedFuture( + new ElasticStreamClientException( + ErrorCode.OFFSET_OUT_OF_RANGE_BOUNDS, + String.format("fetch range[%s, %s) is out of stream bound [%s, %s)", startOffset, endOffset, metadata.getStartOffset(), confirmOffset) + )); + } return blockCache.read(streamId, startOffset, endOffset, maxBytes).thenApply(dataBlock -> { List records = dataBlock.getRecords().stream().map(r -> new RecordBatchWithContextWrapper(r.getRecordBatch(), r.getBaseOffset())).collect(Collectors.toList()); return new DefaultFetchResult(records); @@ -98,16 +137,29 @@ public CompletableFuture trim(long newStartOffset) { @Override public CompletableFuture close() { - // TODO: add stream status to fence future access. + status.markClosed(); return CompletableFuture.completedFuture(null); } @Override public CompletableFuture destroy() { - // TODO: add stream status to fence future access. + status.markDestroy(); + metadata.setStartOffset(this.confirmOffset.get()); return streamManager.deleteStream(streamId, epoch); } + private void updateConfirmOffset(long newOffset) { + for (; ; ) { + long oldConfirmOffset = confirmOffset.get(); + if (oldConfirmOffset <= newOffset) { + break; + } + if (confirmOffset.compareAndSet(oldConfirmOffset, newOffset)) { + break; + } + } + } + static class DefaultFetchResult implements FetchResult { private final List records; @@ -120,4 +172,31 @@ public List recordBatchList() { return records; } } + + static class Status { + private static final int CLOSED_MARK = 1; + private static final int FENCED_MARK = 1 << 1; + private static final int DESTROY_MARK = 1 << 2; + private final AtomicInteger status = new AtomicInteger(); + + public void markFenced() { + status.getAndUpdate(operand -> operand | FENCED_MARK); + } + + public void markClosed() { + status.getAndUpdate(operand -> operand | CLOSED_MARK); + } + + public void markDestroy() { + status.getAndUpdate(operand -> operand | DESTROY_MARK); + } + + public boolean isClosed() { + return (status.get() & CLOSED_MARK) != 0; + } + + public boolean isWritable() { + return status.get() == 0; + } + } } diff --git a/core/src/main/scala/kafka/log/s3/SingleWalObjectWriteTask.java b/core/src/main/scala/kafka/log/s3/SingleWalObjectWriteTask.java index 94104d9bad..7a7fa69f8e 100644 --- a/core/src/main/scala/kafka/log/s3/SingleWalObjectWriteTask.java +++ b/core/src/main/scala/kafka/log/s3/SingleWalObjectWriteTask.java @@ -17,7 +17,8 @@ package kafka.log.s3; -import kafka.log.s3.exception.StreamFencedException; +import com.automq.elasticstream.client.api.ElasticStreamClientException; +import com.automq.elasticstream.client.flatc.header.ErrorCode; import kafka.log.s3.objects.CommitWalObjectRequest; import kafka.log.s3.objects.CommitWalObjectResponse; import kafka.log.s3.objects.ObjectManager; @@ -87,7 +88,7 @@ public void ack() { for (WalWriteRequest request : requests) { long streamId = request.record.getStreamId(); if (failedStreamId.contains(streamId)) { - request.cf.completeExceptionally(new StreamFencedException()); + request.cf.completeExceptionally(new ElasticStreamClientException(ErrorCode.EXPIRED_STREAM_EPOCH, "Stream " + streamId + " epoch expired")); } else { request.cf.complete(null); } diff --git a/core/src/main/scala/kafka/log/s3/exception/StreamFencedException.java b/core/src/main/scala/kafka/log/s3/exception/StreamFencedException.java deleted file mode 100644 index 5a447924f2..0000000000 --- a/core/src/main/scala/kafka/log/s3/exception/StreamFencedException.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log.s3.exception; - -public class StreamFencedException extends Exception { -} diff --git a/core/src/test/java/kafka/log/s3/S3StreamTest.java b/core/src/test/java/kafka/log/s3/S3StreamTest.java index 84c50c3c0b..5d44b65330 100644 --- a/core/src/test/java/kafka/log/s3/S3StreamTest.java +++ b/core/src/test/java/kafka/log/s3/S3StreamTest.java @@ -17,6 +17,7 @@ package kafka.log.s3; +import com.automq.elasticstream.client.api.ElasticStreamClientException; import com.automq.elasticstream.client.api.FetchResult; import com.automq.elasticstream.client.api.RecordBatch; import kafka.log.s3.cache.ReadDataBlock; @@ -31,9 +32,11 @@ import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -60,12 +63,23 @@ public void setup() { @Test public void testFetch() throws Throwable { + stream.confirmOffset.set(120L); when(blockCache.read(eq(233L), eq(110L), eq(120L), eq(100))) .thenReturn(CompletableFuture.completedFuture(newReadDataBlock(110, 115, 110))); FetchResult rst = stream.fetch(110, 120, 100).get(1, TimeUnit.SECONDS); assertEquals(1, rst.recordBatchList().size()); assertEquals(110, rst.recordBatchList().get(0).baseOffset()); assertEquals(115, rst.recordBatchList().get(0).lastOffset()); + + boolean isException = false; + try { + stream.fetch(120, 140, 100).get(); + }catch (ExecutionException e) { + if (e.getCause() instanceof ElasticStreamClientException) { + isException = true; + } + } + assertTrue(isException); } ReadDataBlock newReadDataBlock(long start, long end, int size) {