Skip to content

Commit

Permalink
[SPARK-35832][CORE][ML][K8S][TESTS] Add LocalRootDirsTest trait
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

To make the test suite more robust, this PR aims to add a new trait, `LocalRootDirsTest`, by refactoring `SortShuffleSuite`'s helper functions and applying it to the following:
- ShuffleNettySuite
- ShuffleOldFetchProtocolSuite
- ExternalShuffleServiceSuite
- KubernetesLocalDiskShuffleDataIOSuite
- LocalDirsSuite
- RDDCleanerSuite
- ALSCleanerSuite

In addition, this fixes a UT in `KubernetesLocalDiskShuffleDataIOSuite`.

### Why are the changes needed?

`ShuffleSuite` is extended by four classes but only `SortShuffleSuite` does the clean-up correctly.
```
ShuffleSuite
- SortShuffleSuite
- ShuffleNettySuite
- ShuffleOldFetchProtocolSuite
- ExternalShuffleServiceSuite
```

Since `KubernetesLocalDiskShuffleDataIOSuite` is looking for the other storage directory, the leftover of `ShuffleSuite` causes flakiness.
- https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-3.2/2649/testReport/junit/org.apache.spark.shuffle/KubernetesLocalDiskShuffleDataIOSuite/recompute_is_not_blocked_by_the_recovery/
```
org.apache.spark.SparkException: Job aborted due to stage failure: task 0.0 in stage 1.0 (TID 3) had a not serializable result: org.apache.spark.ShuffleSuite$NonJavaSerializableClass
...
org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIOSuite.$anonfun$new$2(KubernetesLocalDiskShuffleDataIOSuite.scala:52)
```

For the other suites, the clean-up implementation is used but not complete. So, they are refactored to use new trait.

### Does this PR introduce _any_ user-facing change?

No, this is a test-only change.

### How was this patch tested?

Pass the CIs.

Closes apache#32986 from dongjoon-hyun/SPARK-35832.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
dongjoon-hyun authored and JQ-Cao committed Sep 8, 2021
1 parent ba6d017 commit a760b8d
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 68 deletions.
55 changes: 55 additions & 0 deletions core/src/test/scala/org/apache/spark/LocalRootDirsTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import java.io.File
import java.util.UUID

import org.apache.spark.util.Utils


trait LocalRootDirsTest extends SparkFunSuite with LocalSparkContext {

val conf = new SparkConf(loadDefaults = false)

protected var tempDir: File = _

override def beforeAll(): Unit = {
super.beforeAll()
// Once 'spark.local.dir' is set, it is cached. Unless this is manually cleared
// before/after a test, it could return the same directory even if this property
// is configured.
Utils.clearLocalRootDirs()
}

override def beforeEach(): Unit = {
super.beforeEach()
tempDir = Utils.createTempDir(namePrefix = "local")
conf.set("spark.local.dir",
tempDir.getAbsolutePath + File.separator + UUID.randomUUID().toString)
}

override def afterEach(): Unit = {
try {
Utils.deleteRecursively(tempDir)
Utils.clearLocalRootDirs()
} finally {
super.afterEach()
}
}
}
4 changes: 1 addition & 3 deletions core/src/test/scala/org/apache/spark/ShuffleSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ import org.apache.spark.shuffle.ShuffleWriter
import org.apache.spark.storage.{ShuffleBlockId, ShuffleDataBlockId, ShuffleIndexBlockId}
import org.apache.spark.util.MutablePair

abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkContext {

val conf = new SparkConf(loadDefaults = false)
abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalRootDirsTest {

// Ensure that the DAGScheduler doesn't retry stages whose fetches fail, so that we accurately
// test that the shuffle works (rather than retrying until all blocks are local to one Executor).
Expand Down
27 changes: 2 additions & 25 deletions core/src/test/scala/org/apache/spark/SortShuffleSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,40 +26,17 @@ import org.apache.commons.io.filefilter.TrueFileFilter
import org.scalatest.BeforeAndAfterAll
import org.scalatest.matchers.should.Matchers._

import org.apache.spark.internal.config
import org.apache.spark.internal.config.SHUFFLE_MANAGER
import org.apache.spark.rdd.ShuffledRDD
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.util.Utils

class SortShuffleSuite extends ShuffleSuite with BeforeAndAfterAll {

// This test suite should run all tests in ShuffleSuite with sort-based shuffle.

private var tempDir: File = _

override def beforeAll(): Unit = {
super.beforeAll()
// Once 'spark.local.dir' is set, it is cached. Unless this is manually cleared
// before/after a test, it could return the same directory even if this property
// is configured.
Utils.clearLocalRootDirs()
conf.set(config.SHUFFLE_MANAGER, "sort")
}

override def beforeEach(): Unit = {
super.beforeEach()
tempDir = Utils.createTempDir()
conf.set("spark.local.dir", tempDir.getAbsolutePath)
}

override def afterEach(): Unit = {
try {
Utils.deleteRecursively(tempDir)
Utils.clearLocalRootDirs()
} finally {
super.afterEach()
}
conf.set(SHUFFLE_MANAGER, "sort")
}

test("SortShuffleManager properly cleans up files for shuffles that use the serialized path") {
Expand Down
15 changes: 1 addition & 14 deletions core/src/test/scala/org/apache/spark/rdd/RDDCleanerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,12 @@ import scala.collection.JavaConverters._

import org.apache.commons.io.FileUtils
import org.apache.commons.io.filefilter.TrueFileFilter
import org.scalatest.BeforeAndAfterEach

import org.apache.spark._
import org.apache.spark.util.Utils


class RDDCleanerSuite extends SparkFunSuite with BeforeAndAfterEach {
override def beforeEach(): Unit = {
super.beforeEach()
// Once `Utils.getOrCreateLocalRootDirs` is called, it is cached in `Utils.localRootDirs`.
// Unless this is manually cleared before and after a test, it returns the same directory
// set before even if 'spark.local.dir' is configured afterwards.
Utils.clearLocalRootDirs()
}

override def afterEach(): Unit = {
Utils.clearLocalRootDirs()
super.afterEach()
}
class RDDCleanerSuite extends SparkFunSuite with LocalRootDirsTest {

test("RDD shuffle cleanup standalone") {
val conf = new SparkConf()
Expand Down
14 changes: 2 additions & 12 deletions core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,13 @@ package org.apache.spark.storage

import java.io.{File, IOException}

import org.scalatest.BeforeAndAfter

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.{LocalRootDirsTest, SparkConf, SparkFunSuite}
import org.apache.spark.util.{SparkConfWithEnv, Utils}

/**
* Tests for the spark.local.dir and SPARK_LOCAL_DIRS configuration options.
*/
class LocalDirsSuite extends SparkFunSuite with BeforeAndAfter {

before {
Utils.clearLocalRootDirs()
}

after {
Utils.clearLocalRootDirs()
}
class LocalDirsSuite extends SparkFunSuite with LocalRootDirsTest {

private def assumeNonExistentAndNotCreatable(f: File): Unit = {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import scala.collection.mutable.{ArrayBuffer, WrappedArray}
import com.github.fommil.netlib.BLAS.{getInstance => blas}
import org.apache.commons.io.FileUtils
import org.apache.commons.io.filefilter.TrueFileFilter
import org.scalatest.BeforeAndAfterEach

import org.apache.spark._
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -970,19 +969,7 @@ class ALSSuite extends MLTest with DefaultReadWriteTest with Logging {
}
}

class ALSCleanerSuite extends SparkFunSuite with BeforeAndAfterEach {
override def beforeEach(): Unit = {
super.beforeEach()
// Once `Utils.getOrCreateLocalRootDirs` is called, it is cached in `Utils.localRootDirs`.
// Unless this is manually cleared before and after a test, it returns the same directory
// set before even if 'spark.local.dir' is configured afterwards.
Utils.clearLocalRootDirs()
}

override def afterEach(): Unit = {
Utils.clearLocalRootDirs()
super.afterEach()
}
class ALSCleanerSuite extends SparkFunSuite with LocalRootDirsTest {

test("ALS shuffle cleanup in algorithm") {
val conf = new SparkConf()
Expand Down

0 comments on commit a760b8d

Please sign in to comment.