Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion .github/workflows/nightly-builds.yml
Original file line number Diff line number Diff line change
Expand Up @@ -150,15 +150,36 @@ jobs:
# note that this is not running any multi-jvm tests because multi-in-test=false
# JDK 21+ ForkJoinPool scheduling changes need a higher timefactor and a larger
# minimum-runnable to avoid compensation-thread starvation (see #2573, #2870).
# Run the internal fork-join dispatchers virtualized in nightly builds so
# blocking work can unmount from carrier threads. JDK 25 is more sensitive
# on the GitHub runners and needs the capped minimum-runnable value.
run: |-
EXTRA_JVM_OPTS=""
VIRTUAL_THREAD_JVM_OPTS=""
MINIMUM_RUNNABLE=4
if [ "${{ matrix.javaVersion }}" -ge 25 ]; then
TIMEFACTOR=4
MINIMUM_RUNNABLE=8
else
TIMEFACTOR=2
fi
if [ "${{ matrix.javaVersion }}" -ge 21 ]; then
EXTRA_JVM_OPTS="-Dpekko.actor.default-dispatcher.fork-join-executor.minimum-runnable=4 -Dpekko.remote.default-remote-dispatcher.fork-join-executor.minimum-runnable=4"
VIRTUAL_THREAD_JVM_OPTS="--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED"
export JDK_JAVA_OPTIONS="${JDK_JAVA_OPTIONS:-} $VIRTUAL_THREAD_JVM_OPTS"
EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS -Dpekko.actor.default-dispatcher.fork-join-executor.minimum-runnable=$MINIMUM_RUNNABLE"
EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS -Dpekko.actor.internal-dispatcher.fork-join-executor.minimum-runnable=$MINIMUM_RUNNABLE"
EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS -Dpekko.remote.default-remote-dispatcher.fork-join-executor.minimum-runnable=$MINIMUM_RUNNABLE"
EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS -Dpekko.remote.classic.backoff-remote-dispatcher.fork-join-executor.minimum-runnable=$MINIMUM_RUNNABLE"
EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS -Dpekko.persistence.dispatchers.default-replay-dispatcher.fork-join-executor.minimum-runnable=$MINIMUM_RUNNABLE"
EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS -Dpekko.persistence.dispatchers.default-stream-dispatcher.fork-join-executor.minimum-runnable=$MINIMUM_RUNNABLE"
EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS -Dpekko.test.stream-dispatcher.fork-join-executor.minimum-runnable=$MINIMUM_RUNNABLE"
EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS -Dpekko.actor.default-dispatcher.fork-join-executor.virtualize=on"
EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS -Dpekko.actor.internal-dispatcher.fork-join-executor.virtualize=on"
EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS -Dpekko.remote.default-remote-dispatcher.fork-join-executor.virtualize=on"
EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS -Dpekko.remote.classic.backoff-remote-dispatcher.fork-join-executor.virtualize=on"
EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS -Dpekko.persistence.dispatchers.default-replay-dispatcher.fork-join-executor.virtualize=on"
EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS -Dpekko.persistence.dispatchers.default-stream-dispatcher.fork-join-executor.virtualize=on"
EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS -Dpekko.test.stream-dispatcher.fork-join-executor.virtualize=on"
fi
sbt \
-Dpekko.cluster.assert=on \
Expand Down
28 changes: 28 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,34 @@ Pekko, like most Scala projects, compiles faster with the Graal JIT enabled. The
* Use a JDK > 10
* Use the following JVM options for SBT e.g. by adding them to the `SBT_OPTS` environment variable: `-XX:+UnlockExperimentalVMOptions -XX:+EnableJVMCI -XX:+UseJVMCICompiler`

### JDK 21+ Nightly Virtual Threads

The JDK nightly build enables virtual threads for selected dispatchers on JDK 21 and newer with system properties in
`.github/workflows/nightly-builds.yml`. This is a CI-only opt-in; local runs, PR validation, and the reference
configuration keep `virtualize = off` unless explicitly overridden.
For fork-join dispatchers, `virtualize = on` uses each dispatcher's own fork-join pool as the virtual-thread scheduler,
preserving dispatcher isolation rather than routing the selected dispatchers through the JVM-wide default virtual-thread
scheduler.

The nightly override covers:

* `pekko.actor.default-dispatcher.fork-join-executor.virtualize=on`
* `pekko.actor.internal-dispatcher.fork-join-executor.virtualize=on`
* `pekko.remote.default-remote-dispatcher.fork-join-executor.virtualize=on`
* `pekko.remote.classic.backoff-remote-dispatcher.fork-join-executor.virtualize=on`
* `pekko.persistence.dispatchers.default-replay-dispatcher.fork-join-executor.virtualize=on`
* `pekko.persistence.dispatchers.default-stream-dispatcher.fork-join-executor.virtualize=on`
* `pekko.test.stream-dispatcher.fork-join-executor.virtualize=on`

When reproducing this workflow locally on JDK 21+, use `cp .jvmopts-ci .jvmopts` for the CI launcher settings and add
the same virtual-thread `--add-opens` options that the nightly workflow exports through `JDK_JAVA_OPTIONS`:

* `--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED`
* `--add-opens=java.base/java.lang=ALL-UNNAMED`

The nightly build also keeps the JDK 21+ `minimum-runnable` override because virtualized fork-join dispatchers still use
carrier threads.

### The `validatePullRequest` task

The Pekko build includes a special task called `validatePullRequest`, which investigates the changes made as well as dirty
Expand Down
10 changes: 7 additions & 3 deletions docs/src/main/paradox/dispatchers.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,13 @@ Set `minimum-runnable` explicitly (any non-negative integer) to opt out of the a
`minimum-runnable = 1` restores the previous default, and `minimum-runnable = 0` disables compensation entirely.

**Experimental**: When Running on Java 21+, you can use `virtualize=on` to enable the virtual threads feature.
When using virtual threads, all virtual threads will use the same `unparker`, so you may want to
increase the number of `jdk.unparker.maxPoolSize`.
When `virtualize=on` is used with a dispatcher executor such as `fork-join-executor` or `thread-pool-executor`, Pekko
uses that dispatcher's own executor as the virtual-thread scheduler. This preserves dispatcher isolation instead of
routing all virtual threads through the JVM-wide default virtual-thread scheduler. The JVM still uses a shared
`unparker`, so you may want to increase the number of `jdk.unparker.maxPoolSize`.
The setting can also be enabled with a system property such as
`-Dpekko.actor.default-dispatcher.fork-join-executor.virtualize=on`. Pekko's reference configuration keeps it `off` by
default; use an explicit application or test configuration when opting in.

#### Requirements

Expand Down Expand Up @@ -107,4 +112,3 @@ So in this example it's a top-level section, but you could for instance put it a
where you'd use periods to denote sub-sections, like this: `"foo.bar.my-dispatcher"`

@@@

14 changes: 11 additions & 3 deletions docs/src/main/paradox/typed/dispatchers.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,13 @@ Set `minimum-runnable` explicitly (any non-negative integer) to opt out of the a
`minimum-runnable = 1` restores the previous default, and `minimum-runnable = 0` disables compensation entirely.

**Experimental**: When Running on Java 21+, you can use `virtualize=on` to enable the virtual threads feature.
When using virtual threads, all virtual threads will use the same `unparker`, so you may want to
increase the number of `jdk.unparker.maxPoolSize`.
When `virtualize=on` is used with a dispatcher executor such as `fork-join-executor` or `thread-pool-executor`, Pekko
uses that dispatcher's own executor as the virtual-thread scheduler. This preserves dispatcher isolation instead of
routing all virtual threads through the JVM-wide default virtual-thread scheduler. The JVM still uses a shared
`unparker`, so you may want to increase the number of `jdk.unparker.maxPoolSize`.
The setting can also be enabled with a system property such as
`-Dpekko.actor.default-dispatcher.fork-join-executor.virtualize=on`. Pekko's reference configuration keeps it `off` by
default; use an explicit application or test configuration when opting in.

#### Requirements

Expand Down Expand Up @@ -387,7 +392,10 @@ Configuring a dispatcher with virtual threads, requires Java 21 or above:
@@snip [DispatcherDocSpec.scala](/docs/src/test/scala/docs/dispatcher/DispatcherDocSpec.scala) { #virtual-thread-dispatcher-config }

With this, an actor will run in a virtual thread, so you may want to configure it further with :
`jdk.virtualThreadScheduler.parallelism` ,`jdk.virtualThreadScheduler.maxPoolSize` and `jdk.unparker.maxPoolSize`.
dispatcher pool settings such as `parallelism-min` and `parallelism-max`, and with `jdk.unparker.maxPoolSize` if the
shared unparker becomes a bottleneck.
You can enable the same setting from the command line with a system property such as
`-Dpekko.actor.default-dispatcher.fork-join-executor.virtualize=on`.

### Fixed pool size

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class DurableStateBehaviorStashOverflowSpec
journal.completeUpsertFuture()

// exactly how many is racy but at least the first stash buffer full should complete
probe.receiveMessages(stashCapacity)
probe.receiveMessages(stashCapacity, 30.seconds)
}
}
}
3 changes: 2 additions & 1 deletion stream-tests/src/test/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# SPDX-License-Identifier: Apache-2.0

# Test-only: Virtual thread configuration.
# Default: off for local development. Nightly CI sets PEKKO_VIRTUALIZE_DISPATCHER=on for JDK 21+.
# Default: off for local development. Nightly CI enables this with explicit system properties for JDK 21+.
# This environment variable remains available for module-local experiments.

pekko.test.stream-dispatcher.fork-join-executor {
# Default to off, then allow env var to override when set.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.pekko
import pekko.stream.QueueOfferResult
import pekko.stream.testkit.{ StreamSpec, TestSubscriber }
import pekko.stream.testkit.scaladsl.TestSink
import pekko.testkit.TestDuration
import pekko.testkit.WithLogCapturing

class BoundedSourceQueueSpec extends StreamSpec("""pekko.loglevel = debug
Expand Down Expand Up @@ -143,9 +144,9 @@ class BoundedSourceQueueSpec extends StreamSpec("""pekko.loglevel = debug

class QueueingThread extends Thread {
override def run(): Unit = {
var numElemsEnqueued = 0
var numElemsDropped = 0
def runLoop(): Unit = {
def runLoop(): (Int, Int) = {
var numElemsEnqueued = 0
var numElemsDropped = 0
val r = ThreadLocalRandom.current()

var done = false
Expand All @@ -164,11 +165,12 @@ class BoundedSourceQueueSpec extends StreamSpec("""pekko.loglevel = debug
done = true
} else if (i % 100 == 0) Thread.sleep(1) // probabilistic producer throttling delay
}
(numElemsEnqueued, numElemsDropped)
}

startBarrier.countDown()
startBarrier.await() // wait for all threads being in this state before starting race
runLoop()
val (numElemsEnqueued, numElemsDropped) = runLoop()
stopBarrier.countDown()
log.debug(
f"Thread $getName%-20s enqueued: $numElemsEnqueued%7d dropped: $numElemsDropped%7d before completion")
Expand Down Expand Up @@ -204,7 +206,7 @@ class BoundedSourceQueueSpec extends StreamSpec("""pekko.loglevel = debug
if (sendQueue.offer(round * 1000 + n) != QueueOfferResult.Enqueued)
fail(s"offer failed at round $round message $n")
}
downstream.expectNext((1 to burstSize).map(_ + round * 1000).toList)
downstream.expectNext(30.seconds.dilated, (1 to burstSize).map(_ + round * 1000).toList)
downstream.request(1)
}

Expand Down