From 0fd4ca92d0bf9c0b1fe601bae4d4c8b44834c3c6 Mon Sep 17 00:00:00 2001 From: Curtis Wan Date: Mon, 21 Aug 2023 09:51:39 +0800 Subject: [PATCH 1/7] feat: separate handlers of appending, fetching and partition-open-or-close; separate quick fetching and slow fetchin (#84) Signed-off-by: Curtis Wan --- .../errors/es/SlowFetchHintException.java | 34 ++++++++ .../kafka/log/es/AlwaysSuccessClient.java | 37 +++++++- .../log/es/DefaultElasticStreamSlice.java | 13 ++- .../kafka/log/es/ElasticLogFileRecords.java | 5 +- .../kafka/log/es/ElasticLogManager.scala | 3 +- .../kafka/log/es/ElasticStreamSlice.java | 5 +- .../main/scala/kafka/server/KafkaApis.scala | 67 +++++++++------ .../scala/kafka/server/ReplicaManager.scala | 86 +++++++++++++------ 8 files changed, 189 insertions(+), 61 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/es/SlowFetchHintException.java diff --git a/clients/src/main/java/org/apache/kafka/common/errors/es/SlowFetchHintException.java b/clients/src/main/java/org/apache/kafka/common/errors/es/SlowFetchHintException.java new file mode 100644 index 0000000000..6d6deb8c9f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/es/SlowFetchHintException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors.es; + +import org.apache.kafka.common.errors.RetriableException; + +/** + * Indicates that the fetch request was too slow to be served. The request should be served in separated thread pool. + */ +public class SlowFetchHintException extends RetriableException { + private static final long serialVersionUID = 1L; + public SlowFetchHintException() { super();} + + public SlowFetchHintException(String message) { super(message); } + + public SlowFetchHintException(Throwable cause) { super(cause); } + + public SlowFetchHintException(String message, Throwable cause) { super(message, cause); } + +} diff --git a/core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java b/core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java index efc5deb553..c4083e14c7 100644 --- a/core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java +++ b/core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java @@ -26,6 +26,10 @@ import com.automq.elasticstream.client.api.RecordBatch; import com.automq.elasticstream.client.api.Stream; import com.automq.elasticstream.client.api.StreamClient; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeoutException; +import org.apache.kafka.common.errors.es.SlowFetchHintException; import org.apache.kafka.common.utils.ThreadUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -123,6 +127,8 @@ private void openStream0(long streamId, OpenStreamOptions options, CompletableFu static class StreamImpl implements Stream { private final Stream stream; private volatile boolean closed = false; + private final Map slowFetchingOffsetMap = new ConcurrentHashMap<>(); + private final long SLOW_FETCH_TIMEOUT_MILLIS = 10; public StreamImpl(Stream stream) { this.stream = stream; @@ -160,22 +166,47 @@ public CompletableFuture append(RecordBatch recordBatch) { @Override public CompletableFuture fetch(long startOffset, long endOffset, int maxBytesHint) { + String slowFetchKey = startOffset + "-" + endOffset; CompletableFuture cf = new CompletableFuture<>(); - fetch0(startOffset, endOffset, maxBytesHint, cf); + // If it is recorded as slowFetching, then skip timeout check. + if (slowFetchingOffsetMap.containsKey(slowFetchKey)) { + fetch0(startOffset, endOffset, maxBytesHint, cf, slowFetchKey); + } else { + // Try to have a quick stream. If fetching is timeout, then complete with SlowFetchHintException. + stream.fetch(startOffset, endOffset, maxBytesHint) + .orTimeout(SLOW_FETCH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS) + .whenComplete((rst, ex) -> FutureUtil.suppress(() -> { + if (ex != null) { + if (closed) { + cf.completeExceptionally(new IllegalStateException("stream already closed")); + } else if (ex instanceof TimeoutException){ + LOGGER.info("Fetch stream[{}] [{},{}) timeout for {} ms, retry with slow fetching", streamId(), startOffset, endOffset, SLOW_FETCH_TIMEOUT_MILLIS); + cf.completeExceptionally(new SlowFetchHintException("fetch data too slowly, retry with slow fetching")); + slowFetchingOffsetMap.put(slowFetchKey, true); + } else { + cf.completeExceptionally(ex); + } + } else { + slowFetchingOffsetMap.remove(slowFetchKey); + cf.complete(rst); + } + }, LOGGER)); + } return cf; } - private void fetch0(long startOffset, long endOffset, int maxBytesHint, CompletableFuture cf) { + private void fetch0(long startOffset, long endOffset, int maxBytesHint, CompletableFuture cf, String slowFetchKey) { stream.fetch(startOffset, endOffset, maxBytesHint).whenCompleteAsync((rst, ex) -> { FutureUtil.suppress(() -> { if (ex != null) { LOGGER.error("Fetch stream[{}] [{},{}) fail, retry later", streamId(), startOffset, endOffset); if (!closed) { - FETCH_RETRY_SCHEDULER.schedule(() -> fetch0(startOffset, endOffset, maxBytesHint, cf), 3, TimeUnit.SECONDS); + FETCH_RETRY_SCHEDULER.schedule(() -> fetch0(startOffset, endOffset, maxBytesHint, cf, slowFetchKey), 3, TimeUnit.SECONDS); } else { cf.completeExceptionally(new IllegalStateException("stream already closed")); } } else { + slowFetchingOffsetMap.remove(slowFetchKey); cf.complete(rst); } }, LOGGER); diff --git a/core/src/main/scala/kafka/log/es/DefaultElasticStreamSlice.java b/core/src/main/scala/kafka/log/es/DefaultElasticStreamSlice.java index 9c2a92fcc7..802db655f5 100644 --- a/core/src/main/scala/kafka/log/es/DefaultElasticStreamSlice.java +++ b/core/src/main/scala/kafka/log/es/DefaultElasticStreamSlice.java @@ -27,7 +27,9 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import org.apache.kafka.common.errors.es.SlowFetchHintException; public class DefaultElasticStreamSlice implements ElasticStreamSlice { /** @@ -71,11 +73,16 @@ public CompletableFuture append(RecordBatch recordBatch) { } @Override - public FetchResult fetch(long startOffset, long endOffset, int maxBytesHint) { + public FetchResult fetch(long startOffset, long endOffset, int maxBytesHint) throws SlowFetchHintException { try { return stream.fetch(startOffsetInStream + startOffset, startOffsetInStream + endOffset, maxBytesHint).thenApply(FetchResultWrapper::new).get(); - } catch (Throwable e) { - // TODO: specific exception + } catch (ExecutionException e) { + if (e.getCause() instanceof SlowFetchHintException) { + throw (SlowFetchHintException)(e.getCause()); + } else { + throw new RuntimeException(e.getCause()); + } + } catch (InterruptedException e) { throw new RuntimeException(e); } } diff --git a/core/src/main/scala/kafka/log/es/ElasticLogFileRecords.java b/core/src/main/scala/kafka/log/es/ElasticLogFileRecords.java index 2807bb293e..7996ff7c6a 100644 --- a/core/src/main/scala/kafka/log/es/ElasticLogFileRecords.java +++ b/core/src/main/scala/kafka/log/es/ElasticLogFileRecords.java @@ -21,6 +21,7 @@ import com.automq.elasticstream.client.api.RecordBatchWithContext; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; +import org.apache.kafka.common.errors.es.SlowFetchHintException; import org.apache.kafka.common.network.TransferableChannel; import org.apache.kafka.common.record.AbstractRecords; import org.apache.kafka.common.record.ConvertedRecords; @@ -94,7 +95,7 @@ public long appendedOffset() { return nextOffset.get() - baseOffset; } - public Records read(long startOffset, long maxOffset, int maxSize) { + public Records read(long startOffset, long maxOffset, int maxSize) throws SlowFetchHintException { if (ReadManualReleaseHint.isMarked()) { return readAll0(startOffset, maxOffset, maxSize); } else { @@ -102,7 +103,7 @@ public Records read(long startOffset, long maxOffset, int maxSize) { } } - private Records readAll0(long startOffset, long maxOffset, int maxSize) { + private Records readAll0(long startOffset, long maxOffset, int maxSize) throws SlowFetchHintException { int readSize = 0; long nextFetchOffset = startOffset - baseOffset; long endOffset = Utils.min(this.committedOffset.get(), maxOffset) - baseOffset; diff --git a/core/src/main/scala/kafka/log/es/ElasticLogManager.scala b/core/src/main/scala/kafka/log/es/ElasticLogManager.scala index 4f46095704..f8d94ded05 100644 --- a/core/src/main/scala/kafka/log/es/ElasticLogManager.scala +++ b/core/src/main/scala/kafka/log/es/ElasticLogManager.scala @@ -104,7 +104,8 @@ object ElasticLogManager { .build()) INSTANCE = Some(new ElasticLogManager(streamClient)) } else if (endpoint.startsWith(MEMORY_ENDPOINT_PREFIX)) { - INSTANCE = Some(new ElasticLogManager(new MemoryClient())) + val streamClient = new AlwaysSuccessClient(new MemoryClient()) + INSTANCE = Some(new ElasticLogManager(streamClient)) } else if (endpoint.startsWith(REDIS_ENDPOINT_PREFIX)) { INSTANCE = Some(new ElasticLogManager(new ElasticRedisClient(endpoint.substring(REDIS_ENDPOINT_PREFIX.length)))) } else { diff --git a/core/src/main/scala/kafka/log/es/ElasticStreamSlice.java b/core/src/main/scala/kafka/log/es/ElasticStreamSlice.java index d0d58eca5e..3f164116ec 100644 --- a/core/src/main/scala/kafka/log/es/ElasticStreamSlice.java +++ b/core/src/main/scala/kafka/log/es/ElasticStreamSlice.java @@ -23,6 +23,7 @@ import com.automq.elasticstream.client.api.Stream; import java.util.concurrent.CompletableFuture; +import org.apache.kafka.common.errors.es.SlowFetchHintException; /** * Elastic stream slice is a slice from elastic stream, the position of slice is start from 0. @@ -45,9 +46,9 @@ public interface ElasticStreamSlice { * @param maxBytesHint max fetch data size hint, the real return data size may be larger than maxBytesHint. * @return {@link FetchResult} */ - FetchResult fetch(long startOffset, long endOffset, int maxBytesHint); + FetchResult fetch(long startOffset, long endOffset, int maxBytesHint) throws SlowFetchHintException; - default FetchResult fetch(long startOffset, long endOffset) { + default FetchResult fetch(long startOffset, long endOffset) throws SlowFetchHintException { return fetch(startOffset, endOffset, (int) (endOffset - startOffset)); } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index a703195541..ad4b4972c9 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -70,7 +70,7 @@ import org.apache.kafka.common.resource.ResourceType._ import org.apache.kafka.common.resource.{Resource, ResourceType} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation} -import org.apache.kafka.common.utils.{ProducerIdAndEpoch, Time} +import org.apache.kafka.common.utils.{ProducerIdAndEpoch, ThreadUtils, Time} import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.server.authorizer._ import org.apache.kafka.server.common.MetadataVersion @@ -79,7 +79,7 @@ import org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0, IBP_2_3_I import java.lang.{Long => JLong} import java.nio.ByteBuffer import java.util -import java.util.concurrent.{CompletableFuture, ConcurrentHashMap} +import java.util.concurrent.{CompletableFuture, ConcurrentHashMap, Executors} import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} import java.util.{Collections, Optional} import scala.annotation.nowarn @@ -121,6 +121,9 @@ class KafkaApis(val requestChannel: RequestChannel, val requestHelper = new RequestHandlerHelper(requestChannel, quotas, time) val aclApis = new AclApis(authHelper, authorizer, requestHelper, "broker", config) val configManager = new ConfigAdminManager(brokerId, config, configRepository) + // These two executors separate the handling of `produce` and `fetch` requests in case of throttling. + val appendingExecutors = Executors.newFixedThreadPool(2, ThreadUtils.createThreadFactory("kafka-apis-appending-executor-%d", true)) + val fetchingExecutors = Executors.newFixedThreadPool(2, ThreadUtils.createThreadFactory("kafka-apis-fetching-executor-%d", true)) def close(): Unit = { aclApis.close() @@ -743,21 +746,28 @@ class KafkaApis(val requestChannel: RequestChannel, else { val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId - // call the replica manager to append messages to the replicas - replicaManager.appendRecords( - timeout = produceRequest.timeout.toLong, - requiredAcks = produceRequest.acks, - internalTopicsAllowed = internalTopicsAllowed, - origin = AppendOrigin.Client, - entriesPerPartition = authorizedRequestInfo, - requestLocal = requestLocal, - responseCallback = sendResponseCallback, - recordConversionStatsCallback = processingStatsCallback) - - // if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected; - // hence we clear its data here in order to let GC reclaim its memory since it is already appended to log - produceRequest.clearPartitionRecords() - PRODUCE_TIMER.update(System.nanoTime() - startNanos) + // elastic stream inject start + // The appending is done is a separate thread pool to avoid blocking io thread + appendingExecutors.submit(new Runnable { + override def run(): Unit = { + // call the replica manager to append messages to the replicas + replicaManager.appendRecords( + timeout = produceRequest.timeout.toLong, + requiredAcks = produceRequest.acks, + internalTopicsAllowed = internalTopicsAllowed, + origin = AppendOrigin.Client, + entriesPerPartition = authorizedRequestInfo, + requestLocal = requestLocal, + responseCallback = sendResponseCallback, + recordConversionStatsCallback = processingStatsCallback) + + // if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected; + // hence we clear its data here in order to let GC reclaim its memory since it is already appended to log + produceRequest.clearPartitionRecords() + PRODUCE_TIMER.update(System.nanoTime() - startNanos) + } + }) + // elastic stream inject end } } @@ -1058,15 +1068,20 @@ class KafkaApis(val requestChannel: RequestChannel, ) // elastic stream inject start - ReadManualReleaseHint.mark() - // call the replica manager to fetch messages from the local replica - replicaManager.fetchMessages( - params = params, - fetchInfos = interesting, - quota = replicationQuota(fetchRequest), - responseCallback = processResponseCallback, - ) - ReadManualReleaseHint.reset() + // The fetching is done is a separate thread pool to avoid blocking io thread. + fetchingExecutors.submit(new Runnable { + override def run(): Unit = { + ReadManualReleaseHint.mark() + // call the replica manager to fetch messages from the local replica + replicaManager.fetchMessages( + params = params, + fetchInfos = interesting, + quota = replicationQuota(fetchRequest), + responseCallback = processResponseCallback, + ) + ReadManualReleaseHint.reset() + } + }) // elastic stream inject end } } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 865217bf06..6582c9b341 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -16,36 +16,31 @@ */ package kafka.server -import java.io.File -import java.util.Optional -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.locks.Lock import com.yammer.metrics.core.Meter import kafka.api._ import kafka.cluster.{BrokerEndPoint, Partition} import kafka.common.RecordValidationException import kafka.controller.{KafkaController, StateChangeLogger} import kafka.log._ +import kafka.log.es.ReadManualReleaseHint import kafka.metrics.KafkaMetricsGroup import kafka.server.HostedPartition.Online import kafka.server.QuotaFactory.QuotaManagers import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints} import kafka.server.metadata.ZkMetadataCache -import kafka.utils._ import kafka.utils.Implicits._ +import kafka.utils._ import kafka.zk.KafkaZkClient -import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.errors._ +import org.apache.kafka.common.errors.es.SlowFetchHintException import org.apache.kafka.common.internals.Topic -import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult -import org.apache.kafka.common.message.{DescribeLogDirsResponseData, DescribeProducersResponseData, FetchResponseData, LeaderAndIsrResponseData} -import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrTopicError -import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrPartitionError +import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState +import org.apache.kafka.common.message.LeaderAndIsrResponseData.{LeaderAndIsrPartitionError, LeaderAndIsrTopicError} import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.{EpochEndOffset, OffsetForLeaderTopicResult} import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaPartitionState +import org.apache.kafka.common.message.{DescribeLogDirsResponseData, DescribeProducersResponseData, FetchResponseData, LeaderAndIsrResponseData} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors @@ -53,20 +48,26 @@ import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.record._ import org.apache.kafka.common.replica.PartitionView.DefaultPartitionView import org.apache.kafka.common.replica.ReplicaView.DefaultReplicaView -import org.apache.kafka.common.replica.{ClientMetadata, _} +import org.apache.kafka.common.replica._ import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests._ -import org.apache.kafka.common.utils.Time +import org.apache.kafka.common.utils.{ThreadUtils, Time} +import org.apache.kafka.common._ import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta} import org.apache.kafka.metadata.LeaderConstants.NO_LEADER import org.apache.kafka.server.common.MetadataVersion._ +import java.io.File import java.nio.file.{Files, Paths} import java.util -import scala.jdk.CollectionConverters._ +import java.util.Optional +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.locks.Lock +import java.util.concurrent.{Executors, TimeUnit} import scala.collection.{Map, Seq, Set, mutable} import scala.compat.java8.OptionConverters._ +import scala.jdk.CollectionConverters._ /* * Result metadata of a log append operation on the log @@ -219,6 +220,10 @@ class ReplicaManager(val config: KafkaConfig, val delayedElectLeaderPurgatory = delayedElectLeaderPurgatoryParam.getOrElse( DelayedOperationPurgatory[DelayedElectLeader]( purgatoryName = "ElectLeader", brokerId = config.brokerId)) + // This threadPool is used to separate slow fetches from quick fetches. + val slowFetchExecutors = Executors.newFixedThreadPool(4, ThreadUtils.createThreadFactory("slow-fetch-executor-%d", true)) + // This threadPool is used to handle partition open/close in case of throttling metadata replay. + val partitionOpenCloseExecutors = Executors.newFixedThreadPool(4, ThreadUtils.createThreadFactory("partition-open-close-executor-%d", true)) /* epoch of the controller that last changed the leader */ @volatile private[server] var controllerEpoch: Int = KafkaController.InitialControllerEpoch @@ -1037,9 +1042,14 @@ class ReplicaManager(val config: KafkaConfig, var hasPreferredReadReplica = false val logReadResultMap = new mutable.HashMap[TopicIdPartition, LogReadResult] + var containsSlowFetchHint = false + logReadResults.foreach { case (topicIdPartition, logReadResult) => brokerTopicStats.topicStats(topicIdPartition.topicPartition.topic).totalFetchRequestRate.mark() brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark() + if (logReadResult.exception.isDefined && logReadResult.exception.get.isInstanceOf[KafkaStorageException]) { + containsSlowFetchHint = true + } if (logReadResult.error != Errors.NONE) errorReadingData = true if (logReadResult.divergingEpoch.nonEmpty) @@ -1050,6 +1060,25 @@ class ReplicaManager(val config: KafkaConfig, logReadResultMap.put(topicIdPartition, logReadResult) } + // elastic stream inject start + // If there is any slow fetch hint, we will read from local log in a separate thread. + if (containsSlowFetchHint) { + slowFetchExecutors.submit(new Runnable { + override def run(): Unit = { + ReadManualReleaseHint.mark() + val logReadResults = readFromLocalLog(params, fetchInfos, quota, readFromPurgatory = false) + ReadManualReleaseHint.reset() + val fetchPartitionData = logReadResults.map { case (tp, result) => + val isReassignmentFetch = params.isFromFollower && isAddingReplica(tp.topicPartition, params.replicaId) + tp -> result.toFetchPartitionData(isReassignmentFetch) + } + responseCallback(fetchPartitionData) + } + }) + return + } + // elastic stream inject end + // respond immediately if 1) fetch request does not want to wait // 2) fetch request does not require any data // 3) has enough data to respond @@ -1186,6 +1215,7 @@ class ReplicaManager(val config: KafkaConfig, _: ReplicaNotAvailableException | _: KafkaStorageException | _: OffsetOutOfRangeException | + _: SlowFetchHintException | _: InconsistentTopicIdException) => LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), divergingEpoch = None, @@ -2107,16 +2137,20 @@ class ReplicaManager(val config: KafkaConfig, // create new partitions with the same names as the ones we are deleting here. if (!localChanges.deletes.isEmpty) { val deletes = localChanges.deletes.asScala.map(tp => (tp, true)).toMap - stateChangeLogger.info(s"Deleting ${deletes.size} partition(s).") - stopPartitions(deletes).forKeyValue { (topicPartition, e) => - if (e.isInstanceOf[KafkaStorageException]) { - stateChangeLogger.error(s"Unable to delete replica $topicPartition because " + - "the local replica for the partition is in an offline log directory") - } else { - stateChangeLogger.error(s"Unable to delete replica $topicPartition because " + - s"we got an unexpected ${e.getClass.getName} exception: ${e.getMessage}") + partitionOpenCloseExecutors.submit(new Runnable { + override def run(): Unit = { + stateChangeLogger.info(s"Deleting ${deletes.size} partition(s).") + stopPartitions(deletes).forKeyValue { (topicPartition, e) => + if (e.isInstanceOf[KafkaStorageException]) { + stateChangeLogger.error(s"Unable to delete replica $topicPartition because " + + "the local replica for the partition is in an offline log directory") + } else { + stateChangeLogger.error(s"Unable to delete replica $topicPartition because " + + s"we got an unexpected ${e.getClass.getName} exception: ${e.getMessage}") + } + } } - } + }) } // Handle partitions which we are now the leader or follower for. @@ -2124,7 +2158,11 @@ class ReplicaManager(val config: KafkaConfig, val lazyOffsetCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints) val changedPartitions = new mutable.HashSet[Partition] if (!localChanges.leaders.isEmpty) { - applyLocalLeadersDelta(changedPartitions, delta, lazyOffsetCheckpoints, localChanges.leaders.asScala) + partitionOpenCloseExecutors.submit(new Runnable { + override def run(): Unit = { + applyLocalLeadersDelta(changedPartitions, delta, lazyOffsetCheckpoints, localChanges.leaders.asScala) + } + }) } if (!localChanges.followers.isEmpty) { applyLocalFollowersDelta(changedPartitions, newImage, delta, lazyOffsetCheckpoints, localChanges.followers.asScala) From af66f1a81b8bc9d75f96a3d5e1d7a7f3ba652577 Mon Sep 17 00:00:00 2001 From: Curtis Wan Date: Mon, 21 Aug 2023 16:16:57 +0800 Subject: [PATCH 2/7] fix: #87; expecting throw IOException now (#89) Signed-off-by: Curtis Wan --- core/src/test/scala/unit/kafka/log/es/ElasticLogTest.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/es/ElasticLogTest.scala b/core/src/test/scala/unit/kafka/log/es/ElasticLogTest.scala index e941fc53ff..92d2b71a83 100644 --- a/core/src/test/scala/unit/kafka/log/es/ElasticLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/es/ElasticLogTest.scala @@ -27,7 +27,7 @@ import org.apache.kafka.common.{KafkaException, TopicPartition} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, Tag, Test} -import java.io.File +import java.io.{File, IOException} import java.nio.charset.StandardCharsets import java.util.regex.Pattern import java.util.{Collections, Properties} @@ -222,7 +222,7 @@ class ElasticLogTest { val keyValues = Seq(KeyValue("abc", "ABC"), KeyValue("de", "DE")) appendRecords(kvsToRecords(keyValues)) log.close() - assertThrows(classOf[IllegalStateException], () => appendRecords(kvsToRecords(keyValues), initialOffset = 2L)) + assertThrows(classOf[IOException], () => appendRecords(kvsToRecords(keyValues), initialOffset = 2L)) } @Test @@ -245,7 +245,7 @@ class ElasticLogTest { val keyValues = Seq(KeyValue("abc", "ABC"), KeyValue("de", "DE")) appendRecords(kvsToRecords(keyValues)) log.closeHandlers() - assertThrows(classOf[IllegalStateException], + assertThrows(classOf[IOException], () => appendRecords(kvsToRecords(keyValues), initialOffset = 2L)) } From b0f9a1924add303f957c55f468d693a57f2c84d8 Mon Sep 17 00:00:00 2001 From: Curtis Wan Date: Mon, 21 Aug 2023 16:25:59 +0800 Subject: [PATCH 3/7] fix: #88; resize to maxIndexSize when resetting (#91) Signed-off-by: Curtis Wan --- core/src/main/scala/kafka/log/es/ElasticTimeIndex.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/kafka/log/es/ElasticTimeIndex.scala b/core/src/main/scala/kafka/log/es/ElasticTimeIndex.scala index 5e299dbc20..83cbb2a6d9 100644 --- a/core/src/main/scala/kafka/log/es/ElasticTimeIndex.scala +++ b/core/src/main/scala/kafka/log/es/ElasticTimeIndex.scala @@ -137,6 +137,7 @@ class ElasticTimeIndex(_file: File, streamSegmentSupplier: StreamSliceSupplier, stream = streamSliceSupplier.reset() _entries = 0 _lastEntry = lastEntryFromIndexFile + resize(maxIndexSize) } def truncate(): Unit = { From 7dc6da5b6b5fe8aa6432fc54624da4a98ba914b9 Mon Sep 17 00:00:00 2001 From: Curtis Wan Date: Mon, 21 Aug 2023 16:46:47 +0800 Subject: [PATCH 4/7] fix: #85; init with stream-slice's nextOffset (#93) Signed-off-by: Curtis Wan --- core/src/main/scala/kafka/log/es/AbstractStreamIndex.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/log/es/AbstractStreamIndex.scala b/core/src/main/scala/kafka/log/es/AbstractStreamIndex.scala index 00b88e4c45..847ee8d240 100644 --- a/core/src/main/scala/kafka/log/es/AbstractStreamIndex.scala +++ b/core/src/main/scala/kafka/log/es/AbstractStreamIndex.scala @@ -49,7 +49,7 @@ abstract class AbstractStreamIndex(_file: File, val streamSliceSupplier: StreamS protected var _maxEntries: Int = adjustedMaxIndexSize / entrySize @volatile - protected var _entries: Int = (stream.nextOffset() / entrySize).toInt + protected var _entries: Int = stream.nextOffset().toInt @volatile protected var cache: MappedByteBuffer = { From bb8d4c997c29fe6cdb5bdba401024456642c6305 Mon Sep 17 00:00:00 2001 From: Curtis Wan Date: Mon, 21 Aug 2023 16:47:35 +0800 Subject: [PATCH 5/7] fix: #86; make sure stream slice is fetched from at least offset 0 (#92) Signed-off-by: Curtis Wan --- .../main/scala/kafka/log/es/DefaultElasticStreamSlice.java | 4 +++- core/src/main/scala/kafka/log/es/ElasticStreamSlice.java | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/log/es/DefaultElasticStreamSlice.java b/core/src/main/scala/kafka/log/es/DefaultElasticStreamSlice.java index 802db655f5..6dc8725f29 100644 --- a/core/src/main/scala/kafka/log/es/DefaultElasticStreamSlice.java +++ b/core/src/main/scala/kafka/log/es/DefaultElasticStreamSlice.java @@ -30,6 +30,7 @@ import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import org.apache.kafka.common.errors.es.SlowFetchHintException; +import org.apache.kafka.common.utils.Utils; public class DefaultElasticStreamSlice implements ElasticStreamSlice { /** @@ -74,8 +75,9 @@ public CompletableFuture append(RecordBatch recordBatch) { @Override public FetchResult fetch(long startOffset, long endOffset, int maxBytesHint) throws SlowFetchHintException { + long fixedStartOffset = Utils.max(startOffset, 0); try { - return stream.fetch(startOffsetInStream + startOffset, startOffsetInStream + endOffset, maxBytesHint).thenApply(FetchResultWrapper::new).get(); + return stream.fetch(startOffsetInStream + fixedStartOffset, startOffsetInStream + endOffset, maxBytesHint).thenApply(FetchResultWrapper::new).get(); } catch (ExecutionException e) { if (e.getCause() instanceof SlowFetchHintException) { throw (SlowFetchHintException)(e.getCause()); diff --git a/core/src/main/scala/kafka/log/es/ElasticStreamSlice.java b/core/src/main/scala/kafka/log/es/ElasticStreamSlice.java index 3f164116ec..ab4d6ba2d7 100644 --- a/core/src/main/scala/kafka/log/es/ElasticStreamSlice.java +++ b/core/src/main/scala/kafka/log/es/ElasticStreamSlice.java @@ -26,7 +26,7 @@ import org.apache.kafka.common.errors.es.SlowFetchHintException; /** - * Elastic stream slice is a slice from elastic stream, the position of slice is start from 0. + * Elastic stream slice is a slice from elastic stream, the offset of slice starts from 0. * In the same time, there is only one writable slice in a stream, and the writable slice is always the last slice. */ public interface ElasticStreamSlice { From 170ded3cada1dbac55847eec3a5b4f9019517b07 Mon Sep 17 00:00:00 2001 From: Curtis Wan Date: Mon, 21 Aug 2023 17:40:37 +0800 Subject: [PATCH 6/7] fix: #94; adapt for jdk11+ (#95) Signed-off-by: Curtis Wan --- .../apache/kafka/trogdor/common/JsonSerializationTest.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/trogdor/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java b/trogdor/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java index 8e53516a11..20ef0fd680 100644 --- a/trogdor/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java +++ b/trogdor/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java @@ -19,6 +19,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; +import java.lang.reflect.Modifier; import org.apache.kafka.trogdor.fault.FilesUnreadableFaultSpec; import org.apache.kafka.trogdor.fault.Kibosh; import org.apache.kafka.trogdor.fault.NetworkPartitionFaultSpec; @@ -73,9 +74,11 @@ private void verify(T val1) throws Exception { Class clazz = (Class) val1.getClass(); T val2 = JsonUtil.JSON_SERDE.readValue(bytes, clazz); for (Field field : clazz.getDeclaredFields()) { - boolean wasAccessible = field.isAccessible(); + // for static filed, we have to pass null to field.canAccess() + T maybeNullObject = Modifier.isStatic(field.getModifiers()) ? null : val2; + boolean wasAccessible = field.canAccess(maybeNullObject); field.setAccessible(true); - assertNotNull(field.get(val2), "Field " + field + " was null."); + assertNotNull(field.get(maybeNullObject), "Field " + field + " was null."); field.setAccessible(wasAccessible); } } From 8ca15272cce9688d460b07975e1407a279ddbe41 Mon Sep 17 00:00:00 2001 From: Curtis Wan Date: Mon, 21 Aug 2023 20:17:43 +0800 Subject: [PATCH 7/7] chore: to #96; add es unit tests for PR, add PR title check (#97) * feat: add pr title check and unit test check Signed-off-by: Curtis Wan * feat: add es unit tests check Signed-off-by: Curtis Wan * accelerate actions Signed-off-by: Curtis Wan * fix filter output Signed-off-by: Curtis Wan --------- Signed-off-by: Curtis Wan --- .github/workflows/build.yml | 42 +++++++++++++++++++++++++ .github/workflows/es_unit_tests.yml | 35 +++++++++++++++++++++ .github/workflows/validate_pr_title.yml | 17 ++++++++++ 3 files changed, 94 insertions(+) create mode 100644 .github/workflows/build.yml create mode 100644 .github/workflows/es_unit_tests.yml create mode 100644 .github/workflows/validate_pr_title.yml diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 0000000000..69afab19cb --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,42 @@ +name: Build +on: + pull_request: + types: [opened, reopened, synchronize] + push: + branches: ["master", "develop"] + +jobs: + paths-filter: + runs-on: ubuntu-latest + outputs: + es-unit-test: ${{ steps.filter.outputs.es-unit-test }} + steps: + - uses: actions/checkout@v3 + - uses: dorny/paths-filter@v2 + id: filter + with: + filters: | + es-unit-test: + - '.github/workflows/**' + - 'core/**' + - 'metadata/**' + es-unit-test: + needs: [paths-filter] + if: ${{ needs.paths-filter.outputs.es-unit-test == 'true' || github.event_name == 'push' }} + uses: ./.github/workflows/es_unit_tests.yml + build-result: + runs-on: ubuntu-latest + needs: [es-unit-test] + if: ${{ always() }} + steps: + - uses: actions/checkout@v3 + - name: Collect build result + run: | + if echo es-unit-test-${{ needs.es-unit-test.result }} | grep -E 'cancelled|failure' -o > null + then + echo "There are failed/cancelled builds" + exit 1 + else + echo "All builds are successful/skipped" + exit 0 + fi diff --git a/.github/workflows/es_unit_tests.yml b/.github/workflows/es_unit_tests.yml new file mode 100644 index 0000000000..5fd32fb44e --- /dev/null +++ b/.github/workflows/es_unit_tests.yml @@ -0,0 +1,35 @@ +# This workflow uses actions that are not certified by GitHub. +# They are provided by a third-party and are governed by +# separate terms of service, privacy policy, and support +# documentation. +# This workflow will build a Java project with Gradle and cache/restore any dependencies to improve the workflow execution time +# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-java-with-gradle + +name: ES Unit Tests + +on: + workflow_call: + +permissions: + contents: read + +jobs: + build: + name: "${{ matrix.os }}, jdk-${{ matrix.jdk }}" + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: [ubuntu-22.04] + jdk: [11, 17] + steps: + - name: Checkout + uses: actions/checkout@v3 + - name: Set up JDK ${{ matrix.jdk }} + uses: actions/setup-java@v3 + with: + java-version: ${{ matrix.jdk }} + distribution: "zulu" + - name: Setup Gradle + uses: gradle/gradle-build-action@v2 + - name: Execute Gradle build + run: ./gradlew metadata:esUnitTest core:esUnitTest diff --git a/.github/workflows/validate_pr_title.yml b/.github/workflows/validate_pr_title.yml new file mode 100644 index 0000000000..d2c4216123 --- /dev/null +++ b/.github/workflows/validate_pr_title.yml @@ -0,0 +1,17 @@ +name: "Lint PR" + +on: + pull_request_target: + types: + - opened + - edited + - synchronize + +jobs: + main: + name: Validate PR title + runs-on: ubuntu-latest + steps: + - uses: amannn/action-semantic-pull-request@v5 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}