diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 5bbb47908c1ce..bbff56c1172e9 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -70,7 +70,7 @@ import org.apache.spark.internal.config.UI._ import org.apache.spark.internal.config.Worker._ import org.apache.spark.launcher.SparkLauncher import org.apache.spark.network.util.JavaUtils -import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} +import org.apache.spark.serializer.{DeserializationStream, SerializationStream, Serializer, SerializerInstance} import org.apache.spark.status.api.v1.{StackTrace, ThreadStackTrace} import org.apache.spark.util.io.ChunkedByteBufferOutputStream @@ -2597,14 +2597,30 @@ private[spark] object Utils extends Logging { } /** - * Push based shuffle can only be enabled when the application is submitted - * to run in YARN mode, with external shuffle service enabled + * Push based shuffle can only be enabled when below conditions are met: + * - the application is submitted to run in YARN mode + * - external shuffle service enabled + * - IO encryption disabled + * - serializer(such as KryoSerializer) supports relocation of serialized objects */ def isPushBasedShuffleEnabled(conf: SparkConf): Boolean = { - conf.get(PUSH_BASED_SHUFFLE_ENABLED) && - (conf.get(IS_TESTING).getOrElse(false) || - (conf.get(SHUFFLE_SERVICE_ENABLED) && - conf.get(SparkLauncher.SPARK_MASTER, null) == "yarn")) + val serializer = Utils.classForName(conf.get(SERIALIZER)).getConstructor(classOf[SparkConf]) + .newInstance(conf).asInstanceOf[Serializer] + val canDoPushBasedShuffle = + conf.get(PUSH_BASED_SHUFFLE_ENABLED) && + (conf.get(IS_TESTING).getOrElse(false) || + (conf.get(SHUFFLE_SERVICE_ENABLED) && + conf.get(SparkLauncher.SPARK_MASTER, null) == "yarn" && + // TODO: [SPARK-36744] needs to support IO encryption for push-based shuffle + !conf.get(IO_ENCRYPTION_ENABLED) && + serializer.supportsRelocationOfSerializedObjects)) + + if (!canDoPushBasedShuffle) { + logWarning("Push-based shuffle can only be enabled when the application is submitted" + + "to run in YARN mode, with external shuffle service enabled, IO encryption disabled, and" + + "relocation of serialized objects supported.") + } + canDoPushBasedShuffle } /** diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index c1b7b5f6775df..a4df5cdea9c70 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -1509,10 +1509,17 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { assert(Utils.isPushBasedShuffleEnabled(conf) === false) conf.set(SHUFFLE_SERVICE_ENABLED, true) conf.set(SparkLauncher.SPARK_MASTER, "yarn") - conf.set("spark.yarn.maxAttempts", "1") + conf.set("spark.yarn.maxAppAttempts", "1") + conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer") assert(Utils.isPushBasedShuffleEnabled(conf) === true) - conf.set("spark.yarn.maxAttempts", "2") + conf.set("spark.yarn.maxAppAttempts", "2") assert(Utils.isPushBasedShuffleEnabled(conf) === true) + conf.set(IO_ENCRYPTION_ENABLED, true) + assert(Utils.isPushBasedShuffleEnabled(conf) === false) + conf.set(IO_ENCRYPTION_ENABLED, false) + assert(Utils.isPushBasedShuffleEnabled(conf) === true) + conf.set(SERIALIZER, "org.apache.spark.serializer.JavaSerializer") + assert(Utils.isPushBasedShuffleEnabled(conf) === false) } }