Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1014,12 +1014,11 @@ project(':core') {
implementation libs.commonsCli

// implementation libs.elasticstream
implementation(libs.esClient) {
exclude group: 'org.slf4j', module: 'slf4j-api'
}
implementation 'redis.clients:jedis:4.3.1'
implementation libs.slf4jlog4j
implementation libs.s3Client
implementation libs.commonLang
implementation libs.nettyAll

implementation libs.zstd

Expand Down
83 changes: 43 additions & 40 deletions core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,22 @@

package kafka.log.es;

import com.automq.elasticstream.client.api.AppendResult;
import com.automq.elasticstream.client.api.Client;
import com.automq.elasticstream.client.api.CreateStreamOptions;
import com.automq.elasticstream.client.api.ElasticStreamClientException;
import com.automq.elasticstream.client.api.FetchResult;
import com.automq.elasticstream.client.api.KVClient;
import com.automq.elasticstream.client.api.OpenStreamOptions;
import com.automq.elasticstream.client.api.RecordBatch;
import com.automq.elasticstream.client.api.Stream;
import com.automq.elasticstream.client.api.StreamClient;
import com.automq.elasticstream.client.flatc.header.ErrorCode;
import kafka.log.es.api.AppendResult;
import kafka.log.es.api.Client;
import kafka.log.es.api.CreateStreamOptions;
import kafka.log.es.api.ElasticStreamClientException;
import kafka.log.es.api.ErrorCode;
import kafka.log.es.api.FetchResult;
import kafka.log.es.api.KVClient;
import kafka.log.es.api.OpenStreamOptions;
import kafka.log.es.api.RecordBatch;
import kafka.log.es.api.Stream;
import kafka.log.es.api.StreamClient;
import org.apache.kafka.common.errors.es.SlowFetchHintException;
import org.apache.kafka.common.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Map;
import java.util.Set;
Expand All @@ -39,34 +44,30 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.apache.kafka.common.errors.es.SlowFetchHintException;
import org.apache.kafka.common.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AlwaysSuccessClient implements Client {

private static final Logger LOGGER = LoggerFactory.getLogger(AlwaysSuccessClient.class);
public static final Set<Short> HALT_ERROR_CODES = Set.of(ErrorCode.EXPIRED_STREAM_EPOCH, ErrorCode.STREAM_ALREADY_CLOSED);
public static final long DEFAULT_SLOW_FETCH_TIMEOUT_MILLIS = 10;
private final ScheduledExecutorService streamManagerRetryScheduler = Executors.newScheduledThreadPool(1,
ThreadUtils.createThreadFactory("stream-manager-retry-%d", true));
ThreadUtils.createThreadFactory("stream-manager-retry-%d", true));
private final ExecutorService streamManagerCallbackExecutors = Executors.newFixedThreadPool(1,
ThreadUtils.createThreadFactory("stream-manager-callback-executor-%d", true));
ThreadUtils.createThreadFactory("stream-manager-callback-executor-%d", true));
private final ScheduledExecutorService appendRetryScheduler = Executors.newScheduledThreadPool(1,
ThreadUtils.createThreadFactory("append-retry-scheduler-%d", true));
ThreadUtils.createThreadFactory("append-retry-scheduler-%d", true));
private final ScheduledExecutorService fetchRetryScheduler = Executors.newScheduledThreadPool(1,
ThreadUtils.createThreadFactory("fetch-retry-scheduler-%d", true));
ThreadUtils.createThreadFactory("fetch-retry-scheduler-%d", true));
private final ScheduledExecutorService generalRetryScheduler = Executors.newScheduledThreadPool(1,
ThreadUtils.createThreadFactory("general-retry-scheduler-%d", true));
ThreadUtils.createThreadFactory("general-retry-scheduler-%d", true));
private final ExecutorService generalCallbackExecutors = Executors.newFixedThreadPool(4,
ThreadUtils.createThreadFactory("general-callback-scheduler-%d", true));
ThreadUtils.createThreadFactory("general-callback-scheduler-%d", true));
private final ExecutorService appendCallbackExecutors = Executors.newFixedThreadPool(4,
ThreadUtils.createThreadFactory("append-callback-scheduler-%d", true));
ThreadUtils.createThreadFactory("append-callback-scheduler-%d", true));
private final ExecutorService fetchCallbackExecutors = Executors.newFixedThreadPool(4,
ThreadUtils.createThreadFactory("fetch-callback-scheduler-%d", true));
ThreadUtils.createThreadFactory("fetch-callback-scheduler-%d", true));
private final ScheduledExecutorService delayFetchScheduler = Executors.newScheduledThreadPool(1,
ThreadUtils.createThreadFactory("fetch-delayer-%d", true));
ThreadUtils.createThreadFactory("fetch-delayer-%d", true));
private final StreamClient streamClient;
private final KVClient kvClient;
private final Delayer delayer;
Expand Down Expand Up @@ -119,6 +120,7 @@ public void shutdownNow() {

/**
* Check if the exception is a ElasticStreamClientException with a halt error code.
*
* @param t the exception
* @return true if the exception is a ElasticStreamClientException with a halt error code, otherwise false
*/
Expand Down Expand Up @@ -235,7 +237,7 @@ private void append0(RecordBatch recordBatch, CompletableFuture<AppendResult> cf
stream.append(recordBatch).whenCompleteAsync((rst, ex) -> FutureUtil.suppress(() -> {
if (ex != null) {
if (!maybeHaltAndCompleteWaitingFuture(ex, cf)) {
LOGGER.error("Appending to stream[{}] failed, retry later", streamId(), ex);
LOGGER.error("Appending to stream[{}] failed, retry later", streamId(), ex);
appendRetryScheduler.schedule(() -> append0(recordBatch, cf), 3, TimeUnit.SECONDS);
}
} else {
Expand All @@ -247,14 +249,15 @@ private void append0(RecordBatch recordBatch, CompletableFuture<AppendResult> cf
/**
* Append to stream without using async callback threadPools.
* <strong> Used for tests only.</strong>
*
* @param recordBatch
* @param cf
*/
private void append0WithSyncCallback(RecordBatch recordBatch, CompletableFuture<AppendResult> cf) {
stream.append(recordBatch).whenComplete((rst, ex) -> FutureUtil.suppress(() -> {
if (ex != null) {
if (!maybeHaltAndCompleteWaitingFuture(ex, cf)) {
LOGGER.error("Appending to stream[{}] failed, retry later", streamId(), ex);
LOGGER.error("Appending to stream[{}] failed, retry later", streamId(), ex);
appendRetryScheduler.schedule(() -> append0(recordBatch, cf), 3, TimeUnit.SECONDS);
}
} else {
Expand All @@ -274,8 +277,8 @@ private void append0WithSyncCallback(RecordBatch recordBatch, CompletableFuture<
* CompletableFuture with a {@link SlowFetchHintException}
*/
private CompletableFuture<FetchResult> timeoutAndStoreFuture(String id,
CompletableFuture<FetchResult> rawFuture, long timeout,
TimeUnit unit) {
CompletableFuture<FetchResult> rawFuture, long timeout,
TimeUnit unit) {
if (unit == null) {
throw new NullPointerException();
}
Expand Down Expand Up @@ -323,18 +326,18 @@ public CompletableFuture<FetchResult> fetch(long startOffset, long endOffset, in
fetch0(startOffset, endOffset, maxBytesHint, firstFetchFuture);
// Try to have a quick fetch. If the first fetching is timeout, then complete with SlowFetchHintException.
timeoutAndStoreFuture(holdUpKey, firstFetchFuture, slowFetchTimeoutMillis, TimeUnit.MILLISECONDS)
.whenComplete((rst, ex) -> FutureUtil.suppress(() -> {
if (ex != null) {
if (ex instanceof SlowFetchHintException) {
LOGGER.debug("Fetch stream[{}] [{},{}) timeout for {} ms, retry later with slow fetching", streamId(), startOffset, endOffset, DEFAULT_SLOW_FETCH_TIMEOUT_MILLIS);
cf.completeExceptionally(ex);
} else if (!maybeHaltAndCompleteWaitingFuture(ex, cf)) {
cf.completeExceptionally(ex);
.whenComplete((rst, ex) -> FutureUtil.suppress(() -> {
if (ex != null) {
if (ex instanceof SlowFetchHintException) {
LOGGER.debug("Fetch stream[{}] [{},{}) timeout for {} ms, retry later with slow fetching", streamId(), startOffset, endOffset, DEFAULT_SLOW_FETCH_TIMEOUT_MILLIS);
cf.completeExceptionally(ex);
} else if (!maybeHaltAndCompleteWaitingFuture(ex, cf)) {
cf.completeExceptionally(ex);
}
} else {
cf.complete(rst);
}
} else {
cf.complete(rst);
}
}, LOGGER));
}, LOGGER));
}
return cf;
}
Expand Down Expand Up @@ -426,7 +429,7 @@ public Delayer(ScheduledExecutorService delayFetchScheduler) {
}

public ScheduledFuture<?> delay(Runnable command, long delay,
TimeUnit unit) {
TimeUnit unit) {
return delayFetchScheduler.schedule(command, delay, unit);
}
}
Expand Down
38 changes: 38 additions & 0 deletions core/src/main/scala/kafka/log/es/DefaultAppendResult.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.log.es;

import kafka.log.es.api.AppendResult;

public class DefaultAppendResult implements AppendResult {
private final long baseOffset;

public DefaultAppendResult(long baseOffset) {
this.baseOffset = baseOffset;
}

@Override
public long baseOffset() {
return baseOffset;
}

public String toString() {
return "AppendResult(baseOffset=" + baseOffset + ")";
}
}

14 changes: 7 additions & 7 deletions core/src/main/scala/kafka/log/es/DefaultElasticStreamSlice.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package kafka.log.es;

import com.automq.elasticstream.client.api.AppendResult;
import com.automq.elasticstream.client.api.FetchResult;
import com.automq.elasticstream.client.api.RecordBatch;
import com.automq.elasticstream.client.api.RecordBatchWithContext;
import com.automq.elasticstream.client.api.Stream;
import kafka.log.es.api.AppendResult;
import kafka.log.es.api.FetchResult;
import kafka.log.es.api.RecordBatch;
import kafka.log.es.api.RecordBatchWithContext;
import kafka.log.es.api.Stream;
import org.apache.kafka.common.errors.es.SlowFetchHintException;
import org.apache.kafka.common.utils.Utils;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -30,8 +32,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.kafka.common.errors.es.SlowFetchHintException;
import org.apache.kafka.common.utils.Utils;

public class DefaultElasticStreamSlice implements ElasticStreamSlice {
/**
Expand Down
61 changes: 61 additions & 0 deletions core/src/main/scala/kafka/log/es/DefaultRecordBatch.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.log.es;

import kafka.log.es.api.RecordBatch;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;

public class DefaultRecordBatch implements RecordBatch {
private final int count;
private final long baseTimestamp;
private final Map<String, String> properties;
private final ByteBuffer rawPayload;

public DefaultRecordBatch(int count, long baseTimestamp, Map<String, String> properties, ByteBuffer rawPayload) {
this.count = count;
this.baseTimestamp = baseTimestamp;
this.properties = properties;
this.rawPayload = rawPayload;
}

@Override
public int count() {
return count;
}

@Override
public long baseTimestamp() {
return baseTimestamp;
}

@Override
public Map<String, String> properties() {
if (properties == null) {
return Collections.emptyMap();
}
return properties;
}

@Override
public ByteBuffer rawPayload() {
return rawPayload.duplicate();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ import kafka.server.epoch.EpochEntry

import scala.jdk.CollectionConverters.{ListHasAsScala, SeqHasAsJava}

class ElasticLeaderEpochCheckpoint(val meta: ElasticLeaderEpochCheckpointMeta, val saveFunc: ElasticLeaderEpochCheckpointMeta => Unit) extends LeaderEpochCheckpoint{
override def write(epochs: Iterable[EpochEntry]): Unit = this.synchronized {
meta.setEntries(epochs.toList.asJava)
saveFunc(meta)
}
class ElasticLeaderEpochCheckpoint(val meta: ElasticLeaderEpochCheckpointMeta, val saveFunc: ElasticLeaderEpochCheckpointMeta => Unit) extends LeaderEpochCheckpoint {
override def write(epochs: Iterable[EpochEntry]): Unit = this.synchronized {
meta.setEntries(epochs.toList.asJava)
saveFunc(meta)
}

override def read(): collection.Seq[EpochEntry] = this.synchronized {
meta.entries().asScala.toSeq
}
override def read(): collection.Seq[EpochEntry] = this.synchronized {
meta.entries().asScala.toSeq
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

package kafka.log.es;

import kafka.server.epoch.EpochEntry;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import kafka.server.epoch.EpochEntry;

public class ElasticLeaderEpochCheckpointMeta {
private final int version;
Expand All @@ -33,11 +34,11 @@ public ElasticLeaderEpochCheckpointMeta(int version, List<EpochEntry> entries) {

public byte[] encode() {
int totalLength = 4 // version
+ 4 // following entries size
+ 12 * entries.size(); // all entries
+ 4 // following entries size
+ 12 * entries.size(); // all entries
ByteBuffer buffer = ByteBuffer.allocate(totalLength)
.putInt(version)
.putInt(entries.size());
.putInt(version)
.putInt(entries.size());
entries.forEach(entry -> buffer.putInt(entry.epoch()).putLong(entry.startOffset()));
buffer.flip();
return buffer.array();
Expand All @@ -51,7 +52,7 @@ public static ElasticLeaderEpochCheckpointMeta decode(ByteBuffer buffer) {
entryList.add(new EpochEntry(buffer.getInt(), buffer.getLong()));
}
if (entryList.size() != entryCount) {
throw new RuntimeException("expect entry count:" + entryCount + ", decoded " + entryList.size() + " entries");
throw new RuntimeException("expect entry count:" + entryCount + ", decoded " + entryList.size() + " entries");
}
return new ElasticLeaderEpochCheckpointMeta(version, entryList);
}
Expand Down
Loading