From 195b7def1dfa95535469e70bf8dc378cb6cb5735 Mon Sep 17 00:00:00 2001 From: Kunwoo Park Date: Sun, 7 Jun 2026 21:58:48 -0700 Subject: [PATCH 1/3] fix(amber): prevent ConcurrentModificationException in ControllerRpcProbe ControllerRpcProbe stored controller-to-worker RPCs in a mutable ArrayBuffer that the actor/scheduler thread appended to while the test thread iterated it through the read helpers (methodTrace, endWorkerCalls, etc.). With no synchronization, an append racing a filter/map surfaced as a hard ConcurrentModificationException under Scala 2.13's MutationTracker, causing non-deterministic failures in RegionExecutionCoordinatorSpec. Guard the buffer with a dedicated lock: appends synchronize on it and readers take an immutable snapshot under the same lock before filtering. The lock is released before fulfill() runs, so no callback executes while the lock is held. Add ControllerRpcProbeSpec, a stress test that races concurrent appends against reads; it reproduces the CME deterministically before the fix and passes after it. --- .../scheduling/ControllerRpcProbeSpec.scala | 92 +++++++++++++++++++ .../RegionCoordinatorTestSupport.scala | 25 +++-- 2 files changed, 109 insertions(+), 8 deletions(-) create mode 100644 amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/ControllerRpcProbeSpec.scala diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/ControllerRpcProbeSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/ControllerRpcProbeSpec.scala new file mode 100644 index 00000000000..0757f29f734 --- /dev/null +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/ControllerRpcProbeSpec.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.architecture.scheduling + +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{ + AsyncRPCContext, + ControlInvocation, + EmptyRequest +} +import org.apache.texera.amber.engine.architecture.scheduling.RegionCoordinatorTestSupport._ +import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER +import org.scalatest.flatspec.AnyFlatSpec + +import java.util.concurrent.atomic.AtomicReference + +/** + * Concurrency regression test for `ControllerRpcProbe`. + * + * In the real coordinator specs the probe's `calls` buffer is appended to on a Pekko + * scheduler/actor thread (via the output-gateway callback) while the test thread reads it through + * the helper methods. When those two collide, Scala 2.13's `MutationTracker` turns the unsynchronized + * read into a hard `ConcurrentModificationException`. This test forces that collision directly so the + * race is caught deterministically rather than as a non-deterministic CI flake. + */ +class ControllerRpcProbeSpec extends AnyFlatSpec { + + "ControllerRpcProbe" should "tolerate reads racing with concurrent appends" in { + // Hold every endWorker pending so appends never trigger a fulfill side effect. + val probe = new ControllerRpcProbe(_ => None) + val appends = 20000 + val failure = new AtomicReference[Throwable]() + + // Writer mimics the actor side: each sendTo drives handleOutput -> calls += call. + val writer = new Thread(() => { + try { + var i = 0 + while (i < appends) { + probe.outputGateway.sendTo( + CONTROLLER, + ControlInvocation(EndWorker, EmptyRequest(), AsyncRPCContext(CONTROLLER, CONTROLLER), i.toLong) + ) + i += 1 + } + } catch { + case t: Throwable => failure.compareAndSet(null, t) + } + }) + + // Reader mimics the test side: iterate the buffer through every helper while appends are in flight. + val reader = new Thread(() => { + try { + while (writer.isAlive) { + probe.endWorkerCalls + probe.methodTrace + probe.initializedWorkers + probe.startedWorkers + } + } catch { + case t: Throwable => failure.compareAndSet(null, t) + } + }) + + writer.start() + reader.start() + writer.join(testTimeout.inMilliseconds) + reader.join(testTimeout.inMilliseconds) + + assert(!writer.isAlive && !reader.isAlive, "stress threads did not finish within the deadline") + assert( + failure.get() == null, + s"concurrent access to the probe threw ${failure.get()}" + ) + assert(probe.endWorkerCalls.size == appends) + } +} diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionCoordinatorTestSupport.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionCoordinatorTestSupport.scala index 5673c02691f..e311349077e 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionCoordinatorTestSupport.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionCoordinatorTestSupport.scala @@ -91,25 +91,34 @@ object RegionCoordinatorTestSupport { * hold termination pending, fail an attempt, or allow it to succeed. */ class ControllerRpcProbe(endWorkerResponse: WorkerRpcCall => Option[ControlReturn]) { - val calls: mutable.ArrayBuffer[WorkerRpcCall] = mutable.ArrayBuffer() + // `calls` is appended to on the actor/scheduler thread (via the output-gateway callback) while + // test assertions read it from the test thread. Guard every access with `callsLock`, and hand + // readers an immutable snapshot so iteration never races a concurrent append — which Scala + // 2.13's MutationTracker would otherwise surface as a ConcurrentModificationException. + private val callsLock = new Object + private val calls: mutable.ArrayBuffer[WorkerRpcCall] = mutable.ArrayBuffer() + val inputGateway = new NetworkInputGateway(CONTROLLER) val outputGateway = new NetworkOutputGateway(CONTROLLER, handleOutput) val asyncRPCClient = new AsyncRPCClient(inputGateway, outputGateway, CONTROLLER) - def methodTrace: Seq[String] = calls.map(_.methodName).toSeq + private def callsSnapshot: Seq[WorkerRpcCall] = callsLock.synchronized(calls.toSeq) + + def methodTrace: Seq[String] = callsSnapshot.map(_.methodName) def initializedWorkers: Seq[ActorVirtualIdentity] = - calls.filter(_.methodName == InitializeExecutor).map(_.receiver).toSeq + callsSnapshot.filter(_.methodName == InitializeExecutor).map(_.receiver) def startedWorkers: Seq[ActorVirtualIdentity] = - calls.filter(_.methodName == StartWorker).map(_.receiver).toSeq + callsSnapshot.filter(_.methodName == StartWorker).map(_.receiver) def endWorkerCalls: Seq[WorkerRpcCall] = - calls.filter(_.methodName == EndWorker).toSeq + callsSnapshot.filter(_.methodName == EndWorker) def onlyEndWorkerCall: WorkerRpcCall = { - assert(endWorkerCalls.size == 1) - endWorkerCalls.head + val ends = endWorkerCalls + assert(ends.size == 1) + ends.head } def fulfill(call: WorkerRpcCall, returnValue: ControlReturn): Unit = { @@ -131,7 +140,7 @@ object RegionCoordinatorTestSupport { receiver = invocation.context.receiver, commandId = invocation.commandId ) - calls += call + callsLock.synchronized(calls += call) immediateReturn(call).foreach(fulfill(call, _)) } From 348a74c30bf68c11e4195a3459fbd1a72d13a2b0 Mon Sep 17 00:00:00 2001 From: Kunwoo Park Date: Sun, 7 Jun 2026 22:12:47 -0700 Subject: [PATCH 2/3] test(amber): format ControllerRpcProbeSpec and gate stress threads The new spec's ControlInvocation construction exceeded scalafmt's maxColumn=100, failing the scalafmtCheckAll CI gate; reflow it across multiple lines so the format check passes. Also add a CountDownLatch start gate so the writer cannot run its appends to completion before the reader thread is scheduled, guaranteeing the read-vs-append race window is exercised on every run. --- .../scheduling/ControllerRpcProbeSpec.scala | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/ControllerRpcProbeSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/ControllerRpcProbeSpec.scala index 0757f29f734..8a6809b6df9 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/ControllerRpcProbeSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/ControllerRpcProbeSpec.scala @@ -28,6 +28,7 @@ import org.apache.texera.amber.engine.architecture.scheduling.RegionCoordinatorT import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER import org.scalatest.flatspec.AnyFlatSpec +import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicReference /** @@ -46,15 +47,25 @@ class ControllerRpcProbeSpec extends AnyFlatSpec { val probe = new ControllerRpcProbe(_ => None) val appends = 20000 val failure = new AtomicReference[Throwable]() + // Release both threads together so the reader is guaranteed to be polling while the writer is + // still appending. Without this gate a writer that ran to completion before the reader thread + // was scheduled would leave the read-vs-append window untested. + val startGate = new CountDownLatch(1) // Writer mimics the actor side: each sendTo drives handleOutput -> calls += call. val writer = new Thread(() => { try { + startGate.await() var i = 0 while (i < appends) { probe.outputGateway.sendTo( CONTROLLER, - ControlInvocation(EndWorker, EmptyRequest(), AsyncRPCContext(CONTROLLER, CONTROLLER), i.toLong) + ControlInvocation( + EndWorker, + EmptyRequest(), + AsyncRPCContext(CONTROLLER, CONTROLLER), + i.toLong + ) ) i += 1 } @@ -66,6 +77,7 @@ class ControllerRpcProbeSpec extends AnyFlatSpec { // Reader mimics the test side: iterate the buffer through every helper while appends are in flight. val reader = new Thread(() => { try { + startGate.await() while (writer.isAlive) { probe.endWorkerCalls probe.methodTrace @@ -79,6 +91,7 @@ class ControllerRpcProbeSpec extends AnyFlatSpec { writer.start() reader.start() + startGate.countDown() writer.join(testTimeout.inMilliseconds) reader.join(testTimeout.inMilliseconds) From 53d0c18fb397bd786240a26e226ece1a8a30f964 Mon Sep 17 00:00:00 2001 From: Kunwoo Park Date: Sun, 7 Jun 2026 22:19:56 -0700 Subject: [PATCH 3/3] test(amber): simplify ControllerRpcProbeSpec comments Tighten the wording, make the writer/reader comments parallel, and bring the comment lines within the 100-column limit. Comment-only; no behavior change. --- .../scheduling/ControllerRpcProbeSpec.scala | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/ControllerRpcProbeSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/ControllerRpcProbeSpec.scala index 8a6809b6df9..46828fc158c 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/ControllerRpcProbeSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/ControllerRpcProbeSpec.scala @@ -34,25 +34,23 @@ import java.util.concurrent.atomic.AtomicReference /** * Concurrency regression test for `ControllerRpcProbe`. * - * In the real coordinator specs the probe's `calls` buffer is appended to on a Pekko - * scheduler/actor thread (via the output-gateway callback) while the test thread reads it through - * the helper methods. When those two collide, Scala 2.13's `MutationTracker` turns the unsynchronized - * read into a hard `ConcurrentModificationException`. This test forces that collision directly so the - * race is caught deterministically rather than as a non-deterministic CI flake. + * The probe's `calls` buffer is appended to on a scheduler thread while the test thread reads it. + * Before the fix, a read racing an append tripped Scala 2.13's `MutationTracker` and threw a + * `ConcurrentModificationException`. This test forces that race so it fails deterministically + * rather than as a CI flake. */ class ControllerRpcProbeSpec extends AnyFlatSpec { "ControllerRpcProbe" should "tolerate reads racing with concurrent appends" in { - // Hold every endWorker pending so appends never trigger a fulfill side effect. + // Hold every endWorker pending so appends have no fulfill side effects. val probe = new ControllerRpcProbe(_ => None) val appends = 20000 val failure = new AtomicReference[Throwable]() - // Release both threads together so the reader is guaranteed to be polling while the writer is - // still appending. Without this gate a writer that ran to completion before the reader thread - // was scheduled would leave the read-vs-append window untested. + // Release both threads at once so the reader polls while the writer is still appending; + // otherwise the writer could finish before the reader starts and miss the race entirely. val startGate = new CountDownLatch(1) - // Writer mimics the actor side: each sendTo drives handleOutput -> calls += call. + // Writer: the actor side. Each sendTo drives handleOutput -> calls += call. val writer = new Thread(() => { try { startGate.await() @@ -74,7 +72,7 @@ class ControllerRpcProbeSpec extends AnyFlatSpec { } }) - // Reader mimics the test side: iterate the buffer through every helper while appends are in flight. + // Reader: the test side. Read through every helper while appends are in flight. val reader = new Thread(() => { try { startGate.await()