From 37923157f3317f49cf588698352f9acb58f150ad Mon Sep 17 00:00:00 2001 From: TheR1sing3un <87409330+TheR1sing3un@users.noreply.github.com> Date: Fri, 25 Aug 2023 18:32:33 +0800 Subject: [PATCH] fix: fix checkstyle and add ci flow for checkstyle and spotbugs (#106) ci(s3): add checkstyle and spotbugs in ci 1. add checkstyle and spotbugs in ci 2. fix to pass checkstyle Signed-off-by: TheR1sing3un fix: fix fetching problem brought by thread pool separating; fix style problems; add more thread for fetching and appending thread pool (#107) * fix: fix fetching problem brought by thread pool separating; fix style problems; add more thread for fetching and appending thread pool Signed-off-by: Curtis Wan * remove 'SLOW_FETCH_TIMEOUT_MILLIS - 1' case in quickFetch test and change 2 -> 'SLOW_FETCH_TIMEOUT_MILLIS / 2' in slowFetch test Signed-off-by: Curtis Wan * refactor: add more comments to make logic more clear Signed-off-by: Curtis Wan --------- Signed-off-by: Curtis Wan refactor: close #105; add more threads for partition opening or closing (#109) Signed-off-by: Curtis Wan feat(es): client factory SPI (#112) Signed-off-by: Robin Han fix: fix spotbugs Signed-off-by: Robin Han --- .github/workflows/build.yml | 15 ++ .github/workflows/es_unit_tests.yml | 17 ++ .github/workflows/validate_pr_title.yml | 15 ++ checkstyle/import-control-core.xml | 2 +- .../errors/es/SlowFetchHintException.java | 2 + .../kafka/log/es/AlwaysSuccessClient.java | 136 ++++++++-- .../kafka/log/es/ElasticLogManager.scala | 31 +-- .../log/es/client/ClientFactoryProxy.java | 41 +++ .../scala/kafka/log/es/client/Context.java | 24 ++ .../kafka/log/es/client/es/ClientFactory.java | 41 +++ .../log/es/client/memory/ClientFactory.java | 31 +++ .../log/es/client/redis/ClientFactory.java | 32 +++ .../log/s3/memory/MemoryMetadataManager.java | 6 +- .../main/scala/kafka/server/KafkaApis.scala | 4 +- .../scala/kafka/server/ReplicaManager.scala | 5 +- .../kafka/log/es/AlwaysSuccessClientTest.java | 241 ++++++++++++++++++ .../kafka/log/es/ElasticLogReopenTester.scala | 1 - tests/docker/es/docker-compose.yaml | 15 ++ tests/esk_test_suite.yml | 15 ++ 19 files changed, 620 insertions(+), 54 deletions(-) create mode 100644 core/src/main/scala/kafka/log/es/client/ClientFactoryProxy.java create mode 100644 core/src/main/scala/kafka/log/es/client/Context.java create mode 100644 core/src/main/scala/kafka/log/es/client/es/ClientFactory.java create mode 100644 core/src/main/scala/kafka/log/es/client/memory/ClientFactory.java create mode 100644 core/src/main/scala/kafka/log/es/client/redis/ClientFactory.java create mode 100644 core/src/test/java/kafka/log/es/AlwaysSuccessClientTest.java diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 69afab19cb..9abeb21143 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -1,3 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + name: Build on: pull_request: diff --git a/.github/workflows/es_unit_tests.yml b/.github/workflows/es_unit_tests.yml index c94fbe2cb6..c6dc0f3b1b 100644 --- a/.github/workflows/es_unit_tests.yml +++ b/.github/workflows/es_unit_tests.yml @@ -1,3 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + # This workflow uses actions that are not certified by GitHub. # They are provided by a third-party and are governed by # separate terms of service, privacy policy, and support @@ -31,5 +46,7 @@ jobs: distribution: "zulu" - name: Setup Gradle uses: gradle/gradle-build-action@v2 + - name: Checkstyle and Spotbugs + run: ./gradlew rat checkstyleMain checkstyleTest spotbugsMain spotbugsTest - name: Execute Gradle build run: ./gradlew metadata:esUnitTest core:esUnitTest metadata:S3UnitTest core:S3UnitTest diff --git a/.github/workflows/validate_pr_title.yml b/.github/workflows/validate_pr_title.yml index d2c4216123..51e7932735 100644 --- a/.github/workflows/validate_pr_title.yml +++ b/.github/workflows/validate_pr_title.yml @@ -1,3 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + name: "Lint PR" on: diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml index e491b58a00..c70dd6c115 100644 --- a/checkstyle/import-control-core.xml +++ b/checkstyle/import-control-core.xml @@ -51,8 +51,8 @@ - + diff --git a/clients/src/main/java/org/apache/kafka/common/errors/es/SlowFetchHintException.java b/clients/src/main/java/org/apache/kafka/common/errors/es/SlowFetchHintException.java index fb51a5a900..69b057307e 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/es/SlowFetchHintException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/es/SlowFetchHintException.java @@ -22,7 +22,9 @@ * Indicates that the fetch request was too slow to be served. The request should be served in separated thread pool. */ public class SlowFetchHintException extends RetriableException { + private static final long serialVersionUID = 1L; + public SlowFetchHintException() { super(); } diff --git a/core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java b/core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java index d5fce42e15..e446cb2ffa 100644 --- a/core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java +++ b/core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java @@ -28,34 +28,36 @@ import com.automq.elasticstream.client.api.StreamClient; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Future; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; +import org.apache.kafka.common.errors.es.SlowFetchHintException; import org.apache.kafka.common.utils.ThreadUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; @SuppressWarnings("uncheck") public class AlwaysSuccessClient implements Client { + private static final Logger LOGGER = LoggerFactory.getLogger(AlwaysSuccessClient.class); + public static final long SLOW_FETCH_TIMEOUT_MILLIS = 10; private static final ScheduledExecutorService STREAM_MANAGER_RETRY_SCHEDULER = Executors.newScheduledThreadPool(1, - ThreadUtils.createThreadFactory("stream-manager-retry-%d", true)); + ThreadUtils.createThreadFactory("stream-manager-retry-%d", true)); private static final ExecutorService STREAM_MANAGER_CALLBACK_EXECUTORS = Executors.newFixedThreadPool(1, - ThreadUtils.createThreadFactory("stream-manager-callback-executor-%d", true)); + ThreadUtils.createThreadFactory("stream-manager-callback-executor-%d", true)); private static final ScheduledExecutorService FETCH_RETRY_SCHEDULER = Executors.newScheduledThreadPool(1, - ThreadUtils.createThreadFactory("fetch-retry-scheduler-%d", true)); + ThreadUtils.createThreadFactory("fetch-retry-scheduler-%d", true)); private static final ExecutorService APPEND_CALLBACK_EXECUTORS = Executors.newFixedThreadPool(4, - ThreadUtils.createThreadFactory("append-callback-scheduler-%d", true)); + ThreadUtils.createThreadFactory("append-callback-scheduler-%d", true)); private static final ExecutorService FETCH_CALLBACK_EXECUTORS = Executors.newFixedThreadPool(4, - ThreadUtils.createThreadFactory("fetch-callback-scheduler-%d", true)); + ThreadUtils.createThreadFactory("fetch-callback-scheduler-%d", true)); private static final ScheduledExecutorService DELAY_FETCH_SCHEDULER = Executors.newScheduledThreadPool(1, ThreadUtils.createThreadFactory("fetch-delayer-%d", true)); private final StreamClient streamClient; @@ -78,6 +80,7 @@ public KVClient kvClient() { // TODO: do not retry when stream closed. static class StreamClientImpl implements StreamClient { + private final StreamClient streamClient; public StreamClientImpl(StreamClient streamClient) { @@ -126,10 +129,10 @@ private void openStream0(long streamId, OpenStreamOptions options, CompletableFu } static class StreamImpl implements Stream { + private final Stream stream; private volatile boolean closed = false; private final Map> holdUpFetchingFutureMap = new ConcurrentHashMap<>(); - private static final long SLOW_FETCH_TIMEOUT_MILLIS = 10; public StreamImpl(Stream stream) { this.stream = stream; @@ -154,20 +157,89 @@ public long nextOffset() { public CompletableFuture append(RecordBatch recordBatch) { CompletableFuture cf = new CompletableFuture<>(); stream.append(recordBatch) - .whenComplete((rst, ex) -> FutureUtil.suppress(() -> { - if (ex != null) { - cf.completeExceptionally(ex); + .whenComplete((rst, ex) -> FutureUtil.suppress(() -> { + if (ex != null) { + cf.completeExceptionally(ex); + } else { + cf.complete(rst); + } + }, LOGGER)); + return cf; + } + + /** + * Get a new CompletableFuture with a {@link SlowFetchHintException} if not otherwise completed before the given timeout. + * + * @param id the id of rawFuture in holdUpFetchingFutureMap + * @param rawFuture the raw future + * @param timeout how long to wait before completing exceptionally with a SlowFetchHintException, in units of {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter + * @return a new CompletableFuture with completed results of the rawFuture if the raw future is done before timeout, otherwise a new + * CompletableFuture with a {@link SlowFetchHintException} + */ + private CompletableFuture timeoutAndStoreFuture(String id, + CompletableFuture rawFuture, long timeout, + TimeUnit unit) { + if (unit == null) { + throw new NullPointerException(); + } + + if (rawFuture.isDone()) { + return rawFuture; + } + + final CompletableFuture cf = new CompletableFuture<>(); + rawFuture.whenComplete(new CompleteFetchingFutureAndCancelTimeoutCheck(Delayer.delay(() -> { + if (rawFuture == null) { + return; + } + + // If rawFuture is done, then complete the cf with the result of rawFuture. + if (rawFuture.isDone()) { + rawFuture.whenComplete((result, exception) -> { + if (exception != null) { + cf.completeExceptionally(exception); } else { - cf.complete(rst); + cf.complete(result); } - }, LOGGER)); + }); + } else { // else, complete the cf with a SlowFetchHintException and store the rawFuture for slow fetching. + holdUpFetchingFutureMap.putIfAbsent(id, rawFuture); + cf.completeExceptionally(new SlowFetchHintException()); + } + }, timeout, unit), cf)); return cf; } @Override public CompletableFuture fetch(long startOffset, long endOffset, int maxBytesHint) { + String holdUpKey = startOffset + "-" + endOffset + "-" + maxBytesHint; CompletableFuture cf = new CompletableFuture<>(); - fetch0(startOffset, endOffset, maxBytesHint, cf); + // If this thread is not marked, then just fetch data. + if (!SeparateSlowAndQuickFetchHint.isMarked()) { + if (holdUpFetchingFutureMap.containsKey(holdUpKey)) { + holdUpFetchingFutureMap.remove(holdUpKey).thenAccept(cf::complete); + } else { + fetch0(startOffset, endOffset, maxBytesHint, cf); + } + } else { + // Try to have a quick fetch. If fetching is timeout, then complete with SlowFetchHintException. + timeoutAndStoreFuture(holdUpKey, stream.fetch(startOffset, endOffset, maxBytesHint), SLOW_FETCH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS) + .whenComplete((rst, ex) -> FutureUtil.suppress(() -> { + if (ex != null) { + if (closed) { + cf.completeExceptionally(new IllegalStateException("stream already closed")); + } else if (ex instanceof SlowFetchHintException) { + LOGGER.debug("Fetch stream[{}] [{},{}) timeout for {} ms, retry later with slow fetching", streamId(), startOffset, endOffset, SLOW_FETCH_TIMEOUT_MILLIS); + cf.completeExceptionally(ex); + } else { + cf.completeExceptionally(ex); + } + } else { + cf.complete(rst); + } + }, LOGGER)); + } return cf; } @@ -236,22 +308,42 @@ public CompletableFuture destroy() { } static final class Delayer { + static ScheduledFuture delay(Runnable command, long delay, TimeUnit unit) { return DELAY_FETCH_SCHEDULER.schedule(command, delay, unit); } } - static final class Canceller implements BiConsumer { - final Future f; + /** + * A BiConsumer that completes the FetchResult future and cancels the timeout check task. + */ + static final class CompleteFetchingFutureAndCancelTimeoutCheck implements BiConsumer { + /** + * A ScheduledFuture that represents the timeout check task. + */ + final ScheduledFuture f; + /** + * A CompletableFuture waiting for the fetching result. + */ + final CompletableFuture waitingFuture; - Canceller(Future f) { + CompleteFetchingFutureAndCancelTimeoutCheck(ScheduledFuture f, CompletableFuture waitingFuture) { this.f = f; + this.waitingFuture = waitingFuture; } - public void accept(Object ignore, Throwable ex) { + public void accept(FetchResult result, Throwable ex) { + // cancels the timeout check task. if (ex == null && f != null && !f.isDone()) f.cancel(false); + + // completes the waiting future right now. + if (ex == null) { + waitingFuture.complete(result); + } else { + waitingFuture.completeExceptionally(ex); + } } } } diff --git a/core/src/main/scala/kafka/log/es/ElasticLogManager.scala b/core/src/main/scala/kafka/log/es/ElasticLogManager.scala index f902e57dda..c3438f3398 100644 --- a/core/src/main/scala/kafka/log/es/ElasticLogManager.scala +++ b/core/src/main/scala/kafka/log/es/ElasticLogManager.scala @@ -17,14 +17,14 @@ package kafka.log.es -import com.automq.elasticstream.client.DefaultClientBuilder -import kafka.log._ +import com.automq.elasticstream.client.api.Client +import kafka.log.{LogConfig, ProducerStateManagerConfig} import kafka.log.es.ElasticLogManager.NAMESPACE +import kafka.log.es.client.{ClientFactoryProxy, Context} import kafka.server.{KafkaConfig, LogDirFailureChannel} import kafka.utils.Scheduler import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.utils.Time -import com.automq.elasticstream.client.api.Client import java.io.File import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} @@ -80,10 +80,6 @@ class ElasticLogManager(val client: Client) { } object ElasticLogManager { - private val ES_ENDPOINT_PREFIX = "es://" - private val MEMORY_ENDPOINT_PREFIX = "memory://" - private val REDIS_ENDPOINT_PREFIX = "redis://" - var INSTANCE: Option[ElasticLogManager] = None var NAMESPACE = "" @@ -105,24 +101,9 @@ object ElasticLogManager { if (endpoint == null) { return false } - if (endpoint.startsWith(ES_ENDPOINT_PREFIX)) { - val kvEndpoint = config.elasticStreamKvEndpoint; - if (!kvEndpoint.startsWith(ES_ENDPOINT_PREFIX)) { - throw new IllegalArgumentException(s"Elastic stream endpoint and kvEndpoint must be the same protocol: $endpoint $kvEndpoint") - } - val streamClient = new AlwaysSuccessClient(new DefaultClientBuilder() - .endpoint(endpoint.substring(ES_ENDPOINT_PREFIX.length)) - .kvEndpoint(kvEndpoint.substring(ES_ENDPOINT_PREFIX.length)) - .build()) - INSTANCE = Some(new ElasticLogManager(streamClient)) - } else if (endpoint.startsWith(MEMORY_ENDPOINT_PREFIX)) { - val streamClient = new AlwaysSuccessClient(new MemoryClient()) - INSTANCE = Some(new ElasticLogManager(streamClient)) - } else if (endpoint.startsWith(REDIS_ENDPOINT_PREFIX)) { - INSTANCE = Some(new ElasticLogManager(new ElasticRedisClient(endpoint.substring(REDIS_ENDPOINT_PREFIX.length)))) - } else { - return false - } + val context = new Context() + context.config = config + INSTANCE = Some(new ElasticLogManager(ClientFactoryProxy.get(context))) val namespace = config.elasticStreamNamespace NAMESPACE = if (namespace == null || namespace.isEmpty) { diff --git a/core/src/main/scala/kafka/log/es/client/ClientFactoryProxy.java b/core/src/main/scala/kafka/log/es/client/ClientFactoryProxy.java new file mode 100644 index 0000000000..0da3758894 --- /dev/null +++ b/core/src/main/scala/kafka/log/es/client/ClientFactoryProxy.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.es.client; + +import com.automq.elasticstream.client.api.Client; + +import java.lang.reflect.Method; + +public class ClientFactoryProxy { + private static final String PROTOCOL_SEPERATOR = ":"; + private static final String FACTORY_CLASS_FORMAT = "kafka.log.es.client.%s.ClientFactory"; + + public static Client get(Context context) { + String endpoint = context.config.elasticStreamEndpoint(); + String protocal = endpoint.split(PROTOCOL_SEPERATOR)[0]; + String className = String.format(FACTORY_CLASS_FORMAT, protocal); + try { + Class clazz = Class.forName(className); + Method method = clazz.getDeclaredMethod("get", Context.class); + return (Client) method.invoke(null, context); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + +} diff --git a/core/src/main/scala/kafka/log/es/client/Context.java b/core/src/main/scala/kafka/log/es/client/Context.java new file mode 100644 index 0000000000..9cda65b588 --- /dev/null +++ b/core/src/main/scala/kafka/log/es/client/Context.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.es.client; + +import kafka.server.KafkaConfig; + +public class Context { + public KafkaConfig config; +} diff --git a/core/src/main/scala/kafka/log/es/client/es/ClientFactory.java b/core/src/main/scala/kafka/log/es/client/es/ClientFactory.java new file mode 100644 index 0000000000..c9e242f7b0 --- /dev/null +++ b/core/src/main/scala/kafka/log/es/client/es/ClientFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.es.client.es; + +import com.automq.elasticstream.client.DefaultClientBuilder; +import com.automq.elasticstream.client.api.Client; +import kafka.log.es.AlwaysSuccessClient; +import kafka.log.es.client.Context; + +public class ClientFactory { + private static final String ES_ENDPOINT_PREFIX = "es://"; + + public static Client get(Context context) { + String endpoint = context.config.elasticStreamEndpoint(); + String kvEndpoint = context.config.elasticStreamKvEndpoint(); + if (!kvEndpoint.startsWith(ES_ENDPOINT_PREFIX)) { + throw new IllegalArgumentException("Elastic stream endpoint and kvEndpoint must be the same protocol: " + endpoint + " " + kvEndpoint); + } + Client client = new DefaultClientBuilder() + .endpoint(endpoint.substring(ES_ENDPOINT_PREFIX.length())) + .kvEndpoint(kvEndpoint.substring(ES_ENDPOINT_PREFIX.length())) + .build(); + return new AlwaysSuccessClient(client); + } + +} diff --git a/core/src/main/scala/kafka/log/es/client/memory/ClientFactory.java b/core/src/main/scala/kafka/log/es/client/memory/ClientFactory.java new file mode 100644 index 0000000000..01e6d02dec --- /dev/null +++ b/core/src/main/scala/kafka/log/es/client/memory/ClientFactory.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.es.client.memory; + +import com.automq.elasticstream.client.api.Client; +import kafka.log.es.client.Context; +import kafka.log.es.AlwaysSuccessClient; +import kafka.log.es.MemoryClient; + +public class ClientFactory { + + public static Client get(Context context) { + return new AlwaysSuccessClient(new MemoryClient()); + } + +} diff --git a/core/src/main/scala/kafka/log/es/client/redis/ClientFactory.java b/core/src/main/scala/kafka/log/es/client/redis/ClientFactory.java new file mode 100644 index 0000000000..fe9c03a886 --- /dev/null +++ b/core/src/main/scala/kafka/log/es/client/redis/ClientFactory.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.es.client.redis; + +import com.automq.elasticstream.client.api.Client; +import kafka.log.es.ElasticRedisClient; +import kafka.log.es.client.Context; + +public class ClientFactory { + public static final String REDIS_ENDPOINT_PREFIX = "redis://"; + + public static Client get(Context context) { + String endpoint = context.config.elasticStreamEndpoint(); + return new ElasticRedisClient(endpoint.substring(REDIS_ENDPOINT_PREFIX.length())); + } + +} diff --git a/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java b/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java index 71452c3afb..5fc0ecc879 100644 --- a/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java +++ b/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; @@ -62,7 +63,7 @@ public class MemoryMetadataManager implements StreamManager, ObjectManager { private final Map brokerWALMetadata; private static class MemoryStreamMetadata { - private long streamId; + private final long streamId; private long epoch; private long startOffset; private long endOffset; @@ -76,6 +77,9 @@ public MemoryStreamMetadata(long streamId, long epoch, long startOffset, long en } public void addStreamObject(S3StreamObject object) { + if (streamObjects == null) { + streamObjects = new LinkedList<>(); + } streamObjects.add(object); } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index b252b17916..ef22ed86db 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -122,8 +122,8 @@ class KafkaApis(val requestChannel: RequestChannel, val aclApis = new AclApis(authHelper, authorizer, requestHelper, "broker", config) val configManager = new ConfigAdminManager(brokerId, config, configRepository) // These two executors separate the handling of `produce` and `fetch` requests in case of throttling. - val appendingExecutors = Executors.newFixedThreadPool(2, ThreadUtils.createThreadFactory("kafka-apis-appending-executor-%d", true)) - val fetchingExecutors = Executors.newFixedThreadPool(2, ThreadUtils.createThreadFactory("kafka-apis-fetching-executor-%d", true)) + val appendingExecutors = Executors.newFixedThreadPool(4, ThreadUtils.createThreadFactory("kafka-apis-appending-executor-%d", true)) + val fetchingExecutors = Executors.newFixedThreadPool(4, ThreadUtils.createThreadFactory("kafka-apis-fetching-executor-%d", true)) def close(): Unit = { aclApis.close() diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index b4e9aab50e..4eddc2cbe2 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -64,7 +64,7 @@ import java.util import java.util.Optional import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.locks.Lock -import java.util.concurrent.{Executors, TimeUnit} +import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit} import scala.collection.{Map, Seq, Set, mutable} import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ @@ -223,7 +223,8 @@ class ReplicaManager(val config: KafkaConfig, // This threadPool is used to separate slow fetches from quick fetches. val slowFetchExecutors = Executors.newFixedThreadPool(4, ThreadUtils.createThreadFactory("slow-fetch-executor-%d", true)) // This threadPool is used to handle partition open/close in case of throttling metadata replay. - val partitionOpenCloseExecutors = Executors.newFixedThreadPool(4, ThreadUtils.createThreadFactory("partition-open-close-executor-%d", true)) + val partitionOpenCloseExecutors = new ThreadPoolExecutor(4, 32, 30, TimeUnit.SECONDS, new util.concurrent.LinkedBlockingQueue[Runnable](32), + ThreadUtils.createThreadFactory("partition-open-close-executor-%d", true)) /* epoch of the controller that last changed the leader */ @volatile private[server] var controllerEpoch: Int = KafkaController.InitialControllerEpoch diff --git a/core/src/test/java/kafka/log/es/AlwaysSuccessClientTest.java b/core/src/test/java/kafka/log/es/AlwaysSuccessClientTest.java new file mode 100644 index 0000000000..18e5bc05a4 --- /dev/null +++ b/core/src/test/java/kafka/log/es/AlwaysSuccessClientTest.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.es; + +import com.automq.elasticstream.client.api.AppendResult; +import com.automq.elasticstream.client.api.CreateStreamOptions; +import com.automq.elasticstream.client.api.FetchResult; +import com.automq.elasticstream.client.api.OpenStreamOptions; +import com.automq.elasticstream.client.api.RecordBatch; +import com.automq.elasticstream.client.api.RecordBatchWithContext; +import com.automq.elasticstream.client.api.Stream; +import com.automq.elasticstream.client.api.StreamClient; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.NavigableMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.kafka.common.errors.es.SlowFetchHintException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Tag("esUnit") +class AlwaysSuccessClientTest { + private static final long SLOW_FETCH_TIMEOUT_MILLIS = AlwaysSuccessClient.SLOW_FETCH_TIMEOUT_MILLIS; + + @BeforeEach + public void setup() { + SeparateSlowAndQuickFetchHint.mark(); + } + + @AfterEach + public void teardown() { + SeparateSlowAndQuickFetchHint.reset(); + } + + @Test + public void basicAppendAndFetch() throws ExecutionException, InterruptedException { + AlwaysSuccessClient client = new AlwaysSuccessClient(new MemoryClient()); + Stream stream = client + .streamClient() + .createAndOpenStream(CreateStreamOptions.newBuilder().epoch(0).replicaCount(1).build()) + .get(); + List payloads = List.of("hello".getBytes(), "world".getBytes()); + payloads.forEach(payload -> stream.append(RawPayloadRecordBatch.of(ByteBuffer.wrap(payload)))); + + FetchResult fetched = stream.fetch(0, 100, 1000).get(); + checkAppendAndFetch(payloads, fetched); + + stream.destroy(); + } + + @Test + public void testQuickFetch() throws ExecutionException, InterruptedException { + MemoryClientWithDelay memoryClientWithDelay = new MemoryClientWithDelay(); + AlwaysSuccessClient client = new AlwaysSuccessClient(memoryClientWithDelay); + List quickFetchDelayMillisList = List.of(1L, SLOW_FETCH_TIMEOUT_MILLIS / 2); + List payloads = List.of("hello".getBytes(), "world".getBytes()); + + // test quick fetch + for (Long delay : quickFetchDelayMillisList) { + memoryClientWithDelay.setDelayMillis(delay); + Stream stream = client + .streamClient() + .createAndOpenStream(CreateStreamOptions.newBuilder().epoch(0).replicaCount(1).build()) + .get(); + payloads.forEach(payload -> stream.append(RawPayloadRecordBatch.of(ByteBuffer.wrap(payload)))); + FetchResult fetched = stream.fetch(0, 100, 1000) + .orTimeout(delay + 3, TimeUnit.MILLISECONDS) + .get(); + checkAppendAndFetch(payloads, fetched); + stream.destroy(); + } + } + + @Test + public void testSlowFetch() throws ExecutionException, InterruptedException { + MemoryClientWithDelay memoryClientWithDelay = new MemoryClientWithDelay(); + AlwaysSuccessClient client = new AlwaysSuccessClient(memoryClientWithDelay); + List payloads = List.of("hello".getBytes(), "world".getBytes()); + + long slowFetchDelay = SLOW_FETCH_TIMEOUT_MILLIS + 1; + memoryClientWithDelay.setDelayMillis(slowFetchDelay); + Stream stream = client + .streamClient() + .createAndOpenStream(CreateStreamOptions.newBuilder().epoch(0).replicaCount(1).build()) + .get(); + payloads.forEach(payload -> stream.append(RawPayloadRecordBatch.of(ByteBuffer.wrap(payload)))); + + FetchResult fetched = null; + try { + fetched = stream.fetch(0, 100, 1000) + .orTimeout(slowFetchDelay + 3, TimeUnit.MILLISECONDS) + .get(); + checkAppendAndFetch(payloads, fetched); + } catch (ExecutionException e) { + // should throw SlowFetchHintException after SLOW_FETCH_TIMEOUT_MILLIS ms + assertEquals(SlowFetchHintException.class, e.getCause().getClass()); + SeparateSlowAndQuickFetchHint.reset(); + // It should reuse the fetching future above, therefore only (SLOW_FETCH_TIMEOUT_MILLIS / 2) ms is tolerable. + fetched = stream.fetch(0, 100, 1000) + .orTimeout(SLOW_FETCH_TIMEOUT_MILLIS / 2, TimeUnit.MILLISECONDS) + .get(); + } + checkAppendAndFetch(payloads, fetched); + stream.destroy(); + } + + private void checkAppendAndFetch(List rawPayloads, FetchResult fetched) { + for (int i = 0; i < fetched.recordBatchList().size(); i++) { + assertEquals(rawPayloads.get(i), fetched.recordBatchList().get(i).rawPayload().array()); + } + } + + static final class MemoryClientWithDelay extends MemoryClient { + private final StreamClientImpl streamClient = new StreamClientImpl(); + public void setDelayMillis(long delayMillis) { + streamClient.setDelayMillis(delayMillis); + } + + @Override + public StreamClient streamClient() { + return streamClient; + } + + static class StreamClientImpl implements StreamClient { + private final AtomicLong streamIdAlloc = new AtomicLong(); + private long delayMillis = 0; + + public StreamClientImpl() { + } + + public void setDelayMillis(long delayMillis) { + this.delayMillis = delayMillis; + } + + @Override + public CompletableFuture createAndOpenStream(CreateStreamOptions createStreamOptions) { + return CompletableFuture.completedFuture(new TestStreamImpl(streamIdAlloc.incrementAndGet(), delayMillis)); + } + + @Override + public CompletableFuture openStream(long streamId, OpenStreamOptions openStreamOptions) { + return CompletableFuture.completedFuture(new TestStreamImpl(streamId, delayMillis)); + } + } + + static class TestStreamImpl implements Stream { + private final AtomicLong nextOffsetAlloc = new AtomicLong(); + private NavigableMap recordMap = new ConcurrentSkipListMap<>(); + private final long streamId; + /** + * The additional fetching delay + */ + private long delayMillis; + + public TestStreamImpl(long streamId, long delayMillis) { + this.streamId = streamId; + this.delayMillis = delayMillis; + } + + @Override + public long streamId() { + return streamId; + } + + @Override + public long startOffset() { + return 0; + } + + @Override + public long nextOffset() { + return nextOffsetAlloc.get(); + } + + @Override + public synchronized CompletableFuture append(RecordBatch recordBatch) { + long baseOffset = nextOffsetAlloc.getAndAdd(recordBatch.count()); + recordMap.put(baseOffset, new RecordBatchWithContextWrapper(recordBatch, baseOffset)); + return CompletableFuture.completedFuture(() -> baseOffset); + } + + @Override + public CompletableFuture fetch(long startOffset, long endOffset, int maxSizeHint) { + Long floorKey = recordMap.floorKey(startOffset); + if (floorKey == null) { + return CompletableFuture.completedFuture(ArrayList::new); + } + List records = new ArrayList<>(recordMap.subMap(floorKey, endOffset).values()); + return CompletableFuture.supplyAsync(() -> { + try { + Thread.sleep(delayMillis); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return () -> records; + }); + } + + @Override + public CompletableFuture trim(long newStartOffset) { + recordMap = new ConcurrentSkipListMap<>(recordMap.tailMap(newStartOffset)); + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture close() { + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture destroy() { + recordMap.clear(); + return CompletableFuture.completedFuture(null); + } + } + } +} \ No newline at end of file diff --git a/core/src/test/scala/kafka/log/es/ElasticLogReopenTester.scala b/core/src/test/scala/kafka/log/es/ElasticLogReopenTester.scala index 33a7e3d2a7..ef9899ea0b 100644 --- a/core/src/test/scala/kafka/log/es/ElasticLogReopenTester.scala +++ b/core/src/test/scala/kafka/log/es/ElasticLogReopenTester.scala @@ -15,7 +15,6 @@ * limitations under the License. */ - package kafka.log.es import joptsimple.OptionParser diff --git a/tests/docker/es/docker-compose.yaml b/tests/docker/es/docker-compose.yaml index f2a37904b3..66103aeeed 100644 --- a/tests/docker/es/docker-compose.yaml +++ b/tests/docker/es/docker-compose.yaml @@ -1,3 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + version: "3" services: range-server: diff --git a/tests/esk_test_suite.yml b/tests/esk_test_suite.yml index c853c93751..2a3cebb583 100644 --- a/tests/esk_test_suite.yml +++ b/tests/esk_test_suite.yml @@ -1,3 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + # list all tests that are part of the suite under the test suite name: esk_test_suite: included: