Skip to content

Conversation

@yx9o
Copy link
Contributor

@yx9o yx9o commented Nov 24, 2025

Background
Hi community, we encountered a problem during operation. When the system automatically restarted due to an anomaly, it encountered a blockage during shutdown and failed to restart successfully. The jstack logs showed that the blockage occurred at com.automq.opentelemetry.exporter.s3.S3MetricsExporter#flush.
jstack:

"main" #1 prio=5 os_prio=0 cpu=13296.86ms elapsed=41088.92s tid=0x00007f7bf002e160 nid=0x536 waiting for monitor entry  [0x00007f7bf817f000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at com.automq.shell.metrics.S3MetricsExporter.export(S3MetricsExporter.java:244)
	- waiting to lock <0x00001000079b1ed8> (a io.netty.buffer.UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf)
	at io.opentelemetry.sdk.metrics.export.PeriodicMetricReader$Scheduled.doRun(PeriodicMetricReader.java:167)
	at io.opentelemetry.sdk.metrics.export.PeriodicMetricReader.shutdown(PeriodicMetricReader.java:97)
	at io.opentelemetry.sdk.metrics.SdkMeterProvider.shutdown(SdkMeterProvider.java:159)
	at io.opentelemetry.sdk.OpenTelemetrySdk.shutdown(OpenTelemetrySdk.java:112)
	at io.opentelemetry.sdk.OpenTelemetrySdk.close(OpenTelemetrySdk.java:119)
	at kafka.log.stream.s3.telemetry.TelemetryManager.shutdown(TelemetryManager.java:254)
	at kafka.server.SharedServer.$anonfun$stop$12(SharedServer.scala:415)
	at kafka.server.SharedServer$$Lambda$3293/0x00007f793892a800.apply$mcV$sp(Unknown Source)
	at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:68)
	at kafka.server.SharedServer.stop(SharedServer.scala:415)
	- locked <0x00001000004367b0> (a kafka.server.SharedServer)
	at kafka.server.SharedServer.stopForBroker(SharedServer.scala:180)
	- locked <0x00001000004367b0> (a kafka.server.SharedServer)
	at kafka.server.BrokerServer.shutdown(BrokerServer.scala:808)
	at kafka.server.KafkaBroker.shutdown(KafkaBroker.scala:99)
	at kafka.server.KafkaBroker.shutdown$(KafkaBroker.scala:99)
	at kafka.server.BrokerServer.shutdown(BrokerServer.scala:77)
	at kafka.server.BrokerServer.startup(BrokerServer.scala:624)
	at kafka.server.KafkaRaftServer.$anonfun$startup$2(KafkaRaftServer.scala:88)
	at kafka.server.KafkaRaftServer.$anonfun$startup$2$adapted(KafkaRaftServer.scala:88)
	at kafka.server.KafkaRaftServer$$Lambda$567/0x00007f7938388678.apply(Unknown Source)
	at scala.Option.foreach(Option.scala:437)
	at kafka.server.KafkaRaftServer.startup(KafkaRaftServer.scala:88)
	at kafka.Kafka$.main(Kafka.scala:153)
	at kafka.Kafka.main(Kafka.scala)

"s3-metrics-exporter-upload-thread" #39 daemon prio=5 os_prio=0 cpu=15.69ms elapsed=41082.92s tid=0x00007f7bf220c820 nid=0x56b waiting on condition  [0x00007f7934efb000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@17.0.8/Native Method)
	- parking to wait for  <0x00001000de20bf60> (a java.util.concurrent.CompletableFuture$Signaller)
	at java.util.concurrent.locks.LockSupport.park(java.base@17.0.8/LockSupport.java:211)
	at java.util.concurrent.CompletableFuture$Signaller.block(java.base@17.0.8/CompletableFuture.java:1864)
	at java.util.concurrent.ForkJoinPool.unmanagedBlock(java.base@17.0.8/ForkJoinPool.java:3465)
	at java.util.concurrent.ForkJoinPool.managedBlock(java.base@17.0.8/ForkJoinPool.java:3436)
	at java.util.concurrent.CompletableFuture.waitingGet(java.base@17.0.8/CompletableFuture.java:1898)
	at java.util.concurrent.CompletableFuture.get(java.base@17.0.8/CompletableFuture.java:2072)
	at com.automq.shell.metrics.S3MetricsExporter.flush(S3MetricsExporter.java:263)
	- locked <0x00001000079b1ed8> (a io.netty.buffer.UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf)
	at com.automq.shell.metrics.S3MetricsExporter$UploadTask.run(S3MetricsExporter.java:126)
	at java.lang.Thread.run(java.base@17.0.8/Thread.java:833)

Root Cause
flush() execution holding the lock:

  1. Synchronous I/O: objectStorage.write(...).get()
  2. Causes the lock to be held for a long time, blocking other threads (export / shutdown / metrics reader) attempting to enter the same uploadBuffer lock.
  3. The upload thread itself is waiting for I/O to complete (Future.get), creating a standstill with the main thread.

Solution

  • Move time-consuming operations out of the critical section:
    Inside the lock, only do:
    readable = uploadBuffer.readableBytes()
    slice = uploadBuffer.readRetainedSlice(readable)
    uploadBuffer.clear()
    Outside the lock, do:
    compressed = Utils.compress(slice)
    objectStorage.write(...).whenComplete(...) (Asynchronous completion)

  • In the callback:
    On success, update lastUploadTimestamp and nextUploadInterval, and call result.succeed()
    On failure, log the error and call result.fail()
    Regardless of success or failure, release finalCompressed and the slice to avoid memory leaks.

@superhx
Copy link
Collaborator

superhx commented Nov 25, 2025

@yx9o Are there any other error logs when the shutdown gets stuck? Currently, based on the provided stack, there is no deadlock, and theoretically, the shutdown should proceed normally.

@yx9o
Copy link
Contributor Author

yx9o commented Nov 25, 2025

企业微信截图_57cb96d0-08cd-459c-bd36-5ddd4cc41814 The logs at the time indicated that S3-related exceptions were encountered, and the logs were all related to shutdown. The manifestation was that the container was always unhealthy, port 9092 was not being listened to, but the process was still running.

@superhx
Copy link
Collaborator

superhx commented Nov 25, 2025

@yx9o It seems another node takes the failover of this node-60101 when the node-60101 is recovering.

There is a PR to fix the conflict.

@yx9o
Copy link
Contributor Author

yx9o commented Nov 25, 2025

Okay, I will update it and verify. Thank you.

@superhx
Copy link
Collaborator

superhx commented Nov 25, 2025

Okay, I will update it and verify. Thank you.

@yx9o Typically, if a process shuts down gracefully, it will not trigger the failover process. You can check your shutdown script.

@yx9o
Copy link
Contributor Author

yx9o commented Nov 25, 2025

image When we encounter a `WALFencedException` during the append process (not an OOM exception), the process will exit, and the container will automatically restart. Does this trigger a failover, and why does the process exit?

@superhx
Copy link
Collaborator

superhx commented Nov 26, 2025

When we encounter a WALFencedException during the append process (not an OOM exception), the process will exit, and the container will automatically restart. Does this trigger a failover, and why does the process exit?

@yx9o When the controller detects that the broker's heartbeat session has timed out, it will trigger the failover logic. When a node is failovered, the node becomes unwritable and will exit automatically.

@yx9o
Copy link
Contributor Author

yx9o commented Nov 26, 2025

image image

Indeed, we can see here that load information has not been reported. The exception log only shows WALFencedException, indicating that the broker is not working properly, and there is also an exception in the partition status.

@yx9o yx9o closed this Nov 26, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants