From 9631107661914b19fcd75037fbe00a1a59ce49de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E=E9=B8=A3?= Date: Sat, 25 Apr 2026 22:40:25 +0800 Subject: [PATCH] fix: stabilise JDK 25 nightly flakes --- .github/workflows/nightly-builds.yml | 23 ++++++++++++++- CONTRIBUTING.md | 28 +++++++++++++++++++ docs/src/main/paradox/dispatchers.md | 10 +++++-- docs/src/main/paradox/typed/dispatchers.md | 14 ++++++++-- ...urableStateBehaviorStashOverflowSpec.scala | 2 +- .../src/test/resources/application.conf | 3 +- .../scaladsl/BoundedSourceQueueSpec.scala | 12 ++++---- 7 files changed, 78 insertions(+), 14 deletions(-) diff --git a/.github/workflows/nightly-builds.yml b/.github/workflows/nightly-builds.yml index 5d4ff1bbe6d..8ac060072e3 100644 --- a/.github/workflows/nightly-builds.yml +++ b/.github/workflows/nightly-builds.yml @@ -150,15 +150,36 @@ jobs: # note that this is not running any multi-jvm tests because multi-in-test=false # JDK 21+ ForkJoinPool scheduling changes need a higher timefactor and a larger # minimum-runnable to avoid compensation-thread starvation (see #2573, #2870). + # Run the internal fork-join dispatchers virtualized in nightly builds so + # blocking work can unmount from carrier threads. JDK 25 is more sensitive + # on the GitHub runners and needs the capped minimum-runnable value. run: |- EXTRA_JVM_OPTS="" + VIRTUAL_THREAD_JVM_OPTS="" + MINIMUM_RUNNABLE=4 if [ "${{ matrix.javaVersion }}" -ge 25 ]; then TIMEFACTOR=4 + MINIMUM_RUNNABLE=8 else TIMEFACTOR=2 fi if [ "${{ matrix.javaVersion }}" -ge 21 ]; then - EXTRA_JVM_OPTS="-Dpekko.actor.default-dispatcher.fork-join-executor.minimum-runnable=4 -Dpekko.remote.default-remote-dispatcher.fork-join-executor.minimum-runnable=4" + VIRTUAL_THREAD_JVM_OPTS="--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED" + export JDK_JAVA_OPTIONS="${JDK_JAVA_OPTIONS:-} $VIRTUAL_THREAD_JVM_OPTS" + EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS -Dpekko.actor.default-dispatcher.fork-join-executor.minimum-runnable=$MINIMUM_RUNNABLE" + EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS -Dpekko.actor.internal-dispatcher.fork-join-executor.minimum-runnable=$MINIMUM_RUNNABLE" + EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS -Dpekko.remote.default-remote-dispatcher.fork-join-executor.minimum-runnable=$MINIMUM_RUNNABLE" + EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS -Dpekko.remote.classic.backoff-remote-dispatcher.fork-join-executor.minimum-runnable=$MINIMUM_RUNNABLE" + EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS -Dpekko.persistence.dispatchers.default-replay-dispatcher.fork-join-executor.minimum-runnable=$MINIMUM_RUNNABLE" + EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS -Dpekko.persistence.dispatchers.default-stream-dispatcher.fork-join-executor.minimum-runnable=$MINIMUM_RUNNABLE" + EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS -Dpekko.test.stream-dispatcher.fork-join-executor.minimum-runnable=$MINIMUM_RUNNABLE" + EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS -Dpekko.actor.default-dispatcher.fork-join-executor.virtualize=on" + EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS -Dpekko.actor.internal-dispatcher.fork-join-executor.virtualize=on" + EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS -Dpekko.remote.default-remote-dispatcher.fork-join-executor.virtualize=on" + EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS -Dpekko.remote.classic.backoff-remote-dispatcher.fork-join-executor.virtualize=on" + EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS -Dpekko.persistence.dispatchers.default-replay-dispatcher.fork-join-executor.virtualize=on" + EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS -Dpekko.persistence.dispatchers.default-stream-dispatcher.fork-join-executor.virtualize=on" + EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS -Dpekko.test.stream-dispatcher.fork-join-executor.virtualize=on" fi sbt \ -Dpekko.cluster.assert=on \ diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 2c84774d180..0521bb95fc9 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -200,6 +200,34 @@ Pekko, like most Scala projects, compiles faster with the Graal JIT enabled. The * Use a JDK > 10 * Use the following JVM options for SBT e.g. by adding them to the `SBT_OPTS` environment variable: `-XX:+UnlockExperimentalVMOptions -XX:+EnableJVMCI -XX:+UseJVMCICompiler` +### JDK 21+ Nightly Virtual Threads + +The JDK nightly build enables virtual threads for selected dispatchers on JDK 21 and newer with system properties in +`.github/workflows/nightly-builds.yml`. This is a CI-only opt-in; local runs, PR validation, and the reference +configuration keep `virtualize = off` unless explicitly overridden. +For fork-join dispatchers, `virtualize = on` uses each dispatcher's own fork-join pool as the virtual-thread scheduler, +preserving dispatcher isolation rather than routing the selected dispatchers through the JVM-wide default virtual-thread +scheduler. + +The nightly override covers: + +* `pekko.actor.default-dispatcher.fork-join-executor.virtualize=on` +* `pekko.actor.internal-dispatcher.fork-join-executor.virtualize=on` +* `pekko.remote.default-remote-dispatcher.fork-join-executor.virtualize=on` +* `pekko.remote.classic.backoff-remote-dispatcher.fork-join-executor.virtualize=on` +* `pekko.persistence.dispatchers.default-replay-dispatcher.fork-join-executor.virtualize=on` +* `pekko.persistence.dispatchers.default-stream-dispatcher.fork-join-executor.virtualize=on` +* `pekko.test.stream-dispatcher.fork-join-executor.virtualize=on` + +When reproducing this workflow locally on JDK 21+, use `cp .jvmopts-ci .jvmopts` for the CI launcher settings and add +the same virtual-thread `--add-opens` options that the nightly workflow exports through `JDK_JAVA_OPTIONS`: + +* `--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED` +* `--add-opens=java.base/java.lang=ALL-UNNAMED` + +The nightly build also keeps the JDK 21+ `minimum-runnable` override because virtualized fork-join dispatchers still use +carrier threads. + ### The `validatePullRequest` task The Pekko build includes a special task called `validatePullRequest`, which investigates the changes made as well as dirty diff --git a/docs/src/main/paradox/dispatchers.md b/docs/src/main/paradox/dispatchers.md index 27fbd428038..ba28c97da1f 100644 --- a/docs/src/main/paradox/dispatchers.md +++ b/docs/src/main/paradox/dispatchers.md @@ -53,8 +53,13 @@ Set `minimum-runnable` explicitly (any non-negative integer) to opt out of the a `minimum-runnable = 1` restores the previous default, and `minimum-runnable = 0` disables compensation entirely. **Experimental**: When Running on Java 21+, you can use `virtualize=on` to enable the virtual threads feature. -When using virtual threads, all virtual threads will use the same `unparker`, so you may want to -increase the number of `jdk.unparker.maxPoolSize`. +When `virtualize=on` is used with a dispatcher executor such as `fork-join-executor` or `thread-pool-executor`, Pekko +uses that dispatcher's own executor as the virtual-thread scheduler. This preserves dispatcher isolation instead of +routing all virtual threads through the JVM-wide default virtual-thread scheduler. The JVM still uses a shared +`unparker`, so you may want to increase the number of `jdk.unparker.maxPoolSize`. +The setting can also be enabled with a system property such as +`-Dpekko.actor.default-dispatcher.fork-join-executor.virtualize=on`. Pekko's reference configuration keeps it `off` by +default; use an explicit application or test configuration when opting in. #### Requirements @@ -107,4 +112,3 @@ So in this example it's a top-level section, but you could for instance put it a where you'd use periods to denote sub-sections, like this: `"foo.bar.my-dispatcher"` @@@ - diff --git a/docs/src/main/paradox/typed/dispatchers.md b/docs/src/main/paradox/typed/dispatchers.md index abbb6945864..dc547e4ec09 100644 --- a/docs/src/main/paradox/typed/dispatchers.md +++ b/docs/src/main/paradox/typed/dispatchers.md @@ -138,8 +138,13 @@ Set `minimum-runnable` explicitly (any non-negative integer) to opt out of the a `minimum-runnable = 1` restores the previous default, and `minimum-runnable = 0` disables compensation entirely. **Experimental**: When Running on Java 21+, you can use `virtualize=on` to enable the virtual threads feature. -When using virtual threads, all virtual threads will use the same `unparker`, so you may want to -increase the number of `jdk.unparker.maxPoolSize`. +When `virtualize=on` is used with a dispatcher executor such as `fork-join-executor` or `thread-pool-executor`, Pekko +uses that dispatcher's own executor as the virtual-thread scheduler. This preserves dispatcher isolation instead of +routing all virtual threads through the JVM-wide default virtual-thread scheduler. The JVM still uses a shared +`unparker`, so you may want to increase the number of `jdk.unparker.maxPoolSize`. +The setting can also be enabled with a system property such as +`-Dpekko.actor.default-dispatcher.fork-join-executor.virtualize=on`. Pekko's reference configuration keeps it `off` by +default; use an explicit application or test configuration when opting in. #### Requirements @@ -387,7 +392,10 @@ Configuring a dispatcher with virtual threads, requires Java 21 or above: @@snip [DispatcherDocSpec.scala](/docs/src/test/scala/docs/dispatcher/DispatcherDocSpec.scala) { #virtual-thread-dispatcher-config } With this, an actor will run in a virtual thread, so you may want to configure it further with : -`jdk.virtualThreadScheduler.parallelism` ,`jdk.virtualThreadScheduler.maxPoolSize` and `jdk.unparker.maxPoolSize`. +dispatcher pool settings such as `parallelism-min` and `parallelism-max`, and with `jdk.unparker.maxPoolSize` if the +shared unparker becomes a bottleneck. +You can enable the same setting from the command line with a system property such as +`-Dpekko.actor.default-dispatcher.fork-join-executor.virtualize=on`. ### Fixed pool size diff --git a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorStashOverflowSpec.scala b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorStashOverflowSpec.scala index dfbf1ed3e7e..e710612d590 100644 --- a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorStashOverflowSpec.scala +++ b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorStashOverflowSpec.scala @@ -129,7 +129,7 @@ class DurableStateBehaviorStashOverflowSpec journal.completeUpsertFuture() // exactly how many is racy but at least the first stash buffer full should complete - probe.receiveMessages(stashCapacity) + probe.receiveMessages(stashCapacity, 30.seconds) } } } diff --git a/stream-tests/src/test/resources/application.conf b/stream-tests/src/test/resources/application.conf index 98f92a5d505..c5f24920a53 100644 --- a/stream-tests/src/test/resources/application.conf +++ b/stream-tests/src/test/resources/application.conf @@ -1,7 +1,8 @@ # SPDX-License-Identifier: Apache-2.0 # Test-only: Virtual thread configuration. -# Default: off for local development. Nightly CI sets PEKKO_VIRTUALIZE_DISPATCHER=on for JDK 21+. +# Default: off for local development. Nightly CI enables this with explicit system properties for JDK 21+. +# This environment variable remains available for module-local experiments. pekko.test.stream-dispatcher.fork-join-executor { # Default to off, then allow env var to override when set. diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/BoundedSourceQueueSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/BoundedSourceQueueSpec.scala index 1c25fe16977..cb858c897ee 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/BoundedSourceQueueSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/BoundedSourceQueueSpec.scala @@ -22,6 +22,7 @@ import org.apache.pekko import pekko.stream.QueueOfferResult import pekko.stream.testkit.{ StreamSpec, TestSubscriber } import pekko.stream.testkit.scaladsl.TestSink +import pekko.testkit.TestDuration import pekko.testkit.WithLogCapturing class BoundedSourceQueueSpec extends StreamSpec("""pekko.loglevel = debug @@ -143,9 +144,9 @@ class BoundedSourceQueueSpec extends StreamSpec("""pekko.loglevel = debug class QueueingThread extends Thread { override def run(): Unit = { - var numElemsEnqueued = 0 - var numElemsDropped = 0 - def runLoop(): Unit = { + def runLoop(): (Int, Int) = { + var numElemsEnqueued = 0 + var numElemsDropped = 0 val r = ThreadLocalRandom.current() var done = false @@ -164,11 +165,12 @@ class BoundedSourceQueueSpec extends StreamSpec("""pekko.loglevel = debug done = true } else if (i % 100 == 0) Thread.sleep(1) // probabilistic producer throttling delay } + (numElemsEnqueued, numElemsDropped) } startBarrier.countDown() startBarrier.await() // wait for all threads being in this state before starting race - runLoop() + val (numElemsEnqueued, numElemsDropped) = runLoop() stopBarrier.countDown() log.debug( f"Thread $getName%-20s enqueued: $numElemsEnqueued%7d dropped: $numElemsDropped%7d before completion") @@ -204,7 +206,7 @@ class BoundedSourceQueueSpec extends StreamSpec("""pekko.loglevel = debug if (sendQueue.offer(round * 1000 + n) != QueueOfferResult.Enqueued) fail(s"offer failed at round $round message $n") } - downstream.expectNext((1 to burstSize).map(_ + round * 1000).toList) + downstream.expectNext(30.seconds.dilated, (1 to burstSize).map(_ + round * 1000).toList) downstream.request(1) }