diff --git a/actor-tests/src/test/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfiguratorSpec.scala b/actor-tests/src/test/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfiguratorSpec.scala new file mode 100644 index 00000000000..5bf7c122559 --- /dev/null +++ b/actor-tests/src/test/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfiguratorSpec.scala @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.dispatch + +import java.util.concurrent.ThreadFactory + +import org.apache.pekko +import pekko.testkit.PekkoSpec +import pekko.util.JavaVersion + +import com.typesafe.config.{ Config, ConfigFactory } + +object ForkJoinExecutorConfiguratorSpec { + // Keep the root config explicit so the dispatcher-config integration checks below + // see exactly the values this spec is asserting on (and not whatever the project's + // global test configuration happens to set). + val config: Config = ConfigFactory.parseString(""" + |fj-auto-default-dispatcher { + | executor = "fork-join-executor" + | fork-join-executor { + | parallelism-min = 8 + | parallelism-factor = 1.0 + | parallelism-max = 64 + | } + |} + |fj-auto-small-dispatcher { + | executor = "fork-join-executor" + | fork-join-executor { + | parallelism-min = 1 + | parallelism-factor = 1.0 + | parallelism-max = 1 + | } + |} + |fj-explicit-zero-dispatcher { + | executor = "fork-join-executor" + | fork-join-executor { + | parallelism-min = 8 + | parallelism-factor = 1.0 + | parallelism-max = 64 + | minimum-runnable = 0 + | } + |} + |fj-explicit-seven-dispatcher { + | executor = "fork-join-executor" + | fork-join-executor { + | parallelism-min = 8 + | parallelism-factor = 1.0 + | parallelism-max = 64 + | minimum-runnable = 7 + | } + |} + """.stripMargin) +} + +class ForkJoinExecutorConfiguratorSpec extends PekkoSpec(ForkJoinExecutorConfiguratorSpec.config) { + + import ForkJoinExecutorConfigurator.resolveMinimumRunnable + + "ForkJoinExecutorConfigurator.resolveMinimumRunnable" must { + + "honour explicit zero (compensation disabled)" in { + resolveMinimumRunnable(configured = 0, parallelism = 16, jdkMajorVersion = 25) shouldBe 0 + resolveMinimumRunnable(configured = 0, parallelism = 16, jdkMajorVersion = 17) shouldBe 0 + } + + "honour explicit positive overrides verbatim" in { + resolveMinimumRunnable(configured = 1, parallelism = 16, jdkMajorVersion = 25) shouldBe 1 + resolveMinimumRunnable(configured = 7, parallelism = 16, jdkMajorVersion = 25) shouldBe 7 + resolveMinimumRunnable(configured = 100, parallelism = 16, jdkMajorVersion = 25) shouldBe 100 + } + + "auto-resolve to 1 on JDK < 25 regardless of parallelism (preserves legacy behaviour)" in { + // JDK 21 keeps the legacy default per reviewer guidance: only JDK 25 nightlies + // showed the compensation-thread regression badly enough to warrant a default change. + resolveMinimumRunnable(configured = -1, parallelism = 1, jdkMajorVersion = 17) shouldBe 1 + resolveMinimumRunnable(configured = -1, parallelism = 8, jdkMajorVersion = 17) shouldBe 1 + resolveMinimumRunnable(configured = -1, parallelism = 64, jdkMajorVersion = 11) shouldBe 1 + resolveMinimumRunnable(configured = -1, parallelism = 16, jdkMajorVersion = 21) shouldBe 1 + } + + "auto-resolve using parallelism / 2 on JDK 25+ with min cap 1 and max cap 8" in { + resolveMinimumRunnable(configured = -1, parallelism = 1, jdkMajorVersion = 25) shouldBe 1 + resolveMinimumRunnable(configured = -1, parallelism = 2, jdkMajorVersion = 25) shouldBe 1 + resolveMinimumRunnable(configured = -1, parallelism = 4, jdkMajorVersion = 25) shouldBe 2 + resolveMinimumRunnable(configured = -1, parallelism = 8, jdkMajorVersion = 25) shouldBe 4 + resolveMinimumRunnable(configured = -1, parallelism = 16, jdkMajorVersion = 25) shouldBe 8 + resolveMinimumRunnable(configured = -1, parallelism = 64, jdkMajorVersion = 25) shouldBe 8 + } + + "produce a strictly higher value on JDK 25+ than on JDK 21 for plausible dispatcher sizes" in { + // Directional check: the auto policy must move the needle on the JDK line that needs it + // (and only on that line — JDK 21 stays at the legacy default per reviewer guidance). + for (parallelism <- Seq(4, 8, 16, 32, 64)) { + val legacy = resolveMinimumRunnable(configured = -1, parallelism, jdkMajorVersion = 21) + val modern = resolveMinimumRunnable(configured = -1, parallelism, jdkMajorVersion = 25) + withClue(s"parallelism=$parallelism legacy=$legacy modern=$modern: ") { + modern should be > legacy + } + } + } + + "never exceed the documented max cap of 8" in { + for (parallelism <- 1 to 256; jdk <- Seq(21, 25, 30)) { + resolveMinimumRunnable(configured = -1, parallelism, jdk) should be <= 8 + } + } + } + + "ForkJoinExecutorConfigurator wiring" must { + + // Build a factory from a real dispatcher config and return the resolved + // minimum-runnable. This proves the config value actually reaches the + // ForkJoinExecutorServiceFactory — guarding against the trivial regression + // of reverting resolveMinimumRunnable to a direct `config.getInt` read. + def resolvedMinimumRunnable(dispatcherId: String): Int = { + // `system.dispatchers.config(id)` resolves the dispatcher's full config with + // reference.conf defaults applied (so `virtualize`, `minimum-runnable`, etc. + // all have values). + val dispatcherConfig = system.dispatchers.config(dispatcherId) + val configurator = new ForkJoinExecutorConfigurator( + dispatcherConfig.getConfig("fork-join-executor"), + system.dispatchers.prerequisites) + val tf: ThreadFactory = system.dispatchers.prerequisites.threadFactory + val factory = configurator + .createExecutorServiceFactory(dispatcherId, tf) + .asInstanceOf[configurator.ForkJoinExecutorServiceFactory] + factory.minimumRunnable + } + + "respect explicit minimum-runnable = 0" in { + resolvedMinimumRunnable("fj-explicit-zero-dispatcher") shouldBe 0 + } + + "respect explicit minimum-runnable = 7" in { + resolvedMinimumRunnable("fj-explicit-seven-dispatcher") shouldBe 7 + } + + "auto-scale the default (minimum-runnable not set) on JDK 25+" in { + if (JavaVersion.majorVersion < 25) pending + + val resolved = resolvedMinimumRunnable("fj-auto-default-dispatcher") + // The dispatcher declares parallelism-min = 8 so effective parallelism is at + // least 8; auto = min(8, max(1, parallelism/2)) must be at least 4 and never + // exceed the documented cap of 8. + resolved should be >= 4 + resolved should be <= 8 + } + + "keep the legacy value of 1 on JDK < 25 when the default is left untouched" in { + if (JavaVersion.majorVersion >= 25) pending + + resolvedMinimumRunnable("fj-auto-default-dispatcher") shouldBe 1 + } + + "never drop below 1 even for parallelism = 1 dispatchers" in { + val resolved = resolvedMinimumRunnable("fj-auto-small-dispatcher") + // parallelism = 1 implies parallelism/2 = 0, which the min-cap must lift to 1. + // On JDK < 25 the legacy value of 1 is already the expected answer. + resolved shouldBe 1 + } + } +} diff --git a/actor/src/main/resources/reference.conf b/actor/src/main/resources/reference.conf index b2afe5f374c..fb2fc0a7d0d 100644 --- a/actor/src/main/resources/reference.conf +++ b/actor/src/main/resources/reference.conf @@ -490,12 +490,16 @@ pekko { # When blocked worker count causes active threads to drop below this threshold, the pool # may create a compensation thread to maintain progress. # - # The default value of 1 matches the JDK ForkJoinPool behaviour prior to this setting being - # exposed. Higher values (e.g. parallelism/2) can reduce starvation risk for actor-heavy - # workloads on JDK 21+ where ForkJoinPool asyncMode (FIFO) compensation-thread creation - # became more conservative (see JDK-8300995 / JDK-8321335). - # Set to 0 to disable compensation entirely. - minimum-runnable = 1 + # The special value -1 (default) selects a JDK-aware policy: + # * JDK 25+ : effective value = min(8, max(1, parallelism / 2)) + # * JDK < 25: effective value = 1 (preserves the JDK behaviour prior to this setting) + # Auto-selection on JDK 25+ mitigates the asyncMode (FIFO) compensation-thread regression + # tracked in JDK-8300995 / JDK-8321335 that, in Pekko nightly tests, surfaces most clearly + # on the JDK 25 line and can cause actor-heavy workloads to starve. + # + # Set explicitly to 0 to disable compensation entirely. Set to any non-negative integer + # to override the auto-selection (e.g. 1 to restore the previous default behaviour). + minimum-runnable = -1 # This config is new in Pekko v1.2.0 and only has an effect if you are running with JDK 21 and above, # When set to `on` but the underlying runtime does not support virtual threads, an Exception will be thrown. diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala index 9dc2c953187..7135594d356 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala @@ -17,8 +17,32 @@ import java.util.concurrent.{ ExecutorService, ForkJoinPool, ForkJoinTask, Threa import com.typesafe.config.Config +import org.apache.pekko.annotation.InternalApi +import org.apache.pekko.util.JavaVersion + object ForkJoinExecutorConfigurator { + /** + * INTERNAL API + * + * Resolves the effective `minimum-runnable` value for a fork-join dispatcher. + * + * A negative value (default `-1` in reference.conf) selects the JDK-aware policy: + * on JDK 25+ the value is `min(8, max(1, parallelism / 2))` to mitigate the + * asyncMode (FIFO) compensation-thread regression tracked in + * JDK-8300995 / JDK-8321335 (the impact is most visible on the JDK 25 line in + * Pekko nightly tests); on older JDKs the value stays at `1` to preserve the + * pre-existing behaviour. Non-negative configured values are honoured verbatim, + * so `0` still disables compensation entirely. + */ + @InternalApi private[pekko] def resolveMinimumRunnable( + configured: Int, + parallelism: Int, + jdkMajorVersion: Int): Int = + if (configured >= 0) configured + else if (jdkMajorVersion >= 25) math.min(8, math.max(1, parallelism / 2)) + else 1 + /** * INTERNAL PEKKO USAGE ONLY */ @@ -140,16 +164,21 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer """"task-peeking-mode" in "fork-join-executor" section could only set to "FIFO" or "LIFO".""") } + val parallelism = ThreadPoolConfig.scaledPoolSize( + config.getInt("parallelism-min"), + config.getDouble("parallelism-factor"), + config.getInt("parallelism-max")) + new ForkJoinExecutorServiceFactory( id, validate(tf), - ThreadPoolConfig.scaledPoolSize( - config.getInt("parallelism-min"), - config.getDouble("parallelism-factor"), - config.getInt("parallelism-max")), + parallelism, asyncMode, config.getInt("maximum-pool-size"), - config.getInt("minimum-runnable") + ForkJoinExecutorConfigurator.resolveMinimumRunnable( + config.getInt("minimum-runnable"), + parallelism, + JavaVersion.majorVersion) ) } } diff --git a/docs/src/main/paradox/dispatchers.md b/docs/src/main/paradox/dispatchers.md index 37cd6eb1b62..27fbd428038 100644 --- a/docs/src/main/paradox/dispatchers.md +++ b/docs/src/main/paradox/dispatchers.md @@ -44,6 +44,14 @@ You can read more about parallelism in the JDK's [ForkJoinPool documentation](ht When Running on Java 9+, you can use `maximum-pool-size` to set the upper bound on the total number of threads allocated by the ForkJoinPool. +When running on Java 25+, the `minimum-runnable` setting for the `fork-join-executor` defaults to a JDK-aware value +(`min(8, max(1, parallelism / 2))`) instead of the historical `1`. This raises the number of compensation threads the +pool will create when workers block, mitigating the asyncMode (FIFO) compensation regression tracked in +[JDK-8300995](https://bugs.openjdk.org/browse/JDK-8300995) / [JDK-8321335](https://bugs.openjdk.org/browse/JDK-8321335) +that, in Pekko nightly tests, surfaces most clearly on the JDK 25 line and can cause actor-heavy workloads to starve. +Set `minimum-runnable` explicitly (any non-negative integer) to opt out of the auto-selection — for example +`minimum-runnable = 1` restores the previous default, and `minimum-runnable = 0` disables compensation entirely. + **Experimental**: When Running on Java 21+, you can use `virtualize=on` to enable the virtual threads feature. When using virtual threads, all virtual threads will use the same `unparker`, so you may want to increase the number of `jdk.unparker.maxPoolSize`. diff --git a/docs/src/main/paradox/typed/dispatchers.md b/docs/src/main/paradox/typed/dispatchers.md index c1506af836f..abbb6945864 100644 --- a/docs/src/main/paradox/typed/dispatchers.md +++ b/docs/src/main/paradox/typed/dispatchers.md @@ -129,6 +129,14 @@ You can read more about parallelism in the JDK's [ForkJoinPool documentation](ht When Running on Java 9+, you can use `maximum-pool-size` to set the upper bound on the total number of threads allocated by the ForkJoinPool. +When running on Java 25+, the `minimum-runnable` setting for the `fork-join-executor` defaults to a JDK-aware value +(`min(8, max(1, parallelism / 2))`) instead of the historical `1`. This raises the number of compensation threads the +pool will create when workers block, mitigating the asyncMode (FIFO) compensation regression tracked in +[JDK-8300995](https://bugs.openjdk.org/browse/JDK-8300995) / [JDK-8321335](https://bugs.openjdk.org/browse/JDK-8321335) +that, in Pekko nightly tests, surfaces most clearly on the JDK 25 line and can cause actor-heavy workloads to starve. +Set `minimum-runnable` explicitly (any non-negative integer) to opt out of the auto-selection — for example +`minimum-runnable = 1` restores the previous default, and `minimum-runnable = 0` disables compensation entirely. + **Experimental**: When Running on Java 21+, you can use `virtualize=on` to enable the virtual threads feature. When using virtual threads, all virtual threads will use the same `unparker`, so you may want to increase the number of `jdk.unparker.maxPoolSize`.