Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class AlwaysSuccessClient implements Client {

private static final Logger LOGGER = LoggerFactory.getLogger(AlwaysSuccessClient.class);
public static final Set<Short> HALT_ERROR_CODES = Set.of(ErrorCode.EXPIRED_STREAM_EPOCH, ErrorCode.STREAM_ALREADY_CLOSED);
public static final long SLOW_FETCH_TIMEOUT_MILLIS = 10;
public static final long DEFAULT_SLOW_FETCH_TIMEOUT_MILLIS = 10;
private final ScheduledExecutorService streamManagerRetryScheduler = Executors.newScheduledThreadPool(1,
ThreadUtils.createThreadFactory("stream-manager-retry-%d", true));
private final ExecutorService streamManagerCallbackExecutors = Executors.newFixedThreadPool(1,
Expand Down Expand Up @@ -77,16 +77,22 @@ public class AlwaysSuccessClient implements Client {
* due to the delay in updating the committed offset.
*/
private final boolean appendCallbackAsync;
private final long slowFetchTimeoutMillis;

public AlwaysSuccessClient(Client client) {
this(client, true);
this(client, true, DEFAULT_SLOW_FETCH_TIMEOUT_MILLIS);
}

public AlwaysSuccessClient(Client client, boolean appendCallbackAsync) {
this(client, appendCallbackAsync, DEFAULT_SLOW_FETCH_TIMEOUT_MILLIS);
}

public AlwaysSuccessClient(Client client, boolean appendCallbackAsync, long slowFetchTimeoutMillis) {
this.streamClient = new StreamClientImpl(client.streamClient());
this.kvClient = client.kvClient();
this.delayer = new Delayer(delayFetchScheduler);
this.appendCallbackAsync = appendCallbackAsync;
this.slowFetchTimeoutMillis = slowFetchTimeoutMillis;
}

@Override
Expand Down Expand Up @@ -316,11 +322,11 @@ public CompletableFuture<FetchResult> fetch(long startOffset, long endOffset, in
CompletableFuture<FetchResult> firstFetchFuture = new CompletableFuture<>();
fetch0(startOffset, endOffset, maxBytesHint, firstFetchFuture);
// Try to have a quick fetch. If the first fetching is timeout, then complete with SlowFetchHintException.
timeoutAndStoreFuture(holdUpKey, firstFetchFuture, SLOW_FETCH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)
timeoutAndStoreFuture(holdUpKey, firstFetchFuture, slowFetchTimeoutMillis, TimeUnit.MILLISECONDS)
.whenComplete((rst, ex) -> FutureUtil.suppress(() -> {
if (ex != null) {
if (ex instanceof SlowFetchHintException) {
LOGGER.debug("Fetch stream[{}] [{},{}) timeout for {} ms, retry later with slow fetching", streamId(), startOffset, endOffset, SLOW_FETCH_TIMEOUT_MILLIS);
LOGGER.debug("Fetch stream[{}] [{},{}) timeout for {} ms, retry later with slow fetching", streamId(), startOffset, endOffset, DEFAULT_SLOW_FETCH_TIMEOUT_MILLIS);
cf.completeExceptionally(ex);
} else if (!maybeHaltAndCompleteWaitingFuture(ex, cf)) {
cf.completeExceptionally(ex);
Expand Down
140 changes: 69 additions & 71 deletions core/src/test/java/kafka/log/es/AlwaysSuccessClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.automq.elasticstream.client.api.RecordBatchWithContext;
import com.automq.elasticstream.client.api.Stream;
import com.automq.elasticstream.client.api.StreamClient;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand All @@ -41,7 +40,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.kafka.common.errors.es.SlowFetchHintException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -54,7 +52,6 @@

@Tag("esUnit")
class AlwaysSuccessClientTest {
private static final long SLOW_FETCH_TIMEOUT_MILLIS = AlwaysSuccessClient.SLOW_FETCH_TIMEOUT_MILLIS;
private AlwaysSuccessClient client;

@BeforeEach
Expand All @@ -72,14 +69,14 @@ public void teardown() {
public void basicAppendAndFetch() throws ExecutionException, InterruptedException {
client = new AlwaysSuccessClient(new MemoryClient());
Stream stream = client
.streamClient()
.createAndOpenStream(CreateStreamOptions.newBuilder().epoch(0).replicaCount(1).build())
.get();
.streamClient()
.createAndOpenStream(CreateStreamOptions.newBuilder().epoch(0).replicaCount(1).build())
.get();
List<byte[]> payloads = List.of("hello".getBytes(), "world".getBytes());
CompletableFuture.allOf(
payloads
.stream()
.map(payload -> stream.append(RawPayloadRecordBatch.of(ByteBuffer.wrap(payload)))).toArray(CompletableFuture[]::new)
payloads
.stream()
.map(payload -> stream.append(RawPayloadRecordBatch.of(ByteBuffer.wrap(payload)))).toArray(CompletableFuture[]::new)
).get();

FetchResult fetched = stream.fetch(0, 100, 1000).get();
Expand All @@ -91,25 +88,26 @@ public void basicAppendAndFetch() throws ExecutionException, InterruptedExceptio
@Test
public void testQuickFetch() throws ExecutionException, InterruptedException {
MemoryClientWithDelay memoryClientWithDelay = new MemoryClientWithDelay();
client = new AlwaysSuccessClient(memoryClientWithDelay);
List<Long> quickFetchDelayMillisList = List.of(1L, SLOW_FETCH_TIMEOUT_MILLIS / 2);
long slowFetchTimeoutMillis = 1000 * 2;
client = new AlwaysSuccessClient(memoryClientWithDelay, false, slowFetchTimeoutMillis);
List<Long> quickFetchDelayMillisList = List.of(1L, slowFetchTimeoutMillis / 2);
List<byte[]> payloads = List.of("hello".getBytes(), "world".getBytes());

// test quick fetch
for (Long delay : quickFetchDelayMillisList) {
memoryClientWithDelay.setDelayMillis(delay);
Stream stream = client
.streamClient()
.createAndOpenStream(CreateStreamOptions.newBuilder().epoch(0).replicaCount(1).build())
.get();
.streamClient()
.createAndOpenStream(CreateStreamOptions.newBuilder().epoch(0).replicaCount(1).build())
.get();
CompletableFuture.allOf(
payloads
.stream()
.map(payload -> stream.append(RawPayloadRecordBatch.of(ByteBuffer.wrap(payload)))).toArray(CompletableFuture[]::new)
payloads
.stream()
.map(payload -> stream.append(RawPayloadRecordBatch.of(ByteBuffer.wrap(payload)))).toArray(CompletableFuture[]::new)
).get();
FetchResult fetched = stream.fetch(0, 100, 1000)
.orTimeout(delay + 100, TimeUnit.MILLISECONDS)
.get();
.orTimeout(delay + slowFetchTimeoutMillis / 2, TimeUnit.MILLISECONDS)
.get();
checkAppendAndFetch(payloads, fetched);
stream.destroy();
}
Expand All @@ -118,27 +116,28 @@ public void testQuickFetch() throws ExecutionException, InterruptedException {
@Test
public void testSlowFetch() throws ExecutionException, InterruptedException {
MemoryClientWithDelay memoryClientWithDelay = new MemoryClientWithDelay();
client = new AlwaysSuccessClient(memoryClientWithDelay);
long slowFetchTimeoutMillis = 1000 * 2;
client = new AlwaysSuccessClient(memoryClientWithDelay, false, slowFetchTimeoutMillis);
List<byte[]> payloads = List.of("hello".getBytes(), "world".getBytes());

long slowFetchDelay = 500 + SLOW_FETCH_TIMEOUT_MILLIS + SLOW_FETCH_TIMEOUT_MILLIS / 2;
long slowFetchDelay = slowFetchTimeoutMillis * 3 / 2;
memoryClientWithDelay.setDelayMillis(slowFetchDelay);
Stream stream = client
.streamClient()
.createAndOpenStream(CreateStreamOptions.newBuilder().epoch(0).replicaCount(1).build())
.get();
.streamClient()
.createAndOpenStream(CreateStreamOptions.newBuilder().epoch(0).replicaCount(1).build())
.get();
CompletableFuture.allOf(
payloads
.stream()
.map(payload -> stream.append(RawPayloadRecordBatch.of(ByteBuffer.wrap(payload)))).toArray(CompletableFuture[]::new)
payloads
.stream()
.map(payload -> stream.append(RawPayloadRecordBatch.of(ByteBuffer.wrap(payload)))).toArray(CompletableFuture[]::new)
).get();

FetchResult fetched = null;
AtomicBoolean gotSlowFetchHintException = new AtomicBoolean(false);
try {
fetched = stream.fetch(0, 100, 1000)
.orTimeout(SLOW_FETCH_TIMEOUT_MILLIS * 2, TimeUnit.MILLISECONDS)
.get();
.orTimeout(slowFetchDelay, TimeUnit.MILLISECONDS)
.get();
checkAppendAndFetch(payloads, fetched);
} catch (ExecutionException e) {
// should throw SlowFetchHintException after SLOW_FETCH_TIMEOUT_MILLIS ms
Expand All @@ -147,8 +146,8 @@ public void testSlowFetch() throws ExecutionException, InterruptedException {
SeparateSlowAndQuickFetchHint.reset();
// It should reuse the fetching future above, therefore only (SLOW_FETCH_TIMEOUT_MILLIS / 2) ms is tolerable.
fetched = stream.fetch(0, 100, 1000)
.orTimeout(slowFetchDelay + 100, TimeUnit.MILLISECONDS)
.get();
.orTimeout(slowFetchTimeoutMillis - 200, TimeUnit.MILLISECONDS)
.get();
}
checkAppendAndFetch(payloads, fetched);
assertTrue(gotSlowFetchHintException.get(), "should throw SlowFetchHintException");
Expand All @@ -163,12 +162,12 @@ public void testOpenStream() {

AtomicBoolean exceptionThrown = new AtomicBoolean(false);
openStream(1)
.exceptionally(e -> {
assertEquals(IOException.class, e.getClass());
exceptionThrown.set(true);
return null;
})
.join();
.exceptionally(e -> {
assertEquals(IOException.class, e.getClass());
exceptionThrown.set(true);
return null;
})
.join();

assertTrue(exceptionThrown.get(), "should throw IOException");
}
Expand All @@ -180,40 +179,40 @@ public void testStreamOperationHalt() {
client = new AlwaysSuccessClient(memoryClientWithDelay);

Stream stream = client
.streamClient()
.createAndOpenStream(CreateStreamOptions.newBuilder().epoch(0).replicaCount(1).build())
.join();
.streamClient()
.createAndOpenStream(CreateStreamOptions.newBuilder().epoch(0).replicaCount(1).build())
.join();

AtomicInteger exceptionCount = new AtomicInteger(0);
stream
.append(RawPayloadRecordBatch.of(ByteBuffer.wrap("hello".getBytes())))
.exceptionally(e -> {
assertEquals(IOException.class, e.getClass());
exceptionCount.incrementAndGet();
return null;
})
.join();
.append(RawPayloadRecordBatch.of(ByteBuffer.wrap("hello".getBytes())))
.exceptionally(e -> {
assertEquals(IOException.class, e.getClass());
exceptionCount.incrementAndGet();
return null;
})
.join();
stream.fetch(0, 100, 1000)
.exceptionally(e -> {
assertEquals(IOException.class, e.getClass());
exceptionCount.incrementAndGet();
return null;
})
.join();
.exceptionally(e -> {
assertEquals(IOException.class, e.getClass());
exceptionCount.incrementAndGet();
return null;
})
.join();
stream.trim(0)
.exceptionally(e -> {
assertEquals(IOException.class, e.getClass());
exceptionCount.incrementAndGet();
return null;
})
.join();
.exceptionally(e -> {
assertEquals(IOException.class, e.getClass());
exceptionCount.incrementAndGet();
return null;
})
.join();
stream.close()
.exceptionally(e -> {
assertEquals(IOException.class, e.getClass());
exceptionCount.incrementAndGet();
return null;
})
.join();
.exceptionally(e -> {
assertEquals(IOException.class, e.getClass());
exceptionCount.incrementAndGet();
return null;
})
.join();
assertEquals(4, exceptionCount.get(), "should throw IOException 4 times");
stream.destroy();
}
Expand Down Expand Up @@ -252,8 +251,8 @@ public void testNormalExceptionHandling() {

private CompletableFuture<Stream> openStream(long streamId) {
return client
.streamClient()
.openStream(streamId, OpenStreamOptions.newBuilder().epoch(1).build());
.streamClient()
.openStream(streamId, OpenStreamOptions.newBuilder().epoch(1).build());
}

private void checkAppendAndFetch(List<byte[]> rawPayloads, FetchResult fetched) {
Expand All @@ -267,7 +266,6 @@ static final class MemoryClientWithDelay extends MemoryClient {

/**
* Set the additional fetching delay
*
* @param delayMillis
*/
public void setDelayMillis(long delayMillis) {
Expand Down Expand Up @@ -426,9 +424,9 @@ static enum ExceptionHint {
OK;

private static final List<Exception> OTHER_EXCEPTION_LIST = List.of(
new IOException("io exception"),
new RuntimeException("runtime exception"),
new ElasticStreamClientException(-1, "other exception")
new IOException("io exception"),
new RuntimeException("runtime exception"),
new ElasticStreamClientException(-1, "other exception")
);

public Exception generateException() {
Expand Down