Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 6 additions & 30 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3297,21 +3297,8 @@ private[spark] object Utils extends Logging {
}

private[util] object CallerContext extends Logging {
val callerContextSupported: Boolean = {
SparkHadoopUtil.get.conf.getBoolean("hadoop.caller.context.enabled", false) && {
try {
Utils.classForName("org.apache.hadoop.ipc.CallerContext")
Utils.classForName("org.apache.hadoop.ipc.CallerContext$Builder")
true
} catch {
case _: ClassNotFoundException =>
false
case NonFatal(e) =>
logWarning("Fail to load the CallerContext class", e)
false
}
}
}
val callerContextEnabled: Boolean =
SparkHadoopUtil.get.conf.getBoolean("hadoop.caller.context.enabled", false)
}

/**
Expand Down Expand Up @@ -3371,22 +3358,11 @@ private[spark] class CallerContext(

/**
* Set up the caller context [[context]] by invoking Hadoop CallerContext API of
* [[org.apache.hadoop.ipc.CallerContext]], which was added in hadoop 2.8.
* [[org.apache.hadoop.ipc.CallerContext]].
*/
def setCurrentContext(): Unit = {
if (CallerContext.callerContextSupported) {
try {
val callerContext = Utils.classForName("org.apache.hadoop.ipc.CallerContext")
val builder: Class[AnyRef] =
Utils.classForName("org.apache.hadoop.ipc.CallerContext$Builder")
val builderInst = builder.getConstructor(classOf[String]).newInstance(context)
val hdfsContext = builder.getMethod("build").invoke(builderInst)
callerContext.getMethod("setCurrent", callerContext).invoke(null, hdfsContext)
} catch {
case NonFatal(e) =>
logWarning("Fail to set Spark caller context", e)
}
}
def setCurrentContext(): Unit = if (CallerContext.callerContextEnabled) {
val hdfsContext = new org.apache.hadoop.ipc.CallerContext.Builder(context).build()
org.apache.hadoop.ipc.CallerContext.setCurrent(hdfsContext)
Copy link
Contributor

@mridulm mridulm Apr 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Instead of using the fully qualified name (which made sense in reflection code earlier), we should have used a renamed import statement.
Something like:

import org.apache.hadoop.ipc.{CallerContext => HadoopCallerContext}
import org.apache.hadoop.ipc.CallerContext.{Builder => HadoopCallerContextBuilder}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mridulm, opened #40891 to address it.

}
}

Expand Down
6 changes: 2 additions & 4 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -957,10 +957,8 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties {
test("Set Spark CallerContext") {
val context = "test"
new CallerContext(context).setCurrentContext()
if (CallerContext.callerContextSupported) {
val callerContext = Utils.classForName("org.apache.hadoop.ipc.CallerContext")
assert(s"SPARK_$context" ===
callerContext.getMethod("getCurrent").invoke(null).toString)
if (CallerContext.callerContextEnabled) {
assert(s"SPARK_$context" === org.apache.hadoop.ipc.CallerContext.getCurrent.toString)
}
}

Expand Down