From cf0c350daf12ce80fc781fd17fd15506d83c6d02 Mon Sep 17 00:00:00 2001 From: liulijia Date: Tue, 10 Oct 2017 18:31:47 +0800 Subject: [PATCH 1/8] allow user to filter out empty split in HadoopRDD --- .../main/scala/org/apache/spark/rdd/HadoopRDD.scala | 5 ++++- .../scala/org/apache/spark/rdd/NewHadoopRDD.scala | 5 ++++- core/src/test/scala/org/apache/spark/FileSuite.scala | 12 ++++++++++++ docs/configuration.md | 8 ++++++++ 4 files changed, 28 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 23b344230e490..a824fccc77004 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -196,7 +196,10 @@ class HadoopRDD[K, V]( // add the credentials here as this can be called before SparkContext initialized SparkHadoopUtil.get.addCredentials(jobConf) val inputFormat = getInputFormat(jobConf) - val inputSplits = inputFormat.getSplits(jobConf, minPartitions) + var inputSplits = inputFormat.getSplits(jobConf, minPartitions) + if (sparkContext.getConf.getBoolean("spark.hadoop.filterOutEmptySplit", false)) { + inputSplits = inputSplits.filter(_.getLength>0) + } val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { array(i) = new HadoopPartition(id, i, inputSplits(i)) diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 482875e6c1ac5..20e81223bb0bd 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -122,7 +122,10 @@ class NewHadoopRDD[K, V]( case _ => } val jobContext = new JobContextImpl(_conf, jobId) - val rawSplits = inputFormat.getSplits(jobContext).toArray + var rawSplits = inputFormat.getSplits(jobContext).toArray(Array.empty[InputSplit]) + if (sparkContext.getConf.getBoolean("spark.hadoop.filterOutEmptySplit", false)) { + rawSplits = rawSplits.filter(_.getLength>0) + } val result = new Array[Partition](rawSplits.size) for (i <- 0 until rawSplits.size) { result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 02728180ac82d..c50ccf4e68e79 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -510,4 +510,16 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { } } + test("spark.hadoop.filterOutEmptySplit") { + val sf = new SparkConf() + sf.setAppName("test").setMaster("local").set("spark.hadoop.filterOutEmptySplit", "true") + sc = new SparkContext(sf) + val emptyRDD = sc.parallelize(Array.empty[Tuple2[String, String]], 1) + emptyRDD.saveAsHadoopFile[TextOutputFormat[String, String]](tempDir.getPath + "/output") + assert(new File(tempDir.getPath + "/output/part-00000").exists() === true) + + val hadoopRDD = sc.textFile(tempDir.getPath + "/output/part-00000") + assert(hadoopRDD.partitions.length === 0) + } + } diff --git a/docs/configuration.md b/docs/configuration.md index 6e9fe591b70a3..9148c0b6af79c 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1210,6 +1210,14 @@ Apart from these, the following properties are also available, and may be useful This setting is ignored for jobs generated through Spark Streaming's StreamingContext, since data may need to be rewritten to pre-existing output directories during checkpoint recovery. + + spark.hadoop.filterOutEmptySplit + false + If set to true, HadoopRDD will not handle the split which its lenghth is 0. Maybe you will read an empty + hive table but has many empty files. If set to false, Spark generates many tasks to handle these empty files. + Sometimes, users maybe want to use SparkContext#textFile to handle a file stored in hadoop, and they don't + want to generate any task when this file is empty, they can set this configuration to true. + spark.storage.memoryMapThreshold 2m From 31a5d303f91839124f8957f75d4077be5410524c Mon Sep 17 00:00:00 2001 From: liulijia Date: Thu, 12 Oct 2017 17:13:39 +0800 Subject: [PATCH 2/8] change spark.hadoop.filterOutEmptySplit to spark.files.filterOutEmptySplit and add it to internal/config; add test case --- .../spark/internal/config/package.scala | 9 +++ .../org/apache/spark/rdd/HadoopRDD.scala | 9 +-- .../org/apache/spark/rdd/NewHadoopRDD.scala | 10 +-- .../scala/org/apache/spark/FileSuite.scala | 61 +++++++++++++++++-- docs/configuration.md | 16 ++--- 5 files changed, 83 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 5278e5e0fb270..f497850053ae2 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -270,6 +270,15 @@ package object config { .longConf .createWithDefault(4 * 1024 * 1024) + private [spark] val FILTER_OUT_EMPTY_SPLIT = ConfigBuilder("spark.files.filterOutEmptySplit") + .doc("If set to true, HadoopRDD/NewHadoopRDD will not handle the split which its length is 0." + + "Maybe you will read an empty hive table but has many empty files. If set to false, Spark " + + "generates many tasks to handle these empty files. Sometimes, users maybe want to use " + + "SparkContext#textFile to handle a file stored in hadoop, and they don't want to generate " + + "any task when this file is empty, they can set this configuration to true.") + .booleanConf + .createWithDefault(false) + private[spark] val SECRET_REDACTION_PATTERN = ConfigBuilder("spark.redaction.regex") .doc("Regex to decide which Spark configuration properties and environment variables in " + diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index a824fccc77004..30d6d236e2606 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -35,7 +35,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES +import org.apache.spark.internal.config.{FILTER_OUT_EMPTY_SPLIT, IGNORE_CORRUPT_FILES} import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD import org.apache.spark.scheduler.{HDFSCacheTaskLocation, HostTaskLocation} import org.apache.spark.storage.StorageLevel @@ -196,9 +196,10 @@ class HadoopRDD[K, V]( // add the credentials here as this can be called before SparkContext initialized SparkHadoopUtil.get.addCredentials(jobConf) val inputFormat = getInputFormat(jobConf) - var inputSplits = inputFormat.getSplits(jobConf, minPartitions) - if (sparkContext.getConf.getBoolean("spark.hadoop.filterOutEmptySplit", false)) { - inputSplits = inputSplits.filter(_.getLength>0) + val inputSplits = if (sparkContext.getConf.get(FILTER_OUT_EMPTY_SPLIT)) { + inputFormat.getSplits(jobConf, minPartitions).filter(_.getLength > 0) + } else { + inputFormat.getSplits(jobConf, minPartitions) } val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 20e81223bb0bd..f43e5bea8c94e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -21,6 +21,7 @@ import java.io.IOException import java.text.SimpleDateFormat import java.util.{Date, Locale} +import scala.collection.JavaConverters.asScalaBufferConverter import scala.reflect.ClassTag import org.apache.hadoop.conf.{Configurable, Configuration} @@ -34,7 +35,7 @@ import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES +import org.apache.spark.internal.config.{FILTER_OUT_EMPTY_SPLIT, IGNORE_CORRUPT_FILES} import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD import org.apache.spark.storage.StorageLevel import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager} @@ -122,9 +123,10 @@ class NewHadoopRDD[K, V]( case _ => } val jobContext = new JobContextImpl(_conf, jobId) - var rawSplits = inputFormat.getSplits(jobContext).toArray(Array.empty[InputSplit]) - if (sparkContext.getConf.getBoolean("spark.hadoop.filterOutEmptySplit", false)) { - rawSplits = rawSplits.filter(_.getLength>0) + val rawSplits = if (sparkContext.getConf.get(FILTER_OUT_EMPTY_SPLIT)) { + inputFormat.getSplits(jobContext).asScala.filter(_.getLength > 0) + } else { + inputFormat.getSplits(jobContext).asScala } val result = new Array[Partition](rawSplits.size) for (i <- 0 until rawSplits.size) { diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index c50ccf4e68e79..d28861224d1f3 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -31,7 +31,7 @@ import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat} import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat} -import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES +import org.apache.spark.internal.config.{FILTER_OUT_EMPTY_SPLIT, IGNORE_CORRUPT_FILES} import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD} import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils @@ -347,7 +347,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { } } - test ("allow user to disable the output directory existence checking (old Hadoop API") { + test ("allow user to disable the output directory existence checking (old Hadoop API)") { val sf = new SparkConf() sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false") sc = new SparkContext(sf) @@ -510,16 +510,65 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { } } - test("spark.hadoop.filterOutEmptySplit") { + test("allow user to filter out empty split (old Hadoop API)") { val sf = new SparkConf() - sf.setAppName("test").setMaster("local").set("spark.hadoop.filterOutEmptySplit", "true") + sf.setAppName("test").setMaster("local").set(FILTER_OUT_EMPTY_SPLIT, true) sc = new SparkContext(sf) + + // Ensure that if all of the splits are empty, we remove the splits correctly val emptyRDD = sc.parallelize(Array.empty[Tuple2[String, String]], 1) emptyRDD.saveAsHadoopFile[TextOutputFormat[String, String]](tempDir.getPath + "/output") assert(new File(tempDir.getPath + "/output/part-00000").exists() === true) + val hadoopRDD = sc.textFile(tempDir.getPath + "/output/part-*") + assert(hadoopRDD.partitions.length === 0) + + // Ensure that if no split is empty, we don't lose any splits + val randomRDD = sc.parallelize( + Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 2) + randomRDD.saveAsHadoopFile[TextOutputFormat[String, String]](tempDir.getPath + "/output1") + assert(new File(tempDir.getPath + "/output1/part-00001").exists() === true) + val hadoopRDD1 = sc.textFile(tempDir.getPath + "/output1/part-*") + assert(hadoopRDD1.partitions.length === 2) + + // Ensure that if part of the splits are empty, we remove the splits correctly + val randomRDD2 = sc.parallelize( + Array(("key1", "a"), ("key2", "a")), 5) + randomRDD2.saveAsHadoopFile[TextOutputFormat[String, String]]( + tempDir.getPath + "/output2") + assert(new File(tempDir.getPath + "/output2/part-00004").exists() === true) + val hadoopRDD2 = sc.textFile(tempDir.getPath + "/output2/part-*") + assert(hadoopRDD2.partitions.length === 2) + } + + test("allow user to filter out empty split (new Hadoop API)") { + val sf = new SparkConf() + sf.setAppName("test").setMaster("local").set(FILTER_OUT_EMPTY_SPLIT, true) + sc = new SparkContext(sf) - val hadoopRDD = sc.textFile(tempDir.getPath + "/output/part-00000") + // Ensure that if all of the splits are empty, we remove the splits correctly + val emptyRDD = sc.parallelize(Array.empty[Tuple2[String, String]], 1) + emptyRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]]( + tempDir.getPath + "/output") + assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true) + val hadoopRDD = sc.textFile(tempDir.getPath + "/output/part-r-*") assert(hadoopRDD.partitions.length === 0) - } + // Ensure that if no split is empty, we don't lose any splits + val randomRDD1 = sc.parallelize( + Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 2) + randomRDD1.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]]( + tempDir.getPath + "/output1") + assert(new File(tempDir.getPath + "/output1/part-r-00001").exists() === true) + val hadoopRDD1 = sc.textFile(tempDir.getPath + "/output1/part-r-*") + assert(hadoopRDD1.partitions.length === 2) + + // Ensure that if part of the splits are empty, we remove the splits correctly + val randomRDD2 = sc.parallelize( + Array(("key1", "a"), ("key2", "a")), 5) + randomRDD2.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]]( + tempDir.getPath + "/output2") + assert(new File(tempDir.getPath + "/output2/part-r-00004").exists() === true) + val hadoopRDD2 = sc.textFile(tempDir.getPath + "/output2/part-r-*") + assert(hadoopRDD2.partitions.length === 2) + } } diff --git a/docs/configuration.md b/docs/configuration.md index 9148c0b6af79c..c42d206aefab6 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1191,6 +1191,14 @@ Apart from these, the following properties are also available, and may be useful then the partitions with small files will be faster than partitions with bigger files. + + spark.files.filterOutEmptySplit + false + If set to true, HadoopRDD/NewHadoopRDD will not handle the split which its length is 0. Maybe you will read an empty + hive table but has many empty files. If set to false, Spark generates many tasks to handle these empty files. + Sometimes, users maybe want to use SparkContext#textFile to handle a file stored in hadoop, and they don't + want to generate any task when this file is empty, they can set this configuration to true. + spark.hadoop.cloneConf false @@ -1210,14 +1218,6 @@ Apart from these, the following properties are also available, and may be useful This setting is ignored for jobs generated through Spark Streaming's StreamingContext, since data may need to be rewritten to pre-existing output directories during checkpoint recovery. - - spark.hadoop.filterOutEmptySplit - false - If set to true, HadoopRDD will not handle the split which its lenghth is 0. Maybe you will read an empty - hive table but has many empty files. If set to false, Spark generates many tasks to handle these empty files. - Sometimes, users maybe want to use SparkContext#textFile to handle a file stored in hadoop, and they don't - want to generate any task when this file is empty, they can set this configuration to true. - spark.storage.memoryMapThreshold 2m From 4dcfd83612661ce47e8c2c1f33590c61dfe4e473 Mon Sep 17 00:00:00 2001 From: liulijia Date: Fri, 13 Oct 2017 01:01:37 +0800 Subject: [PATCH 3/8] simpfy code; use spark.files.ignoreEmptySplits as config name --- .../spark/internal/config/package.scala | 9 +- .../org/apache/spark/rdd/HadoopRDD.scala | 12 ++- .../org/apache/spark/rdd/NewHadoopRDD.scala | 12 ++- .../scala/org/apache/spark/FileSuite.scala | 99 +++++++++---------- docs/configuration.md | 8 -- 5 files changed, 61 insertions(+), 79 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index f497850053ae2..7dcef0067063a 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -270,12 +270,9 @@ package object config { .longConf .createWithDefault(4 * 1024 * 1024) - private [spark] val FILTER_OUT_EMPTY_SPLIT = ConfigBuilder("spark.files.filterOutEmptySplit") - .doc("If set to true, HadoopRDD/NewHadoopRDD will not handle the split which its length is 0." + - "Maybe you will read an empty hive table but has many empty files. If set to false, Spark " + - "generates many tasks to handle these empty files. Sometimes, users maybe want to use " + - "SparkContext#textFile to handle a file stored in hadoop, and they don't want to generate " + - "any task when this file is empty, they can set this configuration to true.") + private[spark] val IGNORE_EMPTY_SPLITS = ConfigBuilder("spark.files.ignoreEmptySplits") + .doc("If true, methods like that use HadoopRDD and NewHadoopRDD such as " + + "SparkContext.textFiles will not create a partition for input splits that are empty.") .booleanConf .createWithDefault(false) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 30d6d236e2606..1f33c0a2b709f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -35,7 +35,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.{FILTER_OUT_EMPTY_SPLIT, IGNORE_CORRUPT_FILES} +import org.apache.spark.internal.config.{IGNORE_CORRUPT_FILES, IGNORE_EMPTY_SPLITS} import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD import org.apache.spark.scheduler.{HDFSCacheTaskLocation, HostTaskLocation} import org.apache.spark.storage.StorageLevel @@ -134,6 +134,8 @@ class HadoopRDD[K, V]( private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES) + private val ignoreEmptySplits = sparkContext.getConf.get(IGNORE_EMPTY_SPLITS) + // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. protected def getJobConf(): JobConf = { val conf: Configuration = broadcastedConf.value.value @@ -195,11 +197,11 @@ class HadoopRDD[K, V]( val jobConf = getJobConf() // add the credentials here as this can be called before SparkContext initialized SparkHadoopUtil.get.addCredentials(jobConf) - val inputFormat = getInputFormat(jobConf) - val inputSplits = if (sparkContext.getConf.get(FILTER_OUT_EMPTY_SPLIT)) { - inputFormat.getSplits(jobConf, minPartitions).filter(_.getLength > 0) + val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions) + val inputSplits = if (ignoreEmptySplits) { + allInputSplits.filter(_.getLength > 0) } else { - inputFormat.getSplits(jobConf, minPartitions) + allInputSplits } val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index f43e5bea8c94e..db4eac1d0a775 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -35,7 +35,7 @@ import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.{FILTER_OUT_EMPTY_SPLIT, IGNORE_CORRUPT_FILES} +import org.apache.spark.internal.config.{IGNORE_CORRUPT_FILES, IGNORE_EMPTY_SPLITS} import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD import org.apache.spark.storage.StorageLevel import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager} @@ -90,6 +90,8 @@ class NewHadoopRDD[K, V]( private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES) + private val ignoreEmptySplits = sparkContext.getConf.get(IGNORE_EMPTY_SPLITS) + def getConf: Configuration = { val conf: Configuration = confBroadcast.value.value if (shouldCloneJobConf) { @@ -122,11 +124,11 @@ class NewHadoopRDD[K, V]( configurable.setConf(_conf) case _ => } - val jobContext = new JobContextImpl(_conf, jobId) - val rawSplits = if (sparkContext.getConf.get(FILTER_OUT_EMPTY_SPLIT)) { - inputFormat.getSplits(jobContext).asScala.filter(_.getLength > 0) + val allRowSplits = inputFormat.getSplits(new JobContextImpl(_conf, jobId)).asScala + val rawSplits = if (ignoreEmptySplits) { + allRowSplits.filter(_.getLength > 0) } else { - inputFormat.getSplits(jobContext).asScala + allRowSplits } val result = new Array[Partition](rawSplits.size) for (i <- 0 until rawSplits.size) { diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index d28861224d1f3..300d15807db8d 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -31,7 +31,7 @@ import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat} import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat} -import org.apache.spark.internal.config.{FILTER_OUT_EMPTY_SPLIT, IGNORE_CORRUPT_FILES} +import org.apache.spark.internal.config.{IGNORE_CORRUPT_FILES, IGNORE_EMPTY_SPLITS} import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD} import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils @@ -348,9 +348,9 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { } test ("allow user to disable the output directory existence checking (old Hadoop API)") { - val sf = new SparkConf() - sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false") - sc = new SparkContext(sf) + val conf = new SparkConf() + conf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false") + sc = new SparkContext(conf) val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1) randomRDD.saveAsTextFile(tempDir.getPath + "/output") assert(new File(tempDir.getPath + "/output/part-00000").exists() === true) @@ -380,9 +380,9 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { } test ("allow user to disable the output directory existence checking (new Hadoop API") { - val sf = new SparkConf() - sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false") - sc = new SparkContext(sf) + val conf = new SparkConf() + conf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false") + sc = new SparkContext(conf) val randomRDD = sc.parallelize( Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]]( @@ -510,65 +510,54 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { } } - test("allow user to filter out empty split (old Hadoop API)") { - val sf = new SparkConf() - sf.setAppName("test").setMaster("local").set(FILTER_OUT_EMPTY_SPLIT, true) - sc = new SparkContext(sf) + test("spark.files.ignoreEmptySplits work correctly (old Hadoop API)") { + val conf = new SparkConf() + conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, true) + sc = new SparkContext(conf) + + def testIgnoreEmptySplits(data: Array[Tuple2[String, String]], numSlices: Int, + outputSuffix: Int, checkPart: String, partitionLength: Int): Unit = { + val dataRDD = sc.parallelize(data, numSlices) + val output = new File(tempDir, "output" + outputSuffix) + dataRDD.saveAsHadoopFile[TextOutputFormat[String, String]](output.getPath) + assert(new File(output, checkPart).exists() === true) + val hadoopRDD = sc.textFile(new File(output, "part-*").getPath) + assert(hadoopRDD.partitions.length === partitionLength) + } // Ensure that if all of the splits are empty, we remove the splits correctly - val emptyRDD = sc.parallelize(Array.empty[Tuple2[String, String]], 1) - emptyRDD.saveAsHadoopFile[TextOutputFormat[String, String]](tempDir.getPath + "/output") - assert(new File(tempDir.getPath + "/output/part-00000").exists() === true) - val hadoopRDD = sc.textFile(tempDir.getPath + "/output/part-*") - assert(hadoopRDD.partitions.length === 0) + testIgnoreEmptySplits(Array.empty[Tuple2[String, String]], 1, 0, "part-00000", 0) // Ensure that if no split is empty, we don't lose any splits - val randomRDD = sc.parallelize( - Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 2) - randomRDD.saveAsHadoopFile[TextOutputFormat[String, String]](tempDir.getPath + "/output1") - assert(new File(tempDir.getPath + "/output1/part-00001").exists() === true) - val hadoopRDD1 = sc.textFile(tempDir.getPath + "/output1/part-*") - assert(hadoopRDD1.partitions.length === 2) + testIgnoreEmptySplits(Array(("key1", "a"), ("key2", "a"), ("key3", "b")), 2, 1, "part-00001", 2) // Ensure that if part of the splits are empty, we remove the splits correctly - val randomRDD2 = sc.parallelize( - Array(("key1", "a"), ("key2", "a")), 5) - randomRDD2.saveAsHadoopFile[TextOutputFormat[String, String]]( - tempDir.getPath + "/output2") - assert(new File(tempDir.getPath + "/output2/part-00004").exists() === true) - val hadoopRDD2 = sc.textFile(tempDir.getPath + "/output2/part-*") - assert(hadoopRDD2.partitions.length === 2) - } - - test("allow user to filter out empty split (new Hadoop API)") { - val sf = new SparkConf() - sf.setAppName("test").setMaster("local").set(FILTER_OUT_EMPTY_SPLIT, true) - sc = new SparkContext(sf) + testIgnoreEmptySplits(Array(("key1", "a"), ("key2", "a")), 5, 2, "part-00004", 2) + } + + test("spark.files.ignoreEmptySplits work correctly (new Hadoop API)") { + val conf = new SparkConf() + conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, true) + sc = new SparkContext(conf) + + def testIgnoreEmptySplits(data: Array[Tuple2[String, String]], numSlices: Int, + outputSuffix: Int, checkPart: String, partitionLength: Int): Unit = { + val dataRDD = sc.parallelize(data, numSlices) + val output = new File(tempDir, "output" + outputSuffix) + dataRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](output.getPath) + assert(new File(output, checkPart).exists() === true) + val hadoopRDD = sc.textFile(new File(output, "part-r-*").getPath) + assert(hadoopRDD.partitions.length === partitionLength) + } // Ensure that if all of the splits are empty, we remove the splits correctly - val emptyRDD = sc.parallelize(Array.empty[Tuple2[String, String]], 1) - emptyRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]]( - tempDir.getPath + "/output") - assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true) - val hadoopRDD = sc.textFile(tempDir.getPath + "/output/part-r-*") - assert(hadoopRDD.partitions.length === 0) + testIgnoreEmptySplits(Array.empty[Tuple2[String, String]], 1, 0, "part-r-00000", 0) // Ensure that if no split is empty, we don't lose any splits - val randomRDD1 = sc.parallelize( - Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 2) - randomRDD1.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]]( - tempDir.getPath + "/output1") - assert(new File(tempDir.getPath + "/output1/part-r-00001").exists() === true) - val hadoopRDD1 = sc.textFile(tempDir.getPath + "/output1/part-r-*") - assert(hadoopRDD1.partitions.length === 2) + testIgnoreEmptySplits(Array(("key1", "a"), ("key2", "a"), ("key3", "b")), 2, 1, "part-r-00001", + 2) // Ensure that if part of the splits are empty, we remove the splits correctly - val randomRDD2 = sc.parallelize( - Array(("key1", "a"), ("key2", "a")), 5) - randomRDD2.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]]( - tempDir.getPath + "/output2") - assert(new File(tempDir.getPath + "/output2/part-r-00004").exists() === true) - val hadoopRDD2 = sc.textFile(tempDir.getPath + "/output2/part-r-*") - assert(hadoopRDD2.partitions.length === 2) + testIgnoreEmptySplits(Array(("key1", "a"), ("key2", "a")), 5, 2, "part-r-00004", 2) } } diff --git a/docs/configuration.md b/docs/configuration.md index c42d206aefab6..6e9fe591b70a3 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1191,14 +1191,6 @@ Apart from these, the following properties are also available, and may be useful then the partitions with small files will be faster than partitions with bigger files. - - spark.files.filterOutEmptySplit - false - If set to true, HadoopRDD/NewHadoopRDD will not handle the split which its length is 0. Maybe you will read an empty - hive table but has many empty files. If set to false, Spark generates many tasks to handle these empty files. - Sometimes, users maybe want to use SparkContext#textFile to handle a file stored in hadoop, and they don't - want to generate any task when this file is empty, they can set this configuration to true. - spark.hadoop.cloneConf false From 527b367ea482261f6afbb7cdf339495f77c4e7f2 Mon Sep 17 00:00:00 2001 From: liulijia Date: Fri, 13 Oct 2017 01:26:27 +0800 Subject: [PATCH 4/8] optimize code. --- .../scala/org/apache/spark/FileSuite.scala | 51 +++++++++++++++---- 1 file changed, 40 insertions(+), 11 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 300d15807db8d..88cac2335a1e2 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -516,23 +516,38 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { sc = new SparkContext(conf) def testIgnoreEmptySplits(data: Array[Tuple2[String, String]], numSlices: Int, - outputSuffix: Int, checkPart: String, partitionLength: Int): Unit = { + outputSuffix: Int, checkPart: String, expectedPartitionNum: Int): Unit = { val dataRDD = sc.parallelize(data, numSlices) val output = new File(tempDir, "output" + outputSuffix) dataRDD.saveAsHadoopFile[TextOutputFormat[String, String]](output.getPath) assert(new File(output, checkPart).exists() === true) val hadoopRDD = sc.textFile(new File(output, "part-*").getPath) - assert(hadoopRDD.partitions.length === partitionLength) + assert(hadoopRDD.partitions.length === expectedPartitionNum) } // Ensure that if all of the splits are empty, we remove the splits correctly - testIgnoreEmptySplits(Array.empty[Tuple2[String, String]], 1, 0, "part-00000", 0) + testIgnoreEmptySplits( + data = Array.empty[Tuple2[String, String]], + numSlices = 1, + outputSuffix = 0, + checkPart = "part-00000", + expectedPartitionNum = 0) // Ensure that if no split is empty, we don't lose any splits - testIgnoreEmptySplits(Array(("key1", "a"), ("key2", "a"), ("key3", "b")), 2, 1, "part-00001", 2) + testIgnoreEmptySplits( + data = Array(("key1", "a"), ("key2", "a"), ("key3", "b")), + numSlices = 2, + outputSuffix = 1, + checkPart = "part-00001", + expectedPartitionNum = 2) // Ensure that if part of the splits are empty, we remove the splits correctly - testIgnoreEmptySplits(Array(("key1", "a"), ("key2", "a")), 5, 2, "part-00004", 2) + testIgnoreEmptySplits( + data = Array(("key1", "a"), ("key2", "a")), + numSlices = 5, + outputSuffix = 2, + checkPart = "part-00004", + expectedPartitionNum = 2) } test("spark.files.ignoreEmptySplits work correctly (new Hadoop API)") { @@ -541,23 +556,37 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { sc = new SparkContext(conf) def testIgnoreEmptySplits(data: Array[Tuple2[String, String]], numSlices: Int, - outputSuffix: Int, checkPart: String, partitionLength: Int): Unit = { + outputSuffix: Int, checkPart: String, expectedPartitionNum: Int): Unit = { val dataRDD = sc.parallelize(data, numSlices) val output = new File(tempDir, "output" + outputSuffix) dataRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](output.getPath) assert(new File(output, checkPart).exists() === true) val hadoopRDD = sc.textFile(new File(output, "part-r-*").getPath) - assert(hadoopRDD.partitions.length === partitionLength) + assert(hadoopRDD.partitions.length === expectedPartitionNum) } // Ensure that if all of the splits are empty, we remove the splits correctly - testIgnoreEmptySplits(Array.empty[Tuple2[String, String]], 1, 0, "part-r-00000", 0) + testIgnoreEmptySplits( + data = Array.empty[Tuple2[String, String]], + numSlices = 1, + outputSuffix = 0, + checkPart = "part-r-00000", + expectedPartitionNum = 0) // Ensure that if no split is empty, we don't lose any splits - testIgnoreEmptySplits(Array(("key1", "a"), ("key2", "a"), ("key3", "b")), 2, 1, "part-r-00001", - 2) + testIgnoreEmptySplits( + data = Array(("key1", "a"), ("key2", "a"), ("key3", "b")), + numSlices = 2, + outputSuffix = 1, + checkPart = "part-r-00001", + expectedPartitionNum = 2) // Ensure that if part of the splits are empty, we remove the splits correctly - testIgnoreEmptySplits(Array(("key1", "a"), ("key2", "a")), 5, 2, "part-r-00004", 2) + testIgnoreEmptySplits( + data = Array(("key1", "a"), ("key2", "a")), + numSlices = 5, + outputSuffix = 2, + checkPart = "part-r-00004", + expectedPartitionNum = 2) } } From 25f98d0d89e4566339d9ba7701975af4e175c918 Mon Sep 17 00:00:00 2001 From: liulijia Date: Fri, 13 Oct 2017 11:40:52 +0800 Subject: [PATCH 5/8] test read data by NewHadoopRDD. --- .../scala/org/apache/spark/internal/config/package.scala | 2 +- core/src/test/scala/org/apache/spark/FileSuite.scala | 9 ++++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 7dcef0067063a..659d3006ebe28 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -271,7 +271,7 @@ package object config { .createWithDefault(4 * 1024 * 1024) private[spark] val IGNORE_EMPTY_SPLITS = ConfigBuilder("spark.files.ignoreEmptySplits") - .doc("If true, methods like that use HadoopRDD and NewHadoopRDD such as " + + .doc("If true, methods that use HadoopRDD and NewHadoopRDD such as " + "SparkContext.textFiles will not create a partition for input splits that are empty.") .booleanConf .createWithDefault(false) diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 88cac2335a1e2..cc312864b0c91 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -561,7 +561,10 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { val output = new File(tempDir, "output" + outputSuffix) dataRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](output.getPath) assert(new File(output, checkPart).exists() === true) - val hadoopRDD = sc.textFile(new File(output, "part-r-*").getPath) + + val hadoopRDD = sc.newAPIHadoopFile(new File(output, "part-r-*").getPath, + classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text]) + .asInstanceOf[NewHadoopRDD[_, _]] assert(hadoopRDD.partitions.length === expectedPartitionNum) } @@ -575,7 +578,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { // Ensure that if no split is empty, we don't lose any splits testIgnoreEmptySplits( - data = Array(("key1", "a"), ("key2", "a"), ("key3", "b")), + data = Array(("1", "a"), ("2", "a"), ("3", "b")), numSlices = 2, outputSuffix = 1, checkPart = "part-r-00001", @@ -583,7 +586,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { // Ensure that if part of the splits are empty, we remove the splits correctly testIgnoreEmptySplits( - data = Array(("key1", "a"), ("key2", "a")), + data = Array(("1", "a"), ("2", "b")), numSlices = 5, outputSuffix = 2, checkPart = "part-r-00004", From 534d8fbcd7dfbdc9af06a4d926f6a353f429fce8 Mon Sep 17 00:00:00 2001 From: liulijia Date: Fri, 13 Oct 2017 15:31:24 +0800 Subject: [PATCH 6/8] optimize code. --- .../scala/org/apache/spark/FileSuite.scala | 63 ++++++++++--------- 1 file changed, 32 insertions(+), 31 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index cc312864b0c91..8534867593223 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -515,38 +515,39 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, true) sc = new SparkContext(conf) - def testIgnoreEmptySplits(data: Array[Tuple2[String, String]], numSlices: Int, - outputSuffix: Int, checkPart: String, expectedPartitionNum: Int): Unit = { - val dataRDD = sc.parallelize(data, numSlices) - val output = new File(tempDir, "output" + outputSuffix) - dataRDD.saveAsHadoopFile[TextOutputFormat[String, String]](output.getPath) - assert(new File(output, checkPart).exists() === true) + def testIgnoreEmptySplits( + data: Array[Tuple2[String, String]], + actualPartitionNum: Int, + expectedPart: String, + expectedPartitionNum: Int): Unit = { + val output = new File(tempDir, "output") + sc.parallelize(data, actualPartitionNum) + .saveAsHadoopFile[TextOutputFormat[String, String]](output.getPath) + assert(new File(output, expectedPart).exists() === true) val hadoopRDD = sc.textFile(new File(output, "part-*").getPath) assert(hadoopRDD.partitions.length === expectedPartitionNum) + Utils.deleteRecursively(output) } // Ensure that if all of the splits are empty, we remove the splits correctly testIgnoreEmptySplits( data = Array.empty[Tuple2[String, String]], - numSlices = 1, - outputSuffix = 0, - checkPart = "part-00000", + actualPartitionNum = 1, + expectedPart = "part-00000", expectedPartitionNum = 0) // Ensure that if no split is empty, we don't lose any splits testIgnoreEmptySplits( data = Array(("key1", "a"), ("key2", "a"), ("key3", "b")), - numSlices = 2, - outputSuffix = 1, - checkPart = "part-00001", + actualPartitionNum = 2, + expectedPart = "part-00001", expectedPartitionNum = 2) // Ensure that if part of the splits are empty, we remove the splits correctly testIgnoreEmptySplits( data = Array(("key1", "a"), ("key2", "a")), - numSlices = 5, - outputSuffix = 2, - checkPart = "part-00004", + actualPartitionNum = 5, + expectedPart = "part-00004", expectedPartitionNum = 2) } @@ -555,41 +556,41 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, true) sc = new SparkContext(conf) - def testIgnoreEmptySplits(data: Array[Tuple2[String, String]], numSlices: Int, - outputSuffix: Int, checkPart: String, expectedPartitionNum: Int): Unit = { - val dataRDD = sc.parallelize(data, numSlices) - val output = new File(tempDir, "output" + outputSuffix) - dataRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](output.getPath) - assert(new File(output, checkPart).exists() === true) - + def testIgnoreEmptySplits( + data: Array[Tuple2[String, String]], + actualPartitionNum: Int, + expectedPart: String, + expectedPartitionNum: Int): Unit = { + val output = new File(tempDir, "output") + sc.parallelize(data, actualPartitionNum) + .saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](output.getPath) + assert(new File(output, expectedPart).exists() === true) val hadoopRDD = sc.newAPIHadoopFile(new File(output, "part-r-*").getPath, classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text]) .asInstanceOf[NewHadoopRDD[_, _]] assert(hadoopRDD.partitions.length === expectedPartitionNum) + Utils.deleteRecursively(output) } // Ensure that if all of the splits are empty, we remove the splits correctly testIgnoreEmptySplits( data = Array.empty[Tuple2[String, String]], - numSlices = 1, - outputSuffix = 0, - checkPart = "part-r-00000", + actualPartitionNum = 1, + expectedPart = "part-r-00000", expectedPartitionNum = 0) // Ensure that if no split is empty, we don't lose any splits testIgnoreEmptySplits( data = Array(("1", "a"), ("2", "a"), ("3", "b")), - numSlices = 2, - outputSuffix = 1, - checkPart = "part-r-00001", + actualPartitionNum = 2, + expectedPart = "part-r-00001", expectedPartitionNum = 2) // Ensure that if part of the splits are empty, we remove the splits correctly testIgnoreEmptySplits( data = Array(("1", "a"), ("2", "b")), - numSlices = 5, - outputSuffix = 2, - checkPart = "part-r-00004", + actualPartitionNum = 5, + expectedPart = "part-r-00004", expectedPartitionNum = 2) } } From a6818b60adef7bec35b002846a3a504ae53dd9f9 Mon Sep 17 00:00:00 2001 From: liulijia Date: Sat, 14 Oct 2017 10:22:41 +0800 Subject: [PATCH 7/8] Adjust code to conform to the code style. --- .../scala/org/apache/spark/FileSuite.scala | 28 ++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 8534867593223..4da4323ceb5c8 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -516,14 +516,15 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { sc = new SparkContext(conf) def testIgnoreEmptySplits( - data: Array[Tuple2[String, String]], - actualPartitionNum: Int, - expectedPart: String, - expectedPartitionNum: Int): Unit = { + data: Array[Tuple2[String, String]], + actualPartitionNum: Int, + expectedPartitionNum: Int): Unit = { val output = new File(tempDir, "output") sc.parallelize(data, actualPartitionNum) .saveAsHadoopFile[TextOutputFormat[String, String]](output.getPath) - assert(new File(output, expectedPart).exists() === true) + for (i <- 0 until actualPartitionNum) { + assert(new File(output, s"part-0000$i").exists() === true) + } val hadoopRDD = sc.textFile(new File(output, "part-*").getPath) assert(hadoopRDD.partitions.length === expectedPartitionNum) Utils.deleteRecursively(output) @@ -533,21 +534,18 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { testIgnoreEmptySplits( data = Array.empty[Tuple2[String, String]], actualPartitionNum = 1, - expectedPart = "part-00000", expectedPartitionNum = 0) // Ensure that if no split is empty, we don't lose any splits testIgnoreEmptySplits( data = Array(("key1", "a"), ("key2", "a"), ("key3", "b")), actualPartitionNum = 2, - expectedPart = "part-00001", expectedPartitionNum = 2) // Ensure that if part of the splits are empty, we remove the splits correctly testIgnoreEmptySplits( data = Array(("key1", "a"), ("key2", "a")), actualPartitionNum = 5, - expectedPart = "part-00004", expectedPartitionNum = 2) } @@ -557,14 +555,15 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { sc = new SparkContext(conf) def testIgnoreEmptySplits( - data: Array[Tuple2[String, String]], - actualPartitionNum: Int, - expectedPart: String, - expectedPartitionNum: Int): Unit = { + data: Array[Tuple2[String, String]], + actualPartitionNum: Int, + expectedPartitionNum: Int): Unit = { val output = new File(tempDir, "output") sc.parallelize(data, actualPartitionNum) .saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](output.getPath) - assert(new File(output, expectedPart).exists() === true) + for (i <- 0 until actualPartitionNum) { + assert(new File(output, s"part-r-0000$i").exists() === true) + } val hadoopRDD = sc.newAPIHadoopFile(new File(output, "part-r-*").getPath, classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text]) .asInstanceOf[NewHadoopRDD[_, _]] @@ -576,21 +575,18 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { testIgnoreEmptySplits( data = Array.empty[Tuple2[String, String]], actualPartitionNum = 1, - expectedPart = "part-r-00000", expectedPartitionNum = 0) // Ensure that if no split is empty, we don't lose any splits testIgnoreEmptySplits( data = Array(("1", "a"), ("2", "a"), ("3", "b")), actualPartitionNum = 2, - expectedPart = "part-r-00001", expectedPartitionNum = 2) // Ensure that if part of the splits are empty, we remove the splits correctly testIgnoreEmptySplits( data = Array(("1", "a"), ("2", "b")), actualPartitionNum = 5, - expectedPart = "part-r-00004", expectedPartitionNum = 2) } } From 9f42f9f412652f8a5259b3b34f48330c48caedd5 Mon Sep 17 00:00:00 2001 From: liulijia Date: Sat, 14 Oct 2017 16:40:31 +0800 Subject: [PATCH 8/8] code format adjustment. --- core/src/test/scala/org/apache/spark/FileSuite.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 4da4323ceb5c8..b86a8858ebd09 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -564,9 +564,11 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { for (i <- 0 until actualPartitionNum) { assert(new File(output, s"part-r-0000$i").exists() === true) } - val hadoopRDD = sc.newAPIHadoopFile(new File(output, "part-r-*").getPath, - classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text]) - .asInstanceOf[NewHadoopRDD[_, _]] + val hadoopRDD = sc.newAPIHadoopFile( + new File(output, "part-r-*").getPath, + classOf[NewTextInputFormat], + classOf[LongWritable], + classOf[Text]).asInstanceOf[NewHadoopRDD[_, _]] assert(hadoopRDD.partitions.length === expectedPartitionNum) Utils.deleteRecursively(output) }