From 73e1f16b4ab57f708d2cac1442574479bda9b4f1 Mon Sep 17 00:00:00 2001 From: yongtang Date: Sun, 26 Apr 2015 11:44:01 -0700 Subject: [PATCH 1/3] [SPARK-7155] [CORE] Allow newAPIHadoopFile to support comma-separated list of files as input. --- .../scala/org/apache/spark/SparkContext.scala | 4 ++- .../org/apache/spark/SparkContextSuite.scala | 30 ++++++++++++++++++- 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 86269eac52db0..c0251fa4b336e 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -926,7 +926,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // The call to new NewHadoopJob automatically adds security credentials to conf, // so we don't need to explicitly add them ourselves val job = new NewHadoopJob(conf) - NewFileInputFormat.addInputPath(job, new Path(path)) + // Use addInputPaths so that newAPIHadoopFile aligns with hadoopFile in taking + // comma separated files as input. (see SPARK-7155) + NewFileInputFormat.addInputPaths(job, path) val updatedConf = job.getConfiguration new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf).setName(path) } diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 728558a424780..b353ec1cd5315 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -25,7 +25,9 @@ import com.google.common.io.Files import org.scalatest.FunSuite -import org.apache.hadoop.io.BytesWritable +import org.apache.hadoop.io.{BytesWritable, LongWritable, Text} +import org.apache.hadoop.mapred.TextInputFormat +import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat} import org.apache.spark.util.Utils import scala.concurrent.Await @@ -213,4 +215,30 @@ class SparkContextSuite extends FunSuite with LocalSparkContext { sc.stop() } } + + test("Comma separated paths could be used for hadoopFile and newAPIHadoopFile (SPARK-7155)") { + // Regression test for SPARK-7155 + val dir = Utils.createTempDir() + + val file1 = File.createTempFile("someprefix1", "somesuffix1", dir) + val absolutePath1 = file1.getAbsolutePath + + val file2 = File.createTempFile("someprefix2", "somesuffix2", dir) + val absolutePath2 = file2.getAbsolutePath + + try { + // Create two text files. + Files.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1", file1, UTF_8) + Files.write("someline1 in file2\nsomeline2 in file2", file2, UTF_8) + + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) + + // Test textFile, hadoopFile, and newAPIHadoopFile + assert(sc.textFile(absolutePath1+","+absolutePath2).count() == 5L) + assert(sc.hadoopFile(absolutePath1+","+absolutePath2, classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L) + assert(sc.newAPIHadoopFile(absolutePath1+","+absolutePath2, classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L) + } finally { + sc.stop() + } + } } From 26faa6ac6f3f034b517b1ba1d55c452cb09c1c93 Mon Sep 17 00:00:00 2001 From: yongtang Date: Sun, 26 Apr 2015 23:06:19 -0700 Subject: [PATCH 2/3] [SPARK-7155] [CORE] Support comma-separated list of files as input for newAPIHadoopFile, wholeTextFiles, and binaryFiles. Use setInputPaths for consistency. --- .../scala/org/apache/spark/SparkContext.scala | 12 ++-- .../org/apache/spark/SparkContextSuite.scala | 72 +++++++++++++++---- 2 files changed, 68 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index c0251fa4b336e..67e61f4e4dc5e 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -704,7 +704,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli RDD[(String, String)] = { assertNotStopped() val job = new NewHadoopJob(hadoopConfiguration) - NewFileInputFormat.addInputPath(job, new Path(path)) + // Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking + // comma separated files as input. (see SPARK-7155) + NewFileInputFormat.setInputPaths(job, path) val updateConf = job.getConfiguration new WholeTextFileRDD( this, @@ -750,7 +752,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli RDD[(String, PortableDataStream)] = { assertNotStopped() val job = new NewHadoopJob(hadoopConfiguration) - NewFileInputFormat.addInputPath(job, new Path(path)) + // Use setInputPaths so that binaryFiles aligns with hadoopFile/textFile in taking + // comma separated files as input. (see SPARK-7155) + NewFileInputFormat.setInputPaths(job, path) val updateConf = job.getConfiguration new BinaryFileRDD( this, @@ -926,9 +930,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // The call to new NewHadoopJob automatically adds security credentials to conf, // so we don't need to explicitly add them ourselves val job = new NewHadoopJob(conf) - // Use addInputPaths so that newAPIHadoopFile aligns with hadoopFile in taking + // Use setInputPaths so that newAPIHadoopFile aligns with hadoopFile/textFile in taking // comma separated files as input. (see SPARK-7155) - NewFileInputFormat.addInputPaths(job, path) + NewFileInputFormat.setInputPaths(job, path) val updatedConf = job.getConfiguration new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf).setName(path) } diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index b353ec1cd5315..47c4adc12d0e0 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -216,29 +216,77 @@ class SparkContextSuite extends FunSuite with LocalSparkContext { } } - test("Comma separated paths could be used for hadoopFile and newAPIHadoopFile (SPARK-7155)") { + test("Comma separated paths for newAPIHadoopFile/wholeTextFiles/binaryFiles (SPARK-7155)") { // Regression test for SPARK-7155 - val dir = Utils.createTempDir() + // dir1 and dir2 are used for wholeTextFiles and binaryFiles + val dir1 = Utils.createTempDir() + val dir2 = Utils.createTempDir() + + val dirpath1=dir1.getAbsolutePath + val dirpath2=dir2.getAbsolutePath + + // file1 and file2 are placed inside dir1, they are also used for + // textFile, hadoopFile, and newAPIHadoopFile + // file3, file4 and file5 are placed inside dir2, they are used for + // textFile, hadoopFile, and newAPIHadoopFile as well + val file1 = new File(dir1, "part-00000") + val file2 = new File(dir1, "part-00001") + val file3 = new File(dir2, "part-00000") + val file4 = new File(dir2, "part-00001") + val file5 = new File(dir2, "part-00002") + + val filepath1=file1.getAbsolutePath + val filepath2=file2.getAbsolutePath + val filepath3=file3.getAbsolutePath + val filepath4=file4.getAbsolutePath + val filepath5=file5.getAbsolutePath - val file1 = File.createTempFile("someprefix1", "somesuffix1", dir) - val absolutePath1 = file1.getAbsolutePath - - val file2 = File.createTempFile("someprefix2", "somesuffix2", dir) - val absolutePath2 = file2.getAbsolutePath try { - // Create two text files. + // Create 5 text files. Files.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1", file1, UTF_8) Files.write("someline1 in file2\nsomeline2 in file2", file2, UTF_8) + Files.write("someline1 in file3", file3, UTF_8) + Files.write("someline1 in file4\nsomeline2 in file4", file4, UTF_8) + Files.write("someline1 in file2\nsomeline2 in file5", file5, UTF_8) sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) - // Test textFile, hadoopFile, and newAPIHadoopFile - assert(sc.textFile(absolutePath1+","+absolutePath2).count() == 5L) - assert(sc.hadoopFile(absolutePath1+","+absolutePath2, classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L) - assert(sc.newAPIHadoopFile(absolutePath1+","+absolutePath2, classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L) + // Test textFile, hadoopFile, and newAPIHadoopFile for file1 and file2 + assert(sc.textFile(filepath1 + "," + filepath2).count() == 5L) + assert(sc.hadoopFile(filepath1 + "," + filepath2, + classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L) + assert(sc.newAPIHadoopFile(filepath1 + "," + filepath2, + classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L) + + // Test textFile, hadoopFile, and newAPIHadoopFile for file3, file4, and file5 + assert(sc.textFile(filepath3 + "," + filepath4 + "," + filepath5).count() == 5L) + assert(sc.hadoopFile(filepath3 + "," + filepath4 + "," + filepath5, + classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L) + assert(sc.newAPIHadoopFile(filepath3 + "," + filepath4 + "," + filepath5, + classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L) + + // Test wholeTextFiles, and binaryFiles for dir1 and dir2 + assert(sc.wholeTextFiles(dirpath1 + "," + dirpath2).count() == 5L) + assert(sc.binaryFiles(dirpath1 + "," + dirpath2).count() == 5L) + } finally { sc.stop() + if (file1.exists()) { + file1.delete() + } + if (file2.exists()) { + file2.delete() + } + if (file3.exists()) { + file3.delete() + } + if (file4.exists()) { + file4.delete() + } + if (file5.exists()) { + file5.delete() + } } } } From 654c80c4b197682020e5d54ea6d3742bb829aba1 Mon Sep 17 00:00:00 2001 From: yongtang Date: Mon, 27 Apr 2015 08:03:32 -0700 Subject: [PATCH 3/3] [SPARK-7155] [CORE] Remove unneeded temp file deletion in unit test as parent dir is already temporary. --- .../org/apache/spark/SparkContextSuite.scala | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 47c4adc12d0e0..9049db7755358 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -272,21 +272,6 @@ class SparkContextSuite extends FunSuite with LocalSparkContext { } finally { sc.stop() - if (file1.exists()) { - file1.delete() - } - if (file2.exists()) { - file2.delete() - } - if (file3.exists()) { - file3.delete() - } - if (file4.exists()) { - file4.delete() - } - if (file5.exists()) { - file5.delete() - } } } }