Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name: Build
on:
pull_request:
Expand Down
17 changes: 17 additions & 0 deletions .github/workflows/es_unit_tests.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This workflow uses actions that are not certified by GitHub.
# They are provided by a third-party and are governed by
# separate terms of service, privacy policy, and support
Expand Down Expand Up @@ -31,5 +46,7 @@ jobs:
distribution: "zulu"
- name: Setup Gradle
uses: gradle/gradle-build-action@v2
- name: Checkstyle and Spotbugs
run: ./gradlew rat checkstyleMain checkstyleTest spotbugsMain spotbugsTest
- name: Execute Gradle build
run: ./gradlew metadata:esUnitTest core:esUnitTest metadata:S3UnitTest core:S3UnitTest
15 changes: 15 additions & 0 deletions .github/workflows/validate_pr_title.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name: "Lint PR"

on:
Expand Down
2 changes: 1 addition & 1 deletion checkstyle/import-control-core.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
<allow pkg="io.netty" />
<allow pkg="redis.clients.jedis" />
<allow pkg="kafka.server" />
<allow pkg="kafka.log.s3" />
<allow pkg="kafka.log.es" />
<allow pkg="kafka.log.s3" />
<allow pkg="com.google.flatbuffers" />
<allow pkg="org.apache.kafka.metadata" />
</subpackage>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
* Indicates that the fetch request was too slow to be served. The request should be served in separated thread pool.
*/
public class SlowFetchHintException extends RetriableException {

private static final long serialVersionUID = 1L;

public SlowFetchHintException() {
super();
}
Expand Down
136 changes: 114 additions & 22 deletions core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,34 +28,36 @@
import com.automq.elasticstream.client.api.StreamClient;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;

import org.apache.kafka.common.errors.es.SlowFetchHintException;
import org.apache.kafka.common.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@SuppressWarnings("uncheck")
public class AlwaysSuccessClient implements Client {

private static final Logger LOGGER = LoggerFactory.getLogger(AlwaysSuccessClient.class);
public static final long SLOW_FETCH_TIMEOUT_MILLIS = 10;
private static final ScheduledExecutorService STREAM_MANAGER_RETRY_SCHEDULER = Executors.newScheduledThreadPool(1,
ThreadUtils.createThreadFactory("stream-manager-retry-%d", true));
ThreadUtils.createThreadFactory("stream-manager-retry-%d", true));
private static final ExecutorService STREAM_MANAGER_CALLBACK_EXECUTORS = Executors.newFixedThreadPool(1,
ThreadUtils.createThreadFactory("stream-manager-callback-executor-%d", true));
ThreadUtils.createThreadFactory("stream-manager-callback-executor-%d", true));
private static final ScheduledExecutorService FETCH_RETRY_SCHEDULER = Executors.newScheduledThreadPool(1,
ThreadUtils.createThreadFactory("fetch-retry-scheduler-%d", true));
ThreadUtils.createThreadFactory("fetch-retry-scheduler-%d", true));
private static final ExecutorService APPEND_CALLBACK_EXECUTORS = Executors.newFixedThreadPool(4,
ThreadUtils.createThreadFactory("append-callback-scheduler-%d", true));
ThreadUtils.createThreadFactory("append-callback-scheduler-%d", true));
private static final ExecutorService FETCH_CALLBACK_EXECUTORS = Executors.newFixedThreadPool(4,
ThreadUtils.createThreadFactory("fetch-callback-scheduler-%d", true));
ThreadUtils.createThreadFactory("fetch-callback-scheduler-%d", true));
private static final ScheduledExecutorService DELAY_FETCH_SCHEDULER = Executors.newScheduledThreadPool(1,
ThreadUtils.createThreadFactory("fetch-delayer-%d", true));
private final StreamClient streamClient;
Expand All @@ -78,6 +80,7 @@ public KVClient kvClient() {

// TODO: do not retry when stream closed.
static class StreamClientImpl implements StreamClient {

private final StreamClient streamClient;

public StreamClientImpl(StreamClient streamClient) {
Expand Down Expand Up @@ -126,10 +129,10 @@ private void openStream0(long streamId, OpenStreamOptions options, CompletableFu
}

static class StreamImpl implements Stream {

private final Stream stream;
private volatile boolean closed = false;
private final Map<String, CompletableFuture<FetchResult>> holdUpFetchingFutureMap = new ConcurrentHashMap<>();
private static final long SLOW_FETCH_TIMEOUT_MILLIS = 10;

public StreamImpl(Stream stream) {
this.stream = stream;
Expand All @@ -154,20 +157,89 @@ public long nextOffset() {
public CompletableFuture<AppendResult> append(RecordBatch recordBatch) {
CompletableFuture<AppendResult> cf = new CompletableFuture<>();
stream.append(recordBatch)
.whenComplete((rst, ex) -> FutureUtil.suppress(() -> {
if (ex != null) {
cf.completeExceptionally(ex);
.whenComplete((rst, ex) -> FutureUtil.suppress(() -> {
if (ex != null) {
cf.completeExceptionally(ex);
} else {
cf.complete(rst);
}
}, LOGGER));
return cf;
}

/**
* Get a new CompletableFuture with a {@link SlowFetchHintException} if not otherwise completed before the given timeout.
*
* @param id the id of rawFuture in holdUpFetchingFutureMap
* @param rawFuture the raw future
* @param timeout how long to wait before completing exceptionally with a SlowFetchHintException, in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
* @return a new CompletableFuture with completed results of the rawFuture if the raw future is done before timeout, otherwise a new
* CompletableFuture with a {@link SlowFetchHintException}
*/
private CompletableFuture<FetchResult> timeoutAndStoreFuture(String id,
CompletableFuture<FetchResult> rawFuture, long timeout,
TimeUnit unit) {
if (unit == null) {
throw new NullPointerException();
}

if (rawFuture.isDone()) {
return rawFuture;
}

final CompletableFuture<FetchResult> cf = new CompletableFuture<>();
rawFuture.whenComplete(new CompleteFetchingFutureAndCancelTimeoutCheck(Delayer.delay(() -> {
if (rawFuture == null) {
return;
}

// If rawFuture is done, then complete the cf with the result of rawFuture.
if (rawFuture.isDone()) {
rawFuture.whenComplete((result, exception) -> {
if (exception != null) {
cf.completeExceptionally(exception);
} else {
cf.complete(rst);
cf.complete(result);
}
}, LOGGER));
});
} else { // else, complete the cf with a SlowFetchHintException and store the rawFuture for slow fetching.
holdUpFetchingFutureMap.putIfAbsent(id, rawFuture);
cf.completeExceptionally(new SlowFetchHintException());
}
}, timeout, unit), cf));
return cf;
}

@Override
public CompletableFuture<FetchResult> fetch(long startOffset, long endOffset, int maxBytesHint) {
String holdUpKey = startOffset + "-" + endOffset + "-" + maxBytesHint;
CompletableFuture<FetchResult> cf = new CompletableFuture<>();
fetch0(startOffset, endOffset, maxBytesHint, cf);
// If this thread is not marked, then just fetch data.
if (!SeparateSlowAndQuickFetchHint.isMarked()) {
if (holdUpFetchingFutureMap.containsKey(holdUpKey)) {
holdUpFetchingFutureMap.remove(holdUpKey).thenAccept(cf::complete);
} else {
fetch0(startOffset, endOffset, maxBytesHint, cf);
}
} else {
// Try to have a quick fetch. If fetching is timeout, then complete with SlowFetchHintException.
timeoutAndStoreFuture(holdUpKey, stream.fetch(startOffset, endOffset, maxBytesHint), SLOW_FETCH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)
.whenComplete((rst, ex) -> FutureUtil.suppress(() -> {
if (ex != null) {
if (closed) {
cf.completeExceptionally(new IllegalStateException("stream already closed"));
} else if (ex instanceof SlowFetchHintException) {
LOGGER.debug("Fetch stream[{}] [{},{}) timeout for {} ms, retry later with slow fetching", streamId(), startOffset, endOffset, SLOW_FETCH_TIMEOUT_MILLIS);
cf.completeExceptionally(ex);
} else {
cf.completeExceptionally(ex);
}
} else {
cf.complete(rst);
}
}, LOGGER));
}
return cf;
}

Expand Down Expand Up @@ -236,22 +308,42 @@ public CompletableFuture<Void> destroy() {
}

static final class Delayer {

static ScheduledFuture<?> delay(Runnable command, long delay,
TimeUnit unit) {
return DELAY_FETCH_SCHEDULER.schedule(command, delay, unit);
}
}

static final class Canceller implements BiConsumer<Object, Throwable> {
final Future<?> f;
/**
* A BiConsumer that completes the FetchResult future and cancels the timeout check task.
*/
static final class CompleteFetchingFutureAndCancelTimeoutCheck implements BiConsumer<FetchResult, Throwable> {
/**
* A ScheduledFuture that represents the timeout check task.
*/
final ScheduledFuture<?> f;
/**
* A CompletableFuture waiting for the fetching result.
*/
final CompletableFuture<FetchResult> waitingFuture;

Canceller(Future<?> f) {
CompleteFetchingFutureAndCancelTimeoutCheck(ScheduledFuture<?> f, CompletableFuture<FetchResult> waitingFuture) {
this.f = f;
this.waitingFuture = waitingFuture;
}

public void accept(Object ignore, Throwable ex) {
public void accept(FetchResult result, Throwable ex) {
// cancels the timeout check task.
if (ex == null && f != null && !f.isDone())
f.cancel(false);

// completes the waiting future right now.
if (ex == null) {
waitingFuture.complete(result);
} else {
waitingFuture.completeExceptionally(ex);
}
}
}
}
31 changes: 6 additions & 25 deletions core/src/main/scala/kafka/log/es/ElasticLogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@

package kafka.log.es

import com.automq.elasticstream.client.DefaultClientBuilder
import kafka.log._
import com.automq.elasticstream.client.api.Client
import kafka.log.{LogConfig, ProducerStateManagerConfig}
import kafka.log.es.ElasticLogManager.NAMESPACE
import kafka.log.es.client.{ClientFactoryProxy, Context}
import kafka.server.{KafkaConfig, LogDirFailureChannel}
import kafka.utils.Scheduler
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Time
import com.automq.elasticstream.client.api.Client

import java.io.File
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
Expand Down Expand Up @@ -80,10 +80,6 @@ class ElasticLogManager(val client: Client) {
}

object ElasticLogManager {
private val ES_ENDPOINT_PREFIX = "es://"
private val MEMORY_ENDPOINT_PREFIX = "memory://"
private val REDIS_ENDPOINT_PREFIX = "redis://"

var INSTANCE: Option[ElasticLogManager] = None
var NAMESPACE = ""

Expand All @@ -105,24 +101,9 @@ object ElasticLogManager {
if (endpoint == null) {
return false
}
if (endpoint.startsWith(ES_ENDPOINT_PREFIX)) {
val kvEndpoint = config.elasticStreamKvEndpoint;
if (!kvEndpoint.startsWith(ES_ENDPOINT_PREFIX)) {
throw new IllegalArgumentException(s"Elastic stream endpoint and kvEndpoint must be the same protocol: $endpoint $kvEndpoint")
}
val streamClient = new AlwaysSuccessClient(new DefaultClientBuilder()
.endpoint(endpoint.substring(ES_ENDPOINT_PREFIX.length))
.kvEndpoint(kvEndpoint.substring(ES_ENDPOINT_PREFIX.length))
.build())
INSTANCE = Some(new ElasticLogManager(streamClient))
} else if (endpoint.startsWith(MEMORY_ENDPOINT_PREFIX)) {
val streamClient = new AlwaysSuccessClient(new MemoryClient())
INSTANCE = Some(new ElasticLogManager(streamClient))
} else if (endpoint.startsWith(REDIS_ENDPOINT_PREFIX)) {
INSTANCE = Some(new ElasticLogManager(new ElasticRedisClient(endpoint.substring(REDIS_ENDPOINT_PREFIX.length))))
} else {
return false
}
val context = new Context()
context.config = config
INSTANCE = Some(new ElasticLogManager(ClientFactoryProxy.get(context)))

val namespace = config.elasticStreamNamespace
NAMESPACE = if (namespace == null || namespace.isEmpty) {
Expand Down
41 changes: 41 additions & 0 deletions core/src/main/scala/kafka/log/es/client/ClientFactoryProxy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.log.es.client;

import com.automq.elasticstream.client.api.Client;

import java.lang.reflect.Method;

public class ClientFactoryProxy {
private static final String PROTOCOL_SEPERATOR = ":";
private static final String FACTORY_CLASS_FORMAT = "kafka.log.es.client.%s.ClientFactory";

public static Client get(Context context) {
String endpoint = context.config.elasticStreamEndpoint();
String protocal = endpoint.split(PROTOCOL_SEPERATOR)[0];
String className = String.format(FACTORY_CLASS_FORMAT, protocal);
try {
Class<?> clazz = Class.forName(className);
Method method = clazz.getDeclaredMethod("get", Context.class);
return (Client) method.invoke(null, context);
} catch (Throwable e) {
throw new RuntimeException(e);
}
}

}
Loading