diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 69afab19cb..9abeb21143 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -1,3 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
name: Build
on:
pull_request:
diff --git a/.github/workflows/es_unit_tests.yml b/.github/workflows/es_unit_tests.yml
index c94fbe2cb6..c6dc0f3b1b 100644
--- a/.github/workflows/es_unit_tests.yml
+++ b/.github/workflows/es_unit_tests.yml
@@ -1,3 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
# This workflow uses actions that are not certified by GitHub.
# They are provided by a third-party and are governed by
# separate terms of service, privacy policy, and support
@@ -31,5 +46,7 @@ jobs:
distribution: "zulu"
- name: Setup Gradle
uses: gradle/gradle-build-action@v2
+ - name: Checkstyle and Spotbugs
+ run: ./gradlew rat checkstyleMain checkstyleTest spotbugsMain spotbugsTest
- name: Execute Gradle build
run: ./gradlew metadata:esUnitTest core:esUnitTest metadata:S3UnitTest core:S3UnitTest
diff --git a/.github/workflows/validate_pr_title.yml b/.github/workflows/validate_pr_title.yml
index d2c4216123..51e7932735 100644
--- a/.github/workflows/validate_pr_title.yml
+++ b/.github/workflows/validate_pr_title.yml
@@ -1,3 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
name: "Lint PR"
on:
diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
index e491b58a00..c70dd6c115 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -51,8 +51,8 @@
-
+
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/es/SlowFetchHintException.java b/clients/src/main/java/org/apache/kafka/common/errors/es/SlowFetchHintException.java
index fb51a5a900..69b057307e 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/es/SlowFetchHintException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/es/SlowFetchHintException.java
@@ -22,7 +22,9 @@
* Indicates that the fetch request was too slow to be served. The request should be served in separated thread pool.
*/
public class SlowFetchHintException extends RetriableException {
+
private static final long serialVersionUID = 1L;
+
public SlowFetchHintException() {
super();
}
diff --git a/core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java b/core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java
index d5fce42e15..e446cb2ffa 100644
--- a/core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java
+++ b/core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java
@@ -28,34 +28,36 @@
import com.automq.elasticstream.client.api.StreamClient;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
+import org.apache.kafka.common.errors.es.SlowFetchHintException;
import org.apache.kafka.common.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
@SuppressWarnings("uncheck")
public class AlwaysSuccessClient implements Client {
+
private static final Logger LOGGER = LoggerFactory.getLogger(AlwaysSuccessClient.class);
+ public static final long SLOW_FETCH_TIMEOUT_MILLIS = 10;
private static final ScheduledExecutorService STREAM_MANAGER_RETRY_SCHEDULER = Executors.newScheduledThreadPool(1,
- ThreadUtils.createThreadFactory("stream-manager-retry-%d", true));
+ ThreadUtils.createThreadFactory("stream-manager-retry-%d", true));
private static final ExecutorService STREAM_MANAGER_CALLBACK_EXECUTORS = Executors.newFixedThreadPool(1,
- ThreadUtils.createThreadFactory("stream-manager-callback-executor-%d", true));
+ ThreadUtils.createThreadFactory("stream-manager-callback-executor-%d", true));
private static final ScheduledExecutorService FETCH_RETRY_SCHEDULER = Executors.newScheduledThreadPool(1,
- ThreadUtils.createThreadFactory("fetch-retry-scheduler-%d", true));
+ ThreadUtils.createThreadFactory("fetch-retry-scheduler-%d", true));
private static final ExecutorService APPEND_CALLBACK_EXECUTORS = Executors.newFixedThreadPool(4,
- ThreadUtils.createThreadFactory("append-callback-scheduler-%d", true));
+ ThreadUtils.createThreadFactory("append-callback-scheduler-%d", true));
private static final ExecutorService FETCH_CALLBACK_EXECUTORS = Executors.newFixedThreadPool(4,
- ThreadUtils.createThreadFactory("fetch-callback-scheduler-%d", true));
+ ThreadUtils.createThreadFactory("fetch-callback-scheduler-%d", true));
private static final ScheduledExecutorService DELAY_FETCH_SCHEDULER = Executors.newScheduledThreadPool(1,
ThreadUtils.createThreadFactory("fetch-delayer-%d", true));
private final StreamClient streamClient;
@@ -78,6 +80,7 @@ public KVClient kvClient() {
// TODO: do not retry when stream closed.
static class StreamClientImpl implements StreamClient {
+
private final StreamClient streamClient;
public StreamClientImpl(StreamClient streamClient) {
@@ -126,10 +129,10 @@ private void openStream0(long streamId, OpenStreamOptions options, CompletableFu
}
static class StreamImpl implements Stream {
+
private final Stream stream;
private volatile boolean closed = false;
private final Map> holdUpFetchingFutureMap = new ConcurrentHashMap<>();
- private static final long SLOW_FETCH_TIMEOUT_MILLIS = 10;
public StreamImpl(Stream stream) {
this.stream = stream;
@@ -154,20 +157,89 @@ public long nextOffset() {
public CompletableFuture append(RecordBatch recordBatch) {
CompletableFuture cf = new CompletableFuture<>();
stream.append(recordBatch)
- .whenComplete((rst, ex) -> FutureUtil.suppress(() -> {
- if (ex != null) {
- cf.completeExceptionally(ex);
+ .whenComplete((rst, ex) -> FutureUtil.suppress(() -> {
+ if (ex != null) {
+ cf.completeExceptionally(ex);
+ } else {
+ cf.complete(rst);
+ }
+ }, LOGGER));
+ return cf;
+ }
+
+ /**
+ * Get a new CompletableFuture with a {@link SlowFetchHintException} if not otherwise completed before the given timeout.
+ *
+ * @param id the id of rawFuture in holdUpFetchingFutureMap
+ * @param rawFuture the raw future
+ * @param timeout how long to wait before completing exceptionally with a SlowFetchHintException, in units of {@code unit}
+ * @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
+ * @return a new CompletableFuture with completed results of the rawFuture if the raw future is done before timeout, otherwise a new
+ * CompletableFuture with a {@link SlowFetchHintException}
+ */
+ private CompletableFuture timeoutAndStoreFuture(String id,
+ CompletableFuture rawFuture, long timeout,
+ TimeUnit unit) {
+ if (unit == null) {
+ throw new NullPointerException();
+ }
+
+ if (rawFuture.isDone()) {
+ return rawFuture;
+ }
+
+ final CompletableFuture cf = new CompletableFuture<>();
+ rawFuture.whenComplete(new CompleteFetchingFutureAndCancelTimeoutCheck(Delayer.delay(() -> {
+ if (rawFuture == null) {
+ return;
+ }
+
+ // If rawFuture is done, then complete the cf with the result of rawFuture.
+ if (rawFuture.isDone()) {
+ rawFuture.whenComplete((result, exception) -> {
+ if (exception != null) {
+ cf.completeExceptionally(exception);
} else {
- cf.complete(rst);
+ cf.complete(result);
}
- }, LOGGER));
+ });
+ } else { // else, complete the cf with a SlowFetchHintException and store the rawFuture for slow fetching.
+ holdUpFetchingFutureMap.putIfAbsent(id, rawFuture);
+ cf.completeExceptionally(new SlowFetchHintException());
+ }
+ }, timeout, unit), cf));
return cf;
}
@Override
public CompletableFuture fetch(long startOffset, long endOffset, int maxBytesHint) {
+ String holdUpKey = startOffset + "-" + endOffset + "-" + maxBytesHint;
CompletableFuture cf = new CompletableFuture<>();
- fetch0(startOffset, endOffset, maxBytesHint, cf);
+ // If this thread is not marked, then just fetch data.
+ if (!SeparateSlowAndQuickFetchHint.isMarked()) {
+ if (holdUpFetchingFutureMap.containsKey(holdUpKey)) {
+ holdUpFetchingFutureMap.remove(holdUpKey).thenAccept(cf::complete);
+ } else {
+ fetch0(startOffset, endOffset, maxBytesHint, cf);
+ }
+ } else {
+ // Try to have a quick fetch. If fetching is timeout, then complete with SlowFetchHintException.
+ timeoutAndStoreFuture(holdUpKey, stream.fetch(startOffset, endOffset, maxBytesHint), SLOW_FETCH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)
+ .whenComplete((rst, ex) -> FutureUtil.suppress(() -> {
+ if (ex != null) {
+ if (closed) {
+ cf.completeExceptionally(new IllegalStateException("stream already closed"));
+ } else if (ex instanceof SlowFetchHintException) {
+ LOGGER.debug("Fetch stream[{}] [{},{}) timeout for {} ms, retry later with slow fetching", streamId(), startOffset, endOffset, SLOW_FETCH_TIMEOUT_MILLIS);
+ cf.completeExceptionally(ex);
+ } else {
+ cf.completeExceptionally(ex);
+ }
+ } else {
+ cf.complete(rst);
+ }
+ }, LOGGER));
+ }
return cf;
}
@@ -236,22 +308,42 @@ public CompletableFuture destroy() {
}
static final class Delayer {
+
static ScheduledFuture> delay(Runnable command, long delay,
TimeUnit unit) {
return DELAY_FETCH_SCHEDULER.schedule(command, delay, unit);
}
}
- static final class Canceller implements BiConsumer