Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2079,7 +2079,7 @@ private[spark] object Utils

val CONNECT_EXECUTE_THREAD_PREFIX = "SparkConnectExecuteThread"

private val threadInfoOrdering = Ordering.fromLessThan {
private[spark] val threadInfoOrdering = Ordering.fromLessThan {
(threadTrace1: ThreadInfo, threadTrace2: ThreadInfo) => {
def priority(ti: ThreadInfo): Int = ti.getThreadName match {
case name if name.startsWith(TASK_THREAD_NAME_PREFIX) => 100
Expand Down
51 changes: 24 additions & 27 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.audit.CommonAuditContext.currentAuditContext
import org.apache.hadoop.ipc.{CallerContext => HadoopCallerContext}
import org.apache.logging.log4j.Level
import org.mockito.Mockito.doReturn
import org.scalatest.PrivateMethodTester
import org.mockito.Mockito.when
import org.scalatestplus.mockito.MockitoSugar.mock

import org.apache.spark.{SparkConf, SparkException, SparkFunSuite, TaskContext}
Expand All @@ -51,7 +50,7 @@ import org.apache.spark.scheduler.SparkListener
import org.apache.spark.util.collection.Utils.createArray
import org.apache.spark.util.io.ChunkedByteBufferInputStream

class UtilsSuite extends SparkFunSuite with ResetSystemProperties with PrivateMethodTester {
class UtilsSuite extends SparkFunSuite with ResetSystemProperties {

test("timeConversion") {
// Test -1
Expand Down Expand Up @@ -1132,37 +1131,35 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with PrivateMe

test("ThreadInfoOrdering") {
val task1T = mock[ThreadInfo]
doReturn(11L).when(task1T).getThreadId
doReturn("Executor task launch worker for task 1.0 in stage 1.0 (TID 11)")
.when(task1T).getThreadName
doReturn("Executor task launch worker for task 1.0 in stage 1.0 (TID 11)")
.when(task1T).toString
when(task1T.getThreadId).thenReturn(11L)
when(task1T.getThreadName)
.thenReturn("Executor task launch worker for task 1.0 in stage 1.0 (TID 11)")
when(task1T.toString)
.thenReturn("Executor task launch worker for task 1.0 in stage 1.0 (TID 11)")

val task2T = mock[ThreadInfo]
doReturn(12L).when(task2T).getThreadId
doReturn("Executor task launch worker for task 2.0 in stage 1.0 (TID 22)")
.when(task2T).getThreadName
doReturn("Executor task launch worker for task 2.0 in stage 1.0 (TID 22)")
.when(task2T).toString
when(task2T.getThreadId).thenReturn(12L)
when(task2T.getThreadName)
.thenReturn("Executor task launch worker for task 2.0 in stage 1.0 (TID 22)")
when(task2T.toString)
.thenReturn("Executor task launch worker for task 2.0 in stage 1.0 (TID 22)")

val connectExecuteOp1T = mock[ThreadInfo]
doReturn(21L).when(connectExecuteOp1T).getThreadId
doReturn("SparkConnectExecuteThread_opId=16148fb4-4189-43c3-b8d4-8b3b6ddd41c7")
.when(connectExecuteOp1T).getThreadName
doReturn("SparkConnectExecuteThread_opId=16148fb4-4189-43c3-b8d4-8b3b6ddd41c7")
.when(connectExecuteOp1T).toString
when(connectExecuteOp1T.getThreadId).thenReturn(21L)
when(connectExecuteOp1T.getThreadName)
.thenReturn("SparkConnectExecuteThread_opId=16148fb4-4189-43c3-b8d4-8b3b6ddd41c7")
when(connectExecuteOp1T.toString)
.thenReturn("SparkConnectExecuteThread_opId=16148fb4-4189-43c3-b8d4-8b3b6ddd41c7")

val connectExecuteOp2T = mock[ThreadInfo]
doReturn(22L).when(connectExecuteOp2T).getThreadId
doReturn("SparkConnectExecuteThread_opId=4e4d1cac-ffde-46c1-b7c2-808b726cb47e")
.when(connectExecuteOp2T).getThreadName
doReturn("SparkConnectExecuteThread_opId=4e4d1cac-ffde-46c1-b7c2-808b726cb47e")
.when(connectExecuteOp2T).toString

val threadInfoOrderingMethod =
PrivateMethod[Ordering[ThreadInfo]](Symbol("threadInfoOrdering"))
when(connectExecuteOp2T.getThreadId).thenReturn(22L)
when(connectExecuteOp2T.getThreadName)
.thenReturn("SparkConnectExecuteThread_opId=4e4d1cac-ffde-46c1-b7c2-808b726cb47e")
when(connectExecuteOp2T.toString)
.thenReturn("SparkConnectExecuteThread_opId=4e4d1cac-ffde-46c1-b7c2-808b726cb47e")

val sorted = Seq(connectExecuteOp1T, connectExecuteOp2T, task1T, task2T)
.sorted(Utils.invokePrivate(threadInfoOrderingMethod()))
.sorted(Utils.threadInfoOrdering)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... this looks simple, but I was told to prefer using invokePrivate to avoid exposing private methods/fields, not sure which is the best practice for this case.

cc @LuciferYang for more inputs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private[spark] is not public.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appears to be a new piece of code from the 4.1 cycle rather than legacy code. If we also consider it acceptable for third-party libraries to use threadInfoOrdering within the spark package, then this modification is acceptable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

third-party lib can do whatever they want using reflection, the point is what Spark guarantees/promises. Apparently, there is no compatibility guarantee/promise for private[spark] APIs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reasoning is correct.

assert(sorted === Seq(task1T, task2T, connectExecuteOp1T, connectExecuteOp2T))
}

Expand Down