Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.texera.amber.engine.architecture.scheduling

import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
AsyncRPCContext,
ControlInvocation,
EmptyRequest
}
import org.apache.texera.amber.engine.architecture.scheduling.RegionCoordinatorTestSupport._
import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER
import org.scalatest.flatspec.AnyFlatSpec

import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicReference

/**
* Concurrency regression test for `ControllerRpcProbe`.
*
* The probe's `calls` buffer is appended to on a scheduler thread while the test thread reads it.
* Before the fix, a read racing an append tripped Scala 2.13's `MutationTracker` and threw a
* `ConcurrentModificationException`. This test forces that race so it fails deterministically
* rather than as a CI flake.
*/
class ControllerRpcProbeSpec extends AnyFlatSpec {

"ControllerRpcProbe" should "tolerate reads racing with concurrent appends" in {
// Hold every endWorker pending so appends have no fulfill side effects.
val probe = new ControllerRpcProbe(_ => None)
val appends = 20000
val failure = new AtomicReference[Throwable]()
// Release both threads at once so the reader polls while the writer is still appending;
// otherwise the writer could finish before the reader starts and miss the race entirely.
val startGate = new CountDownLatch(1)

// Writer: the actor side. Each sendTo drives handleOutput -> calls += call.
val writer = new Thread(() => {
try {
startGate.await()
var i = 0
while (i < appends) {
probe.outputGateway.sendTo(
CONTROLLER,
ControlInvocation(
EndWorker,
EmptyRequest(),
AsyncRPCContext(CONTROLLER, CONTROLLER),
i.toLong
)
)
i += 1
}
} catch {
case t: Throwable => failure.compareAndSet(null, t)
}
})

// Reader: the test side. Read through every helper while appends are in flight.
val reader = new Thread(() => {
try {
startGate.await()
while (writer.isAlive) {
probe.endWorkerCalls
probe.methodTrace
probe.initializedWorkers
probe.startedWorkers
}
} catch {
case t: Throwable => failure.compareAndSet(null, t)
}
})

writer.start()
reader.start()
startGate.countDown()
writer.join(testTimeout.inMilliseconds)
reader.join(testTimeout.inMilliseconds)

assert(!writer.isAlive && !reader.isAlive, "stress threads did not finish within the deadline")
assert(
failure.get() == null,
s"concurrent access to the probe threw ${failure.get()}"
)
assert(probe.endWorkerCalls.size == appends)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,25 +91,34 @@ object RegionCoordinatorTestSupport {
* hold termination pending, fail an attempt, or allow it to succeed.
*/
class ControllerRpcProbe(endWorkerResponse: WorkerRpcCall => Option[ControlReturn]) {
val calls: mutable.ArrayBuffer[WorkerRpcCall] = mutable.ArrayBuffer()
// `calls` is appended to on the actor/scheduler thread (via the output-gateway callback) while
// test assertions read it from the test thread. Guard every access with `callsLock`, and hand
// readers an immutable snapshot so iteration never races a concurrent append — which Scala
// 2.13's MutationTracker would otherwise surface as a ConcurrentModificationException.
private val callsLock = new Object
private val calls: mutable.ArrayBuffer[WorkerRpcCall] = mutable.ArrayBuffer()

val inputGateway = new NetworkInputGateway(CONTROLLER)
val outputGateway = new NetworkOutputGateway(CONTROLLER, handleOutput)
val asyncRPCClient = new AsyncRPCClient(inputGateway, outputGateway, CONTROLLER)

def methodTrace: Seq[String] = calls.map(_.methodName).toSeq
private def callsSnapshot: Seq[WorkerRpcCall] = callsLock.synchronized(calls.toSeq)

def methodTrace: Seq[String] = callsSnapshot.map(_.methodName)

def initializedWorkers: Seq[ActorVirtualIdentity] =
calls.filter(_.methodName == InitializeExecutor).map(_.receiver).toSeq
callsSnapshot.filter(_.methodName == InitializeExecutor).map(_.receiver)

def startedWorkers: Seq[ActorVirtualIdentity] =
calls.filter(_.methodName == StartWorker).map(_.receiver).toSeq
callsSnapshot.filter(_.methodName == StartWorker).map(_.receiver)

def endWorkerCalls: Seq[WorkerRpcCall] =
calls.filter(_.methodName == EndWorker).toSeq
callsSnapshot.filter(_.methodName == EndWorker)

def onlyEndWorkerCall: WorkerRpcCall = {
assert(endWorkerCalls.size == 1)
endWorkerCalls.head
val ends = endWorkerCalls
assert(ends.size == 1)
ends.head
}

def fulfill(call: WorkerRpcCall, returnValue: ControlReturn): Unit = {
Expand All @@ -131,7 +140,7 @@ object RegionCoordinatorTestSupport {
receiver = invocation.context.receiver,
commandId = invocation.commandId
)
calls += call
callsLock.synchronized(calls += call)
immediateReturn(call).foreach(fulfill(call, _))
}

Expand Down
Loading