Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
Comment thread
pjfanning marked this conversation as resolved.
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.dispatch

import java.util.concurrent.ThreadFactory

import org.apache.pekko
import pekko.testkit.PekkoSpec
import pekko.util.JavaVersion

import com.typesafe.config.{ Config, ConfigFactory }

object ForkJoinExecutorConfiguratorSpec {
// Keep the root config explicit so the dispatcher-config integration checks below
// see exactly the values this spec is asserting on (and not whatever the project's
// global test configuration happens to set).
val config: Config = ConfigFactory.parseString("""
|fj-auto-default-dispatcher {
| executor = "fork-join-executor"
| fork-join-executor {
| parallelism-min = 8
| parallelism-factor = 1.0
| parallelism-max = 64
| }
|}
|fj-auto-small-dispatcher {
| executor = "fork-join-executor"
| fork-join-executor {
| parallelism-min = 1
| parallelism-factor = 1.0
| parallelism-max = 1
| }
|}
|fj-explicit-zero-dispatcher {
| executor = "fork-join-executor"
| fork-join-executor {
| parallelism-min = 8
| parallelism-factor = 1.0
| parallelism-max = 64
| minimum-runnable = 0
| }
|}
|fj-explicit-seven-dispatcher {
| executor = "fork-join-executor"
| fork-join-executor {
| parallelism-min = 8
| parallelism-factor = 1.0
| parallelism-max = 64
| minimum-runnable = 7
| }
|}
""".stripMargin)
}

class ForkJoinExecutorConfiguratorSpec extends PekkoSpec(ForkJoinExecutorConfiguratorSpec.config) {

import ForkJoinExecutorConfigurator.resolveMinimumRunnable

"ForkJoinExecutorConfigurator.resolveMinimumRunnable" must {

"honour explicit zero (compensation disabled)" in {
resolveMinimumRunnable(configured = 0, parallelism = 16, jdkMajorVersion = 25) shouldBe 0
resolveMinimumRunnable(configured = 0, parallelism = 16, jdkMajorVersion = 17) shouldBe 0
}

"honour explicit positive overrides verbatim" in {
resolveMinimumRunnable(configured = 1, parallelism = 16, jdkMajorVersion = 25) shouldBe 1
resolveMinimumRunnable(configured = 7, parallelism = 16, jdkMajorVersion = 25) shouldBe 7
resolveMinimumRunnable(configured = 100, parallelism = 16, jdkMajorVersion = 25) shouldBe 100
}

"auto-resolve to 1 on JDK < 25 regardless of parallelism (preserves legacy behaviour)" in {
// JDK 21 keeps the legacy default per reviewer guidance: only JDK 25 nightlies
// showed the compensation-thread regression badly enough to warrant a default change.
resolveMinimumRunnable(configured = -1, parallelism = 1, jdkMajorVersion = 17) shouldBe 1
resolveMinimumRunnable(configured = -1, parallelism = 8, jdkMajorVersion = 17) shouldBe 1
resolveMinimumRunnable(configured = -1, parallelism = 64, jdkMajorVersion = 11) shouldBe 1
resolveMinimumRunnable(configured = -1, parallelism = 16, jdkMajorVersion = 21) shouldBe 1
}

"auto-resolve using parallelism / 2 on JDK 25+ with min cap 1 and max cap 8" in {
resolveMinimumRunnable(configured = -1, parallelism = 1, jdkMajorVersion = 25) shouldBe 1
resolveMinimumRunnable(configured = -1, parallelism = 2, jdkMajorVersion = 25) shouldBe 1
resolveMinimumRunnable(configured = -1, parallelism = 4, jdkMajorVersion = 25) shouldBe 2
resolveMinimumRunnable(configured = -1, parallelism = 8, jdkMajorVersion = 25) shouldBe 4
resolveMinimumRunnable(configured = -1, parallelism = 16, jdkMajorVersion = 25) shouldBe 8
resolveMinimumRunnable(configured = -1, parallelism = 64, jdkMajorVersion = 25) shouldBe 8
}

"produce a strictly higher value on JDK 25+ than on JDK 21 for plausible dispatcher sizes" in {
// Directional check: the auto policy must move the needle on the JDK line that needs it
// (and only on that line — JDK 21 stays at the legacy default per reviewer guidance).
for (parallelism <- Seq(4, 8, 16, 32, 64)) {
val legacy = resolveMinimumRunnable(configured = -1, parallelism, jdkMajorVersion = 21)
val modern = resolveMinimumRunnable(configured = -1, parallelism, jdkMajorVersion = 25)
withClue(s"parallelism=$parallelism legacy=$legacy modern=$modern: ") {
modern should be > legacy
}
}
}

"never exceed the documented max cap of 8" in {
for (parallelism <- 1 to 256; jdk <- Seq(21, 25, 30)) {
resolveMinimumRunnable(configured = -1, parallelism, jdk) should be <= 8
}
}
}

"ForkJoinExecutorConfigurator wiring" must {

// Build a factory from a real dispatcher config and return the resolved
// minimum-runnable. This proves the config value actually reaches the
// ForkJoinExecutorServiceFactory — guarding against the trivial regression
// of reverting resolveMinimumRunnable to a direct `config.getInt` read.
def resolvedMinimumRunnable(dispatcherId: String): Int = {
// `system.dispatchers.config(id)` resolves the dispatcher's full config with
// reference.conf defaults applied (so `virtualize`, `minimum-runnable`, etc.
// all have values).
val dispatcherConfig = system.dispatchers.config(dispatcherId)
val configurator = new ForkJoinExecutorConfigurator(
dispatcherConfig.getConfig("fork-join-executor"),
system.dispatchers.prerequisites)
val tf: ThreadFactory = system.dispatchers.prerequisites.threadFactory
val factory = configurator
.createExecutorServiceFactory(dispatcherId, tf)
.asInstanceOf[configurator.ForkJoinExecutorServiceFactory]
factory.minimumRunnable
}

"respect explicit minimum-runnable = 0" in {
resolvedMinimumRunnable("fj-explicit-zero-dispatcher") shouldBe 0
}

"respect explicit minimum-runnable = 7" in {
resolvedMinimumRunnable("fj-explicit-seven-dispatcher") shouldBe 7
}

"auto-scale the default (minimum-runnable not set) on JDK 25+" in {
if (JavaVersion.majorVersion < 25) pending

val resolved = resolvedMinimumRunnable("fj-auto-default-dispatcher")
// The dispatcher declares parallelism-min = 8 so effective parallelism is at
// least 8; auto = min(8, max(1, parallelism/2)) must be at least 4 and never
// exceed the documented cap of 8.
resolved should be >= 4
resolved should be <= 8
}

"keep the legacy value of 1 on JDK < 25 when the default is left untouched" in {
if (JavaVersion.majorVersion >= 25) pending

resolvedMinimumRunnable("fj-auto-default-dispatcher") shouldBe 1
}

"never drop below 1 even for parallelism = 1 dispatchers" in {
val resolved = resolvedMinimumRunnable("fj-auto-small-dispatcher")
// parallelism = 1 implies parallelism/2 = 0, which the min-cap must lift to 1.
// On JDK < 25 the legacy value of 1 is already the expected answer.
resolved shouldBe 1
}
}
}
16 changes: 10 additions & 6 deletions actor/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -490,12 +490,16 @@ pekko {
# When blocked worker count causes active threads to drop below this threshold, the pool
# may create a compensation thread to maintain progress.
#
# The default value of 1 matches the JDK ForkJoinPool behaviour prior to this setting being
# exposed. Higher values (e.g. parallelism/2) can reduce starvation risk for actor-heavy
# workloads on JDK 21+ where ForkJoinPool asyncMode (FIFO) compensation-thread creation
# became more conservative (see JDK-8300995 / JDK-8321335).
# Set to 0 to disable compensation entirely.
minimum-runnable = 1
# The special value -1 (default) selects a JDK-aware policy:
# * JDK 25+ : effective value = min(8, max(1, parallelism / 2))
# * JDK < 25: effective value = 1 (preserves the JDK behaviour prior to this setting)
# Auto-selection on JDK 25+ mitigates the asyncMode (FIFO) compensation-thread regression
# tracked in JDK-8300995 / JDK-8321335 that, in Pekko nightly tests, surfaces most clearly
# on the JDK 25 line and can cause actor-heavy workloads to starve.
#
# Set explicitly to 0 to disable compensation entirely. Set to any non-negative integer
# to override the auto-selection (e.g. 1 to restore the previous default behaviour).
minimum-runnable = -1

# This config is new in Pekko v1.2.0 and only has an effect if you are running with JDK 21 and above,
# When set to `on` but the underlying runtime does not support virtual threads, an Exception will be thrown.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,32 @@ import java.util.concurrent.{ ExecutorService, ForkJoinPool, ForkJoinTask, Threa

import com.typesafe.config.Config

import org.apache.pekko.annotation.InternalApi
import org.apache.pekko.util.JavaVersion

object ForkJoinExecutorConfigurator {

/**
* INTERNAL API
*
* Resolves the effective `minimum-runnable` value for a fork-join dispatcher.
*
* A negative value (default `-1` in reference.conf) selects the JDK-aware policy:
* on JDK 25+ the value is `min(8, max(1, parallelism / 2))` to mitigate the
* asyncMode (FIFO) compensation-thread regression tracked in
* JDK-8300995 / JDK-8321335 (the impact is most visible on the JDK 25 line in
* Pekko nightly tests); on older JDKs the value stays at `1` to preserve the
* pre-existing behaviour. Non-negative configured values are honoured verbatim,
* so `0` still disables compensation entirely.
*/
@InternalApi private[pekko] def resolveMinimumRunnable(
configured: Int,
parallelism: Int,
jdkMajorVersion: Int): Int =
if (configured >= 0) configured
else if (jdkMajorVersion >= 25) math.min(8, math.max(1, parallelism / 2))
else 1

/**
* INTERNAL PEKKO USAGE ONLY
*/
Expand Down Expand Up @@ -140,16 +164,21 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
""""task-peeking-mode" in "fork-join-executor" section could only set to "FIFO" or "LIFO".""")
}

val parallelism = ThreadPoolConfig.scaledPoolSize(
config.getInt("parallelism-min"),
config.getDouble("parallelism-factor"),
config.getInt("parallelism-max"))

new ForkJoinExecutorServiceFactory(
id,
validate(tf),
ThreadPoolConfig.scaledPoolSize(
config.getInt("parallelism-min"),
config.getDouble("parallelism-factor"),
config.getInt("parallelism-max")),
parallelism,
asyncMode,
config.getInt("maximum-pool-size"),
config.getInt("minimum-runnable")
ForkJoinExecutorConfigurator.resolveMinimumRunnable(
config.getInt("minimum-runnable"),
parallelism,
JavaVersion.majorVersion)
)
}
}
8 changes: 8 additions & 0 deletions docs/src/main/paradox/dispatchers.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ You can read more about parallelism in the JDK's [ForkJoinPool documentation](ht

When Running on Java 9+, you can use `maximum-pool-size` to set the upper bound on the total number of threads allocated by the ForkJoinPool.

When running on Java 25+, the `minimum-runnable` setting for the `fork-join-executor` defaults to a JDK-aware value
(`min(8, max(1, parallelism / 2))`) instead of the historical `1`. This raises the number of compensation threads the
pool will create when workers block, mitigating the asyncMode (FIFO) compensation regression tracked in
[JDK-8300995](https://bugs.openjdk.org/browse/JDK-8300995) / [JDK-8321335](https://bugs.openjdk.org/browse/JDK-8321335)
that, in Pekko nightly tests, surfaces most clearly on the JDK 25 line and can cause actor-heavy workloads to starve.
Set `minimum-runnable` explicitly (any non-negative integer) to opt out of the auto-selection — for example
`minimum-runnable = 1` restores the previous default, and `minimum-runnable = 0` disables compensation entirely.

**Experimental**: When Running on Java 21+, you can use `virtualize=on` to enable the virtual threads feature.
When using virtual threads, all virtual threads will use the same `unparker`, so you may want to
increase the number of `jdk.unparker.maxPoolSize`.
Expand Down
8 changes: 8 additions & 0 deletions docs/src/main/paradox/typed/dispatchers.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,14 @@ You can read more about parallelism in the JDK's [ForkJoinPool documentation](ht

When Running on Java 9+, you can use `maximum-pool-size` to set the upper bound on the total number of threads allocated by the ForkJoinPool.

When running on Java 25+, the `minimum-runnable` setting for the `fork-join-executor` defaults to a JDK-aware value
(`min(8, max(1, parallelism / 2))`) instead of the historical `1`. This raises the number of compensation threads the
pool will create when workers block, mitigating the asyncMode (FIFO) compensation regression tracked in
[JDK-8300995](https://bugs.openjdk.org/browse/JDK-8300995) / [JDK-8321335](https://bugs.openjdk.org/browse/JDK-8321335)
that, in Pekko nightly tests, surfaces most clearly on the JDK 25 line and can cause actor-heavy workloads to starve.
Set `minimum-runnable` explicitly (any non-negative integer) to opt out of the auto-selection — for example
`minimum-runnable = 1` restores the previous default, and `minimum-runnable = 0` disables compensation entirely.

**Experimental**: When Running on Java 21+, you can use `virtualize=on` to enable the virtual threads feature.
When using virtual threads, all virtual threads will use the same `unparker`, so you may want to
increase the number of `jdk.unparker.maxPoolSize`.
Expand Down