diff --git a/core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java b/core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java index acd1e72aa7..d5fce42e15 100644 --- a/core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java +++ b/core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java @@ -26,12 +26,13 @@ import com.automq.elasticstream.client.api.RecordBatch; import com.automq.elasticstream.client.api.Stream; import com.automq.elasticstream.client.api.StreamClient; + import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.ScheduledFuture; import java.util.function.BiConsumer; -import org.apache.kafka.common.errors.es.SlowFetchHintException; + import org.apache.kafka.common.utils.ThreadUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,7 +57,7 @@ public class AlwaysSuccessClient implements Client { private static final ExecutorService FETCH_CALLBACK_EXECUTORS = Executors.newFixedThreadPool(4, ThreadUtils.createThreadFactory("fetch-callback-scheduler-%d", true)); private static final ScheduledExecutorService DELAY_FETCH_SCHEDULER = Executors.newScheduledThreadPool(1, - ThreadUtils.createThreadFactory("fetch-delayer-%d", true)); + ThreadUtils.createThreadFactory("fetch-delayer-%d", true)); private final StreamClient streamClient; private final KVClient kvClient; @@ -128,7 +129,7 @@ static class StreamImpl implements Stream { private final Stream stream; private volatile boolean closed = false; private final Map> holdUpFetchingFutureMap = new ConcurrentHashMap<>(); - private final long SLOW_FETCH_TIMEOUT_MILLIS = 10; + private static final long SLOW_FETCH_TIMEOUT_MILLIS = 10; public StreamImpl(Stream stream) { this.stream = stream; @@ -163,44 +164,6 @@ public CompletableFuture append(RecordBatch recordBatch) { return cf; } - /** - * Get a new CompletableFuture with - * a {@link SlowFetchHintException} if not otherwise completed - * before the given timeout. - * @param id the id of rawFuture in holdUpFetchingFutureMap - * @param rawFuture the raw future - * @param timeout how long to wait before completing exceptionally - * with a SlowFetchHintException, in units of {@code unit} - * @param unit a {@code TimeUnit} determining how to interpret the - * {@code timeout} parameter - * @return a new CompletableFuture with completed results of the rawFuture if the raw future is done before timeout, - * otherwise a new CompletableFuture with a {@link SlowFetchHintException} - */ - private CompletableFuture timeoutAndStoreFuture(String id, CompletableFuture rawFuture, long timeout, - TimeUnit unit) { - if (unit == null) { - throw new NullPointerException(); - } - - if (!rawFuture.isDone()) { - final CompletableFuture cf = new CompletableFuture<>(); - rawFuture.whenComplete(new Canceller(Delayer.delay(() -> { - if (rawFuture == null) { - return; - } - if (rawFuture.isDone()) { - rawFuture.thenAccept(cf::complete); - } else { - holdUpFetchingFutureMap.putIfAbsent(id, rawFuture); - cf.completeExceptionally(new SlowFetchHintException()); - } - }, - timeout, unit))); - return cf; - } - return rawFuture; - } - @Override public CompletableFuture fetch(long startOffset, long endOffset, int maxBytesHint) { CompletableFuture cf = new CompletableFuture<>(); @@ -274,7 +237,7 @@ public CompletableFuture destroy() { static final class Delayer { static ScheduledFuture delay(Runnable command, long delay, - TimeUnit unit) { + TimeUnit unit) { return DELAY_FETCH_SCHEDULER.schedule(command, delay, unit); } } diff --git a/core/src/main/scala/kafka/log/es/ElasticLogManager.scala b/core/src/main/scala/kafka/log/es/ElasticLogManager.scala index 48ab36b8f7..f902e57dda 100644 --- a/core/src/main/scala/kafka/log/es/ElasticLogManager.scala +++ b/core/src/main/scala/kafka/log/es/ElasticLogManager.scala @@ -91,6 +91,16 @@ object ElasticLogManager { if (!config.elasticStreamEnabled) { return false } + + // TODO: modify kafka on es repo to support SPI + // FIXME: S3Client will cause ElasticLogSegmentTest fail + // if (true) { + // val streamClient = new AlwaysSuccessClient(new S3Client()); + // INSTANCE = Some(new ElasticLogManager(streamClient)) + // return true + // } + + val endpoint = config.elasticStreamEndpoint if (endpoint == null) { return false diff --git a/core/src/main/scala/kafka/log/es/MemoryClient.java b/core/src/main/scala/kafka/log/es/MemoryClient.java index 8131d86962..50c8272c3d 100644 --- a/core/src/main/scala/kafka/log/es/MemoryClient.java +++ b/core/src/main/scala/kafka/log/es/MemoryClient.java @@ -128,7 +128,7 @@ public CompletableFuture openStream(long streamId, OpenStreamOptions ope } } - static class KVClientImpl implements KVClient { + public static class KVClientImpl implements KVClient { private final Map store = new ConcurrentHashMap<>(); @Override diff --git a/core/src/main/scala/kafka/log/es/MetaStream.java b/core/src/main/scala/kafka/log/es/MetaStream.java index f409d68af3..d454363486 100644 --- a/core/src/main/scala/kafka/log/es/MetaStream.java +++ b/core/src/main/scala/kafka/log/es/MetaStream.java @@ -22,7 +22,6 @@ import com.automq.elasticstream.client.api.RecordBatch; import com.automq.elasticstream.client.api.RecordBatchWithContext; import com.automq.elasticstream.client.api.Stream; -import com.fasterxml.jackson.core.JsonProcessingException; import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; diff --git a/core/src/main/scala/kafka/log/es/SeparateSlowAndQuickFetchHint.java b/core/src/main/scala/kafka/log/es/SeparateSlowAndQuickFetchHint.java index 03b19cdee1..a233d67deb 100644 --- a/core/src/main/scala/kafka/log/es/SeparateSlowAndQuickFetchHint.java +++ b/core/src/main/scala/kafka/log/es/SeparateSlowAndQuickFetchHint.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package kafka.log.es; import io.netty.util.concurrent.FastThreadLocal; diff --git a/core/src/main/scala/kafka/log/s3/S3Client.java b/core/src/main/scala/kafka/log/s3/S3Client.java index f6153344e4..41a80820e4 100644 --- a/core/src/main/scala/kafka/log/s3/S3Client.java +++ b/core/src/main/scala/kafka/log/s3/S3Client.java @@ -20,15 +20,34 @@ import com.automq.elasticstream.client.api.Client; import com.automq.elasticstream.client.api.KVClient; import com.automq.elasticstream.client.api.StreamClient; +import kafka.log.es.MemoryClient; +import kafka.log.s3.cache.DefaultS3BlockCache; +import kafka.log.s3.cache.S3BlockCache; +import kafka.log.s3.memory.MemoryMetadataManager; +import kafka.log.s3.operator.MemoryS3Operator; +import kafka.log.s3.operator.S3Operator; public class S3Client implements Client { + private final StreamClient streamClient; + private final KVClient kvClient; + + public S3Client() { + MemoryMetadataManager manager = new MemoryMetadataManager(); + manager.start(); + S3Operator s3Operator = new MemoryS3Operator(); + Wal wal = new S3Wal(manager, s3Operator); + S3BlockCache blockCache = new DefaultS3BlockCache(manager, s3Operator); + this.streamClient = new S3StreamClient(manager, wal, blockCache, manager); + this.kvClient = new MemoryClient.KVClientImpl(); + } + @Override public StreamClient streamClient() { - return null; + return streamClient; } @Override public KVClient kvClient() { - return null; + return kvClient; } } diff --git a/core/src/main/scala/kafka/log/s3/S3StreamClient.java b/core/src/main/scala/kafka/log/s3/S3StreamClient.java index 1903267abf..bdf58baffa 100644 --- a/core/src/main/scala/kafka/log/s3/S3StreamClient.java +++ b/core/src/main/scala/kafka/log/s3/S3StreamClient.java @@ -29,13 +29,13 @@ public class S3StreamClient implements StreamClient { - private final StreamManager streamController; + private final StreamManager streamManager; private final Wal wal; private final S3BlockCache blockCache; private final ObjectManager objectManager; - public S3StreamClient(StreamManager streamController, Wal wal, S3BlockCache blockCache, ObjectManager objectManager) { - this.streamController = streamController; + public S3StreamClient(StreamManager streamManager, Wal wal, S3BlockCache blockCache, ObjectManager objectManager) { + this.streamManager = streamManager; this.wal = wal; this.blockCache = blockCache; this.objectManager = objectManager; @@ -43,7 +43,7 @@ public S3StreamClient(StreamManager streamController, Wal wal, S3BlockCache bloc @Override public CompletableFuture createAndOpenStream(CreateStreamOptions options) { - return streamController.createStream().thenCompose(streamId -> openStream0(streamId, options.epoch())); + return streamManager.createStream().thenCompose(streamId -> openStream0(streamId, options.epoch())); } @Override @@ -52,10 +52,10 @@ public CompletableFuture openStream(long streamId, OpenStreamOptions ope } private CompletableFuture openStream0(long streamId, long epoch) { - return streamController.openStream(streamId, epoch). + return streamManager.openStream(streamId, epoch). thenApply(metadata -> new S3Stream( metadata.getStreamId(), metadata.getEpoch(), metadata.getStartOffset(), metadata.getNextOffset(), - wal, blockCache, streamController)); + wal, blockCache, streamManager)); } } diff --git a/core/src/main/scala/kafka/log/s3/cache/DefaultS3BlockCache.java b/core/src/main/scala/kafka/log/s3/cache/DefaultS3BlockCache.java index 8886c0159a..b18c7aecd5 100644 --- a/core/src/main/scala/kafka/log/s3/cache/DefaultS3BlockCache.java +++ b/core/src/main/scala/kafka/log/s3/cache/DefaultS3BlockCache.java @@ -24,6 +24,7 @@ import kafka.log.s3.operator.S3Operator; import org.apache.kafka.common.utils.CloseableIterator; +import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -39,6 +40,9 @@ public DefaultS3BlockCache(ObjectManager objectManager, S3Operator s3Operator) { @Override public CompletableFuture read(long streamId, long startOffset, long endOffset, int maxBytes) { + if (startOffset >= endOffset || maxBytes <= 0) { + return CompletableFuture.completedFuture(new ReadDataBlock(Collections.emptyList())); + } List objects = objectManager.getObjects(streamId, startOffset, endOffset, 2); ReadContext context = new ReadContext(objects, startOffset, maxBytes); return read0(streamId, endOffset, context); diff --git a/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java b/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java index bbad82112f..c8c6fe1afb 100644 --- a/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java +++ b/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java @@ -269,6 +269,7 @@ public CompletableFuture createStream() { @Override public CompletableFuture openStream(long streamId, long epoch) { return this.submitEvent(() -> { + // TODO: all task should wrapped with try catch to avoid future is forgot to complete // verify stream exist if (!this.streamsMetadata.containsKey(streamId)) { throw new StreamNotExistException("Stream " + streamId + " does not exist"); @@ -280,7 +281,13 @@ public CompletableFuture openStream(long streamId, long epoc } if (streamMetadata.getEpoch() == epoch) { // get active range - long endOffset = streamMetadata.getRanges().get(streamMetadata.getRanges().size() - 1).getEndOffset(); + int rangesCount = streamMetadata.getRanges().size(); + long endOffset = 0; + if (rangesCount != 0) { + endOffset = streamMetadata.getRanges().get(streamMetadata.getRanges().size() - 1).getEndOffset(); + } else { + streamMetadata.getRanges().add(new RangeMetadata(0, 0, 0, MOCK_BROKER_ID)); + } return new OpenStreamMetadata(streamId, epoch, streamMetadata.getStartOffset(), endOffset); } // create new range