Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class HeartbeatReceiverSuite
.setMaster("local[2]")
.setAppName("test")
.set(DYN_ALLOCATION_TESTING, true)
sc = spy(new SparkContext(conf))
sc = spy[SparkContext](new SparkContext(conf))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change it? The following error will be reported if not changed:
image

scheduler = mock(classOf[TaskSchedulerImpl])
when(sc.taskScheduler).thenReturn(scheduler)
when(scheduler.excludedNodes).thenReturn(Predef.Set[String]())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.io.Serializable
import org.mockito.Mockito._

import org.apache.spark.SparkFunSuite
import org.apache.spark.api.java.JavaUtils.SerializableMapWrapper


class JavaUtilsSuite extends SparkFunSuite {
Expand All @@ -33,7 +34,8 @@ class JavaUtilsSuite extends SparkFunSuite {

src.put(key, "42")

val map: java.util.Map[Double, String] = spy(JavaUtils.mapAsSerializableJavaMap(src))
val map: java.util.Map[Double, String] = spy[SerializableMapWrapper[Double, String]](
JavaUtils.mapAsSerializableJavaMap(src))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto


assert(map.containsKey(key))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,7 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P
}

test("provider correctly checks whether fs is in safe mode") {
val provider = spy(new FsHistoryProvider(createTestConf()))
val provider = spy[FsHistoryProvider](new FsHistoryProvider(createTestConf()))
val dfs = mock(classOf[DistributedFileSystem])
// Asserts that safe mode is false because we can't really control the return value of the mock,
// since the API is different between hadoop 1 and 2.
Expand Down Expand Up @@ -1032,7 +1032,7 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P
withTempDir { storeDir =>
val conf = createTestConf().set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
val clock = new ManualClock()
val provider = spy(new FsHistoryProvider(conf, clock))
val provider = spy[FsHistoryProvider](new FsHistoryProvider(conf, clock))
val appId = "new1"

// Write logs for two app attempts.
Expand Down Expand Up @@ -1196,11 +1196,11 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P
SparkListenerApplicationStart("accessGranted", Some("accessGranted"), 1L, "test", None),
SparkListenerApplicationEnd(5L))
var isReadable = false
val mockedFs = spy(provider.fs)
val mockedFs = spy[FileSystem](provider.fs)
doThrow(new AccessControlException("Cannot read accessDenied file")).when(mockedFs).open(
argThat((path: Path) => path.getName.toLowerCase(Locale.ROOT) == "accessdenied" &&
!isReadable))
val mockedProvider = spy(provider)
val mockedProvider = spy[FsHistoryProvider](provider)
when(mockedProvider.fs).thenReturn(mockedFs)
updateAndCheck(mockedProvider) { list =>
list.size should be(1)
Expand All @@ -1225,7 +1225,7 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P
test("check in-progress event logs absolute length") {
val path = new Path("testapp.inprogress")
val provider = new FsHistoryProvider(createTestConf())
val mockedProvider = spy(provider)
val mockedProvider = spy[FsHistoryProvider](provider)
val mockedFs = mock(classOf[FileSystem])
val in = mock(classOf[FSDataInputStream])
val dfsIn = mock(classOf[DFSInputStream])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ abstract class HistoryServerDiskManagerSuite extends SparkFunSuite with BeforeAn

private def mockManager(): HistoryServerDiskManager = {
val conf = new SparkConf().set(MAX_LOCAL_DISK_USAGE, MAX_USAGE)
val manager = spy(new HistoryServerDiskManager(conf, testDir, store, new ManualClock()))
val manager = spy[HistoryServerDiskManager](
new HistoryServerDiskManager(conf, testDir, store, new ManualClock()))
doAnswer(AdditionalAnswers.returnsFirstArg[Long]()).when(manager)
.approximateSize(anyLong(), anyBoolean())
manager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class DriverRunnerTest extends SparkFunSuite {
val conf = new SparkConf()
val worker = mock(classOf[RpcEndpointRef])
doNothing().when(worker).send(any())
spy(new DriverRunner(conf, "driverId", new File("workDir"), new File("sparkHome"),
spy[DriverRunner](new DriverRunner(conf, "driverId", new File("workDir"), new File("sparkHome"),
driverDescription, worker, "spark://1.2.3.4/worker/", "http://publicAddress:80",
new SecurityManager(conf)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,8 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val executor = backend.executor
// Mock the executor.
when(executor.threadPool).thenReturn(threadPool)
val runningTasks = spy(new ConcurrentHashMap[Long, Executor#TaskRunner])
val runningTasks = spy[ConcurrentHashMap[Long, Executor#TaskRunner]](
new ConcurrentHashMap[Long, Executor#TaskRunner])
when(executor.runningTasks).thenAnswer(_ => runningTasks)
when(executor.conf).thenReturn(conf)

Expand Down Expand Up @@ -496,7 +497,8 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val executor = backend.executor
// Mock the executor.
when(executor.threadPool).thenReturn(threadPool)
val runningTasks = spy(new ConcurrentHashMap[Long, Executor#TaskRunner])
val runningTasks = spy[ConcurrentHashMap[Long, Executor#TaskRunner]](
new ConcurrentHashMap[Long, Executor#TaskRunner])
when(executor.runningTasks).thenAnswer(_ => runningTasks)
when(executor.conf).thenReturn(conf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class ProcfsMetricsGetterSuite extends SparkFunSuite {

test("SPARK-34845: partial metrics shouldn't be returned") {
val p = new ProcfsMetricsGetter(getTestResourcePath("ProcfsMetrics"))
val mockedP = spy(p)
val mockedP = spy[ProcfsMetricsGetter](p)

var ptree: Set[Int] = Set(26109, 22763)
when(mockedP.computeProcessTree).thenReturn(ptree)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,14 +307,14 @@ class TestSparkPlugin extends SparkPlugin {
override def driverPlugin(): DriverPlugin = {
val p = new TestDriverPlugin()
require(TestSparkPlugin.driverPlugin == null, "Driver plugin already initialized.")
TestSparkPlugin.driverPlugin = spy(p)
TestSparkPlugin.driverPlugin = spy[TestDriverPlugin](p)
TestSparkPlugin.driverPlugin
}

override def executorPlugin(): ExecutorPlugin = {
val p = new TestExecutorPlugin()
require(TestSparkPlugin.executorPlugin == null, "Executor plugin already initialized.")
TestSparkPlugin.executorPlugin = spy(p)
TestSparkPlugin.executorPlugin = spy[TestExecutorPlugin](p)
TestSparkPlugin.executorPlugin
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,11 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
results.clear()
securityMgr = new SecurityManager(sc.getConf)
broadcastManager = new BroadcastManager(true, sc.getConf)
mapOutputTracker = spy(new MyMapOutputTrackerMaster(sc.getConf, broadcastManager))
blockManagerMaster = spy(new MyBlockManagerMaster(sc.getConf))
mapOutputTracker = spy[MyMapOutputTrackerMaster](
new MyMapOutputTrackerMaster(sc.getConf, broadcastManager))
blockManagerMaster = spy[MyBlockManagerMaster](new MyBlockManagerMaster(sc.getConf))
doNothing().when(blockManagerMaster).updateRDDBlockVisibility(any(), any())
scheduler = spy(new MyDAGScheduler(
scheduler = spy[MyDAGScheduler](new MyDAGScheduler(
sc,
taskScheduler,
sc.listenerBus,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,17 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
outputCommitCoordinator =
spy(new OutputCommitCoordinator(conf, isDriver = true, Option(this)))
spy[OutputCommitCoordinator](
new OutputCommitCoordinator(conf, isDriver = true, Option(this)))
// Use Mockito.spy() to maintain the default infrastructure everywhere else.
// This mocking allows us to control the coordinator responses in test cases.
SparkEnv.createDriverEnv(conf, isLocal, listenerBus,
SparkContext.numDriverCores(master), this, Some(outputCommitCoordinator))
}
}
// Use Mockito.spy() to maintain the default infrastructure everywhere else
val mockTaskScheduler = spy(sc.taskScheduler.asInstanceOf[TaskSchedulerImpl])
val mockTaskScheduler = spy[TaskSchedulerImpl](
sc.taskScheduler.asInstanceOf[TaskSchedulerImpl])

doAnswer { (invoke: InvocationOnMock) =>
// Submit the tasks, then force the task scheduler to dequeue the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local
test("handling total size of results larger than maxResultSize") {
sc = new SparkContext("local", "test", conf)
val scheduler = new DummyTaskSchedulerImpl(sc)
val spyScheduler = spy(scheduler)
val spyScheduler = spy[DummyTaskSchedulerImpl](scheduler)
val resultGetter = new TaskResultGetter(sc.env, spyScheduler)
scheduler.taskResultGetter = resultGetter
val myTsm = new TaskSetManager(spyScheduler, FakeTask.createTaskSet(2), 1) {
Expand Down Expand Up @@ -258,7 +258,7 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local
// Set up custom TaskResultGetter and TaskSchedulerImpl spy
sc = new SparkContext("local", "test", conf)
val scheduler = sc.taskScheduler.asInstanceOf[TaskSchedulerImpl]
val spyScheduler = spy(scheduler)
val spyScheduler = spy[TaskSchedulerImpl](scheduler)
val resultGetter = new MyTaskResultGetter(sc.env, spyScheduler)
val newDAGScheduler = new DAGScheduler(sc, spyScheduler)
scheduler.taskResultGetter = resultGetter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext
override def createTaskSetManager(taskSet: TaskSet, maxFailures: Int): TaskSetManager = {
val tsm = super.createTaskSetManager(taskSet, maxFailures)
// we need to create a spied tsm just so we can set the TaskSetExcludelist
val tsmSpy = spy(tsm)
val tsmSpy = spy[TaskSetManager](tsm)
val taskSetExcludelist = mock[TaskSetExcludelist]
when(tsmSpy.taskSetExcludelistHelperOpt).thenReturn(Some(taskSetExcludelist))
stageToMockTaskSetManager(taskSet.stageId) = tsmSpy
Expand Down Expand Up @@ -1946,7 +1946,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext
override def createTaskSetManager(taskSet: TaskSet, maxFailures: Int): TaskSetManager = {
val tsm = super.createTaskSetManager(taskSet, maxFailures)
// we need to create a spied tsm so that we can see the copies running
val tsmSpy = spy(tsm)
val tsmSpy = spy[TaskSetManager](tsm)
stageToMockTaskSetManager(taskSet.stageId) = tsmSpy
tsmSpy
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ class TaskSetManagerSuite
manager.isZombie = false

// offers not accepted due to excludelist are not delay schedule rejects
val tsmSpy = spy(manager)
val tsmSpy = spy[TaskSetManager](manager)
val excludelist = mock(classOf[TaskSetExcludelist])
when(tsmSpy.taskSetExcludelistHelperOpt).thenReturn(Some(excludelist))
when(excludelist.isNodeExcludedForTaskSet(any())).thenReturn(true)
Expand Down Expand Up @@ -1416,7 +1416,7 @@ class TaskSetManagerSuite
val taskSet = FakeTask.createTaskSet(4)
val tsm = new TaskSetManager(sched, taskSet, 4)
// we need a spy so we can attach our mock excludelist
val tsmSpy = spy(tsm)
val tsmSpy = spy[TaskSetManager](tsm)
val excludelist = mock(classOf[TaskSetExcludelist])
when(tsmSpy.taskSetExcludelistHelperOpt).thenReturn(Some(excludelist))

Expand Down Expand Up @@ -1497,7 +1497,7 @@ class TaskSetManagerSuite
val mockListenerBus = mock(classOf[LiveListenerBus])
val healthTracker = new HealthTracker(mockListenerBus, conf, None, clock)
val taskSetManager = new TaskSetManager(sched, taskSet, 1, Some(healthTracker))
val taskSetManagerSpy = spy(taskSetManager)
val taskSetManagerSpy = spy[TaskSetManager](taskSetManager)

val taskDesc = taskSetManagerSpy.resourceOffer(exec, host, TaskLocality.ANY)._1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite

// create 1 faulty block manager by injecting faulty memory manager
val memManager = UnifiedMemoryManager(conf, numCores = 1)
val mockedMemoryManager = spy(memManager)
val mockedMemoryManager = spy[UnifiedMemoryManager](memManager)
doAnswer(_ => false).when(mockedMemoryManager).acquireStorageMemory(any(), any(), any())
val store2 = makeBlockManager(10000, "host-2", Some(mockedMemoryManager))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe
when(sc.conf).thenReturn(conf)

val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]()
liveListenerBus = spy(new LiveListenerBus(conf))
master = spy(new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
liveListenerBus = spy[LiveListenerBus](new LiveListenerBus(conf))
master = spy[BlockManagerMaster](new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
new BlockManagerMasterEndpoint(rpcEnv, true, conf,
liveListenerBus, None, blockManagerInfo, mapOutputTracker, shuffleManager,
isDriver = true)),
Expand Down Expand Up @@ -873,7 +873,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe
conf.set("spark.shuffle.io.maxRetries", "0")
val sameHostBm = makeBlockManager(8000, "sameHost", master)

val otherHostTransferSrv = spy(sameHostBm.blockTransferService)
val otherHostTransferSrv = spy[BlockTransferService](sameHostBm.blockTransferService)
doAnswer { _ =>
"otherHost"
}.when(otherHostTransferSrv).hostName
Expand All @@ -888,7 +888,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe
val blockId = "list"
bmToPutBlock.putIterator(blockId, List(array).iterator, storageLevel, tellMaster = true)

val sameHostTransferSrv = spy(sameHostBm.blockTransferService)
val sameHostTransferSrv = spy[BlockTransferService](sameHostBm.blockTransferService)
doAnswer { _ =>
fail("Fetching over network is not expected when the block is requested from same host")
}.when(sameHostTransferSrv).fetchBlockSync(mc.any(), mc.any(), mc.any(), mc.any(), mc.any())
Expand Down Expand Up @@ -935,7 +935,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe
}
}
val store1 = makeBlockManager(8000, "executor1", this.master, Some(mockTransferService))
val spiedStore1 = spy(store1)
val spiedStore1 = spy[BlockManager](store1)
doAnswer { inv =>
val blockId = inv.getArguments()(0).asInstanceOf[BlockId]
val localDirs = inv.getArguments()(1).asInstanceOf[Array[String]]
Expand Down Expand Up @@ -974,7 +974,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe
}

test("SPARK-14252: getOrElseUpdate should still read from remote storage") {
val store = spy(makeBlockManager(8000, "executor1"))
val store = spy[BlockManager](makeBlockManager(8000, "executor1"))
val store2 = makeBlockManager(8000, "executor2")
val list1 = List(new Array[Byte](4000))
val blockId = RDDBlockId(0, 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,22 @@ class PartiallySerializedBlockSuite
numItemsToBuffer: Int): PartiallySerializedBlock[T] = {

val bbos: ChunkedByteBufferOutputStream = {
val spy = Mockito.spy(new ChunkedByteBufferOutputStream(128, ByteBuffer.allocate))
val spy = Mockito.spy[ChunkedByteBufferOutputStream](
new ChunkedByteBufferOutputStream(128, ByteBuffer.allocate))
Mockito.doAnswer { (invocationOnMock: InvocationOnMock) =>
Mockito.spy(invocationOnMock.callRealMethod().asInstanceOf[ChunkedByteBuffer])
Mockito.spy[ChunkedByteBuffer](
invocationOnMock.callRealMethod().asInstanceOf[ChunkedByteBuffer])
}.when(spy).toChunkedByteBuffer
spy
}

val serializer = serializerManager
.getSerializer(implicitly[ClassTag[T]], autoPick = true).newInstance()
val redirectableOutputStream = Mockito.spy(new RedirectableOutputStream)
val redirectableOutputStream = Mockito.spy[RedirectableOutputStream](
new RedirectableOutputStream)
redirectableOutputStream.setOutputStream(bbos)
val serializationStream = Mockito.spy(serializer.serializeStream(redirectableOutputStream))
val serializationStream = Mockito.spy[SerializationStream](
serializer.serializeStream(redirectableOutputStream))

(1 to numItemsToBuffer).foreach { _ =>
assert(iter.hasNext)
Expand Down Expand Up @@ -170,7 +174,7 @@ class PartiallySerializedBlockSuite

test(s"$testCaseName with finishWritingToStream() and numBuffered = $numItemsToBuffer") {
val partiallySerializedBlock = partiallyUnroll(items.iterator, numItemsToBuffer)
val bbos = Mockito.spy(new ByteBufferOutputStream())
val bbos = Mockito.spy[ByteBufferOutputStream](new ByteBufferOutputStream())
partiallySerializedBlock.finishWritingToStream(bbos)

Mockito.verify(memoryStore).releaseUnrollMemoryForThisTask(
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-3-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ netty-transport-native-kqueue/4.1.92.Final/osx-aarch_64/netty-transport-native-k
netty-transport-native-kqueue/4.1.92.Final/osx-x86_64/netty-transport-native-kqueue-4.1.92.Final-osx-x86_64.jar
netty-transport-native-unix-common/4.1.92.Final//netty-transport-native-unix-common-4.1.92.Final.jar
netty-transport/4.1.92.Final//netty-transport-4.1.92.Final.jar
objenesis/3.2//objenesis-3.2.jar
objenesis/3.3//objenesis-3.3.jar
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change brought about by mockito version change

okhttp/3.12.12//okhttp-3.12.12.jar
okio/1.15.0//okio-1.15.0.jar
opencsv/2.3//opencsv-2.3.jar
Expand Down
Loading