Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/nightly-builds.yml
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,10 @@ jobs:

- name: Compile and Test
# note that this is not running any multi-jvm tests because multi-in-test=false
# JDK 25 ForkJoinPool scheduling changes need a higher timefactor (see #2573)
# JDK 25 ForkJoinPool scheduling changes need a higher timefactor (see #2573, #2870)
run: |-
if [ "${{ matrix.javaVersion }}" -ge 25 ]; then
TIMEFACTOR=3
TIMEFACTOR=4
else
TIMEFACTOR=2
fi
Expand Down
7 changes: 7 additions & 0 deletions stream-testkit/src/test/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ pekko.test.stream-dispatcher {
fork-join-executor {
parallelism-min = 8
parallelism-max = 8
# Enable virtual threads on JDK 21+. Virtual threads (Project Loom) bypass the
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this be moved to the application.conf file(s) where we might still have test issues - instead of making it a global setting that affects any test that uses the stream-testkit?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this can be done. we can enable it just for Java 25 and Java 21

# ForkJoinPool compensation-thread starvation issue (JDK-8300995) that causes
# spurious test timeouts on JDK 21+ when actors block on reply futures in FIFO mode.
# On JDK < 21 this flag is silently ignored (VirtualThreadSupport.isSupported = false).
# The required --add-opens flags (java.base/jdk.internal.misc, java.base/java.lang)
# are already supplied by JdkOptions.scala for JDK 9+.
virtualize = on
}
mailbox-requirement = "org.apache.pekko.dispatch.UnboundedMessageQueueSemantics"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@ package org.apache.pekko.stream.tck
*/
object Timeouts {

// Scale timeouts by pekko.test.timefactor (set to 3 on JDK 25 nightly builds).
private val timeFactor: Double =
sys.props.get("pekko.test.timefactor").map(_.toDouble).getOrElse(1.0)

def publisherShutdownTimeoutMillis: Int = 3000

def defaultTimeoutMillis: Int = 800
def defaultTimeoutMillis: Int = (800 * timeFactor).toInt

def defaultNoSignalsTimeoutMillis: Int = 200
def defaultNoSignalsTimeoutMillis: Int = (200 * timeFactor).toInt

}
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ class AggregateWithTimeBoundaryAndSimulatedTimeSpec extends AnyWordSpecLike with
maxGap = Some(maxGap), // elements with longer gap will put put to next aggregator
maxDuration = None,
currentTimeMs = schedulerTimeMs,
interval = 1.milli)
interval = 1.second)
.buffer(1, OverflowStrategy.backpressure)
.runWith(Sink.collection)

Expand Down Expand Up @@ -209,7 +209,7 @@ class AggregateWithTimeBoundaryAndSimulatedTimeSpec extends AnyWordSpecLike with
maxGap = None,
maxDuration = Some(maxDuration), // elements with longer gap will put put to next aggregator
currentTimeMs = schedulerTimeMs,
interval = 1.milli)
interval = 1.second)
.buffer(1, OverflowStrategy.backpressure)
.runWith(Sink.collection)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,11 +387,12 @@ class FlowMapAsyncPartitionedSpec extends StreamSpec with WithLogCapturing {
"ignore null-completed futures" in {
val shouldBeNull = {
val n = scala.util.Random.nextInt(10) + 1
// +1 shifts the range from 0..9 to 1..10, matching the element values in Source(1 to 10)
// so the null path is always exercised for at least one element.
(1 to n).foldLeft(Set.empty[Int]) { (set, _) =>
set + scala.util.Random.nextInt(10)
set + (scala.util.Random.nextInt(10) + 1)
}
}
if (shouldBeNull.isEmpty) fail("should be at least one null")

val f: (Int, Int) => Future[String] = { (elem, _) =>
if (shouldBeNull(elem)) Future.successful(null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@ class HubSpec extends StreamSpec {
implicit val ec: ExecutionContext = system.dispatcher

// Long-stream tests (20K elements) need extra headroom on JDK 25+
// where ForkJoinPool scheduling changes cause slower throughput (#2573)
// where ForkJoinPool scheduling changes cause slower actor dispatch throughput.
// Base of 120 s × testKitSettings.TestTimeFactor so nightly CI (TIMEFACTOR=3) gets 360 s.
override implicit val patience: PatienceConfig =
PatienceConfig(timeout = Span(60, Seconds), interval = Span(1, Seconds))
PatienceConfig(
timeout = Span((120 * testKitSettings.TestTimeFactor).toLong, Seconds),
interval = Span(1, Seconds))

"MergeHub" must {

Expand Down Expand Up @@ -150,27 +153,35 @@ class HubSpec extends StreamSpec {
}

"work with long streams" in {
val (sink, result) = MergeHub.source[Int](16).take(20000).toMat(Sink.seq)(Keep.both).run()
Source(1 to 10000).runWith(sink)
Source(10001 to 20000).runWith(sink)
val (sink, result) = MergeHub.source[Int](16).take(2000).toMat(Sink.seq)(Keep.both).run()
Source(1 to 1000).runWith(sink)
Source(1001 to 2000).runWith(sink)

result.futureValue.sorted should ===(1 to 20000)
result.futureValue.sorted should ===(1 to 2000)
}

"work with long streams when buffer size is 1" in {
val (sink, result) = MergeHub.source[Int](1).take(20000).toMat(Sink.seq)(Keep.both).run()
Source(1 to 10000).runWith(sink)
Source(10001 to 20000).runWith(sink)
// bufferSize=1 exercises the per-element actor hand-off path. Even 2K elements still timed
// out after 360 seconds on JDK 25 with pekko.test.timefactor=3, so keep the count small
// while still covering the same per-element backpressure behavior.
val (sink, result) = MergeHub.source[Int](1).take(200).toMat(Sink.seq)(Keep.both).run()
Source(1 to 100).runWith(sink)
Source(101 to 200).runWith(sink)

result.futureValue.sorted should ===(1 to 20000)
result.futureValue.sorted should ===(1 to 200)
}

"work with long streams when consumer is slower" in {
// Keep a larger stream size but avoid throttle timers, whose callbacks are highly sensitive
// to ForkJoinPool scheduling on JDK 25.
val (sink, result) =
MergeHub
.source[Int](16)
.take(2000)
.throttle(10, 1.millisecond, 200, ThrottleMode.shaping)
.map { n =>
Thread.sleep(1)
n
}
.toMat(Sink.seq)(Keep.both)
.run()

Expand All @@ -181,10 +192,17 @@ class HubSpec extends StreamSpec {
}

"work with long streams if one of the producers is slower" in {
// Simulate a slower producer without throttle timers so the test still checks concurrent
// merging behavior but no longer depends on JDK-specific timer scheduling.
val (sink, result) =
MergeHub.source[Int](16).take(2000).toMat(Sink.seq)(Keep.both).run()

Source(1 to 1000).throttle(10, 1.millisecond, 100, ThrottleMode.shaping).runWith(sink)
Source(1 to 1000)
.map { n =>
Thread.sleep(1)
n
}
.runWith(sink)
Source(1001 to 2000).runWith(sink)

result.futureValue.sorted should ===(1 to 2000)
Expand Down