From aace904cabcf0d6dfe169b61ed1a5d058feb47de Mon Sep 17 00:00:00 2001 From: Minchu Yang Date: Fri, 10 Sep 2021 18:45:09 -0700 Subject: [PATCH 1/3] [SPARK-36705] Disable push based shuffle when IO encryption is enabled or serializer is not relocatable --- .../scala/org/apache/spark/util/Utils.scala | 29 ++++++++++++++----- .../org/apache/spark/util/UtilsSuite.scala | 7 +++++ 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 5bbb47908c1ce..f68a845dde834 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -70,7 +70,7 @@ import org.apache.spark.internal.config.UI._ import org.apache.spark.internal.config.Worker._ import org.apache.spark.launcher.SparkLauncher import org.apache.spark.network.util.JavaUtils -import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} +import org.apache.spark.serializer.{DeserializationStream, SerializationStream, Serializer, SerializerInstance} import org.apache.spark.status.api.v1.{StackTrace, ThreadStackTrace} import org.apache.spark.util.io.ChunkedByteBufferOutputStream @@ -2597,14 +2597,29 @@ private[spark] object Utils extends Logging { } /** - * Push based shuffle can only be enabled when the application is submitted - * to run in YARN mode, with external shuffle service enabled + * Push based shuffle can only be enabled when below conditions are met: + * - the application is submitted to run in YARN mode + * - external shuffle service enabled + * - IO encryption disabled + * - serializer(such as KryoSerializer) supports relocation of serialized objects */ def isPushBasedShuffleEnabled(conf: SparkConf): Boolean = { - conf.get(PUSH_BASED_SHUFFLE_ENABLED) && - (conf.get(IS_TESTING).getOrElse(false) || - (conf.get(SHUFFLE_SERVICE_ENABLED) && - conf.get(SparkLauncher.SPARK_MASTER, null) == "yarn")) + val serializer = Utils.classForName(conf.get(SERIALIZER)).getConstructor(classOf[SparkConf]) + .newInstance(conf).asInstanceOf[Serializer] + val doPushBasedShuffle = + conf.get(PUSH_BASED_SHUFFLE_ENABLED) && + (conf.get(IS_TESTING).getOrElse(false) || + (conf.get(SHUFFLE_SERVICE_ENABLED) && + conf.get(SparkLauncher.SPARK_MASTER, null) == "yarn" && + !conf.get(IO_ENCRYPTION_ENABLED) && + serializer.supportsRelocationOfSerializedObjects)) + + if (!doPushBasedShuffle) { + logWarning("Push-based shuffle can only be enabled when the application is submitted" + + "to run in YARN mode, with external shuffle service enabled, IO encryption disabled, and" + + "relocation of serialized objects supported.") + } + doPushBasedShuffle } /** diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index c1b7b5f6775df..aa35708dfd62e 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -1510,9 +1510,16 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { conf.set(SHUFFLE_SERVICE_ENABLED, true) conf.set(SparkLauncher.SPARK_MASTER, "yarn") conf.set("spark.yarn.maxAttempts", "1") + conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer") assert(Utils.isPushBasedShuffleEnabled(conf) === true) conf.set("spark.yarn.maxAttempts", "2") assert(Utils.isPushBasedShuffleEnabled(conf) === true) + conf.set(IO_ENCRYPTION_ENABLED, true) + assert(Utils.isPushBasedShuffleEnabled(conf) === false) + conf.set(IO_ENCRYPTION_ENABLED, false) + assert(Utils.isPushBasedShuffleEnabled(conf) === true) + conf.set(SERIALIZER, "org.apache.spark.serializer.JavaSerializer") + assert(Utils.isPushBasedShuffleEnabled(conf) === false) } } From 9f4e4f445c8f8f4b66162b401ac0047f936ed2dd Mon Sep 17 00:00:00 2001 From: Minchu Yang Date: Fri, 10 Sep 2021 18:45:09 -0700 Subject: [PATCH 2/3] [SPARK-36705] Disable push based shuffle when IO encryption is enabled or serializer is not relocatable --- core/src/main/scala/org/apache/spark/util/Utils.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index f68a845dde834..bbff56c1172e9 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2606,20 +2606,21 @@ private[spark] object Utils extends Logging { def isPushBasedShuffleEnabled(conf: SparkConf): Boolean = { val serializer = Utils.classForName(conf.get(SERIALIZER)).getConstructor(classOf[SparkConf]) .newInstance(conf).asInstanceOf[Serializer] - val doPushBasedShuffle = + val canDoPushBasedShuffle = conf.get(PUSH_BASED_SHUFFLE_ENABLED) && (conf.get(IS_TESTING).getOrElse(false) || (conf.get(SHUFFLE_SERVICE_ENABLED) && conf.get(SparkLauncher.SPARK_MASTER, null) == "yarn" && + // TODO: [SPARK-36744] needs to support IO encryption for push-based shuffle !conf.get(IO_ENCRYPTION_ENABLED) && serializer.supportsRelocationOfSerializedObjects)) - if (!doPushBasedShuffle) { + if (!canDoPushBasedShuffle) { logWarning("Push-based shuffle can only be enabled when the application is submitted" + "to run in YARN mode, with external shuffle service enabled, IO encryption disabled, and" + "relocation of serialized objects supported.") } - doPushBasedShuffle + canDoPushBasedShuffle } /** From 0e5e229f87898e7d3f4c854cc4d171ab5e971fe9 Mon Sep 17 00:00:00 2001 From: Minchu Yang Date: Fri, 10 Sep 2021 18:45:09 -0700 Subject: [PATCH 3/3] [SPARK-36705] Disable push based shuffle when IO encryption is enabled or serializer is not relocatable In addition to disable push based shuffle when IO encryption is enabled or serializer is not relocatable, addressed a minor issue in the UT that key name for MAX_APP_ATTEMPTS is not right. --- core/src/test/scala/org/apache/spark/util/UtilsSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index aa35708dfd62e..a4df5cdea9c70 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -1509,10 +1509,10 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { assert(Utils.isPushBasedShuffleEnabled(conf) === false) conf.set(SHUFFLE_SERVICE_ENABLED, true) conf.set(SparkLauncher.SPARK_MASTER, "yarn") - conf.set("spark.yarn.maxAttempts", "1") + conf.set("spark.yarn.maxAppAttempts", "1") conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer") assert(Utils.isPushBasedShuffleEnabled(conf) === true) - conf.set("spark.yarn.maxAttempts", "2") + conf.set("spark.yarn.maxAppAttempts", "2") assert(Utils.isPushBasedShuffleEnabled(conf) === true) conf.set(IO_ENCRYPTION_ENABLED, true) assert(Utils.isPushBasedShuffleEnabled(conf) === false)