Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package kafka.automq.zerozone;

import kafka.automq.partition.snapshot.SnapshotOperation;
import kafka.cluster.Partition;
import kafka.cluster.PartitionSnapshot;
import kafka.log.streamaspect.LazyStream;
Expand Down Expand Up @@ -139,6 +138,14 @@ public void setVersion(AutoMQVersion newVersion) {
}
}

public synchronized CompletableFuture<Void> nextSnapshotCf() {
return CompletableFuture.allOf(subscribers.values().stream()
.map(Subscriber::nextSnapshotCf)
.toList()
.toArray(new CompletableFuture<?>[0])
);
}

private synchronized void triggerSubscribersApply() {
subscribers.forEach((nodeId, subscriber) -> subscriber.apply());
}
Expand Down Expand Up @@ -290,6 +297,13 @@ public void requestCommit() {
}
}

/**
* Get the next snapshot future. The future will be completed after the next sync snapshots have been applied.
*/
public CompletableFuture<Void> nextSnapshotCf() {
return requester.nextSnapshotCf();
}

public CompletableFuture<Void> close() {
LOGGER.info("[SNAPSHOT_READ_UNSUBSCRIBE],node={}", node);
CompletableFuture<Void> cf = new CompletableFuture<>();
Expand All @@ -301,6 +315,7 @@ public CompletableFuture<Void> close() {
partitions.clear();
snapshotWithOperations.clear();
replayer.close();
requester.nextSnapshotCf().complete(null);
cf.complete(null);
} catch (Throwable e) {
cf.completeExceptionally(e);
Expand Down Expand Up @@ -358,6 +373,11 @@ void onNewOperationBatch(OperationBatch batch) {
void applySnapshot() {
while (!snapshotWithOperations.isEmpty()) {
SnapshotWithOperation snapshotWithOperation = snapshotWithOperations.peek();
if (snapshotWithOperation.isSnapshotMark()) {
snapshotWithOperations.poll();
snapshotWithOperation.snapshotCf.complete(null);
continue;
}
TopicIdPartition topicIdPartition = snapshotWithOperation.topicIdPartition;

switch (snapshotWithOperation.operation) {
Expand Down Expand Up @@ -460,7 +480,8 @@ static boolean checkBatchMetadataReady0(OperationBatch batch, MetadataCache meta
if (batch.readyIndex == batch.operations.size() - 1) {
return true;
}
if (isMetadataUnready(batch.operations.get(batch.readyIndex + 1).snapshot.streamEndOffsets(), metadataCache)) {
SnapshotWithOperation operation = batch.operations.get(batch.readyIndex + 1);
if (!operation.isSnapshotMark() && isMetadataUnready(operation.snapshot.streamEndOffsets(), metadataCache)) {
return false;
}
batch.readyIndex = batch.readyIndex + 1;
Expand Down Expand Up @@ -521,25 +542,4 @@ public WaitingDataLoadTask(long timestamp, List<OperationBatch> operationBatchLi
}
}

static class SnapshotWithOperation {
final TopicIdPartition topicIdPartition;
final PartitionSnapshot snapshot;
final SnapshotOperation operation;

public SnapshotWithOperation(TopicIdPartition topicIdPartition, PartitionSnapshot snapshot,
SnapshotOperation operation) {
this.topicIdPartition = topicIdPartition;
this.snapshot = snapshot;
this.operation = operation;
}

@Override
public String toString() {
return "SnapshotWithOperation{" +
"topicIdPartition=" + topicIdPartition +
", snapshot=" + snapshot +
", operation=" + operation +
'}';
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.automq.zerozone;

import kafka.automq.partition.snapshot.SnapshotOperation;
import kafka.cluster.PartitionSnapshot;

import org.apache.kafka.common.TopicIdPartition;

import java.util.concurrent.CompletableFuture;

public class SnapshotWithOperation {
final TopicIdPartition topicIdPartition;
final PartitionSnapshot snapshot;
final SnapshotOperation operation;
final CompletableFuture<Void> snapshotCf;

public SnapshotWithOperation(TopicIdPartition topicIdPartition, PartitionSnapshot snapshot,
SnapshotOperation operation) {
this(topicIdPartition, snapshot, operation, null);
}

public SnapshotWithOperation(TopicIdPartition topicIdPartition, PartitionSnapshot snapshot,
SnapshotOperation operation, CompletableFuture<Void> snapshotCf) {
this.topicIdPartition = topicIdPartition;
this.snapshot = snapshot;
this.operation = operation;
this.snapshotCf = snapshotCf;
}

public static SnapshotWithOperation snapshotMark(CompletableFuture<Void> cf) {
return new SnapshotWithOperation(null, null, null, cf);
}

public boolean isSnapshotMark() {
return snapshotCf != null;
}

@Override
public String toString() {
return "SnapshotWithOperation{" +
"topicIdPartition=" + topicIdPartition +
", snapshot=" + snapshot +
", operation=" + operation +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@
import org.apache.kafka.storage.internals.log.TimestampOffset;

import com.automq.stream.s3.wal.impl.DefaultRecordOffset;
import com.automq.stream.utils.FutureUtil;
import com.automq.stream.utils.Threads;
import com.automq.stream.utils.threads.EventLoop;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -60,6 +62,7 @@
private int sessionEpoch;
boolean requestCommit = false;
boolean requestReset = false;
private CompletableFuture<Void> nextSnapshotCf = new CompletableFuture<>();

private final SnapshotReadPartitionsManager.Subscriber subscriber;
private final Node node;
Expand Down Expand Up @@ -89,6 +92,10 @@ public void close() {
closed = true;
}

public CompletableFuture<Void> nextSnapshotCf() {
return nextSnapshotCf;
}

private void request() {
eventLoop.execute(this::request0);
}
Expand All @@ -97,6 +104,12 @@ private void request0() {
if (closed) {
return;
}
// The snapshotCf will be completed after all snapshots in the response have been applied.
CompletableFuture<Void> snapshotCf = this.nextSnapshotCf;
this.nextSnapshotCf = new CompletableFuture<>();
// The request may fail. So when the nextSnapshotCf complete, we will complete the current snapshotCf.
FutureUtil.propagate(nextSnapshotCf, snapshotCf);

tryReset0();
lastRequestTime = time.milliseconds();
AutomqGetPartitionSnapshotRequestData data = new AutomqGetPartitionSnapshotRequestData().setSessionId(sessionId).setSessionEpoch(sessionEpoch).setVersion((short) 1);
Expand All @@ -113,7 +126,7 @@ private void request0() {
AutomqGetPartitionSnapshotRequest.Builder builder = new AutomqGetPartitionSnapshotRequest.Builder(data);
asyncSender.sendRequest(node, builder)
.thenAcceptAsync(rst -> {
handleResponse(rst);
handleResponse(rst, snapshotCf);
subscriber.unsafeRun();
}, eventLoop)
.exceptionally(ex -> {
Expand All @@ -129,7 +142,7 @@ private void request0() {
});
}

private void handleResponse(ClientResponse clientResponse) {
private void handleResponse(ClientResponse clientResponse, CompletableFuture<Void> snapshotCf) {
if (closed) {
return;
}
Expand Down Expand Up @@ -175,6 +188,7 @@ private void handleResponse(ClientResponse clientResponse) {
return c1 - c2;
});
subscriber.onNewWalEndOffset(resp.confirmWalConfig(), DefaultRecordOffset.of(Unpooled.wrappedBuffer(resp.confirmWalEndOffset())));
batch.operations.add(SnapshotWithOperation.snapshotMark(snapshotCf));
subscriber.onNewOperationBatch(batch);
sessionId = resp.sessionId();
sessionEpoch = resp.sessionEpoch();
Expand All @@ -191,7 +205,7 @@ private boolean tryReset0() {
}
}

static SnapshotReadPartitionsManager.SnapshotWithOperation convert(TopicIdPartition topicIdPartition,
static SnapshotWithOperation convert(TopicIdPartition topicIdPartition,
AutomqGetPartitionSnapshotResponseData.PartitionSnapshot src) {
PartitionSnapshot.Builder snapshot = PartitionSnapshot.builder();
snapshot.leaderEpoch(src.leaderEpoch());
Expand All @@ -202,7 +216,7 @@ static SnapshotReadPartitionsManager.SnapshotWithOperation convert(TopicIdPartit
snapshot.lastTimestampOffset(convertTimestampOffset(src.lastTimestampOffset()));

SnapshotOperation operation = SnapshotOperation.parse(src.operation());
return new SnapshotReadPartitionsManager.SnapshotWithOperation(topicIdPartition, snapshot.build(), operation);
return new SnapshotWithOperation(topicIdPartition, snapshot.build(), operation);
}

static ElasticLogMeta convert(AutomqGetPartitionSnapshotResponseData.LogMetadata src) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public ZeroZoneTrafficInterceptor(
this.snapshotReadPartitionsManager = new SnapshotReadPartitionsManager(kafkaConfig, kafkaApis.metrics(), time, confirmWALProvider,
(ElasticReplicaManager) kafkaApis.replicaManager(), kafkaApis.metadataCache(), replayer);
this.snapshotReadPartitionsManager.setVersion(version);
kafkaApis.setSnapshotAwaitReadyProvider(this.snapshotReadPartitionsManager::nextSnapshotCf);
replayer.setCacheEventListener(this.snapshotReadPartitionsManager.cacheEventListener());
mapping.registerListener(snapshotReadPartitionsManager);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kafka.server.streamaspect

import com.automq.stream.s3.metrics.TimerUtil
import com.automq.stream.s3.network.{GlobalNetworkBandwidthLimiters, ThrottleStrategy}
import com.automq.stream.utils.Threads
import com.automq.stream.utils.threads.S3StreamThreadPoolMonitor
import com.yammer.metrics.core.Histogram
import kafka.automq.interceptor.{ClientIdMetadata, NoopTrafficInterceptor, ProduceRequestArgs, TrafficInterceptor}
Expand Down Expand Up @@ -38,10 +39,12 @@ import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.storage.internals.log.{FetchIsolation, FetchParams, FetchPartitionData}
import org.slf4j.LoggerFactory

import java.util
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.{ExecutorService, TimeUnit}
import java.util.concurrent.{CompletableFuture, ExecutorService, TimeUnit}
import java.util.function.Supplier
import java.util.stream.IntStream
import java.util.{Collections, Optional}
import scala.annotation.nowarn
Expand Down Expand Up @@ -84,7 +87,10 @@ class ElasticKafkaApis(
autoTopicCreationManager, brokerId, config, configRepository, metadataCache, metrics, authorizer, quotas,
fetchManager, brokerTopicStats, clusterId, time, tokenManager, apiVersionManager, clientMetricsManager) {

private val offsetForLeaderEpochExecutor: ExecutorService = Threads.newFixedFastThreadLocalThreadPoolWithMonitor(1, "kafka-apis-offset-for-leader-epoch-handle-executor", true, LoggerFactory.getLogger(ElasticKafkaApis.getClass))

private var trafficInterceptor: TrafficInterceptor = new NoopTrafficInterceptor(this, metadataCache)
private var snapshotAwaitReadySupplier: Supplier[CompletableFuture[Void]] = () => CompletableFuture.completedFuture(null)

/**
* Generate a map of topic -> [(partitionId, epochId)] based on provided topicsRequestData.
Expand Down Expand Up @@ -820,6 +826,14 @@ class ElasticKafkaApis(
listOffsetHandleExecutor.execute(() => super.handleListOffsetRequest(request))
}

override def handleOffsetForLeaderEpochRequest(request: RequestChannel.Request): Unit = {
offsetForLeaderEpochExecutor.execute(() => {
// Await new snapshots to be applied to avoid consumers finding the endOffset jumping back when the snapshot-read partition leader changes.
snapshotAwaitReadySupplier.get().join()
super.handleOffsetForLeaderEpochRequest(request)
})
}

override protected def metadataTopicsInterceptor(clientId: ClientIdMetadata, listenerName: String, topics: util.List[MetadataResponseData.MetadataResponseTopic]): util.List[MetadataResponseData.MetadataResponseTopic] = {
trafficInterceptor.handleMetadataResponse(clientId, topics)
}
Expand All @@ -838,4 +852,8 @@ class ElasticKafkaApis(
this.trafficInterceptor = trafficInterceptor
}

def setSnapshotAwaitReadyProvider(supplier: Supplier[CompletableFuture[Void]]): Unit = {
this.snapshotAwaitReadySupplier = supplier
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import kafka.automq.partition.snapshot.SnapshotOperation;
import kafka.automq.zerozone.SnapshotReadPartitionsManager.OperationBatch;
import kafka.automq.zerozone.SnapshotReadPartitionsManager.SnapshotWithOperation;
import kafka.automq.zerozone.SnapshotReadPartitionsManager.Subscriber;
import kafka.cluster.Partition;
import kafka.cluster.PartitionSnapshot;
Expand Down
Loading