diff --git a/core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java b/core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java index 7e1111535a..2d67975210 100644 --- a/core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java +++ b/core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java @@ -48,7 +48,7 @@ public class AlwaysSuccessClient implements Client { private static final Logger LOGGER = LoggerFactory.getLogger(AlwaysSuccessClient.class); public static final Set HALT_ERROR_CODES = Set.of(ErrorCode.EXPIRED_STREAM_EPOCH, ErrorCode.STREAM_ALREADY_CLOSED); - public static final long SLOW_FETCH_TIMEOUT_MILLIS = 10; + public static final long DEFAULT_SLOW_FETCH_TIMEOUT_MILLIS = 10; private final ScheduledExecutorService streamManagerRetryScheduler = Executors.newScheduledThreadPool(1, ThreadUtils.createThreadFactory("stream-manager-retry-%d", true)); private final ExecutorService streamManagerCallbackExecutors = Executors.newFixedThreadPool(1, @@ -77,16 +77,22 @@ public class AlwaysSuccessClient implements Client { * due to the delay in updating the committed offset. */ private final boolean appendCallbackAsync; + private final long slowFetchTimeoutMillis; public AlwaysSuccessClient(Client client) { - this(client, true); + this(client, true, DEFAULT_SLOW_FETCH_TIMEOUT_MILLIS); } public AlwaysSuccessClient(Client client, boolean appendCallbackAsync) { + this(client, appendCallbackAsync, DEFAULT_SLOW_FETCH_TIMEOUT_MILLIS); + } + + public AlwaysSuccessClient(Client client, boolean appendCallbackAsync, long slowFetchTimeoutMillis) { this.streamClient = new StreamClientImpl(client.streamClient()); this.kvClient = client.kvClient(); this.delayer = new Delayer(delayFetchScheduler); this.appendCallbackAsync = appendCallbackAsync; + this.slowFetchTimeoutMillis = slowFetchTimeoutMillis; } @Override @@ -316,11 +322,11 @@ public CompletableFuture fetch(long startOffset, long endOffset, in CompletableFuture firstFetchFuture = new CompletableFuture<>(); fetch0(startOffset, endOffset, maxBytesHint, firstFetchFuture); // Try to have a quick fetch. If the first fetching is timeout, then complete with SlowFetchHintException. - timeoutAndStoreFuture(holdUpKey, firstFetchFuture, SLOW_FETCH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS) + timeoutAndStoreFuture(holdUpKey, firstFetchFuture, slowFetchTimeoutMillis, TimeUnit.MILLISECONDS) .whenComplete((rst, ex) -> FutureUtil.suppress(() -> { if (ex != null) { if (ex instanceof SlowFetchHintException) { - LOGGER.debug("Fetch stream[{}] [{},{}) timeout for {} ms, retry later with slow fetching", streamId(), startOffset, endOffset, SLOW_FETCH_TIMEOUT_MILLIS); + LOGGER.debug("Fetch stream[{}] [{},{}) timeout for {} ms, retry later with slow fetching", streamId(), startOffset, endOffset, DEFAULT_SLOW_FETCH_TIMEOUT_MILLIS); cf.completeExceptionally(ex); } else if (!maybeHaltAndCompleteWaitingFuture(ex, cf)) { cf.completeExceptionally(ex); diff --git a/core/src/test/java/kafka/log/es/AlwaysSuccessClientTest.java b/core/src/test/java/kafka/log/es/AlwaysSuccessClientTest.java index 926942c131..932eb677dd 100644 --- a/core/src/test/java/kafka/log/es/AlwaysSuccessClientTest.java +++ b/core/src/test/java/kafka/log/es/AlwaysSuccessClientTest.java @@ -26,7 +26,6 @@ import com.automq.elasticstream.client.api.RecordBatchWithContext; import com.automq.elasticstream.client.api.Stream; import com.automq.elasticstream.client.api.StreamClient; - import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -41,7 +40,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; - import org.apache.kafka.common.errors.es.SlowFetchHintException; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -54,7 +52,6 @@ @Tag("esUnit") class AlwaysSuccessClientTest { - private static final long SLOW_FETCH_TIMEOUT_MILLIS = AlwaysSuccessClient.SLOW_FETCH_TIMEOUT_MILLIS; private AlwaysSuccessClient client; @BeforeEach @@ -72,14 +69,14 @@ public void teardown() { public void basicAppendAndFetch() throws ExecutionException, InterruptedException { client = new AlwaysSuccessClient(new MemoryClient()); Stream stream = client - .streamClient() - .createAndOpenStream(CreateStreamOptions.newBuilder().epoch(0).replicaCount(1).build()) - .get(); + .streamClient() + .createAndOpenStream(CreateStreamOptions.newBuilder().epoch(0).replicaCount(1).build()) + .get(); List payloads = List.of("hello".getBytes(), "world".getBytes()); CompletableFuture.allOf( - payloads - .stream() - .map(payload -> stream.append(RawPayloadRecordBatch.of(ByteBuffer.wrap(payload)))).toArray(CompletableFuture[]::new) + payloads + .stream() + .map(payload -> stream.append(RawPayloadRecordBatch.of(ByteBuffer.wrap(payload)))).toArray(CompletableFuture[]::new) ).get(); FetchResult fetched = stream.fetch(0, 100, 1000).get(); @@ -91,25 +88,26 @@ public void basicAppendAndFetch() throws ExecutionException, InterruptedExceptio @Test public void testQuickFetch() throws ExecutionException, InterruptedException { MemoryClientWithDelay memoryClientWithDelay = new MemoryClientWithDelay(); - client = new AlwaysSuccessClient(memoryClientWithDelay); - List quickFetchDelayMillisList = List.of(1L, SLOW_FETCH_TIMEOUT_MILLIS / 2); + long slowFetchTimeoutMillis = 1000 * 2; + client = new AlwaysSuccessClient(memoryClientWithDelay, false, slowFetchTimeoutMillis); + List quickFetchDelayMillisList = List.of(1L, slowFetchTimeoutMillis / 2); List payloads = List.of("hello".getBytes(), "world".getBytes()); // test quick fetch for (Long delay : quickFetchDelayMillisList) { memoryClientWithDelay.setDelayMillis(delay); Stream stream = client - .streamClient() - .createAndOpenStream(CreateStreamOptions.newBuilder().epoch(0).replicaCount(1).build()) - .get(); + .streamClient() + .createAndOpenStream(CreateStreamOptions.newBuilder().epoch(0).replicaCount(1).build()) + .get(); CompletableFuture.allOf( - payloads - .stream() - .map(payload -> stream.append(RawPayloadRecordBatch.of(ByteBuffer.wrap(payload)))).toArray(CompletableFuture[]::new) + payloads + .stream() + .map(payload -> stream.append(RawPayloadRecordBatch.of(ByteBuffer.wrap(payload)))).toArray(CompletableFuture[]::new) ).get(); FetchResult fetched = stream.fetch(0, 100, 1000) - .orTimeout(delay + 100, TimeUnit.MILLISECONDS) - .get(); + .orTimeout(delay + slowFetchTimeoutMillis / 2, TimeUnit.MILLISECONDS) + .get(); checkAppendAndFetch(payloads, fetched); stream.destroy(); } @@ -118,27 +116,28 @@ public void testQuickFetch() throws ExecutionException, InterruptedException { @Test public void testSlowFetch() throws ExecutionException, InterruptedException { MemoryClientWithDelay memoryClientWithDelay = new MemoryClientWithDelay(); - client = new AlwaysSuccessClient(memoryClientWithDelay); + long slowFetchTimeoutMillis = 1000 * 2; + client = new AlwaysSuccessClient(memoryClientWithDelay, false, slowFetchTimeoutMillis); List payloads = List.of("hello".getBytes(), "world".getBytes()); - long slowFetchDelay = 500 + SLOW_FETCH_TIMEOUT_MILLIS + SLOW_FETCH_TIMEOUT_MILLIS / 2; + long slowFetchDelay = slowFetchTimeoutMillis * 3 / 2; memoryClientWithDelay.setDelayMillis(slowFetchDelay); Stream stream = client - .streamClient() - .createAndOpenStream(CreateStreamOptions.newBuilder().epoch(0).replicaCount(1).build()) - .get(); + .streamClient() + .createAndOpenStream(CreateStreamOptions.newBuilder().epoch(0).replicaCount(1).build()) + .get(); CompletableFuture.allOf( - payloads - .stream() - .map(payload -> stream.append(RawPayloadRecordBatch.of(ByteBuffer.wrap(payload)))).toArray(CompletableFuture[]::new) + payloads + .stream() + .map(payload -> stream.append(RawPayloadRecordBatch.of(ByteBuffer.wrap(payload)))).toArray(CompletableFuture[]::new) ).get(); FetchResult fetched = null; AtomicBoolean gotSlowFetchHintException = new AtomicBoolean(false); try { fetched = stream.fetch(0, 100, 1000) - .orTimeout(SLOW_FETCH_TIMEOUT_MILLIS * 2, TimeUnit.MILLISECONDS) - .get(); + .orTimeout(slowFetchDelay, TimeUnit.MILLISECONDS) + .get(); checkAppendAndFetch(payloads, fetched); } catch (ExecutionException e) { // should throw SlowFetchHintException after SLOW_FETCH_TIMEOUT_MILLIS ms @@ -147,8 +146,8 @@ public void testSlowFetch() throws ExecutionException, InterruptedException { SeparateSlowAndQuickFetchHint.reset(); // It should reuse the fetching future above, therefore only (SLOW_FETCH_TIMEOUT_MILLIS / 2) ms is tolerable. fetched = stream.fetch(0, 100, 1000) - .orTimeout(slowFetchDelay + 100, TimeUnit.MILLISECONDS) - .get(); + .orTimeout(slowFetchTimeoutMillis - 200, TimeUnit.MILLISECONDS) + .get(); } checkAppendAndFetch(payloads, fetched); assertTrue(gotSlowFetchHintException.get(), "should throw SlowFetchHintException"); @@ -163,12 +162,12 @@ public void testOpenStream() { AtomicBoolean exceptionThrown = new AtomicBoolean(false); openStream(1) - .exceptionally(e -> { - assertEquals(IOException.class, e.getClass()); - exceptionThrown.set(true); - return null; - }) - .join(); + .exceptionally(e -> { + assertEquals(IOException.class, e.getClass()); + exceptionThrown.set(true); + return null; + }) + .join(); assertTrue(exceptionThrown.get(), "should throw IOException"); } @@ -180,40 +179,40 @@ public void testStreamOperationHalt() { client = new AlwaysSuccessClient(memoryClientWithDelay); Stream stream = client - .streamClient() - .createAndOpenStream(CreateStreamOptions.newBuilder().epoch(0).replicaCount(1).build()) - .join(); + .streamClient() + .createAndOpenStream(CreateStreamOptions.newBuilder().epoch(0).replicaCount(1).build()) + .join(); AtomicInteger exceptionCount = new AtomicInteger(0); stream - .append(RawPayloadRecordBatch.of(ByteBuffer.wrap("hello".getBytes()))) - .exceptionally(e -> { - assertEquals(IOException.class, e.getClass()); - exceptionCount.incrementAndGet(); - return null; - }) - .join(); + .append(RawPayloadRecordBatch.of(ByteBuffer.wrap("hello".getBytes()))) + .exceptionally(e -> { + assertEquals(IOException.class, e.getClass()); + exceptionCount.incrementAndGet(); + return null; + }) + .join(); stream.fetch(0, 100, 1000) - .exceptionally(e -> { - assertEquals(IOException.class, e.getClass()); - exceptionCount.incrementAndGet(); - return null; - }) - .join(); + .exceptionally(e -> { + assertEquals(IOException.class, e.getClass()); + exceptionCount.incrementAndGet(); + return null; + }) + .join(); stream.trim(0) - .exceptionally(e -> { - assertEquals(IOException.class, e.getClass()); - exceptionCount.incrementAndGet(); - return null; - }) - .join(); + .exceptionally(e -> { + assertEquals(IOException.class, e.getClass()); + exceptionCount.incrementAndGet(); + return null; + }) + .join(); stream.close() - .exceptionally(e -> { - assertEquals(IOException.class, e.getClass()); - exceptionCount.incrementAndGet(); - return null; - }) - .join(); + .exceptionally(e -> { + assertEquals(IOException.class, e.getClass()); + exceptionCount.incrementAndGet(); + return null; + }) + .join(); assertEquals(4, exceptionCount.get(), "should throw IOException 4 times"); stream.destroy(); } @@ -252,8 +251,8 @@ public void testNormalExceptionHandling() { private CompletableFuture openStream(long streamId) { return client - .streamClient() - .openStream(streamId, OpenStreamOptions.newBuilder().epoch(1).build()); + .streamClient() + .openStream(streamId, OpenStreamOptions.newBuilder().epoch(1).build()); } private void checkAppendAndFetch(List rawPayloads, FetchResult fetched) { @@ -267,7 +266,6 @@ static final class MemoryClientWithDelay extends MemoryClient { /** * Set the additional fetching delay - * * @param delayMillis */ public void setDelayMillis(long delayMillis) { @@ -426,9 +424,9 @@ static enum ExceptionHint { OK; private static final List OTHER_EXCEPTION_LIST = List.of( - new IOException("io exception"), - new RuntimeException("runtime exception"), - new ElasticStreamClientException(-1, "other exception") + new IOException("io exception"), + new RuntimeException("runtime exception"), + new ElasticStreamClientException(-1, "other exception") ); public Exception generateException() {