Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
  • Loading branch information
Ngone51 committed Apr 28, 2021
1 parent 3ad3b52 commit 38976b7
Showing 1 changed file with 34 additions and 1 deletion.
35 changes: 34 additions & 1 deletion core/src/test/scala/org/apache/spark/ShuffleSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.spark

import java.io.File
import java.io.{File, FileOutputStream}
import java.nio.ByteBuffer
import java.util.{Locale, Properties}
import java.util.concurrent.{Callable, CyclicBarrier, Executors, ExecutorService }

Expand Down Expand Up @@ -302,6 +303,38 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
rdd.count()
}

test("SPARK-18188: shuffle checksum detect disk corruption") {
conf.set(config.SHUFFLE_CHECKSUM, true)
sc = new SparkContext("local-cluster[2, 1, 2048]", "test", conf)
val rdd = sc.parallelize(1 to 10, 2).map((_, 1)).reduceByKey(_ + _)
// materialize the shuffle map outputs
rdd.count()

sc.parallelize(1 to 10, 2).barrier().mapPartitions { iter =>
var dataFile = SparkEnv.get.blockManager
.diskBlockManager.getFile(ShuffleDataBlockId(0, 0, 0))
if (!dataFile.exists()) {
dataFile = SparkEnv.get.blockManager
.diskBlockManager.getFile(ShuffleDataBlockId(0, 1, 0))
}

if (dataFile.exists()) {
val f = new FileOutputStream(dataFile, true)
val ch = f.getChannel
// corrupt the shuffle data files by writing some arbitrary bytes
ch.write(ByteBuffer.wrap(Array[Byte](12)), 0)
ch.close()
}
BarrierTaskContext.get().barrier()
iter
}.collect()

val e = intercept[SparkException] {
rdd.count()
}
assert(e.getMessage.contains("corrupted due to DISK issue"))
}

test("cannot find its local shuffle file if no execution of the stage and rerun shuffle") {
sc = new SparkContext("local", "test", conf.clone())
val rdd = sc.parallelize(1 to 10, 1).map((_, 1)).reduceByKey(_ + _)
Expand Down

0 comments on commit 38976b7

Please sign in to comment.