Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
name: Build
on:
pull_request:
types: [opened, reopened, synchronize]
push:
branches: ["master", "develop"]

jobs:
paths-filter:
runs-on: ubuntu-latest
outputs:
es-unit-test: ${{ steps.filter.outputs.es-unit-test }}
steps:
- uses: actions/checkout@v3
- uses: dorny/paths-filter@v2
id: filter
with:
filters: |
es-unit-test:
- '.github/workflows/**'
- 'core/**'
- 'metadata/**'
es-unit-test:
needs: [paths-filter]
if: ${{ needs.paths-filter.outputs.es-unit-test == 'true' || github.event_name == 'push' }}
uses: ./.github/workflows/es_unit_tests.yml
build-result:
runs-on: ubuntu-latest
needs: [es-unit-test]
if: ${{ always() }}
steps:
- uses: actions/checkout@v3
- name: Collect build result
run: |
if echo es-unit-test-${{ needs.es-unit-test.result }} | grep -E 'cancelled|failure' -o > null
then
echo "There are failed/cancelled builds"
exit 1
else
echo "All builds are successful/skipped"
exit 0
fi
35 changes: 35 additions & 0 deletions .github/workflows/es_unit_tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# This workflow uses actions that are not certified by GitHub.
# They are provided by a third-party and are governed by
# separate terms of service, privacy policy, and support
# documentation.
# This workflow will build a Java project with Gradle and cache/restore any dependencies to improve the workflow execution time
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-java-with-gradle

name: ES Unit Tests

on:
workflow_call:

permissions:
contents: read

jobs:
build:
name: "${{ matrix.os }}, jdk-${{ matrix.jdk }}"
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-22.04]
jdk: [11, 17]
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Set up JDK ${{ matrix.jdk }}
uses: actions/setup-java@v3
with:
java-version: ${{ matrix.jdk }}
distribution: "zulu"
- name: Setup Gradle
uses: gradle/gradle-build-action@v2
- name: Execute Gradle build
run: ./gradlew metadata:esUnitTest core:esUnitTest
17 changes: 17 additions & 0 deletions .github/workflows/validate_pr_title.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
name: "Lint PR"

on:
pull_request_target:
types:
- opened
- edited
- synchronize

jobs:
main:
name: Validate PR title
runs-on: ubuntu-latest
steps:
- uses: amannn/action-semantic-pull-request@v5
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors.es;

import org.apache.kafka.common.errors.RetriableException;

/**
* Indicates that the fetch request was too slow to be served. The request should be served in separated thread pool.
*/
public class SlowFetchHintException extends RetriableException {
private static final long serialVersionUID = 1L;
public SlowFetchHintException() { super();}

public SlowFetchHintException(String message) { super(message); }

public SlowFetchHintException(Throwable cause) { super(cause); }

public SlowFetchHintException(String message, Throwable cause) { super(message, cause); }

}
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/es/AbstractStreamIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ abstract class AbstractStreamIndex(_file: File, val streamSliceSupplier: StreamS
protected var _maxEntries: Int = adjustedMaxIndexSize / entrySize

@volatile
protected var _entries: Int = (stream.nextOffset() / entrySize).toInt
protected var _entries: Int = stream.nextOffset().toInt

@volatile
protected var cache: MappedByteBuffer = {
Expand Down
37 changes: 34 additions & 3 deletions core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
import com.automq.elasticstream.client.api.RecordBatch;
import com.automq.elasticstream.client.api.Stream;
import com.automq.elasticstream.client.api.StreamClient;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.common.errors.es.SlowFetchHintException;
import org.apache.kafka.common.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -123,6 +127,8 @@ private void openStream0(long streamId, OpenStreamOptions options, CompletableFu
static class StreamImpl implements Stream {
private final Stream stream;
private volatile boolean closed = false;
private final Map<String, Boolean> slowFetchingOffsetMap = new ConcurrentHashMap<>();
private final long SLOW_FETCH_TIMEOUT_MILLIS = 10;

public StreamImpl(Stream stream) {
this.stream = stream;
Expand Down Expand Up @@ -160,22 +166,47 @@ public CompletableFuture<AppendResult> append(RecordBatch recordBatch) {

@Override
public CompletableFuture<FetchResult> fetch(long startOffset, long endOffset, int maxBytesHint) {
String slowFetchKey = startOffset + "-" + endOffset;
CompletableFuture<FetchResult> cf = new CompletableFuture<>();
fetch0(startOffset, endOffset, maxBytesHint, cf);
// If it is recorded as slowFetching, then skip timeout check.
if (slowFetchingOffsetMap.containsKey(slowFetchKey)) {
fetch0(startOffset, endOffset, maxBytesHint, cf, slowFetchKey);
} else {
// Try to have a quick stream. If fetching is timeout, then complete with SlowFetchHintException.
stream.fetch(startOffset, endOffset, maxBytesHint)
.orTimeout(SLOW_FETCH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)
.whenComplete((rst, ex) -> FutureUtil.suppress(() -> {
if (ex != null) {
if (closed) {
cf.completeExceptionally(new IllegalStateException("stream already closed"));
} else if (ex instanceof TimeoutException){
LOGGER.info("Fetch stream[{}] [{},{}) timeout for {} ms, retry with slow fetching", streamId(), startOffset, endOffset, SLOW_FETCH_TIMEOUT_MILLIS);
cf.completeExceptionally(new SlowFetchHintException("fetch data too slowly, retry with slow fetching"));
slowFetchingOffsetMap.put(slowFetchKey, true);
} else {
cf.completeExceptionally(ex);
}
} else {
slowFetchingOffsetMap.remove(slowFetchKey);
cf.complete(rst);
}
}, LOGGER));
}
return cf;
}

private void fetch0(long startOffset, long endOffset, int maxBytesHint, CompletableFuture<FetchResult> cf) {
private void fetch0(long startOffset, long endOffset, int maxBytesHint, CompletableFuture<FetchResult> cf, String slowFetchKey) {
stream.fetch(startOffset, endOffset, maxBytesHint).whenCompleteAsync((rst, ex) -> {
FutureUtil.suppress(() -> {
if (ex != null) {
LOGGER.error("Fetch stream[{}] [{},{}) fail, retry later", streamId(), startOffset, endOffset);
if (!closed) {
FETCH_RETRY_SCHEDULER.schedule(() -> fetch0(startOffset, endOffset, maxBytesHint, cf), 3, TimeUnit.SECONDS);
FETCH_RETRY_SCHEDULER.schedule(() -> fetch0(startOffset, endOffset, maxBytesHint, cf, slowFetchKey), 3, TimeUnit.SECONDS);
} else {
cf.completeExceptionally(new IllegalStateException("stream already closed"));
}
} else {
slowFetchingOffsetMap.remove(slowFetchKey);
cf.complete(rst);
}
}, LOGGER);
Expand Down
17 changes: 13 additions & 4 deletions core/src/main/scala/kafka/log/es/DefaultElasticStreamSlice.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.kafka.common.errors.es.SlowFetchHintException;
import org.apache.kafka.common.utils.Utils;

public class DefaultElasticStreamSlice implements ElasticStreamSlice {
/**
Expand Down Expand Up @@ -71,11 +74,17 @@ public CompletableFuture<AppendResult> append(RecordBatch recordBatch) {
}

@Override
public FetchResult fetch(long startOffset, long endOffset, int maxBytesHint) {
public FetchResult fetch(long startOffset, long endOffset, int maxBytesHint) throws SlowFetchHintException {
long fixedStartOffset = Utils.max(startOffset, 0);
try {
return stream.fetch(startOffsetInStream + startOffset, startOffsetInStream + endOffset, maxBytesHint).thenApply(FetchResultWrapper::new).get();
} catch (Throwable e) {
// TODO: specific exception
return stream.fetch(startOffsetInStream + fixedStartOffset, startOffsetInStream + endOffset, maxBytesHint).thenApply(FetchResultWrapper::new).get();
} catch (ExecutionException e) {
if (e.getCause() instanceof SlowFetchHintException) {
throw (SlowFetchHintException)(e.getCause());
} else {
throw new RuntimeException(e.getCause());
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/kafka/log/es/ElasticLogFileRecords.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.automq.elasticstream.client.api.RecordBatchWithContext;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.kafka.common.errors.es.SlowFetchHintException;
import org.apache.kafka.common.network.TransferableChannel;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.ConvertedRecords;
Expand Down Expand Up @@ -94,15 +95,15 @@ public long appendedOffset() {
return nextOffset.get() - baseOffset;
}

public Records read(long startOffset, long maxOffset, int maxSize) {
public Records read(long startOffset, long maxOffset, int maxSize) throws SlowFetchHintException {
if (ReadManualReleaseHint.isMarked()) {
return readAll0(startOffset, maxOffset, maxSize);
} else {
return new BatchIteratorRecordsAdaptor(this, startOffset, maxOffset, maxSize);
}
}

private Records readAll0(long startOffset, long maxOffset, int maxSize) {
private Records readAll0(long startOffset, long maxOffset, int maxSize) throws SlowFetchHintException {
int readSize = 0;
long nextFetchOffset = startOffset - baseOffset;
long endOffset = Utils.min(this.committedOffset.get(), maxOffset) - baseOffset;
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/log/es/ElasticLogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ object ElasticLogManager {
.build())
INSTANCE = Some(new ElasticLogManager(streamClient))
} else if (endpoint.startsWith(MEMORY_ENDPOINT_PREFIX)) {
INSTANCE = Some(new ElasticLogManager(new MemoryClient()))
val streamClient = new AlwaysSuccessClient(new MemoryClient())
INSTANCE = Some(new ElasticLogManager(streamClient))
} else if (endpoint.startsWith(REDIS_ENDPOINT_PREFIX)) {
INSTANCE = Some(new ElasticLogManager(new ElasticRedisClient(endpoint.substring(REDIS_ENDPOINT_PREFIX.length))))
} else {
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/kafka/log/es/ElasticStreamSlice.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
import com.automq.elasticstream.client.api.Stream;

import java.util.concurrent.CompletableFuture;
import org.apache.kafka.common.errors.es.SlowFetchHintException;

/**
* Elastic stream slice is a slice from elastic stream, the position of slice is start from 0.
* Elastic stream slice is a slice from elastic stream, the offset of slice starts from 0.
* In the same time, there is only one writable slice in a stream, and the writable slice is always the last slice.
*/
public interface ElasticStreamSlice {
Expand All @@ -45,9 +46,9 @@ public interface ElasticStreamSlice {
* @param maxBytesHint max fetch data size hint, the real return data size may be larger than maxBytesHint.
* @return {@link FetchResult}
*/
FetchResult fetch(long startOffset, long endOffset, int maxBytesHint);
FetchResult fetch(long startOffset, long endOffset, int maxBytesHint) throws SlowFetchHintException;

default FetchResult fetch(long startOffset, long endOffset) {
default FetchResult fetch(long startOffset, long endOffset) throws SlowFetchHintException {
return fetch(startOffset, endOffset, (int) (endOffset - startOffset));
}

Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/log/es/ElasticTimeIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ class ElasticTimeIndex(_file: File, streamSegmentSupplier: StreamSliceSupplier,
stream = streamSliceSupplier.reset()
_entries = 0
_lastEntry = lastEntryFromIndexFile
resize(maxIndexSize)
}

def truncate(): Unit = {
Expand Down
Loading