diff --git a/core/src/main/java/kafka/automq/zerozone/ZeroZoneTrafficInterceptor.java b/core/src/main/java/kafka/automq/zerozone/ZeroZoneTrafficInterceptor.java index 9185d263f6..694673f034 100644 --- a/core/src/main/java/kafka/automq/zerozone/ZeroZoneTrafficInterceptor.java +++ b/core/src/main/java/kafka/automq/zerozone/ZeroZoneTrafficInterceptor.java @@ -148,6 +148,7 @@ public ZeroZoneTrafficInterceptor( public void close() { if (closed.compareAndSet(false, true)) { committedEpochManager.close(); + snapshotReadPartitionsManager.close(); } } diff --git a/core/src/main/scala/kafka/server/streamaspect/ElasticKafkaApis.scala b/core/src/main/scala/kafka/server/streamaspect/ElasticKafkaApis.scala index c516bfd1cb..fb0712ce4f 100644 --- a/core/src/main/scala/kafka/server/streamaspect/ElasticKafkaApis.scala +++ b/core/src/main/scala/kafka/server/streamaspect/ElasticKafkaApis.scala @@ -827,9 +827,10 @@ class ElasticKafkaApis( } override def handleOffsetForLeaderEpochRequest(request: RequestChannel.Request): Unit = { + val cf = snapshotAwaitReadySupplier.get() offsetForLeaderEpochExecutor.execute(() => { // Await new snapshots to be applied to avoid consumers finding the endOffset jumping back when the snapshot-read partition leader changes. - snapshotAwaitReadySupplier.get().join() + cf.join() super.handleOffsetForLeaderEpochRequest(request) }) }