diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/ControllerRpcProbeSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/ControllerRpcProbeSpec.scala new file mode 100644 index 00000000000..46828fc158c --- /dev/null +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/ControllerRpcProbeSpec.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.architecture.scheduling + +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{ + AsyncRPCContext, + ControlInvocation, + EmptyRequest +} +import org.apache.texera.amber.engine.architecture.scheduling.RegionCoordinatorTestSupport._ +import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER +import org.scalatest.flatspec.AnyFlatSpec + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.atomic.AtomicReference + +/** + * Concurrency regression test for `ControllerRpcProbe`. + * + * The probe's `calls` buffer is appended to on a scheduler thread while the test thread reads it. + * Before the fix, a read racing an append tripped Scala 2.13's `MutationTracker` and threw a + * `ConcurrentModificationException`. This test forces that race so it fails deterministically + * rather than as a CI flake. + */ +class ControllerRpcProbeSpec extends AnyFlatSpec { + + "ControllerRpcProbe" should "tolerate reads racing with concurrent appends" in { + // Hold every endWorker pending so appends have no fulfill side effects. + val probe = new ControllerRpcProbe(_ => None) + val appends = 20000 + val failure = new AtomicReference[Throwable]() + // Release both threads at once so the reader polls while the writer is still appending; + // otherwise the writer could finish before the reader starts and miss the race entirely. + val startGate = new CountDownLatch(1) + + // Writer: the actor side. Each sendTo drives handleOutput -> calls += call. + val writer = new Thread(() => { + try { + startGate.await() + var i = 0 + while (i < appends) { + probe.outputGateway.sendTo( + CONTROLLER, + ControlInvocation( + EndWorker, + EmptyRequest(), + AsyncRPCContext(CONTROLLER, CONTROLLER), + i.toLong + ) + ) + i += 1 + } + } catch { + case t: Throwable => failure.compareAndSet(null, t) + } + }) + + // Reader: the test side. Read through every helper while appends are in flight. + val reader = new Thread(() => { + try { + startGate.await() + while (writer.isAlive) { + probe.endWorkerCalls + probe.methodTrace + probe.initializedWorkers + probe.startedWorkers + } + } catch { + case t: Throwable => failure.compareAndSet(null, t) + } + }) + + writer.start() + reader.start() + startGate.countDown() + writer.join(testTimeout.inMilliseconds) + reader.join(testTimeout.inMilliseconds) + + assert(!writer.isAlive && !reader.isAlive, "stress threads did not finish within the deadline") + assert( + failure.get() == null, + s"concurrent access to the probe threw ${failure.get()}" + ) + assert(probe.endWorkerCalls.size == appends) + } +} diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionCoordinatorTestSupport.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionCoordinatorTestSupport.scala index 5673c02691f..e311349077e 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionCoordinatorTestSupport.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionCoordinatorTestSupport.scala @@ -91,25 +91,34 @@ object RegionCoordinatorTestSupport { * hold termination pending, fail an attempt, or allow it to succeed. */ class ControllerRpcProbe(endWorkerResponse: WorkerRpcCall => Option[ControlReturn]) { - val calls: mutable.ArrayBuffer[WorkerRpcCall] = mutable.ArrayBuffer() + // `calls` is appended to on the actor/scheduler thread (via the output-gateway callback) while + // test assertions read it from the test thread. Guard every access with `callsLock`, and hand + // readers an immutable snapshot so iteration never races a concurrent append — which Scala + // 2.13's MutationTracker would otherwise surface as a ConcurrentModificationException. + private val callsLock = new Object + private val calls: mutable.ArrayBuffer[WorkerRpcCall] = mutable.ArrayBuffer() + val inputGateway = new NetworkInputGateway(CONTROLLER) val outputGateway = new NetworkOutputGateway(CONTROLLER, handleOutput) val asyncRPCClient = new AsyncRPCClient(inputGateway, outputGateway, CONTROLLER) - def methodTrace: Seq[String] = calls.map(_.methodName).toSeq + private def callsSnapshot: Seq[WorkerRpcCall] = callsLock.synchronized(calls.toSeq) + + def methodTrace: Seq[String] = callsSnapshot.map(_.methodName) def initializedWorkers: Seq[ActorVirtualIdentity] = - calls.filter(_.methodName == InitializeExecutor).map(_.receiver).toSeq + callsSnapshot.filter(_.methodName == InitializeExecutor).map(_.receiver) def startedWorkers: Seq[ActorVirtualIdentity] = - calls.filter(_.methodName == StartWorker).map(_.receiver).toSeq + callsSnapshot.filter(_.methodName == StartWorker).map(_.receiver) def endWorkerCalls: Seq[WorkerRpcCall] = - calls.filter(_.methodName == EndWorker).toSeq + callsSnapshot.filter(_.methodName == EndWorker) def onlyEndWorkerCall: WorkerRpcCall = { - assert(endWorkerCalls.size == 1) - endWorkerCalls.head + val ends = endWorkerCalls + assert(ends.size == 1) + ends.head } def fulfill(call: WorkerRpcCall, returnValue: ControlReturn): Unit = { @@ -131,7 +140,7 @@ object RegionCoordinatorTestSupport { receiver = invocation.context.receiver, commandId = invocation.commandId ) - calls += call + callsLock.synchronized(calls += call) immediateReturn(call).foreach(fulfill(call, _)) }