Skip to content

Commit

Permalink
[SPARK-39196][CORE][SQL][K8S] replace getOrElse(null) with orNull
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR aims to replace `getOrElse(null)` with `orNull`.

### Why are the changes needed?

Code simplification.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the GA.

Closes #36567 from dcoliversun/SPARK-39196.

Authored-by: Qian.Sun <qian.sun2020@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
  • Loading branch information
dcoliversun authored and srowen committed May 17, 2022
1 parent 98fad57 commit b4c0196
Show file tree
Hide file tree
Showing 14 changed files with 17 additions and 18 deletions.
Expand Up @@ -162,7 +162,7 @@ private[kafka010] class KafkaSource(
}

override def reportLatestOffset(): streaming.Offset = {
latestPartitionOffsets.map(KafkaSourceOffset(_)).getOrElse(null)
latestPartitionOffsets.map(KafkaSourceOffset(_)).orNull
}

override def latestOffset(startOffset: streaming.Offset, limit: ReadLimit): streaming.Offset = {
Expand Down
Expand Up @@ -556,7 +556,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
val obj = new Array[Byte](exLength)
stream.readFully(obj)
new PythonException(new String(obj, StandardCharsets.UTF_8),
writerThread.exception.getOrElse(null))
writerThread.exception.orNull)
}

protected def handleEndOfDataSection(): Unit = {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Expand Up @@ -1816,7 +1816,7 @@ abstract class RDD[T: ClassTag](
*/
@Experimental
@Since("3.1.0")
def getResourceProfile(): ResourceProfile = resourceProfile.getOrElse(null)
def getResourceProfile(): ResourceProfile = resourceProfile.orNull

// =======================================================================
// Other internal methods and fields
Expand Down
Expand Up @@ -1272,7 +1272,7 @@ private[spark] object JsonProtocol {
val properties = new Properties
mapFromJson(json).foreach { case (k, v) => properties.setProperty(k, v) }
properties
}.getOrElse(null)
}.orNull
}

def UUIDFromJson(json: JValue): UUID = {
Expand Down
Expand Up @@ -81,7 +81,7 @@ object TriangleCount {

// join the sets with the graph
val setGraph: Graph[VertexSet, ED] = graph.outerJoinVertices(nbrSets) {
(vid, _, optSet) => optSet.getOrElse(null)
(vid, _, optSet) => optSet.orNull
}

// Edge function computes intersection of smaller vertex with larger vertex
Expand Down
Expand Up @@ -88,7 +88,7 @@ private[spark] object SparkKubernetesClientFactory extends Logging {
// Start from an auto-configured config with the desired context
// Fabric 8 uses null to indicate that the users current context should be used so if no
// explicit setting pass null
val config = new ConfigBuilder(autoConfigure(kubeContext.getOrElse(null)))
val config = new ConfigBuilder(autoConfigure(kubeContext.orNull))
.withApiVersion("v1")
.withMasterUrl(master)
.withRequestTimeout(clientType.requestTimeout(sparkConf))
Expand Down
Expand Up @@ -156,8 +156,8 @@ class KubernetesSuite extends SparkFunSuite
// Try the spark test home
sys.props("spark.test.home")
)
val sparkDirProp = possible_spark_dirs.filter(x =>
new File(Paths.get(x).toFile, "bin/spark-submit").exists).headOption.getOrElse(null)
val sparkDirProp = possible_spark_dirs.find(x =>
new File(Paths.get(x).toFile, "bin/spark-submit").exists).orNull
require(sparkDirProp != null,
s"Spark home directory must be provided in system properties tested $possible_spark_dirs")
sparkHomeDir = Paths.get(sparkDirProp)
Expand Down
Expand Up @@ -43,8 +43,7 @@ private[spark] class KubeConfigBackend(var context: String)

// If an explicit master URL was specified then override that detected from the
// K8S config if it is different
var masterUrl = Option(System.getProperty(TestConstants.CONFIG_KEY_KUBE_MASTER_URL))
.getOrElse(null)
var masterUrl = Option(System.getProperty(TestConstants.CONFIG_KEY_KUBE_MASTER_URL)).orNull
if (StringUtils.isNotBlank(masterUrl)) {
// Clean up master URL which would have been specified in Spark format into a normal
// K8S master URL
Expand Down
Expand Up @@ -1123,8 +1123,8 @@ case class MapZipWith(left: Expression, right: Expression, function: Expression)
val valueData2 = mapData2.valueArray()
var i = 0
for ((key, Array(index1, index2)) <- keysWithIndexes) {
val v1 = index1.map(valueData1.get(_, leftValueType)).getOrElse(null)
val v2 = index2.map(valueData2.get(_, rightValueType)).getOrElse(null)
val v1 = index1.map(valueData1.get(_, leftValueType)).orNull
val v2 = index2.map(valueData2.get(_, rightValueType)).orNull
keyVar.value.set(key)
value1Var.value.set(v1)
value2Var.value.set(v2)
Expand Down
Expand Up @@ -806,7 +806,7 @@ object QueryExecutionErrors extends QueryErrorsBase {
|Could not execute broadcast in $timeout secs. You can increase the timeout
|for broadcasts via ${SQLConf.BROADCAST_TIMEOUT.key} or disable broadcast join
|by setting ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key} to -1
""".stripMargin.replaceAll("\n", " "), ex.getOrElse(null))
""".stripMargin.replaceAll("\n", " "), ex.orNull)
}

def cannotCompareCostWithTargetCostError(cost: String): Throwable = {
Expand Down
Expand Up @@ -66,7 +66,7 @@ class AnsiTypeCoercionSuite extends TypeCoercionSuiteBase {
val input = Literal("123")
val castResult = AnsiTypeCoercion.implicitCast(input, to)
assert(DataType.equalsIgnoreCaseAndNullability(
castResult.map(_.dataType).getOrElse(null), expected),
castResult.map(_.dataType).orNull, expected),
s"Failed to cast String literal to $to")
}

Expand Down
Expand Up @@ -47,13 +47,13 @@ abstract class TypeCoercionSuiteBase extends AnalysisTest {
// Check default value
val castDefault = implicitCast(default(from), to)
assert(DataType.equalsIgnoreCompatibleNullability(
castDefault.map(_.dataType).getOrElse(null), expected),
castDefault.map(_.dataType).orNull, expected),
s"Failed to cast $from to $to")

// Check null value
val castNull = implicitCast(createNull(from), to)
assert(DataType.equalsIgnoreCaseAndNullability(
castNull.map(_.dataType).getOrElse(null), expected),
castNull.map(_.dataType).orNull, expected),
s"Failed to cast $from to $to")
}

Expand Down
Expand Up @@ -38,7 +38,7 @@ private[ui] class ThriftServerSessionPage(parent: ThriftServerTab)
require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")

val content = store.synchronized { // make sure all parts in this page are consistent
val sessionStat = store.getSession(parameterId).getOrElse(null)
val sessionStat = store.getSession(parameterId).orNull
require(sessionStat != null, "Invalid sessionID[" + parameterId + "]")

generateBasicStats() ++
Expand Down
Expand Up @@ -190,7 +190,7 @@ private[streaming] abstract class ReceiverSupervisor(
// This is a blocking action so we should use "futureExecutionContext" which is a cached
// thread pool.
logWarning("Restarting receiver with delay " + delay + " ms: " + message,
error.getOrElse(null))
error.orNull)
stopReceiver("Restarting receiver with delay " + delay + "ms: " + message, error)
logDebug("Sleeping for " + delay)
Thread.sleep(delay)
Expand Down

0 comments on commit b4c0196

Please sign in to comment.