From a760b8d5d4f3cea9528bde3ac86d88c8903aaa5b Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 21 Jun 2021 01:53:53 +0800 Subject: [PATCH] [SPARK-35832][CORE][ML][K8S][TESTS] Add LocalRootDirsTest trait ### What changes were proposed in this pull request? To make the test suite more robust, this PR aims to add a new trait, `LocalRootDirsTest`, by refactoring `SortShuffleSuite`'s helper functions and applying it to the following: - ShuffleNettySuite - ShuffleOldFetchProtocolSuite - ExternalShuffleServiceSuite - KubernetesLocalDiskShuffleDataIOSuite - LocalDirsSuite - RDDCleanerSuite - ALSCleanerSuite In addition, this fixes a UT in `KubernetesLocalDiskShuffleDataIOSuite`. ### Why are the changes needed? `ShuffleSuite` is extended by four classes but only `SortShuffleSuite` does the clean-up correctly. ``` ShuffleSuite - SortShuffleSuite - ShuffleNettySuite - ShuffleOldFetchProtocolSuite - ExternalShuffleServiceSuite ``` Since `KubernetesLocalDiskShuffleDataIOSuite` is looking for the other storage directory, the leftover of `ShuffleSuite` causes flakiness. - https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-3.2/2649/testReport/junit/org.apache.spark.shuffle/KubernetesLocalDiskShuffleDataIOSuite/recompute_is_not_blocked_by_the_recovery/ ``` org.apache.spark.SparkException: Job aborted due to stage failure: task 0.0 in stage 1.0 (TID 3) had a not serializable result: org.apache.spark.ShuffleSuite$NonJavaSerializableClass ... org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIOSuite.$anonfun$new$2(KubernetesLocalDiskShuffleDataIOSuite.scala:52) ``` For the other suites, the clean-up implementation is used but not complete. So, they are refactored to use new trait. ### Does this PR introduce _any_ user-facing change? No, this is a test-only change. ### How was this patch tested? Pass the CIs. Closes #32986 from dongjoon-hyun/SPARK-35832. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- .../org/apache/spark/LocalRootDirsTest.scala | 55 +++++++++++++++++++ .../scala/org/apache/spark/ShuffleSuite.scala | 4 +- .../org/apache/spark/SortShuffleSuite.scala | 27 +-------- .../apache/spark/rdd/RDDCleanerSuite.scala | 15 +---- .../apache/spark/storage/LocalDirsSuite.scala | 14 +---- .../spark/ml/recommendation/ALSSuite.scala | 15 +---- 6 files changed, 62 insertions(+), 68 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/LocalRootDirsTest.scala diff --git a/core/src/test/scala/org/apache/spark/LocalRootDirsTest.scala b/core/src/test/scala/org/apache/spark/LocalRootDirsTest.scala new file mode 100644 index 0000000000000..3a813f4d8b53c --- /dev/null +++ b/core/src/test/scala/org/apache/spark/LocalRootDirsTest.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.io.File +import java.util.UUID + +import org.apache.spark.util.Utils + + +trait LocalRootDirsTest extends SparkFunSuite with LocalSparkContext { + + val conf = new SparkConf(loadDefaults = false) + + protected var tempDir: File = _ + + override def beforeAll(): Unit = { + super.beforeAll() + // Once 'spark.local.dir' is set, it is cached. Unless this is manually cleared + // before/after a test, it could return the same directory even if this property + // is configured. + Utils.clearLocalRootDirs() + } + + override def beforeEach(): Unit = { + super.beforeEach() + tempDir = Utils.createTempDir(namePrefix = "local") + conf.set("spark.local.dir", + tempDir.getAbsolutePath + File.separator + UUID.randomUUID().toString) + } + + override def afterEach(): Unit = { + try { + Utils.deleteRecursively(tempDir) + Utils.clearLocalRootDirs() + } finally { + super.afterEach() + } + } +} diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index 31cddc4c96491..b1239ed2a73fc 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -39,9 +39,7 @@ import org.apache.spark.shuffle.ShuffleWriter import org.apache.spark.storage.{ShuffleBlockId, ShuffleDataBlockId, ShuffleIndexBlockId} import org.apache.spark.util.MutablePair -abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkContext { - - val conf = new SparkConf(loadDefaults = false) +abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalRootDirsTest { // Ensure that the DAGScheduler doesn't retry stages whose fetches fail, so that we accurately // test that the shuffle works (rather than retrying until all blocks are local to one Executor). diff --git a/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala b/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala index 3d853ff4294be..571110784818f 100644 --- a/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala @@ -26,40 +26,17 @@ import org.apache.commons.io.filefilter.TrueFileFilter import org.scalatest.BeforeAndAfterAll import org.scalatest.matchers.should.Matchers._ -import org.apache.spark.internal.config +import org.apache.spark.internal.config.SHUFFLE_MANAGER import org.apache.spark.rdd.ShuffledRDD import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} import org.apache.spark.shuffle.sort.SortShuffleManager -import org.apache.spark.util.Utils class SortShuffleSuite extends ShuffleSuite with BeforeAndAfterAll { // This test suite should run all tests in ShuffleSuite with sort-based shuffle. - - private var tempDir: File = _ - override def beforeAll(): Unit = { super.beforeAll() - // Once 'spark.local.dir' is set, it is cached. Unless this is manually cleared - // before/after a test, it could return the same directory even if this property - // is configured. - Utils.clearLocalRootDirs() - conf.set(config.SHUFFLE_MANAGER, "sort") - } - - override def beforeEach(): Unit = { - super.beforeEach() - tempDir = Utils.createTempDir() - conf.set("spark.local.dir", tempDir.getAbsolutePath) - } - - override def afterEach(): Unit = { - try { - Utils.deleteRecursively(tempDir) - Utils.clearLocalRootDirs() - } finally { - super.afterEach() - } + conf.set(SHUFFLE_MANAGER, "sort") } test("SortShuffleManager properly cleans up files for shuffles that use the serialized path") { diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDCleanerSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDCleanerSuite.scala index 05240f95e616d..cfd646999eb6b 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDCleanerSuite.scala @@ -23,25 +23,12 @@ import scala.collection.JavaConverters._ import org.apache.commons.io.FileUtils import org.apache.commons.io.filefilter.TrueFileFilter -import org.scalatest.BeforeAndAfterEach import org.apache.spark._ import org.apache.spark.util.Utils -class RDDCleanerSuite extends SparkFunSuite with BeforeAndAfterEach { - override def beforeEach(): Unit = { - super.beforeEach() - // Once `Utils.getOrCreateLocalRootDirs` is called, it is cached in `Utils.localRootDirs`. - // Unless this is manually cleared before and after a test, it returns the same directory - // set before even if 'spark.local.dir' is configured afterwards. - Utils.clearLocalRootDirs() - } - - override def afterEach(): Unit = { - Utils.clearLocalRootDirs() - super.afterEach() - } +class RDDCleanerSuite extends SparkFunSuite with LocalRootDirsTest { test("RDD shuffle cleanup standalone") { val conf = new SparkConf() diff --git a/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala b/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala index 6883eb211efd6..4b22ec334e84b 100644 --- a/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala @@ -19,23 +19,13 @@ package org.apache.spark.storage import java.io.{File, IOException} -import org.scalatest.BeforeAndAfter - -import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.{LocalRootDirsTest, SparkConf, SparkFunSuite} import org.apache.spark.util.{SparkConfWithEnv, Utils} /** * Tests for the spark.local.dir and SPARK_LOCAL_DIRS configuration options. */ -class LocalDirsSuite extends SparkFunSuite with BeforeAndAfter { - - before { - Utils.clearLocalRootDirs() - } - - after { - Utils.clearLocalRootDirs() - } +class LocalDirsSuite extends SparkFunSuite with LocalRootDirsTest { private def assumeNonExistentAndNotCreatable(f: File): Unit = { try { diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index 28275eb06cf0d..6fe16f7b5ac87 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -27,7 +27,6 @@ import scala.collection.mutable.{ArrayBuffer, WrappedArray} import com.github.fommil.netlib.BLAS.{getInstance => blas} import org.apache.commons.io.FileUtils import org.apache.commons.io.filefilter.TrueFileFilter -import org.scalatest.BeforeAndAfterEach import org.apache.spark._ import org.apache.spark.internal.Logging @@ -970,19 +969,7 @@ class ALSSuite extends MLTest with DefaultReadWriteTest with Logging { } } -class ALSCleanerSuite extends SparkFunSuite with BeforeAndAfterEach { - override def beforeEach(): Unit = { - super.beforeEach() - // Once `Utils.getOrCreateLocalRootDirs` is called, it is cached in `Utils.localRootDirs`. - // Unless this is manually cleared before and after a test, it returns the same directory - // set before even if 'spark.local.dir' is configured afterwards. - Utils.clearLocalRootDirs() - } - - override def afterEach(): Unit = { - Utils.clearLocalRootDirs() - super.afterEach() - } +class ALSCleanerSuite extends SparkFunSuite with LocalRootDirsTest { test("ALS shuffle cleanup in algorithm") { val conf = new SparkConf()